From 9cba523a12858576bc2d45fd52c5530b5c1e7de6 Mon Sep 17 00:00:00 2001 From: Sonic Date: Tue, 16 Oct 2018 17:01:19 +0800 Subject: dex: implement peerSetLoop --- dex/handler.go | 54 +++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 11 deletions(-) (limited to 'dex/handler.go') diff --git a/dex/handler.go b/dex/handler.go index 276dd433c..b605c907a 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -98,8 +98,8 @@ type ProtocolManager struct { noMorePeers chan struct{} // channels for peerSetLoop - crsCh chan core.NewCRSEvent - crsSub event.Subscription + chainHeadCh chan core.ChainHeadEvent + chainHeadSub event.Subscription // channels for dexon consensus core receiveCh chan interface{} @@ -239,8 +239,8 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { go pm.minedBroadcastLoop() // run the peer set loop - pm.crsCh = make(chan core.NewCRSEvent) - // pm.crsSub = pm.gov.SubscribeNewCRSEvent(pm.crsCh) + pm.chainHeadCh = make(chan core.ChainHeadEvent) + pm.chainHeadSub = pm.blockchain.SubscribeChainHeadEvent(pm.chainHeadCh) go pm.peerSetLoop() // start sync handlers @@ -284,6 +284,7 @@ func (pm *ProtocolManager) Stop() { pm.txsSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + pm.chainHeadSub.Unsubscribe() // Quit the sync loop. // After this send has completed, no new peers will be accepted. @@ -918,16 +919,47 @@ func (pm *ProtocolManager) metaBroadcastLoop() { // a loop keep building and maintaining peers in notary set. // TODO: finish this func (pm *ProtocolManager) peerSetLoop() { - log.Debug("start peer set loop") + round := pm.gov.LenCRS() - 1 + log.Trace("first len crs", "len", round+1, "round", round) + if round >= 1 { + pm.peers.BuildNotaryConn(round - 1) + pm.peers.BuildDKGConn(round - 1) + } + pm.peers.BuildNotaryConn(round) + pm.peers.BuildDKGConn(round) + for { select { - case event := <-pm.crsCh: - pm.peers.BuildNotaryConn(event.Round) - pm.peers.BuildDKGConn(event.Round) - pm.peers.ForgetNotaryConn(event.Round - 1) - pm.peers.ForgetDKGConn(event.Round - 1) - case <-pm.quitSync: + case <-pm.chainHeadCh: + newRound := pm.gov.LenCRS() - 1 + log.Trace("new round", "round", newRound) + if newRound == round { + break + } + if newRound == round+1 { + pm.peers.BuildNotaryConn(newRound) + pm.peers.BuildDKGConn(newRound) + pm.peers.ForgetNotaryConn(round - 1) + pm.peers.ForgetDKGConn(round - 1) + } else { + // just forget all network connection and rebuild. + pm.peers.ForgetNotaryConn(round) + pm.peers.ForgetDKGConn(round) + + if newRound >= 1 { + pm.peers.BuildNotaryConn(newRound - 1) + pm.peers.BuildDKGConn(newRound - 1) + } + pm.peers.BuildNotaryConn(newRound) + pm.peers.BuildDKGConn(newRound) + } + round = newRound + case <-time.After(5 * time.Second): + pm.peers.lock.Lock() + pm.peers.dumpPeerLabel("ticker") + pm.peers.lock.Unlock() + case <-pm.chainHeadSub.Err(): return } } -- cgit v1.2.3