From bebcd229959b250a4127bdd678b49048ee0208ea Mon Sep 17 00:00:00 2001 From: Sonic Date: Fri, 21 Dec 2018 14:53:37 +0800 Subject: core, dex, internal: block proposer syncing (first iteration) (#96) * dex, internal: block proposer syncing (first iteration) * core: find block from db if not in memory This fix handles stopping proposing and then restarting * core: no need to reorg when reset Dexon will not fork. This commit also fix when a block confirm but its parent is not in db yet, during restarting proposing. * dex: always accept NewBlockMsg, NewBlockHashesMsg We need to accept NewBlockMsg, NewBlockHashesMsg to sync current block with other peers in block proposer mode when syncing lattice data. It's a waste when the node is synced and start proposing. Todo: control msg processing on/off more granular, accept NewBlockMsg, NewBlockHashesMsg when syncing, but stop when synced. --- .../dexon-consensus/core/syncer/agreement.go | 181 +++++ .../dexon-consensus/core/syncer/consensus.go | 747 +++++++++++++++++++++ 2 files changed, 928 insertions(+) create mode 100644 vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go create mode 100644 vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go (limited to 'vendor/github.com') diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go new file mode 100644 index 000000000..fee462442 --- /dev/null +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go @@ -0,0 +1,181 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// . + +package syncer + +import ( + "sync" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core" + "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" +) + +// Struct agreement implements struct of BA (Byzantine Agreement) protocol +// needed in syncer, which only receives agreement results. +type agreement struct { + wg *sync.WaitGroup + cache *utils.NodeSetCache + inputChan chan interface{} + outputChan chan<- *types.Block + pullChan chan<- common.Hash + blocks map[types.Position]map[common.Hash]*types.Block + agreementResults map[common.Hash]struct{} + latestCRSRound uint64 + pendings map[uint64]map[common.Hash]*types.AgreementResult + logger common.Logger + confirmedBlocks map[common.Hash]struct{} +} + +// newAgreement creates a new agreement instance. +func newAgreement( + ch chan<- *types.Block, + pullChan chan<- common.Hash, + cache *utils.NodeSetCache, + wg *sync.WaitGroup, + logger common.Logger) *agreement { + + return &agreement{ + cache: cache, + wg: wg, + inputChan: make(chan interface{}, 1000), + outputChan: ch, + pullChan: pullChan, + blocks: make(map[types.Position]map[common.Hash]*types.Block), + agreementResults: make(map[common.Hash]struct{}), + logger: logger, + pendings: make( + map[uint64]map[common.Hash]*types.AgreementResult), + confirmedBlocks: make(map[common.Hash]struct{}), + } +} + +// run starts the agreement, this does not start a new routine, go a new +// routine explicitly in the caller. +func (a *agreement) run() { + a.wg.Add(1) + defer a.wg.Done() + for { + select { + case val, ok := <-a.inputChan: + if !ok { + // InputChan is closed by network when network ends. + return + } + switch v := val.(type) { + case *types.Block: + a.processBlock(v) + case *types.AgreementResult: + a.processAgreementResult(v) + case uint64: + a.processNewCRS(v) + } + } + } +} + +func (a *agreement) processBlock(b *types.Block) { + if _, exist := a.confirmedBlocks[b.Hash]; exist { + return + } + if _, exist := a.agreementResults[b.Hash]; exist { + a.confirm(b) + } else { + if _, exist := a.blocks[b.Position]; !exist { + a.blocks[b.Position] = make(map[common.Hash]*types.Block) + } + a.blocks[b.Position][b.Hash] = b + } +} + +func (a *agreement) processAgreementResult(r *types.AgreementResult) { + // Cache those results that CRS is not ready yet. + if _, exists := a.confirmedBlocks[r.BlockHash]; exists { + a.logger.Info("agreement result already confirmed", "result", r) + return + } + if r.Position.Round > a.latestCRSRound { + pendingsForRound, exists := a.pendings[r.Position.Round] + if !exists { + pendingsForRound = make(map[common.Hash]*types.AgreementResult) + a.pendings[r.Position.Round] = pendingsForRound + } + pendingsForRound[r.BlockHash] = r + a.logger.Info("agreement result cached", "result", r) + return + } + if err := core.VerifyAgreementResult(r, a.cache); err != nil { + a.logger.Error("agreement result verification failed", + "result", r, + "error", err) + return + } + if r.IsEmptyBlock { + // Empty block is also confirmed. + b := &types.Block{ + Position: r.Position, + } + a.confirm(b) + } else { + needPull := true + if bs, exist := a.blocks[r.Position]; exist { + if b, exist := bs[r.BlockHash]; exist { + a.confirm(b) + needPull = false + } + } + if needPull { + a.agreementResults[r.BlockHash] = struct{}{} + a.pullChan <- r.BlockHash + } + } +} + +func (a *agreement) processNewCRS(round uint64) { + if round <= a.latestCRSRound { + return + } + // Verify all pending results. + for r := a.latestCRSRound + 1; r <= round; r++ { + pendingsForRound := a.pendings[r] + if pendingsForRound == nil { + continue + } + delete(a.pendings, r) + for _, res := range pendingsForRound { + if err := core.VerifyAgreementResult(res, a.cache); err != nil { + a.logger.Error("invalid agreement result", "result", res) + continue + } + a.logger.Error("flush agreement result", "result", res) + a.processAgreementResult(res) + break + } + } + a.latestCRSRound = round +} + +// confirm notifies consensus the confirmation of a block in BA. +func (a *agreement) confirm(b *types.Block) { + if _, exist := a.confirmedBlocks[b.Hash]; !exist { + delete(a.blocks, b.Position) + delete(a.agreementResults, b.Hash) + a.outputChan <- b + a.confirmedBlocks[b.Hash] = struct{}{} + } +} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go new file mode 100644 index 000000000..da9d352f4 --- /dev/null +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go @@ -0,0 +1,747 @@ +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// . + +package syncer + +import ( + "context" + "fmt" + "sort" + "sync" + "time" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core" + "github.com/dexon-foundation/dexon-consensus/core/crypto" + "github.com/dexon-foundation/dexon-consensus/core/db" + "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" +) + +var ( + // ErrAlreadySynced is reported when syncer is synced. + ErrAlreadySynced = fmt.Errorf("already synced") + // ErrNotSynced is reported when syncer is not synced yet. + ErrNotSynced = fmt.Errorf("not synced yet") + // ErrGenesisBlockReached is reported when genesis block reached. + ErrGenesisBlockReached = fmt.Errorf("genesis block reached") + // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered blocks. + ErrInvalidBlockOrder = fmt.Errorf("invalid block order") + // ErrMismatchBlockHashSequence means the delivering sequence is not + // correct, compared to finalized blocks. + ErrMismatchBlockHashSequence = fmt.Errorf("mismatch block hash sequence") + // ErrInvalidSyncingFinalizationHeight raised when the blocks to sync is + // not following the compaction chain tip in database. + ErrInvalidSyncingFinalizationHeight = fmt.Errorf( + "invalid syncing finalization height") +) + +// Consensus is for syncing consensus module. +type Consensus struct { + db db.Database + gov core.Governance + dMoment time.Time + logger common.Logger + app core.Application + prv crypto.PrivateKey + network core.Network + nodeSetCache *utils.NodeSetCache + + lattice *core.Lattice + validatedChains map[uint32]struct{} + finalizedBlockHashes common.Hashes + latticeLastRound uint64 + randomnessResults []*types.BlockRandomnessResult + blocks []types.ByPosition + agreements []*agreement + configs []*types.Config + roundBeginTimes []time.Time + agreementRoundCut uint64 + + // lock for accessing all fields. + lock sync.RWMutex + moduleWaitGroup sync.WaitGroup + agreementWaitGroup sync.WaitGroup + pullChan chan common.Hash + receiveChan chan *types.Block + ctx context.Context + ctxCancel context.CancelFunc + syncedLastBlock *types.Block + syncedConsensus *core.Consensus +} + +// NewConsensus creates an instance for Consensus (syncer consensus). +func NewConsensus( + dMoment time.Time, + app core.Application, + gov core.Governance, + db db.Database, + network core.Network, + prv crypto.PrivateKey, + logger common.Logger) *Consensus { + + con := &Consensus{ + dMoment: dMoment, + app: app, + gov: gov, + db: db, + network: network, + nodeSetCache: utils.NewNodeSetCache(gov), + prv: prv, + logger: logger, + validatedChains: make(map[uint32]struct{}), + configs: []*types.Config{ + utils.GetConfigWithPanic(gov, 0, logger), + }, + roundBeginTimes: []time.Time{dMoment}, + receiveChan: make(chan *types.Block, 1000), + pullChan: make(chan common.Hash, 1000), + } + con.ctx, con.ctxCancel = context.WithCancel(context.Background()) + return con +} + +func (con *Consensus) initConsensusObj(initBlock *types.Block) { + var cfg *types.Config + func() { + con.lock.Lock() + defer con.lock.Unlock() + con.latticeLastRound = initBlock.Position.Round + cfg = con.configs[con.latticeLastRound] + debugApp, _ := con.app.(core.Debug) + con.lattice = core.NewLattice( + con.roundBeginTimes[con.latticeLastRound], + con.latticeLastRound, + cfg, + core.NewAuthenticator(con.prv), + con.app, + debugApp, + con.db, + con.logger, + ) + }() + con.startAgreement(cfg.NumChains) + con.startNetwork() + con.startCRSMonitor() +} + +func (con *Consensus) checkIfValidated() bool { + con.lock.RLock() + defer con.lock.RUnlock() + var numChains = con.configs[con.blocks[0][0].Position.Round].NumChains + var validatedChainCount uint32 + // Make sure we validate some block in all chains. + for chainID := range con.validatedChains { + if chainID < numChains { + validatedChainCount++ + } + } + if validatedChainCount == numChains { + return true + } + con.logger.Info("not validated yet", "validated-chain", validatedChainCount) + return false +} + +func (con *Consensus) checkIfSynced(blocks []*types.Block) bool { + con.lock.RLock() + defer con.lock.RUnlock() + var ( + numChains = con.configs[con.blocks[0][0].Position.Round].NumChains + compactionTips = make([]*types.Block, numChains) + overlapCount = uint32(0) + ) + // Find tips (newset blocks) of each chain in compaction chain. + b := blocks[len(blocks)-1] + for tipCount := uint32(0); tipCount < numChains; { + if compactionTips[b.Position.ChainID] == nil { + // Check chainID for config change. + if b.Position.ChainID < numChains { + compactionTips[b.Position.ChainID] = b + tipCount++ + } + } + if (b.Finalization.ParentHash == common.Hash{}) { + return false + } + b1, err := con.db.GetBlock(b.Finalization.ParentHash) + if err != nil { + panic(err) + } + b = &b1 + } + // Check if chain tips of compaction chain and current cached confirmed + // blocks are overlapped on each chain, numChains is decided by the round + // of last block we seen on compaction chain. + for chainID, b := range compactionTips { + if len(con.blocks[chainID]) > 0 { + if !b.Position.Older(&con.blocks[chainID][0].Position) { + overlapCount++ + } + } + } + if overlapCount == numChains { + return true + } + con.logger.Info("not synced yet", + "overlap-count", overlapCount, + "num-chain", numChains, + "last-block", blocks[len(blocks)-1]) + return false +} + +// ensureAgreementOverlapRound ensures the oldest blocks in each chain in +// con.blocks are all in the same round, for avoiding config change while +// syncing. +func (con *Consensus) ensureAgreementOverlapRound() bool { + con.lock.Lock() + defer con.lock.Unlock() + if con.agreementRoundCut > 0 { + return true + } + // Clean empty blocks on tips of chains. + for idx, bs := range con.blocks { + for len(bs) > 0 && con.isEmptyBlock(bs[0]) { + bs = bs[1:] + } + con.blocks[idx] = bs + } + // Build empty blocks. + for _, bs := range con.blocks { + for i := range bs { + if con.isEmptyBlock(bs[i]) { + if bs[i-1].Position.Height == bs[i].Position.Height-1 { + con.buildEmptyBlock(bs[i], bs[i-1]) + } + } + } + } + var tipRoundMap map[uint64]uint32 + for { + tipRoundMap = make(map[uint64]uint32) + for _, bs := range con.blocks { + if len(bs) > 0 { + tipRoundMap[bs[0].Position.Round]++ + } + } + if len(tipRoundMap) <= 1 { + break + } + // Make all tips in same round. + var maxRound uint64 + for r := range tipRoundMap { + if r > maxRound { + maxRound = r + } + } + for idx, bs := range con.blocks { + for len(bs) > 0 && bs[0].Position.Round < maxRound { + bs = bs[1:] + } + con.blocks[idx] = bs + } + } + if len(tipRoundMap) == 1 { + var r uint64 + for r = range tipRoundMap { + break + } + if tipRoundMap[r] == con.configs[r].NumChains { + con.agreementRoundCut = r + con.logger.Info("agreement round cut found, round", r) + return true + } + } + return false +} + +func (con *Consensus) findLatticeSyncBlock( + blocks []*types.Block) (*types.Block, error) { + lastBlock := blocks[len(blocks)-1] + round := lastBlock.Position.Round + for { + // Find round r which r-1, r, r+1 are all in same total ordering config. + for { + sameAsPrevRound := round == 0 || !con.isConfigChanged( + con.configs[round-1], con.configs[round]) + sameAsNextRound := !con.isConfigChanged( + con.configs[round], con.configs[round+1]) + if sameAsPrevRound && sameAsNextRound { + break + } + if round == 0 { + // Unable to find a safe round, wait for new rounds. + return nil, nil + } + round-- + } + // Find the newset block which round is "round". + for lastBlock.Position.Round != round { + if (lastBlock.Finalization.ParentHash == common.Hash{}) { + return nil, ErrGenesisBlockReached + } + b, err := con.db.GetBlock(lastBlock.Finalization.ParentHash) + if err != nil { + return nil, err + } + lastBlock = &b + } + // Find the deliver set by hash for two times. Blocks in a deliver set + // returned by total ordering is sorted by hash. If a block's parent hash + // is greater than its hash means there is a cut between deliver sets. + var curBlock, prevBlock *types.Block + var deliverSetFirstBlock, deliverSetLastBlock *types.Block + curBlock = lastBlock + for { + if (curBlock.Finalization.ParentHash == common.Hash{}) { + return nil, ErrGenesisBlockReached + } + b, err := con.db.GetBlock(curBlock.Finalization.ParentHash) + if err != nil { + return nil, err + } + prevBlock = &b + if !prevBlock.Hash.Less(curBlock.Hash) { + break + } + curBlock = prevBlock + } + deliverSetLastBlock = prevBlock + curBlock = prevBlock + for { + if (curBlock.Finalization.ParentHash == common.Hash{}) { + break + } + b, err := con.db.GetBlock(curBlock.Finalization.ParentHash) + if err != nil { + return nil, err + } + prevBlock = &b + if !prevBlock.Hash.Less(curBlock.Hash) { + break + } + curBlock = prevBlock + } + deliverSetFirstBlock = curBlock + // Check if all blocks from deliverSetFirstBlock to deliverSetLastBlock + // are in the same round. + ok := true + curBlock = deliverSetLastBlock + for { + if curBlock.Position.Round != round { + ok = false + break + } + b, err := con.db.GetBlock(curBlock.Finalization.ParentHash) + if err != nil { + return nil, err + } + curBlock = &b + if curBlock.Hash == deliverSetFirstBlock.Hash { + break + } + } + if ok { + return deliverSetFirstBlock, nil + } + if round == 0 { + return nil, nil + } + round-- + } +} + +func (con *Consensus) processFinalizedBlock(block *types.Block) error { + if con.lattice == nil { + return nil + } + con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash) + delivered, err := con.lattice.ProcessFinalizedBlock(block) + if err != nil { + return err + } + for idx, b := range delivered { + if con.finalizedBlockHashes[idx] != b.Hash { + return ErrMismatchBlockHashSequence + } + con.validatedChains[b.Position.ChainID] = struct{}{} + } + con.finalizedBlockHashes = con.finalizedBlockHashes[len(delivered):] + return nil +} + +// SyncBlocks syncs blocks from compaction chain, latest is true if the caller +// regards the blocks are the latest ones. Notice that latest can be true for +// many times. +// NOTICE: parameter "blocks" should be consecutive in compaction height. +func (con *Consensus) SyncBlocks( + blocks []*types.Block, latest bool) (bool, error) { + if con.syncedLastBlock != nil { + return true, ErrAlreadySynced + } + if len(blocks) == 0 { + return false, nil + } + // Check if blocks are consecutive. + for i := 1; i < len(blocks); i++ { + if blocks[i].Finalization.Height != blocks[i-1].Finalization.Height+1 { + return false, ErrInvalidBlockOrder + } + } + // Make sure the first block is the next block of current compaction chain + // tip in DB. + _, tipHeight := con.db.GetCompactionChainTipInfo() + if blocks[0].Finalization.Height != tipHeight+1 { + con.logger.Error("mismatched finalization height", + "now", blocks[0].Finalization.Height, + "expected", tipHeight+1) + return false, ErrInvalidSyncingFinalizationHeight + } + con.logger.Info("syncBlocks", + "position", &blocks[0].Position, + "final height", blocks[0].Finalization.Height, + "len", len(blocks), + "latest", latest, + ) + con.setupConfigs(blocks) + for _, b := range blocks { + // TODO(haoping) remove this if lattice puts blocks into db. + if err := con.db.PutBlock(*b); err != nil { + // A block might be put into db when confirmed by BA, but not + // finalized yet. + if err == db.ErrBlockExists { + err = con.db.UpdateBlock(*b) + } + if err != nil { + return false, err + } + } + if err := con.db.PutCompactionChainTipInfo( + b.Hash, b.Finalization.Height); err != nil { + return false, err + } + if err := con.processFinalizedBlock(b); err != nil { + return false, err + } + } + if latest && con.lattice == nil { + // New Lattice and find the deliver set of total ordering when "latest" is + // true for first time. Deliver set is found by block hashes. + syncBlock, err := con.findLatticeSyncBlock(blocks) + if err != nil { + return false, err + } + if syncBlock != nil { + con.logger.Info("deliver set found", syncBlock) + // New lattice with the round of syncBlock. + con.initConsensusObj(syncBlock) + // Process blocks from syncBlock to blocks' last block. + b := blocks[len(blocks)-1] + blocksCount := b.Finalization.Height - syncBlock.Finalization.Height + 1 + blocksToProcess := make([]*types.Block, blocksCount) + for { + blocksToProcess[blocksCount-1] = b + blocksCount-- + if b.Hash == syncBlock.Hash { + break + } + b1, err := con.db.GetBlock(b.Finalization.ParentHash) + if err != nil { + return false, err + } + b = &b1 + } + for _, b := range blocksToProcess { + if err := con.processFinalizedBlock(b); err != nil { + return false, err + } + } + } + } + if latest && con.ensureAgreementOverlapRound() { + // Check if compaction and agreements' blocks are overlapped. The + // overlapping of compaction chain and BA's oldest blocks means the + // syncing is done. + if con.checkIfValidated() && con.checkIfSynced(blocks) { + if err := con.Stop(); err != nil { + return false, err + } + con.syncedLastBlock = blocks[len(blocks)-1] + con.logger.Info("syncer.Consensus synced", + "last-block", con.syncedLastBlock) + } + } + return con.syncedLastBlock != nil, nil +} + +// GetSyncedConsensus returns the core.Consensus instance after synced. +func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { + if con.syncedConsensus != nil { + return con.syncedConsensus, nil + } + if con.syncedLastBlock == nil { + return nil, ErrNotSynced + } + // flush all blocks in con.blocks into core.Consensus, and build + // core.Consensus from syncer. + confirmedBlocks := []*types.Block{} + func() { + con.lock.Lock() + defer con.lock.Unlock() + for _, bs := range con.blocks { + confirmedBlocks = append(confirmedBlocks, bs...) + } + }() + var err error + con.syncedConsensus, err = core.NewConsensusFromSyncer( + con.syncedLastBlock, + con.roundBeginTimes[con.syncedLastBlock.Position.Round], + con.app, + con.gov, + con.db, + con.network, + con.prv, + con.lattice, + confirmedBlocks, + con.randomnessResults, + con.logger) + return con.syncedConsensus, err +} + +// Stop the syncer. +// +// This method is mainly for caller to stop the syncer before synced, the syncer +// would call this method automatically after synced. +func (con *Consensus) Stop() error { + // Stop network and CRS routines, wait until they are all stoped. + con.ctxCancel() + con.moduleWaitGroup.Wait() + // Stop agreements. + con.stopAgreement() + return nil +} + +// isEmptyBlock checks if a block is an empty block by both its hash and parent +// hash are empty. +func (con *Consensus) isEmptyBlock(b *types.Block) bool { + return b.Hash == common.Hash{} && b.ParentHash == common.Hash{} +} + +// buildEmptyBlock builds an empty block in agreement. +func (con *Consensus) buildEmptyBlock(b *types.Block, parent *types.Block) { + cfg := con.configs[b.Position.Round] + b.Timestamp = parent.Timestamp.Add(cfg.MinBlockInterval) + b.Witness.Height = parent.Witness.Height + b.Witness.Data = make([]byte, len(parent.Witness.Data)) + copy(b.Witness.Data, parent.Witness.Data) + b.Acks = common.NewSortedHashes(common.Hashes{parent.Hash}) +} + +// setupConfigs is called by SyncBlocks with blocks from compaction chain. In +// the first time, setupConfigs setups from round 0. +func (con *Consensus) setupConfigs(blocks []*types.Block) { + // Find max round in blocks. + var maxRound uint64 + for _, b := range blocks { + if b.Position.Round > maxRound { + maxRound = b.Position.Round + } + } + // Get configs from governance. + // + // In fullnode, the notification of new round is yet another TX, which + // needs to be executed after corresponding block delivered. Thus, the + // configuration for 'maxRound + core.ConfigRoundShift' won't be ready when + // seeing this block. + untilRound := maxRound + core.ConfigRoundShift - 1 + curMaxNumChains := uint32(0) + func() { + con.lock.Lock() + defer con.lock.Unlock() + for r := uint64(len(con.configs)); r <= untilRound; r++ { + cfg := utils.GetConfigWithPanic(con.gov, r, con.logger) + con.configs = append(con.configs, cfg) + con.roundBeginTimes = append( + con.roundBeginTimes, + con.roundBeginTimes[r-1].Add(con.configs[r-1].RoundInterval)) + if cfg.NumChains >= curMaxNumChains { + curMaxNumChains = cfg.NumChains + } + } + }() + con.resizeByNumChains(curMaxNumChains) + // Notify core.Lattice for new configs. + if con.lattice != nil { + for con.latticeLastRound+1 <= maxRound { + con.latticeLastRound++ + if err := con.lattice.AppendConfig( + con.latticeLastRound, + con.configs[con.latticeLastRound]); err != nil { + panic(err) + } + } + } +} + +// resizeByNumChains resizes fake lattice and agreement if numChains increases. +// Notice the decreasing case is neglected. +func (con *Consensus) resizeByNumChains(numChains uint32) { + con.lock.Lock() + defer con.lock.Unlock() + if numChains > uint32(len(con.blocks)) { + for i := uint32(len(con.blocks)); i < numChains; i++ { + // Resize the pool of blocks. + con.blocks = append(con.blocks, types.ByPosition{}) + // Resize agreement modules. + a := newAgreement(con.receiveChan, con.pullChan, con.nodeSetCache, + &con.agreementWaitGroup, con.logger) + con.agreements = append(con.agreements, a) + go a.run() + } + } +} + +// startAgreement starts agreements for receiving votes and agreements. +func (con *Consensus) startAgreement(numChains uint32) { + // Start a routine for listening receive channel and pull block channel. + go func() { + for { + select { + case b, ok := <-con.receiveChan: + if !ok { + return + } + chainID := b.Position.ChainID + func() { + con.lock.Lock() + defer con.lock.Unlock() + // If round is cut in agreements, do not add blocks with round less + // then cut round. + if b.Position.Round < con.agreementRoundCut { + return + } + con.blocks[chainID] = append(con.blocks[chainID], b) + sort.Sort(con.blocks[chainID]) + }() + case h, ok := <-con.pullChan: + if !ok { + return + } + con.network.PullBlocks(common.Hashes{h}) + } + } + }() +} + +// startNetwork starts network for receiving blocks and agreement results. +func (con *Consensus) startNetwork() { + go func() { + con.moduleWaitGroup.Add(1) + defer con.moduleWaitGroup.Done() + Loop: + for { + select { + case val := <-con.network.ReceiveChan(): + var pos types.Position + switch v := val.(type) { + case *types.Block: + pos = v.Position + case *types.AgreementResult: + pos = v.Position + case *types.BlockRandomnessResult: + func() { + con.lock.Lock() + defer con.lock.Unlock() + if v.Position.Round >= con.agreementRoundCut { + con.randomnessResults = append(con.randomnessResults, v) + } + }() + continue Loop + default: + continue Loop + } + func() { + con.lock.RLock() + defer con.lock.RUnlock() + if pos.ChainID >= uint32(len(con.agreements)) { + con.logger.Error("Unknown chainID message received (syncer)", + "position", &pos) + } + }() + con.agreements[pos.ChainID].inputChan <- val + case <-con.ctx.Done(): + return + } + } + }() +} + +// startCRSMonitor is the dummiest way to verify if the CRS for one round +// is ready or not. +func (con *Consensus) startCRSMonitor() { + var lastNotifiedRound uint64 + // Notify all agreements for new CRS. + notifyNewCRS := func(round uint64) { + if round == lastNotifiedRound { + return + } + con.logger.Info("CRS is ready", "round", round) + con.lock.RLock() + defer con.lock.RUnlock() + lastNotifiedRound = round + for _, a := range con.agreements { + a.inputChan <- round + } + } + go func() { + con.moduleWaitGroup.Add(1) + defer con.moduleWaitGroup.Done() + for { + select { + case <-con.ctx.Done(): + return + case <-time.After(1 * time.Second): + // Notify agreement modules for the latest round that CRS is + // available if the round is not notified yet. + var crsRound = lastNotifiedRound + for (con.gov.CRS(crsRound+1) != common.Hash{}) { + crsRound++ + } + notifyNewCRS(crsRound) + } + } + }() +} + +func (con *Consensus) stopAgreement() { + func() { + con.lock.Lock() + defer con.lock.Unlock() + for _, a := range con.agreements { + close(a.inputChan) + } + }() + con.agreementWaitGroup.Wait() + close(con.receiveChan) + close(con.pullChan) +} + +func (con *Consensus) isConfigChanged(prev, cur *types.Config) bool { + return prev.K != cur.K || + prev.NumChains != cur.NumChains || + prev.PhiRatio != cur.PhiRatio +} -- cgit v1.2.3