From 9cba523a12858576bc2d45fd52c5530b5c1e7de6 Mon Sep 17 00:00:00 2001 From: Sonic Date: Tue, 16 Oct 2018 17:01:19 +0800 Subject: dex: implement peerSetLoop --- dex/governance.go | 5 +++++ dex/handler.go | 54 +++++++++++++++++++++++++++++++++++++++++++----------- dex/helper_test.go | 5 +++++ dex/nodetable.go | 3 +++ dex/peer.go | 16 ++++++++++++++++ dex/protocol.go | 2 ++ 6 files changed, 74 insertions(+), 11 deletions(-) diff --git a/dex/governance.go b/dex/governance.go index 22452dea3..a78cf2d4f 100644 --- a/dex/governance.go +++ b/dex/governance.go @@ -132,6 +132,11 @@ func (d *DexconGovernance) CRS(round uint64) coreCommon.Hash { return coreCommon.Hash(s.CRS(big.NewInt(int64(round)))) } +func (d *DexconGovernance) LenCRS() uint64 { + s := d.getGovState() + return s.LenCRS().Uint64() +} + // ProposeCRS send proposals of a new CRS func (d *DexconGovernance) ProposeCRS(signedCRS []byte) { method := vm.GovernanceContractName2Method["proposeCRS"] diff --git a/dex/handler.go b/dex/handler.go index 276dd433c..b605c907a 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -98,8 +98,8 @@ type ProtocolManager struct { noMorePeers chan struct{} // channels for peerSetLoop - crsCh chan core.NewCRSEvent - crsSub event.Subscription + chainHeadCh chan core.ChainHeadEvent + chainHeadSub event.Subscription // channels for dexon consensus core receiveCh chan interface{} @@ -239,8 +239,8 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { go pm.minedBroadcastLoop() // run the peer set loop - pm.crsCh = make(chan core.NewCRSEvent) - // pm.crsSub = pm.gov.SubscribeNewCRSEvent(pm.crsCh) + pm.chainHeadCh = make(chan core.ChainHeadEvent) + pm.chainHeadSub = pm.blockchain.SubscribeChainHeadEvent(pm.chainHeadCh) go pm.peerSetLoop() // start sync handlers @@ -284,6 +284,7 @@ func (pm *ProtocolManager) Stop() { pm.txsSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + pm.chainHeadSub.Unsubscribe() // Quit the sync loop. // After this send has completed, no new peers will be accepted. @@ -918,16 +919,47 @@ func (pm *ProtocolManager) metaBroadcastLoop() { // a loop keep building and maintaining peers in notary set. // TODO: finish this func (pm *ProtocolManager) peerSetLoop() { - log.Debug("start peer set loop") + round := pm.gov.LenCRS() - 1 + log.Trace("first len crs", "len", round+1, "round", round) + if round >= 1 { + pm.peers.BuildNotaryConn(round - 1) + pm.peers.BuildDKGConn(round - 1) + } + pm.peers.BuildNotaryConn(round) + pm.peers.BuildDKGConn(round) + for { select { - case event := <-pm.crsCh: - pm.peers.BuildNotaryConn(event.Round) - pm.peers.BuildDKGConn(event.Round) - pm.peers.ForgetNotaryConn(event.Round - 1) - pm.peers.ForgetDKGConn(event.Round - 1) - case <-pm.quitSync: + case <-pm.chainHeadCh: + newRound := pm.gov.LenCRS() - 1 + log.Trace("new round", "round", newRound) + if newRound == round { + break + } + if newRound == round+1 { + pm.peers.BuildNotaryConn(newRound) + pm.peers.BuildDKGConn(newRound) + pm.peers.ForgetNotaryConn(round - 1) + pm.peers.ForgetDKGConn(round - 1) + } else { + // just forget all network connection and rebuild. + pm.peers.ForgetNotaryConn(round) + pm.peers.ForgetDKGConn(round) + + if newRound >= 1 { + pm.peers.BuildNotaryConn(newRound - 1) + pm.peers.BuildDKGConn(newRound - 1) + } + pm.peers.BuildNotaryConn(newRound) + pm.peers.BuildDKGConn(newRound) + } + round = newRound + case <-time.After(5 * time.Second): + pm.peers.lock.Lock() + pm.peers.dumpPeerLabel("ticker") + pm.peers.lock.Unlock() + case <-pm.chainHeadSub.Err(): return } } diff --git a/dex/helper_test.go b/dex/helper_test.go index 09d15d42f..5b73c4afa 100644 --- a/dex/helper_test.go +++ b/dex/helper_test.go @@ -183,6 +183,7 @@ func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *typ // testGovernance is a fake, helper governance for testing purposes type testGovernance struct { numChainsFunc func(uint64) uint32 + lenCRSFunc func() uint64 notarySetFunc func(uint64, uint32) (map[string]struct{}, error) dkgSetFunc func(uint64) (map[string]struct{}, error) } @@ -191,6 +192,10 @@ func (g *testGovernance) GetNumChains(round uint64) uint32 { return g.numChainsFunc(round) } +func (g *testGovernance) LenCRS() uint64 { + return g.lenCRSFunc() +} + func (g *testGovernance) NotarySet( round uint64, chainID uint32) (map[string]struct{}, error) { return g.notarySetFunc(round, chainID) diff --git a/dex/nodetable.go b/dex/nodetable.go index 929b168a8..f1291b4fd 100644 --- a/dex/nodetable.go +++ b/dex/nodetable.go @@ -7,6 +7,7 @@ import ( "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/crypto/sha3" "github.com/dexon-foundation/dexon/event" + "github.com/dexon-foundation/dexon/log" "github.com/dexon-foundation/dexon/p2p/enode" "github.com/dexon-foundation/dexon/rlp" ) @@ -59,6 +60,8 @@ func (t *nodeTable) Add(metas []*NodeMeta) { } t.entry[meta.ID] = meta newMetas = append(newMetas, meta) + log.Trace("add new node meta", "id", meta.ID[:8], + "ip", meta.IP, "udp", meta.UDP, "tcp", meta.TCP) } t.feed.Send(newMetasEvent{newMetas}) } diff --git a/dex/peer.go b/dex/peer.go index 342d0f033..1f2b9518d 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -778,6 +778,7 @@ func (ps *peerSet) Close() { func (ps *peerSet) BuildNotaryConn(round uint64) { ps.lock.Lock() defer ps.lock.Unlock() + defer ps.dumpPeerLabel(fmt.Sprintf("BuildNotaryConn: %d", round)) if _, ok := ps.notaryHistory[round]; ok { return @@ -816,9 +817,21 @@ func (ps *peerSet) BuildNotaryConn(round uint64) { } } +func (ps *peerSet) dumpPeerLabel(s string) { + log.Trace(s, "peer num", len(ps.peers)) + for id, labels := range ps.peer2Labels { + _, ok := ps.peers[id] + for label := range labels { + log.Trace(s, "connected", ok, "id", id[:16], + "round", label.round, "cid", label.chainID, "set", label.set) + } + } +} + func (ps *peerSet) ForgetNotaryConn(round uint64) { ps.lock.Lock() defer ps.lock.Unlock() + defer ps.dumpPeerLabel(fmt.Sprintf("ForgetNotaryConn: %d", round)) // forget all the rounds before the given round for r := range ps.notaryHistory { @@ -862,6 +875,7 @@ func notarySetName(chainID uint32, round uint64) string { func (ps *peerSet) BuildDKGConn(round uint64) { ps.lock.Lock() defer ps.lock.Unlock() + defer ps.dumpPeerLabel(fmt.Sprintf("BuildDKGConn: %d", round)) selfID := ps.srvr.Self().ID.String() s, err := ps.gov.DKGSet(round) if err != nil { @@ -886,6 +900,7 @@ func (ps *peerSet) BuildDKGConn(round uint64) { func (ps *peerSet) ForgetDKGConn(round uint64) { ps.lock.Lock() defer ps.lock.Unlock() + defer ps.dumpPeerLabel(fmt.Sprintf("ForgetDKGConn: %d", round)) // forget all the rounds before the given round for r := range ps.dkgHistory { @@ -919,6 +934,7 @@ func (ps *peerSet) forgetDKGConn(round uint64) { // make sure the ps.lock is hold func (ps *peerSet) addDirectPeer(id string, label peerLabel) { + log.Trace("peerSet addDirectPeer", "id", id[:8], "round", label.round, "cid", label.chainID) // if the peer exists add the label if p, ok := ps.peers[id]; ok { p.addLabel(label) diff --git a/dex/protocol.go b/dex/protocol.go index 8e1db583d..c17398ffb 100644 --- a/dex/protocol.go +++ b/dex/protocol.go @@ -127,6 +127,8 @@ type txPool interface { type governance interface { GetNumChains(uint64) uint32 + LenCRS() uint64 + NotarySet(uint64, uint32) (map[string]struct{}, error) DKGSet(uint64) (map[string]struct{}, error) -- cgit v1.2.3