aboutsummaryrefslogtreecommitdiffstats
path: root/core/consensus.go
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2019-01-11 12:58:30 +0800
committerGitHub <noreply@github.com>2019-01-11 12:58:30 +0800
commit809e8def862fdfa792061a448f952747f1af4d3c (patch)
treebd038971e65a09bc9bb399f03a37b194ce67ae3c /core/consensus.go
parentfa25817354d5b7d40f5911004232392acfe7fe53 (diff)
downloaddexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar
dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.gz
dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.bz2
dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.lz
dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.xz
dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.zst
dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.zip
syncer: fix issues when switching to core.Consensus (#418)
- when confirmed blocks passed to core.Consensus aren't continuous in position in some chain, the pulling would skip those missing blocks. - fix: when some block is missing, avoid adding it and all blocks after it to core.Consensus. - we need to avoid the receive channel of network module full. - fix: during switching to core.Consensus, we need to launch a dummy receiver to receive from receive channel of network module. - fix: between the period during core.Consensus created and before running, a dummy receiver is also required to receive from receive channel of network module.
Diffstat (limited to 'core/consensus.go')
-rw-r--r--core/consensus.go129
1 files changed, 119 insertions, 10 deletions
diff --git a/core/consensus.go b/core/consensus.go
index ad1efeb..2770018 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -415,6 +415,13 @@ type Consensus struct {
nonFinalizedBlockDelivered bool
resetRandomnessTicker chan struct{}
resetDeliveryGuardTicker chan struct{}
+ msgChan chan interface{}
+ waitGroup sync.WaitGroup
+
+ // Context of Dummy receiver during switching from syncer.
+ dummyCancel context.CancelFunc
+ dummyFinished <-chan struct{}
+ dummyMsgBuffer []interface{}
}
// NewConsensus construct an Consensus instance.
@@ -450,6 +457,9 @@ func NewConsensusForSimulation(
// You need to provide the initial block for this newly created Consensus
// instance to bootstrap with. A proper choice is the last finalized block you
// delivered to syncer.
+//
+// NOTE: those confirmed blocks should be organized by chainID and sorted by
+// their positions, in ascending order.
func NewConsensusFromSyncer(
initBlock *types.Block,
initRoundBeginTime time.Time,
@@ -459,17 +469,43 @@ func NewConsensusFromSyncer(
networkModule Network,
prv crypto.PrivateKey,
latticeModule *Lattice,
- blocks []*types.Block,
+ confirmedBlocks [][]*types.Block,
randomnessResults []*types.BlockRandomnessResult,
+ cachedMessages []interface{},
logger common.Logger) (*Consensus, error) {
// Setup Consensus instance.
con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db,
networkModule, prv, logger, latticeModule, true)
- // Dump all BA-confirmed blocks to the consensus instance.
- for _, b := range blocks {
- con.ccModule.registerBlock(b)
- if err := con.processBlock(b); err != nil {
- return nil, err
+ // Launch a dummy receiver before we start receiving from network module.
+ con.dummyMsgBuffer = cachedMessages
+ con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
+ con.ctx, networkModule.ReceiveChan(), func(msg interface{}) {
+ con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
+ })
+ // Dump all BA-confirmed blocks to the consensus instance, make sure these
+ // added blocks forming a DAG.
+ for {
+ updated := false
+ for idx, bs := range confirmedBlocks {
+ for bIdx, b := range bs {
+ // Only when its parent block is already added to lattice, we can
+ // then add this block. If not, our pulling mechanism would stop at
+ // the block we added, and lost its parent block forever.
+ if !latticeModule.Exist(b.ParentHash) {
+ logger.Debug("Skip discontinuous confirmed block",
+ "from", b,
+ "until", bs[len(bs)-1])
+ confirmedBlocks[idx] = bs[bIdx:]
+ break
+ }
+ con.ccModule.registerBlock(b)
+ if err := con.processBlock(b); err != nil {
+ return nil, err
+ }
+ }
+ }
+ if !updated {
+ break
}
}
// Dump all randomness result to the consensus instance.
@@ -549,6 +585,7 @@ func newConsensusForRound(
logger: logger,
resetRandomnessTicker: make(chan struct{}),
resetDeliveryGuardTicker: make(chan struct{}),
+ msgChan: make(chan interface{}, 1024),
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime)
@@ -614,12 +651,39 @@ func (con *Consensus) Run() {
con.baMgr.run()
// Launch network handler.
con.logger.Debug("Calling Network.ReceiveChan")
- go con.processMsg(con.network.ReceiveChan())
+ con.waitGroup.Add(1)
+ go con.deliverNetworkMsg()
+ con.waitGroup.Add(1)
+ go con.processMsg()
// Sleep until dMoment come.
time.Sleep(con.dMoment.Sub(time.Now().UTC()))
// Take some time to bootstrap.
time.Sleep(3 * time.Second)
+ con.waitGroup.Add(1)
go con.pullRandomness()
+ // Stop dummy receiver if launched.
+ if con.dummyCancel != nil {
+ con.logger.Trace("Stop dummy receiver")
+ con.dummyCancel()
+ <-con.dummyFinished
+ // Replay those cached messages.
+ con.logger.Trace("Dummy receiver stoped, start dumping cached messages",
+ "count", len(con.dummyMsgBuffer))
+ for _, msg := range con.dummyMsgBuffer {
+ loop:
+ for {
+ select {
+ case con.msgChan <- msg:
+ break loop
+ case <-time.After(50 * time.Millisecond):
+ con.logger.Debug(
+ "internal message channel is full when syncing")
+ }
+ }
+ }
+ con.logger.Trace("Finish dumping cached messages")
+ }
+ con.waitGroup.Add(1)
go con.deliveryGuard()
// Block until done.
select {
@@ -815,18 +879,51 @@ func (con *Consensus) Stop() {
con.ctxCancel()
con.baMgr.stop()
con.event.Reset()
+ con.waitGroup.Wait()
+}
+
+func (con *Consensus) deliverNetworkMsg() {
+ defer con.waitGroup.Done()
+ recv := con.network.ReceiveChan()
+ for {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ select {
+ case msg := <-recv:
+ innerLoop:
+ for {
+ select {
+ case con.msgChan <- msg:
+ break innerLoop
+ case <-time.After(500 * time.Millisecond):
+ con.logger.Debug("internal message channel is full",
+ "pending", msg)
+ }
+ }
+ case <-con.ctx.Done():
+ return
+ }
+ }
}
-func (con *Consensus) processMsg(msgChan <-chan interface{}) {
+func (con *Consensus) processMsg() {
+ defer con.waitGroup.Done()
MessageLoop:
for {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
var msg interface{}
select {
- case msg = <-msgChan:
+ case msg = <-con.msgChan:
case <-con.ctx.Done():
return
}
-
switch val := msg.(type) {
case *types.Block:
if ch, exist := func() (chan<- *types.Block, bool) {
@@ -1037,10 +1134,16 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
}
func (con *Consensus) pullRandomness() {
+ defer con.waitGroup.Done()
for {
select {
case <-con.ctx.Done():
return
+ default:
+ }
+ select {
+ case <-con.ctx.Done():
+ return
case <-con.resetRandomnessTicker:
case <-time.After(1500 * time.Millisecond):
// TODO(jimmy): pulling period should be related to lambdaBA.
@@ -1055,6 +1158,7 @@ func (con *Consensus) pullRandomness() {
}
func (con *Consensus) deliveryGuard() {
+ defer con.waitGroup.Done()
time.Sleep(con.dMoment.Sub(time.Now()))
// Node takes time to start.
time.Sleep(60 * time.Second)
@@ -1062,6 +1166,11 @@ func (con *Consensus) deliveryGuard() {
select {
case <-con.ctx.Done():
return
+ default:
+ }
+ select {
+ case <-con.ctx.Done():
+ return
case <-con.resetDeliveryGuardTicker:
case <-time.After(60 * time.Second):
con.logger.Error("no blocks delivered for too long", "ID", con.ID)