diff options
author | Sonic <sonic@cobinhood.com> | 2018-09-25 16:53:56 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@byzantine-lab.io> | 2019-06-12 17:21:31 +0800 |
commit | 94bb940818d9b5c6d654da14f13918e65cf84623 (patch) | |
tree | 27c40976f1a1ee43ea717673e771b7eef9e93729 /dex/handler.go | |
parent | 42e9b9b25d21f40f7d4ebfbb0e7cbca0f1ade6d4 (diff) | |
download | go-tangerine-94bb940818d9b5c6d654da14f13918e65cf84623.tar go-tangerine-94bb940818d9b5c6d654da14f13918e65cf84623.tar.gz go-tangerine-94bb940818d9b5c6d654da14f13918e65cf84623.tar.bz2 go-tangerine-94bb940818d9b5c6d654da14f13918e65cf84623.tar.lz go-tangerine-94bb940818d9b5c6d654da14f13918e65cf84623.tar.xz go-tangerine-94bb940818d9b5c6d654da14f13918e65cf84623.tar.zst go-tangerine-94bb940818d9b5c6d654da14f13918e65cf84623.zip |
dex: implement notary node info propagation and management mechanism
Diffstat (limited to 'dex/handler.go')
-rw-r--r-- | dex/handler.go | 164 |
1 files changed, 150 insertions, 14 deletions
diff --git a/dex/handler.go b/dex/handler.go index 21609a561..96d20b02b 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -59,6 +59,11 @@ var ( // not compatible (low protocol version restrictions and high requirements). var errIncompatibleConfig = errors.New("incompatible configuration") +type newNotarySetEvent struct { + Round uint64 + Set map[string]struct{} +} + func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -74,9 +79,10 @@ type ProtocolManager struct { chainconfig *params.ChainConfig maxPeers int - downloader *downloader.Downloader - fetcher *fetcher.Fetcher - peers *peerSet + downloader *downloader.Downloader + fetcher *fetcher.Fetcher + peers *peerSet + notarySetManager *notarySetManager SubProtocols []p2p.Protocol @@ -91,6 +97,13 @@ type ProtocolManager struct { quitSync chan struct{} noMorePeers chan struct{} + // channels for notarySetLoop + newNotarySetCh chan newNotarySetEvent + newRoundCh chan uint64 + newNotaryNodeInfoCh chan *notaryNodeInfo + + srvr p2pServer + // wait group is used for graceful shutdowns during downloading // and processing wg sync.WaitGroup @@ -101,17 +114,22 @@ type ProtocolManager struct { func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkID: networkID, - eventMux: mux, - txpool: txpool, - blockchain: blockchain, - chainconfig: config, - peers: newPeerSet(), - newPeerCh: make(chan *peer), - noMorePeers: make(chan struct{}), - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), + networkID: networkID, + eventMux: mux, + txpool: txpool, + blockchain: blockchain, + chainconfig: config, + peers: newPeerSet(), + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), + newNotarySetCh: make(chan newNotarySetEvent), + newRoundCh: make(chan uint64), + newNotaryNodeInfoCh: make(chan *notaryNodeInfo), } + manager.notarySetManager = newNotarySetManager(manager.newNotaryNodeInfoCh) + // Figure out whether to allow fast sync or not if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 { log.Warn("Blockchain not empty, fast sync disabled") @@ -195,8 +213,9 @@ func (pm *ProtocolManager) removePeer(id string) { } } -func (pm *ProtocolManager) Start(maxPeers int) { +func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { pm.maxPeers = maxPeers + pm.srvr = srvr // broadcast transactions pm.txsCh = make(chan core.NewTxsEvent, txChanSize) @@ -207,6 +226,9 @@ func (pm *ProtocolManager) Start(maxPeers int) { pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() + // run the notary set loop + go pm.notarySetLoop() + // start sync handlers go pm.syncer() go pm.txsyncLoop() @@ -677,6 +699,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.txpool.AddRemotes(txs) + case msg.Code == NotaryNodeInfoMsg: + var info notaryNodeInfo + if err := msg.Decode(&info); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + + // TODO(sonic): validate signature + p.MarkNotaryNodeInfo(info.Hash()) + // Broadcast to peers + pm.BroadcastNotaryNodeInfo(&info) + + pm.notarySetManager.TryAddInfo(&info) default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } @@ -735,6 +769,14 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { } } +// BroadcastNotaryNodeInfo will propagate notary node info to its peers. +func (pm *ProtocolManager) BroadcastNotaryNodeInfo(info *notaryNodeInfo) { + peers := pm.peers.PeersWithoutNotaryNodeInfo(info.Hash()) + for _, peer := range peers { + peer.AsyncSendNotaryNodeInfo(info) + } +} + // Mined broadcast loop func (pm *ProtocolManager) minedBroadcastLoop() { // automatically stops if unsubscribe @@ -759,6 +801,100 @@ func (pm *ProtocolManager) txBroadcastLoop() { } } +// a loop keep building and maintaining peers in notary set. +func (pm *ProtocolManager) notarySetLoop() { + // TODO: + // * pm need to know current round, and whether we are in notary set. + // * (done) able to handle node info in the near future round. + // * check peer limit. + // * revisit channels (newNotarySetCh, newRoundCh, newNotaryNodeInfoCh) (buffered ?) + + // If we are in notary set and we are synced with network. + // events: + // * new notary set is determined, and we are in notary set. + // (TBD: subscribe the event or polling govornance) + // - advance round + // - build new notary set + // - remove old notary set, remove old notary set from static + + // * current round node info changed. + + self := pm.srvr.Self() + var round uint64 + + for { + select { + case event := <-pm.newNotarySetCh: + // new notary set is determined. + if _, ok := event.Set[self.ID.String()]; ok { + // initialize the new notary set and add it to pm.notarySetManager + s := newNotarySet(event.Round, event.Set) + pm.notarySetManager.Register(event.Round, s) + + // TODO: handle signature + pm.BroadcastNotaryNodeInfo(¬aryNodeInfo{ + ID: self.ID, + IP: self.IP, + UDP: self.UDP, + TCP: self.TCP, + Round: event.Round, + Timestamp: time.Now().Unix(), + }) + } + + case r := <-pm.newRoundCh: + // move to new round. + round = r + + // TODO: revisit this to make sure how many previous round's + // notary set we need to keep. + + // rmove notary set before current round, they are useless + for _, s := range pm.notarySetManager.Before(round) { + pm.removeNotarySetFromStatic(s) + } + + // prepare notary set connections for new round. + if pm.isInNotarySet(round + 1) { + // we assume the notary set for round + 1 is added to + // pm.notarySetManager before this step. + if n, ok := pm.notarySetManager.Round(round + 1); ok { + pm.addNotarySetToStatic(n) + } + } + + case <-pm.newNotaryNodeInfoCh: + // some node info in "current round" is updated. + // try to connect them by the new info. + if pm.isInNotarySet(round) { + if n, ok := pm.notarySetManager.Round(round); ok { + pm.addNotarySetToStatic(n) + } + } + + case <-pm.quitSync: + return + } + } +} + +func (pm *ProtocolManager) isInNotarySet(r uint64) bool { + return false +} + +func (pm *ProtocolManager) addNotarySetToStatic(n *notarySet) { + for _, node := range n.NodesToAdd() { + pm.srvr.AddNotaryPeer(node) + n.MarkAdded(node.ID.String()) + } +} + +func (pm *ProtocolManager) removeNotarySetFromStatic(n *notarySet) { + for _, node := range n.Nodes() { + pm.srvr.RemoveNotaryPeer(node) + } +} + // NodeInfo represents a short summary of the Ethereum sub-protocol metadata // known about the host peer. type NodeInfo struct { |