aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--dex/backend.go15
-rw-r--r--dex/blockproposer.go17
-rw-r--r--dex/handler.go72
-rw-r--r--dex/protocol_test.go18
4 files changed, 78 insertions, 44 deletions
diff --git a/dex/backend.go b/dex/backend.go
index f1b0012a2..29ac42906 100644
--- a/dex/backend.go
+++ b/dex/backend.go
@@ -253,7 +253,20 @@ func (s *Dexon) Start(srvr *p2p.Server) error {
s.protocolManager.Start(srvr, maxPeers)
if s.config.BlockProposerEnabled {
- s.bp.Start()
+ go func() {
+ // Since we might be in fast sync mode when started. wait for
+ // ChainHeadEvent before starting blockproposer, or else we will trigger
+ // watchcat.
+ if s.config.SyncMode == downloader.FastSync &&
+ s.blockchain.CurrentBlock().NumberU64() == 0 {
+ ch := make(chan core.ChainHeadEvent)
+ sub := s.blockchain.SubscribeChainHeadEvent(ch)
+ defer sub.Unsubscribe()
+
+ <-ch
+ }
+ s.bp.Start()
+ }()
}
return nil
}
diff --git a/dex/blockproposer.go b/dex/blockproposer.go
index 24a277335..dc2b22e16 100644
--- a/dex/blockproposer.go
+++ b/dex/blockproposer.go
@@ -50,7 +50,7 @@ func (b *blockProposer) Start() error {
if !atomic.CompareAndSwapInt32(&b.running, 0, 1) {
return fmt.Errorf("block proposer is already running")
}
- log.Info("Block proposer started")
+ log.Info("Started block proposer")
b.stopCh = make(chan struct{})
b.wg.Add(1)
@@ -61,6 +61,9 @@ func (b *blockProposer) Start() error {
var err error
var c *dexCore.Consensus
if b.dMoment.After(time.Now()) {
+ // Start receiving core messages.
+ b.dex.protocolManager.SetReceiveCoreMessage(true)
+
c = b.initConsensus()
} else {
c, err = b.syncConsensus()
@@ -91,7 +94,7 @@ func (b *blockProposer) Stop() {
defer b.mu.Unlock()
if atomic.LoadInt32(&b.running) == 1 {
- atomic.StoreInt32(&b.dex.protocolManager.receiveEnabled, 0)
+ b.dex.protocolManager.SetReceiveCoreMessage(false)
close(b.stopCh)
b.wg.Wait()
atomic.StoreInt32(&b.proposing, 0)
@@ -124,10 +127,9 @@ func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) {
db, b.dex.network, privkey, log.Root())
// Start the watchCat.
- log.Info("Starting sync watchCat ...")
-
b.watchCat.Start()
defer b.watchCat.Stop()
+ log.Info("Started sync watchCat")
// Feed the current block we have in local blockchain.
cb := b.dex.blockchain.CurrentBlock()
@@ -156,9 +158,6 @@ func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) {
// Sync all blocks in compaction chain to core.
_, coreHeight := db.GetCompactionChainTipInfo()
- // Stop receiving block proposer message when syncing.
- atomic.StoreInt32(&b.dex.protocolManager.receiveEnabled, 0)
-
Loop:
for {
currentBlock := b.dex.blockchain.CurrentBlock()
@@ -211,7 +210,7 @@ ListenLoop:
log.Error("SyncBlocks fail", "err", err)
return nil, err
}
- atomic.CompareAndSwapInt32(&b.dex.protocolManager.receiveEnabled, 0, 1)
+ b.dex.protocolManager.SetReceiveCoreMessage(true)
if synced {
log.Debug("Consensus core synced")
break ListenLoop
@@ -249,7 +248,7 @@ ListenLoop:
log.Info("Sleeping until next starting time", "time", nextDMoment)
time.Sleep(time.Duration(nextDMoment-time.Now().Unix()) * time.Second)
- atomic.StoreInt32(&b.dex.protocolManager.receiveEnabled, 1)
+ b.dex.protocolManager.SetReceiveCoreMessage(true)
consensusSync.ForceSync(true)
break ListenLoop
}
diff --git a/dex/handler.go b/dex/handler.go
index 245b31807..84170e54b 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -136,8 +136,8 @@ type ProtocolManager struct {
chainHeadSub event.Subscription
// channels for dexon consensus core
- receiveCh chan interface{}
- receiveEnabled int32
+ receiveCh chan interface{}
+ receiveCoreMessage int32
srvr p2pServer
@@ -166,29 +166,25 @@ func NewProtocolManager(
tab := newNodeTable()
// Create the protocol manager with the base fields
manager := &ProtocolManager{
- networkID: networkID,
- eventMux: mux,
- txpool: txpool,
- nodeTable: tab,
- gov: gov,
- blockchain: blockchain,
- cache: newCache(5120, dexDB.NewDatabase(chaindb)),
- nextPullVote: &sync.Map{},
- chainconfig: config,
- newPeerCh: make(chan *peer),
- noMorePeers: make(chan struct{}),
- txsyncCh: make(chan *txsync),
- recordsyncCh: make(chan *recordsync),
- quitSync: make(chan struct{}),
- receiveCh: make(chan interface{}, 1024),
- receiveEnabled: 0,
- isBlockProposer: isBlockProposer,
- app: app,
- blockNumberGauge: metrics.GetOrRegisterGauge("dex/blocknumber", nil),
- }
-
- if isBlockProposer {
- manager.receiveEnabled = 1
+ networkID: networkID,
+ eventMux: mux,
+ txpool: txpool,
+ nodeTable: tab,
+ gov: gov,
+ blockchain: blockchain,
+ cache: newCache(5120, dexDB.NewDatabase(chaindb)),
+ nextPullVote: &sync.Map{},
+ chainconfig: config,
+ newPeerCh: make(chan *peer),
+ noMorePeers: make(chan struct{}),
+ txsyncCh: make(chan *txsync),
+ recordsyncCh: make(chan *recordsync),
+ quitSync: make(chan struct{}),
+ receiveCh: make(chan interface{}, 1024),
+ receiveCoreMessage: 0,
+ isBlockProposer: isBlockProposer,
+ app: app,
+ blockNumberGauge: metrics.GetOrRegisterGauge("dex/blocknumber", nil),
}
// Figure out whether to allow fast sync or not
@@ -839,7 +835,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Block proposer-only messages.
case msg.Code == CoreBlockMsg:
- if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
+ if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
}
var blocks []*coreTypes.Block
@@ -851,7 +847,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.receiveCh <- block
}
case msg.Code == VoteMsg:
- if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
+ if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
}
var votes []*coreTypes.Vote
@@ -865,7 +861,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.receiveCh <- vote
}
case msg.Code == AgreementMsg:
- if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
+ if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
}
// DKG set is receiver
@@ -876,7 +872,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.MarkAgreement(rlpHash(agreement))
pm.receiveCh <- &agreement
case msg.Code == RandomnessMsg:
- if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
+ if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
}
// Broadcast this to all peer
@@ -889,7 +885,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.receiveCh <- randomness
}
case msg.Code == DKGPrivateShareMsg:
- if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
+ if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
}
// Do not relay this msg
@@ -900,7 +896,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.MarkDKGPrivateShares(rlpHash(ps))
pm.receiveCh <- &ps
case msg.Code == DKGPartialSignatureMsg:
- if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
+ if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
}
// broadcast in DKG set
@@ -910,7 +906,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &psig
case msg.Code == PullBlocksMsg:
- if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
+ if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
}
var hashes coreCommon.Hashes
@@ -921,7 +917,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
log.Debug("Push blocks", "blocks", blocks)
return p.SendCoreBlocks(blocks)
case msg.Code == PullVotesMsg:
- if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
+ if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
}
next, ok := pm.nextPullVote.Load(p.ID())
@@ -940,7 +936,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
log.Debug("Push votes", "votes", votes)
return p.SendVotes(votes)
case msg.Code == PullRandomnessMsg:
- if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
+ if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
}
var hashes coreCommon.Hashes
@@ -1287,6 +1283,14 @@ func (pm *ProtocolManager) recordBroadcastLoop() {
}
}
+func (pm *ProtocolManager) SetReceiveCoreMessage(enabled bool) {
+ if enabled {
+ atomic.StoreInt32(&pm.receiveCoreMessage, 1)
+ } else {
+ atomic.StoreInt32(&pm.receiveCoreMessage, 0)
+ }
+}
+
// a loop keep building and maintaining peers in notary set.
// TODO: finish this
func (pm *ProtocolManager) peerSetLoop() {
diff --git a/dex/protocol_test.go b/dex/protocol_test.go
index 23b2c4248..517df97d9 100644
--- a/dex/protocol_test.go
+++ b/dex/protocol_test.go
@@ -314,6 +314,8 @@ func TestSendNodeRecords(t *testing.T) {
func TestRecvCoreBlocks(t *testing.T) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ pm.SetReceiveCoreMessage(true)
+
p, _ := newTestPeer("peer", dex64, pm, true)
defer pm.Stop()
defer p.close()
@@ -365,6 +367,8 @@ func TestRecvCoreBlocks(t *testing.T) {
func TestSendCoreBlocks(t *testing.T) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ pm.SetReceiveCoreMessage(true)
+
p, _ := newTestPeer("peer", dex64, pm, true)
defer pm.Stop()
defer p.close()
@@ -419,6 +423,8 @@ func TestSendCoreBlocks(t *testing.T) {
func TestRecvVotes(t *testing.T) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ pm.SetReceiveCoreMessage(true)
+
p, _ := newTestPeer("peer", dex64, pm, true)
defer pm.Stop()
defer p.close()
@@ -561,6 +567,8 @@ func (p *mockPublicKey) Bytes() []byte {
func TestRecvDKGPrivateShare(t *testing.T) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ pm.SetReceiveCoreMessage(true)
+
p, _ := newTestPeer("peer1", dex64, pm, true)
defer pm.Stop()
defer p.close()
@@ -596,6 +604,8 @@ func TestRecvDKGPrivateShare(t *testing.T) {
func TestSendDKGPrivateShare(t *testing.T) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ pm.SetReceiveCoreMessage(true)
+
p1, _ := newTestPeer("peer1", dex64, pm, true)
p2, _ := newTestPeer("peer2", dex64, pm, true)
defer pm.Stop()
@@ -644,6 +654,8 @@ func TestSendDKGPrivateShare(t *testing.T) {
func TestRecvAgreement(t *testing.T) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ pm.SetReceiveCoreMessage(true)
+
p, _ := newTestPeer("peer", dex64, pm, true)
defer pm.Stop()
defer p.close()
@@ -687,6 +699,8 @@ func TestRecvAgreement(t *testing.T) {
func TestSendAgreement(t *testing.T) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ pm.SetReceiveCoreMessage(true)
+
p, _ := newTestPeer("peer", dex64, pm, true)
defer pm.Stop()
defer p.close()
@@ -733,6 +747,8 @@ func TestSendAgreement(t *testing.T) {
func TestRecvRandomnesses(t *testing.T) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ pm.SetReceiveCoreMessage(true)
+
p, _ := newTestPeer("peer", dex64, pm, true)
defer pm.Stop()
defer p.close()
@@ -764,6 +780,8 @@ func TestRecvRandomnesses(t *testing.T) {
func TestSendRandomnesses(t *testing.T) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ pm.SetReceiveCoreMessage(true)
+
p, _ := newTestPeer("peer", dex64, pm, true)
defer pm.Stop()
defer p.close()