diff options
-rw-r--r-- | core/consensus.go | 129 | ||||
-rw-r--r-- | core/consensus_test.go | 1 | ||||
-rw-r--r-- | core/lattice-data.go | 2 | ||||
-rw-r--r-- | core/syncer/agreement.go | 2 | ||||
-rw-r--r-- | core/syncer/consensus.go | 87 | ||||
-rw-r--r-- | core/test/utils.go | 19 | ||||
-rw-r--r-- | core/utils/utils.go | 34 | ||||
-rw-r--r-- | core/utils/utils_test.go | 60 | ||||
-rw-r--r-- | integration_test/consensus_test.go | 8 |
9 files changed, 274 insertions, 68 deletions
diff --git a/core/consensus.go b/core/consensus.go index ad1efeb..2770018 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -415,6 +415,13 @@ type Consensus struct { nonFinalizedBlockDelivered bool resetRandomnessTicker chan struct{} resetDeliveryGuardTicker chan struct{} + msgChan chan interface{} + waitGroup sync.WaitGroup + + // Context of Dummy receiver during switching from syncer. + dummyCancel context.CancelFunc + dummyFinished <-chan struct{} + dummyMsgBuffer []interface{} } // NewConsensus construct an Consensus instance. @@ -450,6 +457,9 @@ func NewConsensusForSimulation( // You need to provide the initial block for this newly created Consensus // instance to bootstrap with. A proper choice is the last finalized block you // delivered to syncer. +// +// NOTE: those confirmed blocks should be organized by chainID and sorted by +// their positions, in ascending order. func NewConsensusFromSyncer( initBlock *types.Block, initRoundBeginTime time.Time, @@ -459,17 +469,43 @@ func NewConsensusFromSyncer( networkModule Network, prv crypto.PrivateKey, latticeModule *Lattice, - blocks []*types.Block, + confirmedBlocks [][]*types.Block, randomnessResults []*types.BlockRandomnessResult, + cachedMessages []interface{}, logger common.Logger) (*Consensus, error) { // Setup Consensus instance. con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db, networkModule, prv, logger, latticeModule, true) - // Dump all BA-confirmed blocks to the consensus instance. - for _, b := range blocks { - con.ccModule.registerBlock(b) - if err := con.processBlock(b); err != nil { - return nil, err + // Launch a dummy receiver before we start receiving from network module. + con.dummyMsgBuffer = cachedMessages + con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( + con.ctx, networkModule.ReceiveChan(), func(msg interface{}) { + con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) + }) + // Dump all BA-confirmed blocks to the consensus instance, make sure these + // added blocks forming a DAG. + for { + updated := false + for idx, bs := range confirmedBlocks { + for bIdx, b := range bs { + // Only when its parent block is already added to lattice, we can + // then add this block. If not, our pulling mechanism would stop at + // the block we added, and lost its parent block forever. + if !latticeModule.Exist(b.ParentHash) { + logger.Debug("Skip discontinuous confirmed block", + "from", b, + "until", bs[len(bs)-1]) + confirmedBlocks[idx] = bs[bIdx:] + break + } + con.ccModule.registerBlock(b) + if err := con.processBlock(b); err != nil { + return nil, err + } + } + } + if !updated { + break } } // Dump all randomness result to the consensus instance. @@ -549,6 +585,7 @@ func newConsensusForRound( logger: logger, resetRandomnessTicker: make(chan struct{}), resetDeliveryGuardTicker: make(chan struct{}), + msgChan: make(chan interface{}, 1024), } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime) @@ -614,12 +651,39 @@ func (con *Consensus) Run() { con.baMgr.run() // Launch network handler. con.logger.Debug("Calling Network.ReceiveChan") - go con.processMsg(con.network.ReceiveChan()) + con.waitGroup.Add(1) + go con.deliverNetworkMsg() + con.waitGroup.Add(1) + go con.processMsg() // Sleep until dMoment come. time.Sleep(con.dMoment.Sub(time.Now().UTC())) // Take some time to bootstrap. time.Sleep(3 * time.Second) + con.waitGroup.Add(1) go con.pullRandomness() + // Stop dummy receiver if launched. + if con.dummyCancel != nil { + con.logger.Trace("Stop dummy receiver") + con.dummyCancel() + <-con.dummyFinished + // Replay those cached messages. + con.logger.Trace("Dummy receiver stoped, start dumping cached messages", + "count", len(con.dummyMsgBuffer)) + for _, msg := range con.dummyMsgBuffer { + loop: + for { + select { + case con.msgChan <- msg: + break loop + case <-time.After(50 * time.Millisecond): + con.logger.Debug( + "internal message channel is full when syncing") + } + } + } + con.logger.Trace("Finish dumping cached messages") + } + con.waitGroup.Add(1) go con.deliveryGuard() // Block until done. select { @@ -815,18 +879,51 @@ func (con *Consensus) Stop() { con.ctxCancel() con.baMgr.stop() con.event.Reset() + con.waitGroup.Wait() +} + +func (con *Consensus) deliverNetworkMsg() { + defer con.waitGroup.Done() + recv := con.network.ReceiveChan() + for { + select { + case <-con.ctx.Done(): + return + default: + } + select { + case msg := <-recv: + innerLoop: + for { + select { + case con.msgChan <- msg: + break innerLoop + case <-time.After(500 * time.Millisecond): + con.logger.Debug("internal message channel is full", + "pending", msg) + } + } + case <-con.ctx.Done(): + return + } + } } -func (con *Consensus) processMsg(msgChan <-chan interface{}) { +func (con *Consensus) processMsg() { + defer con.waitGroup.Done() MessageLoop: for { + select { + case <-con.ctx.Done(): + return + default: + } var msg interface{} select { - case msg = <-msgChan: + case msg = <-con.msgChan: case <-con.ctx.Done(): return } - switch val := msg.(type) { case *types.Block: if ch, exist := func() (chan<- *types.Block, bool) { @@ -1037,10 +1134,16 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) { } func (con *Consensus) pullRandomness() { + defer con.waitGroup.Done() for { select { case <-con.ctx.Done(): return + default: + } + select { + case <-con.ctx.Done(): + return case <-con.resetRandomnessTicker: case <-time.After(1500 * time.Millisecond): // TODO(jimmy): pulling period should be related to lambdaBA. @@ -1055,6 +1158,7 @@ func (con *Consensus) pullRandomness() { } func (con *Consensus) deliveryGuard() { + defer con.waitGroup.Done() time.Sleep(con.dMoment.Sub(time.Now())) // Node takes time to start. time.Sleep(60 * time.Second) @@ -1062,6 +1166,11 @@ func (con *Consensus) deliveryGuard() { select { case <-con.ctx.Done(): return + default: + } + select { + case <-con.ctx.Done(): + return case <-con.resetDeliveryGuardTicker: case <-time.After(60 * time.Second): con.logger.Error("no blocks delivered for too long", "ID", con.ID) diff --git a/core/consensus_test.go b/core/consensus_test.go index 938b56d..b53af8a 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -593,6 +593,7 @@ func (s *ConsensusTestSuite) TestSyncBA() { prvKey := prvKeys[0] _, con := s.prepareConsensus(time.Now().UTC(), gov, prvKey, conn) go con.Run() + defer con.Stop() hash := common.NewRandomHash() signers := make([]*utils.Signer, 0, len(prvKeys)) for _, prvKey := range prvKeys { diff --git a/core/lattice-data.go b/core/lattice-data.go index 998fb1f..8367539 100644 --- a/core/lattice-data.go +++ b/core/lattice-data.go @@ -55,7 +55,7 @@ type ErrAckingBlockNotExists struct { } func (e ErrAckingBlockNotExists) Error() string { - return fmt.Sprintf("acking block %s not exists", e.hash) + return fmt.Sprintf("acking block %s not exists", e.hash.String()[:6]) } // Errors for method usage diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go index eaad860..acc4f1c 100644 --- a/core/syncer/agreement.go +++ b/core/syncer/agreement.go @@ -1,4 +1,4 @@ -// Copyright 2018 The dexon-consensus-core Authors +// Copyright 2018 The dexon-consensus Authors // This file is part of the dexon-consensus-core library. // // The dexon-consensus-core library is free software: you can redistribute it diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index 306017f..75c1067 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -39,7 +39,8 @@ var ( ErrNotSynced = fmt.Errorf("not synced yet") // ErrGenesisBlockReached is reported when genesis block reached. ErrGenesisBlockReached = fmt.Errorf("genesis block reached") - // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered blocks. + // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered + // blocks. ErrInvalidBlockOrder = fmt.Errorf("invalid block order") // ErrMismatchBlockHashSequence means the delivering sequence is not // correct, compared to finalized blocks. @@ -83,6 +84,9 @@ type Consensus struct { ctxCancel context.CancelFunc syncedLastBlock *types.Block syncedConsensus *core.Consensus + dummyCancel context.CancelFunc + dummyFinished <-chan struct{} + dummyMsgBuffer []interface{} } // NewConsensus creates an instance for Consensus (syncer consensus). @@ -286,12 +290,17 @@ func (con *Consensus) findLatticeSyncBlock( blocks []*types.Block) (*types.Block, error) { lastBlock := blocks[len(blocks)-1] round := lastBlock.Position.Round + isConfigChanged := func(prev, cur *types.Config) bool { + return prev.K != cur.K || + prev.NumChains != cur.NumChains || + prev.PhiRatio != cur.PhiRatio + } for { // Find round r which r-1, r, r+1 are all in same total ordering config. for { - sameAsPrevRound := round == 0 || !con.isConfigChanged( + sameAsPrevRound := round == 0 || !isConfigChanged( con.configs[round-1], con.configs[round]) - sameAsNextRound := !con.isConfigChanged( + sameAsNextRound := !isConfigChanged( con.configs[round], con.configs[round+1]) if sameAsPrevRound && sameAsNextRound { break @@ -314,8 +323,9 @@ func (con *Consensus) findLatticeSyncBlock( lastBlock = &b } // Find the deliver set by hash for two times. Blocks in a deliver set - // returned by total ordering is sorted by hash. If a block's parent hash - // is greater than its hash means there is a cut between deliver sets. + // returned by total ordering is sorted by hash. If a block's parent + // hash is greater than its hash means there is a cut between deliver + // sets. var curBlock, prevBlock *types.Block var deliverSetFirstBlock, deliverSetLastBlock *types.Block curBlock = lastBlock @@ -464,8 +474,8 @@ func (con *Consensus) SyncBlocks( } } if latest && con.lattice == nil { - // New Lattice and find the deliver set of total ordering when "latest" is - // true for first time. Deliver set is found by block hashes. + // New Lattice and find the deliver set of total ordering when "latest" + // is true for first time. Deliver set is found by block hashes. var syncBlock *types.Block syncBlock, err = con.findLatticeSyncBlock(blocks) if err != nil { @@ -482,7 +492,8 @@ func (con *Consensus) SyncBlocks( con.setupConfigs(blocks) // Process blocks from syncBlock to blocks' last block. b := blocks[len(blocks)-1] - blocksCount := b.Finalization.Height - syncBlock.Finalization.Height + 1 + blocksCount := + b.Finalization.Height - syncBlock.Finalization.Height + 1 blocksToProcess := make([]*types.Block, blocksCount) for { blocksToProcess[blocksCount-1] = b @@ -512,8 +523,13 @@ func (con *Consensus) SyncBlocks( if err = con.Stop(); err != nil { return } + con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( + context.Background(), con.network.ReceiveChan(), + func(msg interface{}) { + con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) + }) con.syncedLastBlock = blocks[len(blocks)-1] - synced = con.syncedLastBlock != nil + synced = true } } return @@ -521,6 +537,8 @@ func (con *Consensus) SyncBlocks( // GetSyncedConsensus returns the core.Consensus instance after synced. func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { + con.lock.Lock() + defer con.lock.Unlock() if con.syncedConsensus != nil { return con.syncedConsensus, nil } @@ -529,18 +547,16 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { } // flush all blocks in con.blocks into core.Consensus, and build // core.Consensus from syncer. - confirmedBlocks := []*types.Block{} + confirmedBlocks := make([][]*types.Block, len(con.blocks)) + for i, bs := range con.blocks { + confirmedBlocks[i] = []*types.Block(bs) + } randomnessResults := []*types.BlockRandomnessResult{} - func() { - con.lock.Lock() - defer con.lock.Unlock() - for _, bs := range con.blocks { - confirmedBlocks = append(confirmedBlocks, bs...) - } - for _, r := range con.randomnessResults { - randomnessResults = append(randomnessResults, r) - } - }() + for _, r := range con.randomnessResults { + randomnessResults = append(randomnessResults, r) + } + con.dummyCancel() + <-con.dummyFinished var err error con.syncedConsensus, err = core.NewConsensusFromSyncer( con.syncedLastBlock, @@ -553,6 +569,7 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { con.lattice, confirmedBlocks, randomnessResults, + con.dummyMsgBuffer, con.logger) return con.syncedConsensus, err } @@ -560,7 +577,7 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { // Stop the syncer. // // This method is mainly for caller to stop the syncer before synced, the syncer -// would call this method automatically after synced. +// would call this method automatically after being synced. func (con *Consensus) Stop() error { con.logger.Trace("syncer is about to stop") // Stop network and CRS routines, wait until they are all stoped. @@ -654,7 +671,8 @@ func (con *Consensus) resizeByNumChains(numChains uint32) { // Resize the pool of blocks. con.blocks = append(con.blocks, types.ByPosition{}) // Resize agreement modules. - a := newAgreement(con.receiveChan, con.pullChan, con.nodeSetCache, con.logger) + a := newAgreement( + con.receiveChan, con.pullChan, con.nodeSetCache, con.logger) con.agreements = append(con.agreements, a) con.agreementWaitGroup.Add(1) go func() { @@ -679,8 +697,8 @@ func (con *Consensus) startAgreement() { func() { con.lock.Lock() defer con.lock.Unlock() - // If round is cut in agreements, do not add blocks with round less - // then cut round. + // If round is cut in agreements, do not add blocks with + // round less then cut round. if b.Position.Round < con.agreementRoundCut { return } @@ -764,15 +782,22 @@ func (con *Consensus) startNetwork() { default: continue Loop } - func() { + if func() bool { con.lock.RLock() defer con.lock.RUnlock() if pos.ChainID >= uint32(len(con.agreements)) { - con.logger.Error("Unknown chainID message received (syncer)", + // This error might be easily encountered when the + // "latest" parameter of SyncBlocks is turned on too + // early. + con.logger.Error( + "Unknown chainID message received (syncer)", "position", &pos) + return false } - }() - con.agreements[pos.ChainID].inputChan <- val + return true + }() { + con.agreements[pos.ChainID].inputChan <- val + } case <-con.ctx.Done(): return } @@ -849,9 +874,3 @@ func (con *Consensus) stopAgreement() { close(con.receiveChan) close(con.pullChan) } - -func (con *Consensus) isConfigChanged(prev, cur *types.Config) bool { - return prev.K != cur.K || - prev.NumChains != cur.NumChains || - prev.PhiRatio != cur.PhiRatio -} diff --git a/core/test/utils.go b/core/test/utils.go index d85395b..a2819ae 100644 --- a/core/test/utils.go +++ b/core/test/utils.go @@ -18,7 +18,6 @@ package test import ( - "context" "errors" "fmt" "math" @@ -227,24 +226,6 @@ func VerifyDB(db db.Database) error { return nil } -// LaunchDummyReceiver launches a go routine to receive and drop messages from -// a network module. An optional context could be passed to stop the go routine. -func LaunchDummyReceiver( - ctx context.Context, networkModule *Network) context.CancelFunc { - dummyCtx, dummyCancel := context.WithCancel(ctx) - go func() { - loop: - for { - select { - case <-dummyCtx.Done(): - break loop - case <-networkModule.ReceiveChan(): - } - } - }() - return dummyCancel -} - func getComplementSet( all, set map[types.NodeID]struct{}) map[types.NodeID]struct{} { complement := make(map[types.NodeID]struct{}) diff --git a/core/utils/utils.go b/core/utils/utils.go index 8c9f77a..8486d28 100644 --- a/core/utils/utils.go +++ b/core/utils/utils.go @@ -18,6 +18,7 @@ package utils import ( + "context" "fmt" "github.com/dexon-foundation/dexon-consensus/common" @@ -92,3 +93,36 @@ func VerifyDKGComplaint( } return ok, nil } + +// LaunchDummyReceiver launches a go routine to receive from the receive +// channel of a network module. An context is required to stop the go routine +// automatically. An optinal message handler could be provided. +func LaunchDummyReceiver( + ctx context.Context, recv <-chan interface{}, handler func(interface{})) ( + context.CancelFunc, <-chan struct{}) { + var ( + dummyCtx, dummyCancel = context.WithCancel(ctx) + finishedChan = make(chan struct{}, 1) + ) + go func() { + defer func() { + finishedChan <- struct{}{} + }() + loop: + for { + select { + case <-dummyCtx.Done(): + break loop + case v, ok := <-recv: + if !ok { + panic(fmt.Errorf( + "receive channel is closed before dummy receiver")) + } + if handler != nil { + handler(v) + } + } + } + }() + return dummyCancel, finishedChan +} diff --git a/core/utils/utils_test.go b/core/utils/utils_test.go index 34336b3..88576b8 100644 --- a/core/utils/utils_test.go +++ b/core/utils/utils_test.go @@ -18,7 +18,9 @@ package utils import ( + "context" "testing" + "time" "github.com/stretchr/testify/suite" @@ -111,6 +113,64 @@ func (s *UtilsTestSuite) TestVerifyDKGComplaint() { s.False(ok) } +func (s *UtilsTestSuite) TestDummyReceiver() { + var ( + msgCount = 1000 + fakeMsgs = make([]int, 0, msgCount) + ) + for i := 0; i < msgCount; i++ { + fakeMsgs = append(fakeMsgs, i) + } + launchDummySender := func(msgs []int, inputChan chan<- interface{}) { + finished := make(chan struct{}, 1) + go func() { + defer func() { + finished <- struct{}{} + }() + for _, v := range msgs { + inputChan <- v + } + }() + select { + case <-finished: + case <-time.After(1 * time.Second): + s.Require().FailNow("unable to deliver all messages in time") + } + } + checkBuffer := func(sent []int, buff []interface{}) { + s.Require().Len(buff, len(sent)) + for i := range sent { + s.Require().Equal(sent[i], buff[i].(int)) + } + } + // Basic scenario: a dummy receiver with caching enabled. + recv := make(chan interface{}) + buff := []interface{}{} + cancel, finished := LaunchDummyReceiver( + context.Background(), recv, func(msg interface{}) { + buff = append(buff, msg) + }) + launchDummySender(fakeMsgs, recv) + cancel() + select { + case <-finished: + case <-time.After(1 * time.Second): + s.Require().FailNow("should finished after cancel is called") + } + checkBuffer(fakeMsgs, buff) + // Dummy receiver can be shutdown along with parent context, and caching + // is not enabled. + ctx, cancel := context.WithCancel(context.Background()) + _, finished = LaunchDummyReceiver(ctx, recv, nil) + launchDummySender(fakeMsgs, recv) + cancel() + select { + case <-finished: + case <-time.After(1 * time.Second): + s.Require().FailNow("should finished after cancel is called") + } +} + func TestUtils(t *testing.T) { suite.Run(t, new(UtilsTestSuite)) } diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go index 99de7c9..fc6bb47 100644 --- a/integration_test/consensus_test.go +++ b/integration_test/consensus_test.go @@ -367,8 +367,8 @@ func (s *ConsensusTestSuite) TestSync() { } // Clean syncNode's network receive channel, or it might exceed the limit // and block other go routines. - dummyReceiverCtxCancel := test.LaunchDummyReceiver( - context.Background(), syncNode.network) + dummyReceiverCtxCancel, dummyFinished := utils.LaunchDummyReceiver( + context.Background(), syncNode.network.ReceiveChan(), nil) ReachAlive: for { // Check if any error happened or sleep for a period of time. @@ -390,6 +390,7 @@ ReachAlive: } } dummyReceiverCtxCancel() + <-dummyFinished break } // Initiate Syncer. @@ -456,7 +457,8 @@ ReachAlive: stoppedNode.con.Stop() stoppedNode.con = nil fmt.Println("one node stopped", stoppedNode.ID) - test.LaunchDummyReceiver(runnerCtx, stoppedNode.network) + utils.LaunchDummyReceiver( + runnerCtx, stoppedNode.network.ReceiveChan(), nil) continue } if pos.Round < untilRound { |