diff options
author | haoping-ku <haoping.ku@dexon.org> | 2018-11-29 14:30:02 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-11-29 14:30:02 +0800 |
commit | daf3bab93c323b173345811adc9a334dad4a7094 (patch) | |
tree | 8a3957ec1f77262ce92c56f0384b0dedc307628c | |
parent | 8470ac070f097b261fddc42991a4d2e9ec998db6 (diff) | |
download | dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar.gz dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar.bz2 dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar.lz dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar.xz dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar.zst dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.zip |
core: syncer: add syncer (#346)
-rw-r--r-- | core/consensus.go | 39 | ||||
-rw-r--r-- | core/lattice.go | 30 | ||||
-rw-r--r-- | core/syncer/agreement.go | 177 | ||||
-rw-r--r-- | core/syncer/consensus.go | 622 | ||||
-rw-r--r-- | core/test/app.go | 6 | ||||
-rw-r--r-- | core/test/fake-transport.go | 17 | ||||
-rw-r--r-- | core/test/interface.go | 17 | ||||
-rw-r--r-- | core/test/network.go | 5 | ||||
-rw-r--r-- | core/test/revealer.go | 83 | ||||
-rw-r--r-- | core/test/revealer_test.go | 60 | ||||
-rw-r--r-- | core/test/stopper.go | 4 | ||||
-rw-r--r-- | core/total-ordering-syncer_test.go | 2 | ||||
-rw-r--r-- | core/total-ordering.go | 17 | ||||
-rw-r--r-- | core/total-ordering_test.go | 28 | ||||
-rw-r--r-- | core/utils.go | 45 | ||||
-rw-r--r-- | integration_test/consensus_test.go | 213 | ||||
-rw-r--r-- | integration_test/stats.go | 19 | ||||
-rw-r--r-- | integration_test/stats_test.go | 17 | ||||
-rw-r--r-- | integration_test/utils.go | 17 |
19 files changed, 1291 insertions, 127 deletions
diff --git a/core/consensus.go b/core/consensus.go index af40417..0d1955d 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -639,9 +639,9 @@ func (con *Consensus) initialRound( // Stop the Consensus core. func (con *Consensus) Stop() { + con.ctxCancel() con.baMgr.stop() con.event.Reset() - con.ctxCancel() } func (con *Consensus) processMsg(msgChan <-chan interface{}) { @@ -761,44 +761,9 @@ func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { func (con *Consensus) ProcessAgreementResult( rand *types.AgreementResult) error { // Sanity Check. - notarySet, err := con.nodeSetCache.GetNotarySet( - rand.Position.Round, rand.Position.ChainID) - if err != nil { + if err := VerifyAgreementResult(rand, con.nodeSetCache); err != nil { return err } - if len(rand.Votes) < len(notarySet)/3*2+1 { - return ErrNotEnoughVotes - } - if len(rand.Votes) > len(notarySet) { - return ErrIncorrectVoteProposer - } - for _, vote := range rand.Votes { - if rand.IsEmptyBlock { - if (vote.BlockHash != common.Hash{}) { - return ErrIncorrectVoteBlockHash - } - } else { - if vote.BlockHash != rand.BlockHash { - return ErrIncorrectVoteBlockHash - } - } - if vote.Type != types.VoteCom { - return ErrIncorrectVoteType - } - if vote.Position != rand.Position { - return ErrIncorrectVotePosition - } - if _, exist := notarySet[vote.ProposerID]; !exist { - return ErrIncorrectVoteProposer - } - ok, err := verifyVoteSignature(&vote) - if err != nil { - return err - } - if !ok { - return ErrIncorrectVoteSignature - } - } // Syncing BA Module. if err := con.baMgr.processAgreementResult(rand); err != nil { return err diff --git a/core/lattice.go b/core/lattice.go index f94ca5a..e578e3f 100644 --- a/core/lattice.go +++ b/core/lattice.go @@ -39,7 +39,6 @@ type Lattice struct { app Application debug Debug pool blockPool - retryAdd bool data *latticeData toModule *totalOrdering ctModule *consensusTimestamp @@ -64,7 +63,7 @@ func NewLattice( debug: debug, pool: newBlockPool(cfg.NumChains), data: newLatticeData(db, dMoment, round, cfg), - toModule: newTotalOrdering(dMoment, cfg), + toModule: newTotalOrdering(dMoment, round, cfg), ctModule: newConsensusTimestamp(dMoment, round, cfg.NumChains), logger: logger, } @@ -306,4 +305,31 @@ func (l *Lattice) AppendConfig(round uint64, config *types.Config) (err error) { // ProcessFinalizedBlock is used for syncing lattice data. func (l *Lattice) ProcessFinalizedBlock(b *types.Block) { + l.lock.Lock() + defer l.lock.Unlock() + // Syncing state for core.latticeData module. + if err := l.data.addFinalizedBlock(b); err != nil { + panic(err) + } + l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height) + // Syncing state for core.totalOrdering module. + toDelivered, deliveredMode, err := l.toModule.processBlock(b) + if err != nil { + panic(err) + } + if len(toDelivered) == 0 { + return + } + hashes := make(common.Hashes, len(toDelivered)) + for idx := range toDelivered { + hashes[idx] = toDelivered[idx].Hash + } + if l.debug != nil { + l.debug.TotalOrderingDelivered(hashes, deliveredMode) + } + // Sync core.consensusTimestamp module. + if err = l.ctModule.processBlocks(toDelivered); err != nil { + panic(err) + } + return } diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go new file mode 100644 index 0000000..89b8c8d --- /dev/null +++ b/core/syncer/agreement.go @@ -0,0 +1,177 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package syncer + +import ( + "sync" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core" + "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" +) + +// Struct agreement implements struct of BA (Byzantine Agreement) protocol +// needed in syncer, which only receives agreement results. +type agreement struct { + wg *sync.WaitGroup + cache *utils.NodeSetCache + inputChan chan interface{} + outputChan chan<- *types.Block + pullChan chan<- common.Hash + blocks map[types.Position]map[common.Hash]*types.Block + agreementResults map[common.Hash]struct{} + latestCRSRound uint64 + pendings map[uint64]map[common.Hash]*types.AgreementResult + logger common.Logger + confirmedBlocks map[common.Hash]struct{} +} + +// newAgreement creates a new agreement instance. +func newAgreement( + ch chan<- *types.Block, + pullChan chan<- common.Hash, + cache *utils.NodeSetCache, + wg *sync.WaitGroup, + logger common.Logger) *agreement { + + return &agreement{ + cache: cache, + wg: wg, + inputChan: make(chan interface{}, 1000), + outputChan: ch, + pullChan: pullChan, + blocks: make(map[types.Position]map[common.Hash]*types.Block), + agreementResults: make(map[common.Hash]struct{}), + logger: logger, + pendings: make( + map[uint64]map[common.Hash]*types.AgreementResult), + confirmedBlocks: make(map[common.Hash]struct{}), + } +} + +// run starts the agreement, this does not start a new routine, go a new +// routine explicitly in the caller. +func (a *agreement) run() { + a.wg.Add(1) + defer a.wg.Done() + for { + select { + case val, ok := <-a.inputChan: + if !ok { + // InputChan is closed by network when network ends. + return + } + switch v := val.(type) { + case *types.Block: + a.processBlock(v) + case *types.AgreementResult: + a.processAgreementResult(v) + case uint64: + a.processNewCRS(v) + } + } + } +} + +func (a *agreement) processBlock(b *types.Block) { + if _, exist := a.confirmedBlocks[b.Hash]; exist { + return + } + if _, exist := a.agreementResults[b.Hash]; exist { + a.confirm(b) + } else { + if _, exist := a.blocks[b.Position]; !exist { + a.blocks[b.Position] = make(map[common.Hash]*types.Block) + } + a.blocks[b.Position][b.Hash] = b + } +} + +func (a *agreement) processAgreementResult(r *types.AgreementResult) { + // Cache those results that CRS is not ready yet. + if r.Position.Round > a.latestCRSRound { + pendingsForRound, exists := a.pendings[r.Position.Round] + if !exists { + pendingsForRound = make(map[common.Hash]*types.AgreementResult) + a.pendings[r.Position.Round] = pendingsForRound + } + pendingsForRound[r.BlockHash] = r + a.logger.Info("agreement result cached", "result", r) + return + } + if err := core.VerifyAgreementResult(r, a.cache); err != nil { + a.logger.Error("agreement result verification failed", + "result", r, + "error", err) + return + } + if r.IsEmptyBlock { + // Empty block is also confirmed. + b := &types.Block{ + Position: r.Position, + } + a.confirm(b) + } else { + needPull := true + if bs, exist := a.blocks[r.Position]; exist { + if b, exist := bs[r.BlockHash]; exist { + a.confirm(b) + needPull = false + } + } + if needPull { + a.agreementResults[r.BlockHash] = struct{}{} + a.pullChan <- r.BlockHash + } + } +} + +func (a *agreement) processNewCRS(round uint64) { + if round <= a.latestCRSRound { + return + } + // Verify all pending results. + for r := a.latestCRSRound + 1; r <= round; r++ { + pendingsForRound := a.pendings[r] + if pendingsForRound == nil { + continue + } + delete(a.pendings, r) + for _, res := range pendingsForRound { + if err := core.VerifyAgreementResult(res, a.cache); err != nil { + a.logger.Error("invalid agreement result", "result", res) + continue + } + a.logger.Error("flush agreement result", "result", res) + a.processAgreementResult(res) + break + } + } + a.latestCRSRound = round +} + +// confirm notifies consensus the confirmation of a block in BA. +func (a *agreement) confirm(b *types.Block) { + if _, exist := a.confirmedBlocks[b.Hash]; !exist { + delete(a.blocks, b.Position) + delete(a.agreementResults, b.Hash) + a.outputChan <- b + a.confirmedBlocks[b.Hash] = struct{}{} + } +} diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go new file mode 100644 index 0000000..d84f168 --- /dev/null +++ b/core/syncer/consensus.go @@ -0,0 +1,622 @@ +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// <http://www.gnu.org/licenses/>. + +package syncer + +import ( + "context" + "fmt" + "sort" + "sync" + "time" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core" + "github.com/dexon-foundation/dexon-consensus/core/blockdb" + "github.com/dexon-foundation/dexon-consensus/core/crypto" + "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" +) + +var ( + // ErrAlreadySynced is reported when syncer is synced. + ErrAlreadySynced = fmt.Errorf("already synced") + // ErrGenesisBlockReached is reported when genesis block reached. + ErrGenesisBlockReached = fmt.Errorf("genesis block reached") + // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered blocks. + ErrInvalidBlockOrder = fmt.Errorf("invalid block order") +) + +// Consensus is for syncing consensus module. +type Consensus struct { + db blockdb.BlockDatabase + gov core.Governance + dMoment time.Time + logger common.Logger + app core.Application + prv crypto.PrivateKey + network core.Network + nodeSetCache *utils.NodeSetCache + + lattice *core.Lattice + latticeLastRound uint64 + blocks []types.ByPosition + agreements []*agreement + configs []*types.Config + roundBeginTimes []time.Time + agreementRoundCut uint64 + + // lock for accessing all fields. + lock sync.RWMutex + moduleWaitGroup sync.WaitGroup + agreementWaitGroup sync.WaitGroup + pullChan chan common.Hash + receiveChan chan *types.Block + ctx context.Context + ctxCancel context.CancelFunc + isSynced bool +} + +// NewConsensus creates an instance for Consensus (syncer consensus). +func NewConsensus( + dMoment time.Time, + app core.Application, + gov core.Governance, + db blockdb.BlockDatabase, + network core.Network, + prv crypto.PrivateKey, + logger common.Logger) *Consensus { + + con := &Consensus{ + dMoment: dMoment, + app: app, + gov: gov, + db: db, + network: network, + nodeSetCache: utils.NewNodeSetCache(gov), + prv: prv, + logger: logger, + isSynced: false, + configs: []*types.Config{gov.Configuration(0)}, + roundBeginTimes: []time.Time{dMoment}, + receiveChan: make(chan *types.Block, 1000), + pullChan: make(chan common.Hash, 1000), + } + con.ctx, con.ctxCancel = context.WithCancel(context.Background()) + return con +} + +func (con *Consensus) initConsensusObj(initBlock *types.Block) { + var cfg *types.Config + func() { + con.lock.Lock() + defer con.lock.Unlock() + con.latticeLastRound = initBlock.Position.Round + cfg = con.configs[con.latticeLastRound] + debugApp, _ := con.app.(core.Debug) + con.lattice = core.NewLattice( + con.roundBeginTimes[con.latticeLastRound], + con.latticeLastRound, + cfg, + core.NewAuthenticator(con.prv), + con.app, + debugApp, + con.db, + con.logger, + ) + }() + con.startAgreement(cfg.NumChains) + con.startNetwork() + con.startCRSMonitor() +} + +func (con *Consensus) checkIfSynced(blocks []*types.Block) { + var ( + numChains = con.configs[con.blocks[0][0].Position.Round].NumChains + compactionTips = make([]*types.Block, numChains) + overlapCount = uint32(0) + ) + // Find tips (newset blocks) of each chain in compaction chain. + b := blocks[len(blocks)-1] + for tipCount := uint32(0); tipCount < numChains; { + if compactionTips[b.Position.ChainID] == nil { + // Check chainID for config change. + if b.Position.ChainID < numChains { + compactionTips[b.Position.ChainID] = b + tipCount++ + } + } + if (b.Finalization.ParentHash == common.Hash{}) { + return + } + b1, err := con.db.Get(b.Finalization.ParentHash) + if err != nil { + panic(err) + } + b = &b1 + } + // Check if chain tips of compaction chain and current cached confirmed + // blocks are overlapped on each chain, numChains is decided by the round + // of last block we seen on compaction chain. + con.lock.RLock() + defer con.lock.RUnlock() + for chainID, b := range compactionTips { + if len(con.blocks[chainID]) > 0 { + if !b.Position.Older(&con.blocks[chainID][0].Position) { + overlapCount++ + } + } + } + if overlapCount == numChains { + con.isSynced = true + } else { + con.logger.Info("not overlap yet", + "overlap-count", overlapCount, + "num-chain", numChains, + "last-block", blocks[len(blocks)-1]) + } +} + +// ensureAgreementOverlapRound ensures the oldest blocks in each chain in +// con.blocks are all in the same round, for avoiding config change while +// syncing. +func (con *Consensus) ensureAgreementOverlapRound() bool { + if con.agreementRoundCut > 0 { + return true + } + con.lock.Lock() + defer con.lock.Unlock() + // Clean empty blocks on tips of chains. + for idx, bs := range con.blocks { + for len(bs) > 0 && con.isEmptyBlock(bs[0]) { + bs = bs[1:] + } + con.blocks[idx] = bs + } + // Build empty blocks. + for _, bs := range con.blocks { + for i := range bs { + if con.isEmptyBlock(bs[i]) { + if bs[i-1].Position.Height == bs[i].Position.Height-1 { + con.buildEmptyBlock(bs[i], bs[i-1]) + } + } + } + } + var tipRoundMap map[uint64]uint32 + for { + tipRoundMap = make(map[uint64]uint32) + for _, bs := range con.blocks { + if len(bs) > 0 { + tipRoundMap[bs[0].Position.Round]++ + } + } + if len(tipRoundMap) <= 1 { + break + } + // Make all tips in same round. + var maxRound uint64 + for r := range tipRoundMap { + if r > maxRound { + maxRound = r + } + } + for idx, bs := range con.blocks { + for len(bs) > 0 && bs[0].Position.Round < maxRound { + bs = bs[1:] + } + con.blocks[idx] = bs + } + } + if len(tipRoundMap) == 1 { + var r uint64 + for r = range tipRoundMap { + break + } + if tipRoundMap[r] == con.configs[r].NumChains { + con.agreementRoundCut = r + con.logger.Info("agreement round cut found, round", r) + return true + } + } + return false +} + +func (con *Consensus) findLatticeSyncBlock( + blocks []*types.Block) (*types.Block, error) { + var lastBlock = blocks[len(blocks)-1] + for { + // Find round r which r-1, r, r+1 are all in same total ordering config. + round := lastBlock.Position.Round + for { + sameAsPrevRound := round == 0 || !con.isConfigChanged( + con.configs[round-1], con.configs[round]) + sameAsNextRound := !con.isConfigChanged( + con.configs[round], con.configs[round+1]) + if sameAsPrevRound && sameAsNextRound { + break + } + if round == 0 { + // Unable to find a safe round, wait for new rounds. + return nil, nil + } + round-- + } + // Find the oldest block which round is "round". + for lastBlock.Position.Round != round { + if (lastBlock.Finalization.ParentHash == common.Hash{}) { + return nil, ErrGenesisBlockReached + } + b, err := con.db.Get(lastBlock.Finalization.ParentHash) + if err != nil { + return nil, err + } + lastBlock = &b + } + // Find the deliver set by hash for two times. Blocks in a deliver set + // returned by total ordering is sorted by hash. If a block's parent hash + // is greater than its hash means there is a cut between deliver sets. + var curBlock, prevBlock *types.Block + var deliverSetFirstBlock, deliverSetLastBlock *types.Block + curBlock = lastBlock + for { + if (curBlock.Finalization.ParentHash == common.Hash{}) { + return nil, ErrGenesisBlockReached + } + b, err := con.db.Get(curBlock.Finalization.ParentHash) + if err != nil { + return nil, err + } + prevBlock = &b + if !prevBlock.Hash.Less(curBlock.Hash) { + break + } + curBlock = prevBlock + } + deliverSetLastBlock = prevBlock + curBlock = prevBlock + for { + if (curBlock.Finalization.ParentHash == common.Hash{}) { + break + } + b, err := con.db.Get(curBlock.Finalization.ParentHash) + if err != nil { + return nil, err + } + prevBlock = &b + if !prevBlock.Hash.Less(curBlock.Hash) { + break + } + curBlock = prevBlock + } + deliverSetFirstBlock = curBlock + // Check if all blocks from deliverSetFirstBlock to deliverSetLastBlock + // are in the same round. + ok := true + curBlock = deliverSetLastBlock + for { + if curBlock.Position.Round != round { + ok = false + break + } + b, err := con.db.Get(curBlock.Finalization.ParentHash) + if err != nil { + return nil, err + } + curBlock = &b + if curBlock.Hash == deliverSetFirstBlock.Hash { + break + } + } + if ok { + return deliverSetFirstBlock, nil + } + if round == 0 { + return nil, nil + } + round-- + } +} + +// SyncBlocks syncs blocks from compaction chain, latest is true if the caller +// regards the blocks are the latest ones. Notice that latest can be true for +// many times. +// NOTICE: parameter "blocks" should be consecutive in compaction height. +func (con *Consensus) SyncBlocks( + blocks []*types.Block, latest bool) (*core.Consensus, error) { + if con.isSynced { + return nil, ErrAlreadySynced + } + if len(blocks) == 0 { + return nil, nil + } + // Check if blocks are consecutive. + for i := 1; i < len(blocks); i++ { + if blocks[i].Finalization.Height != blocks[i-1].Finalization.Height+1 { + return nil, ErrInvalidBlockOrder + } + } + con.logger.Info("syncBlocks", + "position", &blocks[0].Position, + "final height", blocks[0].Finalization.Height, + "len", len(blocks), + "latest", latest, + ) + con.setupConfigs(blocks) + for _, b := range blocks { + // TODO(haoping) remove this if lattice puts blocks into db. + if err := con.db.Put(*b); err != nil { + if err != blockdb.ErrBlockExists { + return nil, err + } + } + if con.lattice != nil { + con.lattice.ProcessFinalizedBlock(b) + } + } + if latest && con.lattice == nil { + // New Lattice and find the deliver set of total ordering when "latest" is + // true for first time. Deliver set is found by block hashes. + syncBlock, err := con.findLatticeSyncBlock(blocks) + if err != nil { + return nil, err + } + if syncBlock != nil { + con.logger.Info("deliver set found", syncBlock) + // New lattice with the round of syncBlock. + con.initConsensusObj(syncBlock) + // Process blocks from syncBlock to blocks' last block. + b := blocks[len(blocks)-1] + blocksCount := b.Finalization.Height - syncBlock.Finalization.Height + 1 + blocksToProcess := make([]*types.Block, blocksCount) + for { + blocksToProcess[blocksCount-1] = b + blocksCount-- + if b.Hash == syncBlock.Hash { + break + } + b1, err := con.db.Get(b.Finalization.ParentHash) + if err != nil { + return nil, err + } + b = &b1 + } + for _, b := range blocksToProcess { + con.lattice.ProcessFinalizedBlock(b) + } + } + } + if latest && con.ensureAgreementOverlapRound() { + // Check if compaction and agreements' blocks are overlapped. The + // overlapping of compaction chain and BA's oldest blocks means the + // syncing is done. + con.checkIfSynced(blocks) + } + if con.isSynced { + // Stop network and CRS routines, wait until they are all stoped. + con.ctxCancel() + con.moduleWaitGroup.Wait() + // Stop agreements. + con.stopAgreement() + // TODO: flush all blocks in con.blocks into core.Consensus, and build + // core.Consensus from syncer. + con.logger.Info("syncer.Consensus synced") + return &core.Consensus{}, nil + } + return nil, nil +} + +// isEmptyBlock checks if a block is an empty block by both its hash and parent +// hash are empty. +func (con *Consensus) isEmptyBlock(b *types.Block) bool { + return b.Hash == common.Hash{} && b.ParentHash == common.Hash{} +} + +// buildEmptyBlock builds an empty block in agreement. +func (con *Consensus) buildEmptyBlock(b *types.Block, parent *types.Block) { + cfg := con.configs[b.Position.Round] + b.Timestamp = parent.Timestamp.Add(cfg.MinBlockInterval) + b.Witness.Height = parent.Witness.Height + b.Witness.Data = make([]byte, len(parent.Witness.Data)) + copy(b.Witness.Data, parent.Witness.Data) + b.Acks = common.NewSortedHashes(common.Hashes{parent.Hash}) +} + +// setupConfigs is called by SyncBlocks with blocks from compaction chain. In +// the first time, setupConfigs setups from round 0. +func (con *Consensus) setupConfigs(blocks []*types.Block) { + // Find max round in blocks. + var maxRound uint64 + for _, b := range blocks { + if b.Position.Round > maxRound { + maxRound = b.Position.Round + } + } + // Get configs from governance. + untilRound := maxRound + core.ConfigRoundShift + curMaxNumChains := uint32(0) + func() { + con.lock.Lock() + defer con.lock.Unlock() + for r := uint64(len(con.configs)); r <= untilRound; r++ { + cfg := con.gov.Configuration(r) + if cfg == nil { + panic(fmt.Errorf( + "unable to get config for round: %v (syncer)", r)) + } + con.configs = append(con.configs, cfg) + con.roundBeginTimes = append( + con.roundBeginTimes, + con.roundBeginTimes[r-1].Add(con.configs[r-1].RoundInterval)) + if cfg.NumChains >= curMaxNumChains { + curMaxNumChains = cfg.NumChains + } + } + }() + con.resizeByNumChains(curMaxNumChains) + // Notify core.Lattice for new configs. + if con.lattice != nil { + for con.latticeLastRound+1 < uint64(len(con.configs)) { + con.latticeLastRound++ + if err := con.lattice.AppendConfig( + con.latticeLastRound, + con.configs[con.latticeLastRound]); err != nil { + panic(err) + } + } + } +} + +// resizeByNumChains resizes fake lattice and agreement if numChains increases. +// Notice the decreasing case is neglected. +func (con *Consensus) resizeByNumChains(numChains uint32) { + con.lock.Lock() + defer con.lock.Unlock() + if numChains > uint32(len(con.blocks)) { + for i := uint32(len(con.blocks)); i < numChains; i++ { + // Resize the pool of blocks. + con.blocks = append(con.blocks, types.ByPosition{}) + // Resize agreement modules. + a := newAgreement(con.receiveChan, con.pullChan, con.nodeSetCache, + &con.agreementWaitGroup, con.logger) + con.agreements = append(con.agreements, a) + go a.run() + } + } +} + +// startAgreement starts agreements for receiving votes and agreements. +func (con *Consensus) startAgreement(numChains uint32) { + // Start a routine for listening receive channel and pull block channel. + go func() { + for { + select { + case b, ok := <-con.receiveChan: + if !ok { + return + } + chainID := b.Position.ChainID + func() { + con.lock.Lock() + defer con.lock.Unlock() + // If round is cut in agreements, do not add blocks with round less + // then cut round. + if b.Position.Round < con.agreementRoundCut { + return + } + con.blocks[chainID] = append(con.blocks[chainID], b) + sort.Sort(con.blocks[chainID]) + }() + case h, ok := <-con.pullChan: + if !ok { + return + } + con.network.PullBlocks(common.Hashes{h}) + } + } + }() +} + +// startNetwork starts network for receiving blocks and agreement results. +func (con *Consensus) startNetwork() { + go func() { + con.moduleWaitGroup.Add(1) + defer con.moduleWaitGroup.Done() + Loop: + for { + select { + case val := <-con.network.ReceiveChan(): + var pos types.Position + switch v := val.(type) { + case *types.Block: + pos = v.Position + case *types.AgreementResult: + pos = v.Position + default: + continue Loop + } + func() { + con.lock.RLock() + defer con.lock.RUnlock() + if pos.ChainID >= uint32(len(con.agreements)) { + con.logger.Error("Unknown chainID message received (syncer)", + "position", &pos) + } + }() + con.agreements[pos.ChainID].inputChan <- val + case <-con.ctx.Done(): + return + } + } + }() +} + +// startCRSMonitor is the dummiest way to verify if the CRS for one round +// is ready or not. +func (con *Consensus) startCRSMonitor() { + var lastNotifiedRound uint64 + // Notify all agreements for new CRS. + notifyNewCRS := func(round uint64) { + if round == lastNotifiedRound { + return + } + con.logger.Info("CRS is ready", "round", round) + con.lock.RLock() + defer con.lock.RUnlock() + lastNotifiedRound = round + for _, a := range con.agreements { + a.inputChan <- round + } + } + go func() { + con.moduleWaitGroup.Add(1) + defer con.moduleWaitGroup.Done() + for { + select { + case <-con.ctx.Done(): + return + case <-time.After(1 * time.Second): + // Notify agreement modules for the latest round that CRS is + // available if the round is not notified yet. + var crsRound = lastNotifiedRound + for (con.gov.CRS(crsRound+1) != common.Hash{}) { + crsRound++ + } + notifyNewCRS(crsRound) + } + } + }() +} + +func (con *Consensus) stopAgreement() { + func() { + con.lock.Lock() + defer con.lock.Unlock() + for _, a := range con.agreements { + close(a.inputChan) + } + }() + con.agreementWaitGroup.Wait() + close(con.receiveChan) + close(con.pullChan) +} + +func (con *Consensus) isConfigChanged(prev, cur *types.Config) bool { + return prev.K != cur.K || + prev.NumChains != cur.NumChains || + prev.PhiRatio != cur.PhiRatio +} diff --git a/core/test/app.go b/core/test/app.go index 327555b..9f030e9 100644 --- a/core/test/app.go +++ b/core/test/app.go @@ -280,8 +280,8 @@ Loop: return nil } -// Check provides a backdoor to check status of App with reader lock. -func (app *App) Check(checker func(*App)) { +// WithLock provides a backdoor to check status of App with reader lock. +func (app *App) WithLock(function func(*App)) { app.confirmedLock.RLock() defer app.confirmedLock.RUnlock() app.totalOrderedLock.RLock() @@ -289,5 +289,5 @@ func (app *App) Check(checker func(*App)) { app.deliveredLock.RLock() defer app.deliveredLock.RUnlock() - checker(app) + function(app) } diff --git a/core/test/fake-transport.go b/core/test/fake-transport.go index a783ac9..388d0aa 100644 --- a/core/test/fake-transport.go +++ b/core/test/fake-transport.go @@ -15,23 +15,6 @@ // along with the dexon-consensus library. If not, see // <http://www.gnu.org/licenses/>. -// Copyright 2018 The dexon-consensus Authors -// This file is part of the dexon-consensus library. -// -// The dexon-consensus library is free software: you can redistribute it and/or -// modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus library. If not, see -// <http://www.gnu.org/licenses/>. - package test import ( diff --git a/core/test/interface.go b/core/test/interface.go index e7a8f70..dcecee8 100644 --- a/core/test/interface.go +++ b/core/test/interface.go @@ -15,23 +15,6 @@ // along with the dexon-consensus library. If not, see // <http://www.gnu.org/licenses/>. -// Copyright 2018 The dexon-consensus Authors -// This file is part of the dexon-consensus library. -// -// The dexon-consensus library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus library. If not, see -// <http://www.gnu.org/licenses/>. - package test import ( diff --git a/core/test/network.go b/core/test/network.go index 8bcb050..207d8a4 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -437,6 +437,11 @@ func (n *Network) Broadcast(msg interface{}) { } } +// Send exports 'Send' method of Transport. +func (n *Network) Send(nodeID types.NodeID, msg interface{}) error { + return n.trans.Send(nodeID, msg) +} + // ReceiveChanForNode returns a channel for messages not handled by // core.Consensus. func (n *Network) ReceiveChanForNode() <-chan interface{} { diff --git a/core/test/revealer.go b/core/test/revealer.go index b39a898..dcd75a5 100644 --- a/core/test/revealer.go +++ b/core/test/revealer.go @@ -15,26 +15,10 @@ // along with the dexon-consensus library. If not, see // <http://www.gnu.org/licenses/>. -// Copyright 2018 The dexon-consensus Authors -// This file is part of the dexon-consensus library. -// -// The dexon-consensus library is free software: you can redistribute it and/or -// modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus library. If not, see -// <http://www.gnu.org/licenses/>. - package test import ( + "errors" "math/rand" "sort" "time" @@ -44,6 +28,11 @@ import ( "github.com/dexon-foundation/dexon-consensus/core/types" ) +// Errors returns from revealer. +var ( + ErrNotValidCompactionChain = errors.New("not valid compaction chain") +) + // isAllAckingBlockRevealed is a helper to check if all acking blocks of // one block are revealed. func isAllAckingBlockRevealed( @@ -280,3 +269,63 @@ func (r *RandomTipRevealer) Reset() { r.chainTip[i] = 0 } } + +// CompactionChainRevealer implements Revealer interface, which would load +// all blocks from blockdb, reveal them in the order of compaction chain, from +// the genesis block to the latest one. +type CompactionChainRevealer struct { + blocks types.ByFinalizationHeight + nextRevealIndex int +} + +// NewCompactionChainRevealer constructs a revealer in the order of compaction +// chain. +func NewCompactionChainRevealer(iter blockdb.BlockIterator, + startHeight uint64) (r *CompactionChainRevealer, err error) { + blocksByHash, err := loadAllBlocks(iter) + if err != nil { + return + } + if startHeight == 0 { + startHeight = 1 + } + blocks := types.ByFinalizationHeight{} + for _, b := range blocksByHash { + if b.Finalization.Height < startHeight { + continue + } + blocks = append(blocks, b) + } + sort.Sort(types.ByFinalizationHeight(blocks)) + // Make sure the finalization height of blocks are incremental with step 1. + for idx, b := range blocks { + if idx == 0 { + continue + } + if b.Finalization.Height != blocks[idx-1].Finalization.Height+1 { + err = ErrNotValidCompactionChain + return + } + } + r = &CompactionChainRevealer{ + blocks: blocks, + } + r.Reset() + return +} + +// Next implements Revealer.Next method, which would reveal blocks in the order +// of compaction chain. +func (r *CompactionChainRevealer) Next() (types.Block, error) { + if r.nextRevealIndex == len(r.blocks) { + return types.Block{}, blockdb.ErrIterationFinished + } + b := r.blocks[r.nextRevealIndex] + r.nextRevealIndex++ + return *b, nil +} + +// Reset implement Revealer.Reset method, which would reset revealing. +func (r *CompactionChainRevealer) Reset() { + r.nextRevealIndex = 0 +} diff --git a/core/test/revealer_test.go b/core/test/revealer_test.go index d9920c7..5a1bc07 100644 --- a/core/test/revealer_test.go +++ b/core/test/revealer_test.go @@ -157,6 +157,66 @@ func (s *RevealerTestSuite) TestRandomTipReveal() { s.baseTest(revealer, 10, checkFunc) } +func (s *RevealerTestSuite) TestCompactionChainReveal() { + db, err := blockdb.NewMemBackedBlockDB() + s.Require().NoError(err) + // Put several blocks with finalization field ready. + b1 := &types.Block{ + Hash: common.NewRandomHash(), + Finalization: types.FinalizationResult{ + Height: 1, + }} + b2 := &types.Block{ + Hash: common.NewRandomHash(), + Finalization: types.FinalizationResult{ + ParentHash: b1.Hash, + Height: 2, + }} + b3 := &types.Block{ + Hash: common.NewRandomHash(), + Finalization: types.FinalizationResult{ + ParentHash: b2.Hash, + Height: 3, + }} + s.Require().NoError(db.Put(*b1)) + s.Require().NoError(db.Put(*b3)) + iter, err := db.GetAll() + s.Require().NoError(err) + // The compaction chain is not complete, we can't construct a revealer + // instance successfully. + r, err := NewCompactionChainRevealer(iter, 0) + s.Require().Nil(r) + s.Require().IsType(ErrNotValidCompactionChain, err) + // Put a block to make the compaction chain complete. + s.Require().NoError(db.Put(*b2)) + // We can construct that revealer now. + iter, err = db.GetAll() + s.Require().NoError(err) + r, err = NewCompactionChainRevealer(iter, 0) + s.Require().NotNil(r) + s.Require().NoError(err) + // The revealing order should be ok. + chk := func(h uint64) { + b, err := r.Next() + s.Require().NoError(err) + s.Require().Equal(b.Finalization.Height, h) + } + chk(1) + chk(2) + chk(3) + // Iteration should be finished + _, err = r.Next() + s.Require().IsType(blockdb.ErrIterationFinished, err) + // Test 'startHeight' parameter. + iter, err = db.GetAll() + s.Require().NoError(err) + r, err = NewCompactionChainRevealer(iter, 2) + s.Require().NotNil(r) + s.Require().NoError(err) + chk(2) + chk(3) +} + func TestRevealer(t *testing.T) { suite.Run(t, new(RevealerTestSuite)) } diff --git a/core/test/stopper.go b/core/test/stopper.go index abc93c3..40868d8 100644 --- a/core/test/stopper.go +++ b/core/test/stopper.go @@ -61,7 +61,7 @@ func (s *StopByConfirmedBlocks) ShouldStop(nID types.NodeID) bool { lastChecked := s.lastCheckDelivered[nID] currentConfirmedBlocks := s.confirmedBlocks[nID] db := s.dbs[nID] - s.apps[nID].Check(func(app *App) { + s.apps[nID].WithLock(func(app *App) { for _, h := range app.DeliverSequence[lastChecked:] { b, err := db.Get(h) if err != nil { @@ -116,7 +116,7 @@ func (s *StopByRound) ShouldStop(nID types.NodeID) bool { if curRound := s.currentRounds[nID]; curRound < s.untilRound { lastChecked := s.lastCheckDelivered[nID] db := s.dbs[nID] - s.apps[nID].Check(func(app *App) { + s.apps[nID].WithLock(func(app *App) { for _, h := range app.DeliverSequence[lastChecked:] { b, err := db.Get(h) if err != nil { diff --git a/core/total-ordering-syncer_test.go b/core/total-ordering-syncer_test.go index 5f9fd0f..92b5dac 100644 --- a/core/total-ordering-syncer_test.go +++ b/core/total-ordering-syncer_test.go @@ -44,7 +44,7 @@ func (s *TotalOrderingSyncerTestSuite) genDeliverySet(numChains uint32) ( RoundInterval: 1000 * time.Second, } - to := newTotalOrdering(genesisTime, genesisConfig) + to := newTotalOrdering(genesisTime, 0, genesisConfig) gen := test.NewBlocksGenerator(&test.BlocksGeneratorConfig{ NumChains: numChains, diff --git a/core/total-ordering.go b/core/total-ordering.go index 90848ce..52f9270 100644 --- a/core/total-ordering.go +++ b/core/total-ordering.go @@ -44,9 +44,9 @@ const ( ) var ( - // ErrNotValidDAG would be reported when block subbmitted to totalOrdering + // ErrInvalidDAG is reported when block subbmitted to totalOrdering // didn't form a DAG. - ErrNotValidDAG = errors.New("not a valid dag") + ErrInvalidDAG = errors.New("invalid dag") // ErrFutureRoundDelivered means some blocks from later rounds are // delivered, this means program error. ErrFutureRoundDelivered = errors.New("future round delivered") @@ -347,7 +347,7 @@ func (v *totalOrderingCandidateInfo) addBlock(b *types.Block) error { rec.count = 1 } else { if b.Position.Height <= rec.minHeight { - return ErrNotValidDAG + return ErrInvalidDAG } rec.count++ } @@ -640,12 +640,12 @@ func (global *totalOrderingGlobalVector) addBlock( if tip != nil { // Perform light weight sanity check based on tip. if tip.Position.Round > b.Position.Round { - err = ErrNotValidDAG + err = ErrInvalidDAG return } if DiffUint64(tip.Position.Round, b.Position.Round) > 1 { if b.Position.Height != 0 { - err = ErrNotValidDAG + err = ErrInvalidDAG return } // Add breakpoint. @@ -657,7 +657,7 @@ func (global *totalOrderingGlobalVector) addBlock( }) } else { if b.Position.Height != tip.Position.Height+1 { - err = ErrNotValidDAG + err = ErrInvalidDAG return } } @@ -792,9 +792,9 @@ type totalOrdering struct { } // newTotalOrdering constructs an totalOrdering instance. -func newTotalOrdering(dMoment time.Time, cfg *types.Config) *totalOrdering { +func newTotalOrdering(dMoment time.Time, round uint64, cfg *types.Config) *totalOrdering { config := &totalOrderingConfig{} - config.fromConfig(0, cfg) + config.fromConfig(round, cfg) config.setRoundBeginTime(dMoment) candidates := make([]*totalOrderingCandidateInfo, config.numChains) to := &totalOrdering{ @@ -816,6 +816,7 @@ func newTotalOrdering(dMoment time.Time, cfg *types.Config) *totalOrdering { // round R, next time you can only add the config for round R+1. func (to *totalOrdering) appendConfig( round uint64, config *types.Config) error { + if round != uint64(len(to.configs))+to.configs[0].roundID { return ErrRoundNotIncreasing } diff --git a/core/total-ordering_test.go b/core/total-ordering_test.go index ac8956a..77bf21a 100644 --- a/core/total-ordering_test.go +++ b/core/total-ordering_test.go @@ -157,7 +157,7 @@ func (s *TotalOrderingTestSuite) TestBlockRelation() { NumChains: uint32(len(nodes)), } genesisTime := time.Now().UTC() - to := newTotalOrdering(genesisTime, genesisConfig) + to := newTotalOrdering(genesisTime, 0, genesisConfig) s.checkNotDeliver(to, blockA) s.checkNotDeliver(to, blockB) s.checkNotDeliver(to, blockC) @@ -345,7 +345,7 @@ func (s *TotalOrderingTestSuite) TestCycleDetection() { NumChains: uint32(len(nodes)), } genesisTime := time.Now().UTC() - to := newTotalOrdering(genesisTime, genesisConfig) + to := newTotalOrdering(genesisTime, 0, genesisConfig) s.checkNotDeliver(to, b00) s.checkNotDeliver(to, b01) s.checkNotDeliver(to, b02) @@ -375,7 +375,7 @@ func (s *TotalOrderingTestSuite) TestEarlyDeliver() { NumChains: uint32(len(nodes)), } genesisTime := time.Now().UTC() - to := newTotalOrdering(genesisTime, genesisConfig) + to := newTotalOrdering(genesisTime, 0, genesisConfig) genNextBlock := func(b *types.Block) *types.Block { return &types.Block{ ProposerID: b.ProposerID, @@ -487,7 +487,7 @@ func (s *TotalOrderingTestSuite) TestBasicCaseForK2() { NumChains: uint32(len(nodes)), } genesisTime := time.Now().UTC() - to := newTotalOrdering(genesisTime, genesisConfig) + to := newTotalOrdering(genesisTime, 0, genesisConfig) // Setup blocks. b00 := s.genGenesisBlock(nodes, 0, common.Hashes{}) b10 := s.genGenesisBlock(nodes, 1, common.Hashes{}) @@ -830,7 +830,7 @@ func (s *TotalOrderingTestSuite) TestBasicCaseForK0() { } req = s.Require() genesisTime = time.Now().UTC() - to = newTotalOrdering(genesisTime, genesisConfig) + to = newTotalOrdering(genesisTime, 0, genesisConfig) ) // Setup blocks. b00 := s.genGenesisBlock(nodes, 0, common.Hashes{}) @@ -992,7 +992,7 @@ func (s *TotalOrderingTestSuite) TestRandomlyGeneratedBlocks() { PhiRatio: phi, NumChains: numChains, } - to := newTotalOrdering(genesisTime, genesisConfig) + to := newTotalOrdering(genesisTime, 0, genesisConfig) // Add config for next round. s.Require().NoError(to.appendConfig(1, &types.Config{ K: 0, @@ -1010,7 +1010,7 @@ func (s *TotalOrderingTestSuite) TestRandomlyGeneratedBlocks() { PhiRatio: phi, NumChains: numChains, } - to := newTotalOrdering(genesisTime, genesisConfig) + to := newTotalOrdering(genesisTime, 0, genesisConfig) // Add config for next round. s.Require().NoError(to.appendConfig(1, &types.Config{ K: 1, @@ -1028,7 +1028,7 @@ func (s *TotalOrderingTestSuite) TestRandomlyGeneratedBlocks() { PhiRatio: phi, NumChains: numChains, } - to := newTotalOrdering(genesisTime, genesisConfig) + to := newTotalOrdering(genesisTime, 0, genesisConfig) s.Require().NoError(to.appendConfig(1, &types.Config{ K: 2, PhiRatio: 0.5, @@ -1045,7 +1045,7 @@ func (s *TotalOrderingTestSuite) TestRandomlyGeneratedBlocks() { PhiRatio: phi, NumChains: numChains, } - to := newTotalOrdering(genesisTime, genesisConfig) + to := newTotalOrdering(genesisTime, 0, genesisConfig) s.Require().NoError(to.appendConfig(1, &types.Config{ K: 3, PhiRatio: 0.5, @@ -1085,7 +1085,7 @@ func (s *TotalOrderingTestSuite) baseTestForRoundChange( revealingSequence := make(map[string]struct{}) orderingSequence := make(map[string]struct{}) for i := 0; i < repeat; i++ { - to := newTotalOrdering(genesisTime, configs[0]) + to := newTotalOrdering(genesisTime, 0, configs[0]) for roundID, config := range configs[1:] { req.NoError(to.appendConfig(uint64(roundID+1), config)) } @@ -1212,7 +1212,7 @@ func (s *TotalOrderingTestSuite) TestSync() { PhiRatio: 0.67, NumChains: numChains, } - to1 := newTotalOrdering(genesisTime, genesisConfig) + to1 := newTotalOrdering(genesisTime, 0, genesisConfig) s.Require().NoError(to1.appendConfig(1, &types.Config{ K: 0, PhiRatio: 0.5, @@ -1236,7 +1236,7 @@ func (s *TotalOrderingTestSuite) TestSync() { } // Run new total ordering again. offset := len(deliveredBlockSets1) / 2 - to2 := newTotalOrdering(genesisTime, genesisConfig) + to2 := newTotalOrdering(genesisTime, 0, genesisConfig) s.Require().NoError(to2.appendConfig(1, &types.Config{ K: 0, PhiRatio: 0.5, @@ -1338,7 +1338,7 @@ func (s *TotalOrderingTestSuite) TestSyncWithConfigChange() { blocks = append(blocks, &b) } - to1 := newTotalOrdering(genesisTime, configs[0]) + to1 := newTotalOrdering(genesisTime, 0, configs[0]) for i, cfg := range configs[1:] { req.NoError(to1.appendConfig(uint64(i+1), cfg)) } @@ -1366,7 +1366,7 @@ func (s *TotalOrderingTestSuite) TestSyncWithConfigChange() { // or nothing is tested. req.True(uint64(0) < offsetRound && offsetRound < uint64(len(configs)-1)) - to2 := newTotalOrdering(genesisTime, configs[0]) + to2 := newTotalOrdering(genesisTime, 0, configs[0]) for i, cfg := range configs[1:] { req.NoError(to2.appendConfig(uint64(i+1), cfg)) } diff --git a/core/utils.go b/core/utils.go index 441aac1..bc5e336 100644 --- a/core/utils.go +++ b/core/utils.go @@ -167,6 +167,51 @@ func VerifyBlock(b *types.Block) (err error) { return } +// VerifyAgreementResult perform sanity check against a types.AgreementResult +// instance. +func VerifyAgreementResult( + res *types.AgreementResult, cache *utils.NodeSetCache) error { + notarySet, err := cache.GetNotarySet( + res.Position.Round, res.Position.ChainID) + if err != nil { + return err + } + if len(res.Votes) < len(notarySet)/3*2+1 { + return ErrNotEnoughVotes + } + if len(res.Votes) > len(notarySet) { + return ErrIncorrectVoteProposer + } + for _, vote := range res.Votes { + if res.IsEmptyBlock { + if (vote.BlockHash != common.Hash{}) { + return ErrIncorrectVoteBlockHash + } + } else { + if vote.BlockHash != res.BlockHash { + return ErrIncorrectVoteBlockHash + } + } + if vote.Type != types.VoteCom { + return ErrIncorrectVoteType + } + if vote.Position != res.Position { + return ErrIncorrectVotePosition + } + if _, exist := notarySet[vote.ProposerID]; !exist { + return ErrIncorrectVoteProposer + } + ok, err := verifyVoteSignature(&vote) + if err != nil { + return err + } + if !ok { + return ErrIncorrectVoteSignature + } + } + return nil +} + // DiffUint64 calculates difference between two uint64. func DiffUint64(a, b uint64) uint64 { if a > b { diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go index 8fd3fa4..6432e9f 100644 --- a/integration_test/consensus_test.go +++ b/integration_test/consensus_test.go @@ -18,6 +18,7 @@ package integration import ( + "context" "sync" "testing" "time" @@ -26,6 +27,7 @@ import ( "github.com/dexon-foundation/dexon-consensus/core" "github.com/dexon-foundation/dexon-consensus/core/blockdb" "github.com/dexon-foundation/dexon-consensus/core/crypto" + "github.com/dexon-foundation/dexon-consensus/core/syncer" "github.com/dexon-foundation/dexon-consensus/core/test" "github.com/dexon-foundation/dexon-consensus/core/types" "github.com/dexon-foundation/dexon-consensus/core/utils" @@ -39,10 +41,12 @@ type ConsensusTestSuite struct { } type node struct { - con *core.Consensus - app *test.App - gov *test.Governance - db blockdb.BlockDatabase + ID types.NodeID + con *core.Consensus + app *test.App + gov *test.Governance + db blockdb.BlockDatabase + network *test.Network } func (s *ConsensusTestSuite) setupNodes( @@ -81,8 +85,9 @@ func (s *ConsensusTestSuite) setupNodes( db, networkModule, k, - &common.NullLogger{}) - nodes[con.ID] = &node{con, app, gov, db} + &common.NullLogger{}, + ) + nodes[con.ID] = &node{con.ID, con, app, gov, db, networkModule} go func() { defer wg.Done() s.Require().NoError(networkModule.Setup(serverChannel)) @@ -107,6 +112,53 @@ func (s *ConsensusTestSuite) verifyNodes(nodes map[types.NodeID]*node) { } } +func (s *ConsensusTestSuite) syncBlocksWithSomeNode( + sourceNode *node, syncerObj *syncer.Consensus, nextSyncHeight uint64) ( + syncedCon *core.Consensus, syncerHeight uint64, err error) { + + syncerHeight = nextSyncHeight + // Setup revealer. + DBAll, err := sourceNode.db.GetAll() + if err != nil { + return + } + r, err := test.NewCompactionChainRevealer(DBAll, nextSyncHeight) + if err != nil { + return + } + // Load all blocks from revealer and dump them into syncer. + var compactionChainBlocks []*types.Block + syncBlocks := func() (done bool) { + syncedCon, err = syncerObj.SyncBlocks(compactionChainBlocks, true) + if syncedCon != nil || err != nil { + done = true + } + compactionChainBlocks = nil + return + } + for { + var b types.Block + b, err = r.Next() + if err != nil { + if err == blockdb.ErrIterationFinished { + err = nil + if syncBlocks() { + break + } + } + break + } + syncerHeight = b.Finalization.Height + 1 + compactionChainBlocks = append(compactionChainBlocks, &b) + if len(compactionChainBlocks) >= 100 { + if syncBlocks() { + break + } + } + } + return +} + func (s *ConsensusTestSuite) TestSimple() { // The simplest test case: // - Node set is equals to DKG set and notary set for each chain in each @@ -143,7 +195,7 @@ Loop: s.T().Log("check latest position delivered by each node") for _, n := range nodes { latestPos := n.app.GetLatestDeliveredPosition() - s.T().Log("latestPos", n.con.ID, &latestPos) + s.T().Log("latestPos", n.ID, &latestPos) if latestPos.Round < untilRound { continue Loop } @@ -152,6 +204,7 @@ Loop: break } s.verifyNodes(nodes) + // TODO(haoping) stop consensus. } func (s *ConsensusTestSuite) TestNumChainsChange() { @@ -215,7 +268,7 @@ Loop: s.T().Log("check latest position delivered by each node") for _, n := range nodes { latestPos := n.app.GetLatestDeliveredPosition() - s.T().Log("latestPos", n.con.ID, &latestPos) + s.T().Log("latestPos", n.ID, &latestPos) if latestPos.Round < untilRound { continue Loop } @@ -226,6 +279,150 @@ Loop: s.verifyNodes(nodes) } +func (s *ConsensusTestSuite) TestSync() { + // The sync test case: + // - No configuration change. + // - One node does not run when all others starts until aliveRound exceeded. + var ( + req = s.Require() + peerCount = 4 + dMoment = time.Now().UTC() + untilRound = uint64(5) + aliveRound = uint64(1) + errChan = make(chan error, 100) + ) + prvKeys, pubKeys, err := test.NewKeys(peerCount) + req.NoError(err) + // Setup seed governance instance. Give a short latency to make this test + // run faster. + seedGov, err := test.NewGovernance( + pubKeys, 100*time.Millisecond, core.ConfigRoundShift) + req.NoError(err) + req.NoError(seedGov.State().RequestChange( + test.StateChangeRoundInterval, 30*time.Second)) + // A short round interval. + nodes := s.setupNodes(dMoment, prvKeys, seedGov) + // Choose the first node as "syncNode" that its consensus' Run() is called + // later. + syncNode := nodes[types.NewNodeID(pubKeys[0])] + syncNode.con = nil + // Use other node's governance instance. Normally, fullnode would make + // governance when syncing. In our test, it's the simplest way to achieve + // that. + syncNode.gov = nodes[types.NewNodeID(pubKeys[1])].gov + for _, n := range nodes { + if n.ID != syncNode.ID { + go n.con.Run(&types.Block{}) + } + } + // Clean syncNode's network receive channel, or it might exceed the limit + // and block other go routines. + dummyReceiverCtx, dummyReceiverCtxCancel := context.WithCancel( + context.Background()) + go func() { + Loop: + for { + select { + case <-syncNode.network.ReceiveChan(): + case <-dummyReceiverCtx.Done(): + break Loop + } + } + }() +ReachAlive: + for { + // If all nodes excepts syncNode have reached aliveRound, call syncNode's + // Run() and send it all blocks in one of normal node's compaction chain. + for id, n := range nodes { + if id == syncNode.ID { + continue + } + if n.app.GetLatestDeliveredPosition().Round < aliveRound { + continue ReachAlive + } + // Check if any error happened or sleep for a period of time. + select { + case err := <-errChan: + req.NoError(err) + case <-time.After(5 * time.Second): + } + } + dummyReceiverCtxCancel() + break + } + // Initiate Syncer. + runnerCtx, runnerCtxCancel := context.WithCancel(context.Background()) + defer runnerCtxCancel() + syncerObj := syncer.NewConsensus( + dMoment, + syncNode.app, + syncNode.gov, + syncNode.db, + syncNode.network, + prvKeys[0], + &common.NullLogger{}, + ) + // Initialize communication channel, it's not recommended to assertion in + // another go routine. + go func() { + var ( + node *node + syncedHeight uint64 + err error + syncedCon *core.Consensus + ) + SyncLoop: + for { + for _, n := range nodes { + if n.ID == syncNode.ID { + continue + } + node = n + break + } + syncedCon, syncedHeight, err = s.syncBlocksWithSomeNode( + node, syncerObj, syncedHeight) + if syncedCon != nil { + // TODO(mission): run it and make sure it can follow up with + // other nodes. + runnerCtxCancel() + break SyncLoop + } + if err != nil { + errChan <- err + break SyncLoop + } + select { + case <-runnerCtx.Done(): + break SyncLoop + case <-time.After(2 * time.Second): + } + } + }() + // Wait until all nodes reach 'untilRound'. + go func() { + ReachFinished: + for { + time.Sleep(5 * time.Second) + for _, n := range nodes { + if n.app.GetLatestDeliveredPosition().Round < untilRound { + continue ReachFinished + } + } + break + } + runnerCtxCancel() + }() + // Block until any reasonable testing milestone reached. + select { + case err := <-errChan: + req.NoError(err) + case <-runnerCtx.Done(): + // This test passed. + // TODO(haoping) stop consensus. + } +} + func TestConsensus(t *testing.T) { suite.Run(t, new(ConsensusTestSuite)) } diff --git a/integration_test/stats.go b/integration_test/stats.go index 79f1c20..5d66e29 100644 --- a/integration_test/stats.go +++ b/integration_test/stats.go @@ -1,3 +1,20 @@ +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// <http://www.gnu.org/licenses/>. + package integration import ( @@ -58,7 +75,7 @@ func (s *StatsSet) newBlockReceiveEvent( // Find statistics from test.App block := payload.PiggyBack.(*types.Block) - app.Check(func(app *test.App) { + app.WithLock(func(app *test.App) { // Is this block confirmed? if _, exists := app.Confirmed[block.Hash]; !exists { return diff --git a/integration_test/stats_test.go b/integration_test/stats_test.go index d61799b..5c1f412 100644 --- a/integration_test/stats_test.go +++ b/integration_test/stats_test.go @@ -1,3 +1,20 @@ +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// <http://www.gnu.org/licenses/>. + package integration import ( diff --git a/integration_test/utils.go b/integration_test/utils.go index 1530507..a6cc139 100644 --- a/integration_test/utils.go +++ b/integration_test/utils.go @@ -1,3 +1,20 @@ +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// <http://www.gnu.org/licenses/>. + package integration import ( |