diff options
Diffstat (limited to 'dex')
-rw-r--r-- | dex/backend.go | 15 | ||||
-rw-r--r-- | dex/blockproposer.go | 17 | ||||
-rw-r--r-- | dex/handler.go | 72 | ||||
-rw-r--r-- | dex/protocol_test.go | 18 |
4 files changed, 78 insertions, 44 deletions
diff --git a/dex/backend.go b/dex/backend.go index f1b0012a2..29ac42906 100644 --- a/dex/backend.go +++ b/dex/backend.go @@ -253,7 +253,20 @@ func (s *Dexon) Start(srvr *p2p.Server) error { s.protocolManager.Start(srvr, maxPeers) if s.config.BlockProposerEnabled { - s.bp.Start() + go func() { + // Since we might be in fast sync mode when started. wait for + // ChainHeadEvent before starting blockproposer, or else we will trigger + // watchcat. + if s.config.SyncMode == downloader.FastSync && + s.blockchain.CurrentBlock().NumberU64() == 0 { + ch := make(chan core.ChainHeadEvent) + sub := s.blockchain.SubscribeChainHeadEvent(ch) + defer sub.Unsubscribe() + + <-ch + } + s.bp.Start() + }() } return nil } diff --git a/dex/blockproposer.go b/dex/blockproposer.go index 24a277335..dc2b22e16 100644 --- a/dex/blockproposer.go +++ b/dex/blockproposer.go @@ -50,7 +50,7 @@ func (b *blockProposer) Start() error { if !atomic.CompareAndSwapInt32(&b.running, 0, 1) { return fmt.Errorf("block proposer is already running") } - log.Info("Block proposer started") + log.Info("Started block proposer") b.stopCh = make(chan struct{}) b.wg.Add(1) @@ -61,6 +61,9 @@ func (b *blockProposer) Start() error { var err error var c *dexCore.Consensus if b.dMoment.After(time.Now()) { + // Start receiving core messages. + b.dex.protocolManager.SetReceiveCoreMessage(true) + c = b.initConsensus() } else { c, err = b.syncConsensus() @@ -91,7 +94,7 @@ func (b *blockProposer) Stop() { defer b.mu.Unlock() if atomic.LoadInt32(&b.running) == 1 { - atomic.StoreInt32(&b.dex.protocolManager.receiveEnabled, 0) + b.dex.protocolManager.SetReceiveCoreMessage(false) close(b.stopCh) b.wg.Wait() atomic.StoreInt32(&b.proposing, 0) @@ -124,10 +127,9 @@ func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) { db, b.dex.network, privkey, log.Root()) // Start the watchCat. - log.Info("Starting sync watchCat ...") - b.watchCat.Start() defer b.watchCat.Stop() + log.Info("Started sync watchCat") // Feed the current block we have in local blockchain. cb := b.dex.blockchain.CurrentBlock() @@ -156,9 +158,6 @@ func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) { // Sync all blocks in compaction chain to core. _, coreHeight := db.GetCompactionChainTipInfo() - // Stop receiving block proposer message when syncing. - atomic.StoreInt32(&b.dex.protocolManager.receiveEnabled, 0) - Loop: for { currentBlock := b.dex.blockchain.CurrentBlock() @@ -211,7 +210,7 @@ ListenLoop: log.Error("SyncBlocks fail", "err", err) return nil, err } - atomic.CompareAndSwapInt32(&b.dex.protocolManager.receiveEnabled, 0, 1) + b.dex.protocolManager.SetReceiveCoreMessage(true) if synced { log.Debug("Consensus core synced") break ListenLoop @@ -249,7 +248,7 @@ ListenLoop: log.Info("Sleeping until next starting time", "time", nextDMoment) time.Sleep(time.Duration(nextDMoment-time.Now().Unix()) * time.Second) - atomic.StoreInt32(&b.dex.protocolManager.receiveEnabled, 1) + b.dex.protocolManager.SetReceiveCoreMessage(true) consensusSync.ForceSync(true) break ListenLoop } diff --git a/dex/handler.go b/dex/handler.go index 245b31807..84170e54b 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -136,8 +136,8 @@ type ProtocolManager struct { chainHeadSub event.Subscription // channels for dexon consensus core - receiveCh chan interface{} - receiveEnabled int32 + receiveCh chan interface{} + receiveCoreMessage int32 srvr p2pServer @@ -166,29 +166,25 @@ func NewProtocolManager( tab := newNodeTable() // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkID: networkID, - eventMux: mux, - txpool: txpool, - nodeTable: tab, - gov: gov, - blockchain: blockchain, - cache: newCache(5120, dexDB.NewDatabase(chaindb)), - nextPullVote: &sync.Map{}, - chainconfig: config, - newPeerCh: make(chan *peer), - noMorePeers: make(chan struct{}), - txsyncCh: make(chan *txsync), - recordsyncCh: make(chan *recordsync), - quitSync: make(chan struct{}), - receiveCh: make(chan interface{}, 1024), - receiveEnabled: 0, - isBlockProposer: isBlockProposer, - app: app, - blockNumberGauge: metrics.GetOrRegisterGauge("dex/blocknumber", nil), - } - - if isBlockProposer { - manager.receiveEnabled = 1 + networkID: networkID, + eventMux: mux, + txpool: txpool, + nodeTable: tab, + gov: gov, + blockchain: blockchain, + cache: newCache(5120, dexDB.NewDatabase(chaindb)), + nextPullVote: &sync.Map{}, + chainconfig: config, + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + recordsyncCh: make(chan *recordsync), + quitSync: make(chan struct{}), + receiveCh: make(chan interface{}, 1024), + receiveCoreMessage: 0, + isBlockProposer: isBlockProposer, + app: app, + blockNumberGauge: metrics.GetOrRegisterGauge("dex/blocknumber", nil), } // Figure out whether to allow fast sync or not @@ -839,7 +835,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Block proposer-only messages. case msg.Code == CoreBlockMsg: - if atomic.LoadInt32(&pm.receiveEnabled) == 0 { + if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break } var blocks []*coreTypes.Block @@ -851,7 +847,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.receiveCh <- block } case msg.Code == VoteMsg: - if atomic.LoadInt32(&pm.receiveEnabled) == 0 { + if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break } var votes []*coreTypes.Vote @@ -865,7 +861,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.receiveCh <- vote } case msg.Code == AgreementMsg: - if atomic.LoadInt32(&pm.receiveEnabled) == 0 { + if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break } // DKG set is receiver @@ -876,7 +872,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.MarkAgreement(rlpHash(agreement)) pm.receiveCh <- &agreement case msg.Code == RandomnessMsg: - if atomic.LoadInt32(&pm.receiveEnabled) == 0 { + if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break } // Broadcast this to all peer @@ -889,7 +885,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.receiveCh <- randomness } case msg.Code == DKGPrivateShareMsg: - if atomic.LoadInt32(&pm.receiveEnabled) == 0 { + if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break } // Do not relay this msg @@ -900,7 +896,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.MarkDKGPrivateShares(rlpHash(ps)) pm.receiveCh <- &ps case msg.Code == DKGPartialSignatureMsg: - if atomic.LoadInt32(&pm.receiveEnabled) == 0 { + if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break } // broadcast in DKG set @@ -910,7 +906,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &psig case msg.Code == PullBlocksMsg: - if atomic.LoadInt32(&pm.receiveEnabled) == 0 { + if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break } var hashes coreCommon.Hashes @@ -921,7 +917,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { log.Debug("Push blocks", "blocks", blocks) return p.SendCoreBlocks(blocks) case msg.Code == PullVotesMsg: - if atomic.LoadInt32(&pm.receiveEnabled) == 0 { + if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break } next, ok := pm.nextPullVote.Load(p.ID()) @@ -940,7 +936,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { log.Debug("Push votes", "votes", votes) return p.SendVotes(votes) case msg.Code == PullRandomnessMsg: - if atomic.LoadInt32(&pm.receiveEnabled) == 0 { + if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break } var hashes coreCommon.Hashes @@ -1287,6 +1283,14 @@ func (pm *ProtocolManager) recordBroadcastLoop() { } } +func (pm *ProtocolManager) SetReceiveCoreMessage(enabled bool) { + if enabled { + atomic.StoreInt32(&pm.receiveCoreMessage, 1) + } else { + atomic.StoreInt32(&pm.receiveCoreMessage, 0) + } +} + // a loop keep building and maintaining peers in notary set. // TODO: finish this func (pm *ProtocolManager) peerSetLoop() { diff --git a/dex/protocol_test.go b/dex/protocol_test.go index 23b2c4248..517df97d9 100644 --- a/dex/protocol_test.go +++ b/dex/protocol_test.go @@ -314,6 +314,8 @@ func TestSendNodeRecords(t *testing.T) { func TestRecvCoreBlocks(t *testing.T) { pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + pm.SetReceiveCoreMessage(true) + p, _ := newTestPeer("peer", dex64, pm, true) defer pm.Stop() defer p.close() @@ -365,6 +367,8 @@ func TestRecvCoreBlocks(t *testing.T) { func TestSendCoreBlocks(t *testing.T) { pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + pm.SetReceiveCoreMessage(true) + p, _ := newTestPeer("peer", dex64, pm, true) defer pm.Stop() defer p.close() @@ -419,6 +423,8 @@ func TestSendCoreBlocks(t *testing.T) { func TestRecvVotes(t *testing.T) { pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + pm.SetReceiveCoreMessage(true) + p, _ := newTestPeer("peer", dex64, pm, true) defer pm.Stop() defer p.close() @@ -561,6 +567,8 @@ func (p *mockPublicKey) Bytes() []byte { func TestRecvDKGPrivateShare(t *testing.T) { pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + pm.SetReceiveCoreMessage(true) + p, _ := newTestPeer("peer1", dex64, pm, true) defer pm.Stop() defer p.close() @@ -596,6 +604,8 @@ func TestRecvDKGPrivateShare(t *testing.T) { func TestSendDKGPrivateShare(t *testing.T) { pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + pm.SetReceiveCoreMessage(true) + p1, _ := newTestPeer("peer1", dex64, pm, true) p2, _ := newTestPeer("peer2", dex64, pm, true) defer pm.Stop() @@ -644,6 +654,8 @@ func TestSendDKGPrivateShare(t *testing.T) { func TestRecvAgreement(t *testing.T) { pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + pm.SetReceiveCoreMessage(true) + p, _ := newTestPeer("peer", dex64, pm, true) defer pm.Stop() defer p.close() @@ -687,6 +699,8 @@ func TestRecvAgreement(t *testing.T) { func TestSendAgreement(t *testing.T) { pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + pm.SetReceiveCoreMessage(true) + p, _ := newTestPeer("peer", dex64, pm, true) defer pm.Stop() defer p.close() @@ -733,6 +747,8 @@ func TestSendAgreement(t *testing.T) { func TestRecvRandomnesses(t *testing.T) { pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + pm.SetReceiveCoreMessage(true) + p, _ := newTestPeer("peer", dex64, pm, true) defer pm.Stop() defer p.close() @@ -764,6 +780,8 @@ func TestRecvRandomnesses(t *testing.T) { func TestSendRandomnesses(t *testing.T) { pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + pm.SetReceiveCoreMessage(true) + p, _ := newTestPeer("peer", dex64, pm, true) defer pm.Stop() defer p.close() |