From 2354d4aa747616a8fd4fb9482ada8042fd362139 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Fri, 22 Feb 2019 13:14:55 +0800 Subject: core: Remove K, Phi and NumChains from Governance (#198) * change default sync_core.sh * vendor: sync to latest core * core: Remove K, Phi and NumChain --- .../dexon-consensus/common/utils.go | 10 + .../dexon-consensus/core/agreement-mgr.go | 270 ++-- .../dexon-consensus/core/agreement.go | 47 +- .../dexon-consensus/core/blockchain.go | 621 +++++++++ .../dexon-consensus/core/blockpool.go | 79 -- .../dexon-consensus/core/compaction-chain.go | 291 ----- .../dexon-consensus/core/configuration-chain.go | 5 - .../dexon-consensus/core/consensus-timestamp.go | 162 --- .../dexon-consensus/core/consensus.go | 332 ++--- .../dexon-consensus/core/dkg-tsig-protocol.go | 12 + .../dexon-consensus/core/interfaces.go | 3 - .../dexon-consensus/core/lattice-data.go | 683 ---------- .../dexon-consensus/core/lattice.go | 363 ------ .../dexon-consensus/core/negative-ack.go | 211 ---- .../dexon-consensus/core/nonblocking.go | 16 - .../dexon-consensus/core/syncer/consensus.go | 572 ++------- .../dexon-consensus/core/total-ordering-syncer.go | 177 --- .../dexon-consensus/core/total-ordering.go | 1321 -------------------- .../dexon-consensus/core/types/block-randomness.go | 4 +- .../dexon-consensus/core/types/block.go | 32 +- .../dexon-consensus/core/types/dkg/dkg.go | 58 + .../dexon-consensus/core/types/nodeset.go | 18 +- .../dexon-consensus/core/types/position.go | 17 +- .../dexon-consensus/core/types/vote.go | 2 +- .../dexon-consensus/core/utils/nodeset-cache.go | 55 +- 25 files changed, 1137 insertions(+), 4224 deletions(-) create mode 100644 vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go delete mode 100644 vendor/github.com/dexon-foundation/dexon-consensus/core/blockpool.go delete mode 100644 vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go delete mode 100644 vendor/github.com/dexon-foundation/dexon-consensus/core/consensus-timestamp.go delete mode 100644 vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go delete mode 100644 vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go delete mode 100644 vendor/github.com/dexon-foundation/dexon-consensus/core/negative-ack.go delete mode 100644 vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering-syncer.go delete mode 100644 vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go (limited to 'vendor/github.com') diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/common/utils.go b/vendor/github.com/dexon-foundation/dexon-consensus/common/utils.go index 63d25a3fc..e46b3e9c9 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/common/utils.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/common/utils.go @@ -19,3 +19,13 @@ func NewRandomHash() Hash { } return x } + +// GenerateRandomBytes generates bytes randomly. +func GenerateRandomBytes() []byte { + randomness := make([]byte, 32) + _, err := rand.Read(randomness) + if err != nil { + panic(err) + } + return randomness +} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go index bcf10139c..5f5b9ae5f 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go @@ -33,6 +33,8 @@ import ( // Errors returned from BA modules var ( ErrPreviousRoundIsNotFinished = errors.New("previous round is not finished") + ErrRoundOutOfRange = errors.New("round out of range") + ErrInvalidBlock = errors.New("invalid block") ) const maxResultCache = 100 @@ -44,7 +46,7 @@ func genValidLeader( if block.Timestamp.After(time.Now()) { return false, nil } - if err := mgr.lattice.SanityCheck(block, true); err != nil { + if err := mgr.bcModule.sanityCheck(block); err != nil { if err == ErrRetrySanityCheckLater { return false, nil } @@ -64,7 +66,6 @@ func genValidLeader( type agreementMgrConfig struct { beginTime time.Time - numChains uint32 roundInterval time.Duration notarySetSize uint32 lambdaBA time.Duration @@ -72,7 +73,6 @@ type agreementMgrConfig struct { } type baRoundSetting struct { - chainID uint32 notarySet map[types.NodeID]struct{} agr *agreement recv *consensusBAReceiver @@ -90,28 +90,17 @@ type agreementMgr struct { logger common.Logger cache *utils.NodeSetCache signer *utils.Signer - lattice *Lattice + bcModule *blockChain ctx context.Context lastEndTime time.Time initRound uint64 configs []*agreementMgrConfig - baModules []*agreement + baModule *agreement processedBAResult map[types.Position]struct{} - voteFilters []*utils.VoteFilter + voteFilter *utils.VoteFilter waitGroup sync.WaitGroup - pendingVotes map[uint64][]*types.Vote - pendingBlocks map[uint64][]*types.Block isRunning bool - - // This lock should be used when attempting to: - // - add a new baModule. - // - remove all baModules when stopping. In this case, the cleaner need - // to wait for all routines runnning baModules finished. - // - access a method of baModule. - // - append a config from new round. - // The routine running corresponding baModule, however, doesn't have to - // acquire this lock. - lock sync.RWMutex + lock sync.RWMutex } func newAgreementMgr(con *Consensus, initRound uint64, @@ -125,7 +114,7 @@ func newAgreementMgr(con *Consensus, initRound uint64, logger: con.logger, cache: con.nodeSetCache, signer: con.signer, - lattice: con.lattice, + bcModule: con.bcModule, ctx: con.ctx, initRound: initRound, lastEndTime: initRoundBeginTime, @@ -133,6 +122,20 @@ func newAgreementMgr(con *Consensus, initRound uint64, } } +func (mgr *agreementMgr) run() { + mgr.lock.Lock() + defer mgr.lock.Unlock() + if mgr.isRunning { + return + } + mgr.isRunning = true + mgr.waitGroup.Add(1) + go func() { + defer mgr.waitGroup.Done() + mgr.runBA(mgr.initRound) + }() +} + func (mgr *agreementMgr) getConfig(round uint64) *agreementMgrConfig { mgr.lock.RLock() defer mgr.lock.RUnlock() @@ -146,22 +149,6 @@ func (mgr *agreementMgr) getConfig(round uint64) *agreementMgrConfig { return mgr.configs[roundIndex] } -func (mgr *agreementMgr) run() { - mgr.lock.Lock() - defer mgr.lock.Unlock() - if mgr.isRunning { - return - } - mgr.isRunning = true - for i := uint32(0); i < uint32(len(mgr.baModules)); i++ { - mgr.waitGroup.Add(1) - go func(idx uint32) { - defer mgr.waitGroup.Done() - mgr.runBA(mgr.initRound, idx) - }(i) - } -} - func (mgr *agreementMgr) appendConfig( round uint64, config *types.Config, crs common.Hash) (err error) { mgr.lock.Lock() @@ -171,7 +158,6 @@ func (mgr *agreementMgr) appendConfig( } newConfig := &agreementMgrConfig{ beginTime: mgr.lastEndTime, - numChains: config.NumChains, roundInterval: config.RoundInterval, notarySetSize: config.NotarySetSize, lambdaBA: config.LambdaBA, @@ -179,80 +165,61 @@ func (mgr *agreementMgr) appendConfig( } mgr.configs = append(mgr.configs, newConfig) mgr.lastEndTime = mgr.lastEndTime.Add(config.RoundInterval) - // Create baModule for newly added chain. - for i := uint32(len(mgr.baModules)); i < newConfig.numChains; i++ { - // Prepare modules. - recv := &consensusBAReceiver{ - consensus: mgr.con, - chainID: i, - restartNotary: make(chan types.Position, 1), - roundValue: &atomic.Value{}, - } - recv.roundValue.Store(uint64(0)) - agrModule := newAgreement( - mgr.con.ID, - recv, - newLeaderSelector(genValidLeader(mgr), mgr.logger), - mgr.signer, - mgr.logger) - // Hacky way to initialize first notarySet. - nodes, err := mgr.cache.GetNodeSet(round) - if err != nil { - return err - } - agrModule.notarySet = nodes.GetSubSet( - int(config.NotarySetSize), - types.NewNotarySetTarget(crs, i)) - // Hacky way to make agreement module self contained. - recv.agreementModule = agrModule - mgr.baModules = append(mgr.baModules, agrModule) - mgr.voteFilters = append(mgr.voteFilters, utils.NewVoteFilter()) - if mgr.isRunning { - mgr.waitGroup.Add(1) - go func(idx uint32) { - defer mgr.waitGroup.Done() - mgr.runBA(round, idx) - }(i) - } + // Prepare modules. + if mgr.baModule != nil { + return nil } + recv := &consensusBAReceiver{ + consensus: mgr.con, + restartNotary: make(chan types.Position, 1), + roundValue: &atomic.Value{}, + } + recv.roundValue.Store(uint64(0)) + agrModule := newAgreement( + mgr.con.ID, + recv, + newLeaderSelector(genValidLeader(mgr), mgr.logger), + mgr.signer, + mgr.logger) + // Hacky way to initialize first notarySet. + nodes, err := mgr.cache.GetNodeSet(round) + if err != nil { + return err + } + agrModule.notarySet = nodes.GetSubSet( + int(config.NotarySetSize), types.NewNotarySetTarget(crs)) + // Hacky way to make agreement module self contained. + recv.agreementModule = agrModule + mgr.baModule = agrModule + mgr.voteFilter = utils.NewVoteFilter() return nil } -func (mgr *agreementMgr) processVote(v *types.Vote) error { - mgr.lock.RLock() - defer mgr.lock.RUnlock() - if v.Position.ChainID >= uint32(len(mgr.baModules)) { +func (mgr *agreementMgr) processVote(v *types.Vote) (err error) { + if v.Position.ChainID > 0 { mgr.logger.Error("Process vote for unknown chain to BA", - "position", &v.Position, - "baChain", len(mgr.baModules), - "baRound", len(mgr.configs), + "position", v.Position, "initRound", mgr.initRound) return utils.ErrInvalidChainID } - filter := mgr.voteFilters[v.Position.ChainID] - if filter.Filter(v) { + if mgr.voteFilter.Filter(v) { return nil } - v = v.Clone() - err := mgr.baModules[v.Position.ChainID].processVote(v) - if err == nil { - mgr.baModules[v.Position.ChainID].updateFilter(filter) + if err = mgr.baModule.processVote(v); err == nil { + mgr.baModule.updateFilter(mgr.voteFilter) } - return err + return } func (mgr *agreementMgr) processBlock(b *types.Block) error { - mgr.lock.RLock() - defer mgr.lock.RUnlock() - if b.Position.ChainID >= uint32(len(mgr.baModules)) { + if b.Position.ChainID > 0 { mgr.logger.Error("Process block for unknown chain to BA", - "position", &b.Position, - "baChain", len(mgr.baModules), + "position", b.Position, "baRound", len(mgr.configs), "initRound", mgr.initRound) return utils.ErrInvalidChainID } - return mgr.baModules[b.Position.ChainID].processBlock(b) + return mgr.baModule.processBlock(b) } func (mgr *agreementMgr) touchAgreementResult( @@ -280,30 +247,26 @@ func (mgr *agreementMgr) untouchAgreementResult( func (mgr *agreementMgr) processAgreementResult( result *types.AgreementResult) error { - mgr.lock.RLock() - defer mgr.lock.RUnlock() - if result.Position.ChainID >= uint32(len(mgr.baModules)) { + if result.Position.ChainID > 0 { mgr.logger.Error("Process unknown result for unknown chain to BA", - "position", &result.Position, - "baChain", len(mgr.baModules), + "position", result.Position, "baRound", len(mgr.configs), "initRound", mgr.initRound) return utils.ErrInvalidChainID } - agreement := mgr.baModules[result.Position.ChainID] - aID := agreement.agreementID() + aID := mgr.baModule.agreementID() if isStop(aID) { return nil } - if result.Position == aID && !agreement.confirmed() { - mgr.logger.Info("Syncing BA", "position", &result.Position) + if result.Position == aID && !mgr.baModule.confirmed() { + mgr.logger.Info("Syncing BA", "position", result.Position) for key := range result.Votes { - if err := agreement.processVote(&result.Votes[key]); err != nil { + if err := mgr.baModule.processVote(&result.Votes[key]); err != nil { return err } } - } else if result.Position.Newer(&aID) { - mgr.logger.Info("Fast syncing BA", "position", &result.Position) + } else if result.Position.Newer(aID) { + mgr.logger.Info("Fast syncing BA", "position", result.Position) nodes, err := mgr.cache.GetNodeSet(result.Position.Round) if err != nil { return err @@ -316,9 +279,9 @@ func (mgr *agreementMgr) processAgreementResult( nIDs := nodes.GetSubSet( int(utils.GetConfigWithPanic( mgr.gov, result.Position.Round, mgr.logger).NotarySetSize), - types.NewNotarySetTarget(crs, result.Position.ChainID)) + types.NewNotarySetTarget(crs)) for key := range result.Votes { - if err := agreement.processVote(&result.Votes[key]); err != nil { + if err := mgr.baModule.processVote(&result.Votes[key]); err != nil { return err } } @@ -326,7 +289,7 @@ func (mgr *agreementMgr) processAgreementResult( if err != nil { return err } - agreement.restart(nIDs, result.Position, leader, crs) + mgr.baModule.restart(nIDs, result.Position, leader, crs) } return nil } @@ -336,30 +299,20 @@ func (mgr *agreementMgr) stop() { func() { mgr.lock.Lock() defer mgr.lock.Unlock() - for _, agr := range mgr.baModules { - agr.stop() - } + mgr.baModule.stop() }() // Block until all routines are done. mgr.waitGroup.Wait() } -func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { - // Acquire agreement module. - agr, recv := func() (*agreement, *consensusBAReceiver) { - mgr.lock.RLock() - defer mgr.lock.RUnlock() - agr := mgr.baModules[chainID] - return agr, agr.data.recv.(*consensusBAReceiver) - }() +func (mgr *agreementMgr) runBA(initRound uint64) { // These are round based variables. var ( currentRound uint64 nextRound = initRound setting = baRoundSetting{ - chainID: chainID, - agr: agr, - recv: recv, + agr: mgr.baModule, + recv: mgr.baModule.data.recv.(*consensusBAReceiver), } roundBeginTime time.Time roundEndTime time.Time @@ -368,7 +321,7 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { // Check if this routine needs to awake in this round and prepare essential // variables when yes. - checkRound := func() (isNotary, isDisabled bool) { + checkRound := func() (isNotary bool) { defer func() { currentRound = nextRound nextRound++ @@ -386,13 +339,8 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { // Set next checkpoint. roundBeginTime = config.beginTime roundEndTime = config.beginTime.Add(config.roundInterval) - // Check if this chain handled by this routine included in this round. - if chainID >= config.numChains { - isDisabled = true - return - } // Check if this node in notary set of this chain in this round. - notarySet, err := mgr.cache.GetNotarySet(nextRound, chainID) + notarySet, err := mgr.cache.GetNotarySet(nextRound, 0) if err != nil { panic(err) } @@ -402,13 +350,11 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { if isNotary { mgr.logger.Info("selected as notary set", "ID", mgr.ID, - "round", nextRound, - "chainID", chainID) + "round", nextRound) } else { mgr.logger.Info("not selected as notary set", "ID", mgr.ID, - "round", nextRound, - "chainID", chainID) + "round", nextRound) } // Setup ticker if tickDuration != config.lambdaBA { @@ -428,16 +374,7 @@ Loop: default: } now := time.Now().UTC() - var isDisabled bool - setting.recv.isNotary, isDisabled = checkRound() - if isDisabled { - select { - case <-mgr.ctx.Done(): - break Loop - case <-time.After(roundEndTime.Sub(now)): - continue Loop - } - } + setting.recv.isNotary = checkRound() // Sleep until round begin. Here a biased round begin time would be // used instead of the one in config. The reason it to disperse the load // of fullnodes to verify confirmed blocks from each chain. @@ -454,18 +391,17 @@ Loop: <-setting.ticker.Tick() } // Run BA for this round. - recv.roundValue.Store(currentRound) - recv.changeNotaryTime = roundEndTime - recv.restartNotary <- types.Position{ + setting.recv.roundValue.Store(currentRound) + setting.recv.changeNotaryTime = roundEndTime + setting.recv.restartNotary <- types.Position{ Round: setting.recv.round(), ChainID: math.MaxUint32, } - mgr.voteFilters[chainID] = utils.NewVoteFilter() + mgr.voteFilter = utils.NewVoteFilter() if err := mgr.baRoutineForOneRound(&setting); err != nil { mgr.logger.Error("BA routine failed", "error", err, - "nodeID", mgr.ID, - "chain", chainID) + "nodeID", mgr.ID) break Loop } } @@ -485,13 +421,13 @@ func (mgr *agreementMgr) baRoutineForOneRound( break default: } - tipRound := mgr.lattice.TipRound(setting.chainID) + tipRound := mgr.bcModule.tipRound() if tipRound > restartPos.Round { // It's a vary rare that this go routine sleeps for entire round. break } else if tipRound != restartPos.Round { - mgr.logger.Debug("Waiting lattice to change round...", - "pos", &restartPos) + mgr.logger.Debug("Waiting blockChain to change round...", + "pos", restartPos) } else { break } @@ -501,7 +437,7 @@ func (mgr *agreementMgr) baRoutineForOneRound( breakLoop = true return } - if restartPos.Older(&oldPos) { + if restartPos.Older(oldPos) { // The restartNotary event is triggered by 'BlockConfirmed' // of some older block. return @@ -510,16 +446,7 @@ func (mgr *agreementMgr) baRoutineForOneRound( var nextHeight uint64 var nextTime time.Time for { - nextHeight, nextTime, err = - mgr.lattice.NextBlock(recv.round(), setting.chainID) - if err != nil { - mgr.logger.Debug("Error getting next height", - "error", err, - "round", recv.round(), - "chainID", setting.chainID) - err = nil - nextHeight = restartPos.Height - } + nextHeight, nextTime = mgr.bcModule.nextBlock() if isStop(oldPos) && nextHeight == 0 { break } @@ -529,14 +456,13 @@ func (mgr *agreementMgr) baRoutineForOneRound( if nextHeight > restartPos.Height { break } - mgr.logger.Debug("Lattice not ready!!!", - "old", &oldPos, "restart", &restartPos, "next", nextHeight) + mgr.logger.Debug("BlockChain not ready!!!", + "old", oldPos, "restart", restartPos, "next", nextHeight) time.Sleep(100 * time.Millisecond) } nextPos := types.Position{ - Round: recv.round(), - ChainID: setting.chainID, - Height: nextHeight, + Round: recv.round(), + Height: nextHeight, } oldPos = nextPos var leader types.NodeID @@ -582,18 +508,18 @@ Loop: } default: } - if agr.pullVotes() { - pos := agr.agreementID() - mgr.logger.Debug("Calling Network.PullVotes for syncing votes", - "position", &pos) - mgr.network.PullVotes(pos) - } if err = agr.nextState(); err != nil { mgr.logger.Error("Failed to proceed to next state", "nodeID", mgr.ID.String(), "error", err) break Loop } + if agr.pullVotes() { + pos := agr.agreementID() + mgr.logger.Debug("Calling Network.PullVotes for syncing votes", + "position", pos) + mgr.network.PullVotes(pos) + } for i := 0; i < agr.clocks(); i++ { // Priority select for agreement.done(). select { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go index c2ac711eb..579cea8c3 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go @@ -162,7 +162,7 @@ func (a *agreement) restart( defer a.lock.Unlock() if !isStop(aID) { oldAID := a.agreementID() - if !isStop(oldAID) && !aID.Newer(&oldAID) { + if !isStop(oldAID) && !aID.Newer(oldAID) { return false } } @@ -209,7 +209,7 @@ func (a *agreement) restart( defer a.lock.Unlock() newPendingBlock := make([]pendingBlock, 0) for _, pending := range a.pendingBlock { - if aID.Newer(&pending.block.Position) { + if aID.Newer(pending.block.Position) { continue } else if pending.block.Position == aID { replayBlock = append(replayBlock, pending.block) @@ -226,7 +226,7 @@ func (a *agreement) restart( defer a.lock.Unlock() newPendingVote := make([]pendingVote, 0) for _, pending := range a.pendingVote { - if aID.Newer(&pending.vote.Position) { + if aID.Newer(pending.vote.Position) { continue } else if pending.vote.Position == aID { replayVote = append(replayVote, pending.vote) @@ -267,6 +267,9 @@ func (a *agreement) clocks() int { a.data.lock.RLock() defer a.data.lock.RUnlock() scale := int(a.data.period) - 1 + if a.state.state() == stateForward { + scale = 1 + } if scale < 1 { // just in case. scale = 1 @@ -387,7 +390,7 @@ func (a *agreement) processVote(vote *types.Vote) error { return nil } if vote.Position != aID { - if aID.Newer(&vote.Position) { + if aID.Newer(vote.Position) { return nil } a.pendingVote = append(a.pendingVote, pendingVote{ @@ -431,8 +434,10 @@ func (a *agreement) processVote(vote *types.Vote) error { a.hasOutput = true a.data.recv.ConfirmBlock(hash, a.data.votes[vote.Period][vote.Type]) - close(a.doneChan) - a.doneChan = nil + if a.doneChan != nil { + close(a.doneChan) + a.doneChan = nil + } } return nil } @@ -465,6 +470,10 @@ func (a *agreement) processVote(vote *types.Vote) error { a.data.lockIter = vote.Period } a.fastForward <- vote.Period + if a.doneChan != nil { + close(a.doneChan) + a.doneChan = nil + } return nil } } @@ -490,6 +499,10 @@ func (a *agreement) processVote(vote *types.Vote) error { a.data.recv.PullBlocks(hashes) } a.fastForward <- vote.Period + 1 + if a.doneChan != nil { + close(a.doneChan) + a.doneChan = nil + } return nil } return nil @@ -498,23 +511,22 @@ func (a *agreement) processVote(vote *types.Vote) error { func (a *agreement) done() <-chan struct{} { a.lock.Lock() defer a.lock.Unlock() - if a.doneChan == nil { - return closedchan - } - a.data.lock.Lock() - defer a.data.lock.Unlock() select { case period := <-a.fastForward: + a.data.lock.Lock() + defer a.data.lock.Unlock() if period <= a.data.period { break } a.data.setPeriod(period) a.state = newPreCommitState(a.data) - close(a.doneChan) a.doneChan = make(chan struct{}) return closedchan default: } + if a.doneChan == nil { + return closedchan + } return a.doneChan } @@ -535,7 +547,7 @@ func (a *agreement) processBlock(block *types.Block) error { if block.Position != aID { // Agreement module has stopped. if !isStop(aID) { - if aID.Newer(&block.Position) { + if aID.Newer(block.Position) { return nil } } @@ -562,17 +574,22 @@ func (a *agreement) processBlock(block *types.Block) error { block.ProposerID == a.leader() { go func() { for func() bool { + if aID != a.agreementID() { + return false + } a.lock.RLock() defer a.lock.RUnlock() if a.state.state() != stateFast && a.state.state() != stateFastVote { return false } + a.data.lock.RLock() + defer a.data.lock.RUnlock() + a.data.blocksLock.Lock() + defer a.data.blocksLock.Unlock() block, exist := a.data.blocks[a.leader()] if !exist { return true } - a.data.lock.RLock() - defer a.data.lock.RUnlock() ok, err := a.data.leader.validLeader(block) if err != nil { fmt.Println("Error checking validLeader for Fast BA", diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go new file mode 100644 index 000000000..d1aa644a5 --- /dev/null +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go @@ -0,0 +1,621 @@ +// Copyright 2019 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// . + +package core + +import ( + "errors" + "fmt" + "sort" + "sync" + "time" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core/crypto" + "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" +) + +// Errors for sanity check error. +var ( + ErrBlockFromOlderPosition = errors.New("block from older position") + ErrNotGenesisBlock = errors.New("not a genesis block") + ErrIsGenesisBlock = errors.New("is a genesis block") + ErrIncorrectParentHash = errors.New("incorrect parent hash") + ErrInvalidBlockHeight = errors.New("invalid block height") + ErrInvalidRoundID = errors.New("invalid round id") + ErrNotFollowTipPosition = errors.New("not follow tip position") + ErrDuplicatedPendingBlock = errors.New("duplicated pending block") + ErrRetrySanityCheckLater = errors.New("retry sanity check later") + ErrRoundNotIncreasing = errors.New("round not increasing") + ErrRoundNotSwitch = errors.New("round not switch") + ErrIncorrectBlockRandomnessResult = errors.New( + "incorrect block randomness result") +) + +type pendingBlockRecord struct { + position types.Position + block *types.Block +} + +type pendingBlockRecords []pendingBlockRecord + +func (pb *pendingBlockRecords) insert(p pendingBlockRecord) error { + idx := sort.Search(len(*pb), func(i int) bool { + return !(*pb)[i].position.Older(p.position) + }) + switch idx { + case len(*pb): + *pb = append(*pb, p) + default: + if (*pb)[idx].position.Equal(p.position) { + return ErrDuplicatedPendingBlock + } + // Insert the value to that index. + *pb = append((*pb), pendingBlockRecord{}) + copy((*pb)[idx+1:], (*pb)[idx:]) + (*pb)[idx] = p + } + return nil +} + +func (pb pendingBlockRecords) searchByHeight(h uint64) ( + pendingBlockRecord, bool) { + idx := sort.Search(len(pb), func(i int) bool { + return pb[i].position.Height >= h + }) + if idx == len(pb) || pb[idx].position.Height != h { + return pendingBlockRecord{}, false + } + return pb[idx], true +} + +func (pb pendingBlockRecords) searchByPosition(p types.Position) ( + pendingBlockRecord, bool) { + idx := sort.Search(len(pb), func(i int) bool { + return !pb[i].block.Position.Older(p) + }) + if idx == len(pb) || !pb[idx].position.Equal(p) { + return pendingBlockRecord{}, false + } + return pb[idx], true +} + +type blockChainConfig struct { + roundBasedConfig + + minBlockInterval time.Duration +} + +func (c *blockChainConfig) fromConfig(round uint64, config *types.Config) { + c.minBlockInterval = config.MinBlockInterval + c.setupRoundBasedFields(round, config) +} + +func newBlockChainConfig(prev blockChainConfig, config *types.Config) ( + c blockChainConfig) { + c = blockChainConfig{} + c.fromConfig(prev.roundID+1, config) + c.setRoundBeginTime(prev.roundEndTime) + return +} + +type tsigVerifierGetter interface { + UpdateAndGet(uint64) (TSigVerifier, bool, error) +} + +type blockChain struct { + lock sync.RWMutex + ID types.NodeID + lastConfirmed *types.Block + lastDelivered *types.Block + signer *utils.Signer + vGetter tsigVerifierGetter + app Application + logger common.Logger + pendingRandomnesses map[types.Position]*types.BlockRandomnessResult + configs []blockChainConfig + pendingBlocks pendingBlockRecords + confirmedBlocks types.BlocksByPosition +} + +func newBlockChain(nID types.NodeID, initBlock *types.Block, + initConfig blockChainConfig, app Application, vGetter tsigVerifierGetter, + signer *utils.Signer, logger common.Logger) *blockChain { + if initBlock != nil { + if initConfig.roundID != initBlock.Position.Round { + panic(fmt.Errorf("incompatible config/block %s %d", + initBlock, initConfig.roundID)) + } + } else { + if initConfig.roundID != 0 { + panic(fmt.Errorf("genesis config should from round 0 %d", + initConfig.roundID)) + } + } + return &blockChain{ + ID: nID, + lastConfirmed: initBlock, + lastDelivered: initBlock, + signer: signer, + vGetter: vGetter, + app: app, + logger: logger, + configs: []blockChainConfig{initConfig}, + pendingRandomnesses: make( + map[types.Position]*types.BlockRandomnessResult), + } +} + +func (bc *blockChain) appendConfig(round uint64, config *types.Config) error { + expectedRound := uint64(len(bc.configs)) + if bc.lastConfirmed != nil { + expectedRound += bc.lastConfirmed.Position.Round + } + if round != expectedRound { + return ErrRoundNotIncreasing + } + bc.configs = append(bc.configs, newBlockChainConfig( + bc.configs[len(bc.configs)-1], config)) + return nil +} + +func (bc *blockChain) proposeBlock(position types.Position, + proposeTime time.Time) (b *types.Block, err error) { + bc.lock.RLock() + defer bc.lock.RUnlock() + return bc.prepareBlock(position, proposeTime, false) +} + +func (bc *blockChain) extractBlocks() (ret []*types.Block) { + bc.lock.Lock() + defer bc.lock.Unlock() + for len(bc.confirmedBlocks) > 0 { + c := bc.confirmedBlocks[0] + if c.Position.Round > 0 && len(c.Finalization.Randomness) == 0 { + break + } + c, bc.confirmedBlocks = bc.confirmedBlocks[0], bc.confirmedBlocks[1:] + // TODO(mission): remove these duplicated field if we fully converted + // to single chain. + c.Finalization.ParentHash = c.ParentHash + c.Finalization.Timestamp = c.Timestamp + // It's a workaround, the height for application is one-based. + c.Finalization.Height = c.Position.Height + 1 + ret = append(ret, c) + bc.lastDelivered = c + } + return +} + +func (bc *blockChain) sanityCheck(b *types.Block) error { + if b.Position.ChainID != 0 { + panic(fmt.Errorf("attempt to process block from non-zero chainID")) + } + if b.IsEmpty() { + panic(fmt.Errorf("pass empty block to sanity check: %s", b)) + } + bc.lock.RLock() + defer bc.lock.RUnlock() + if bc.lastConfirmed == nil { + // It should be a genesis block. + if !b.IsGenesis() { + return ErrNotGenesisBlock + } + // TODO(mission): Do we have to check timestamp of genesis block? + return nil + } + if b.IsGenesis() { + return ErrIsGenesisBlock + } + if b.Position.Height != bc.lastConfirmed.Position.Height+1 { + if b.Position.Height > bc.lastConfirmed.Position.Height { + return ErrRetrySanityCheckLater + } + return ErrInvalidBlockHeight + } + tipConfig := bc.getTipConfig() + if tipConfig.isLastBlock(bc.lastConfirmed) { + if b.Position.Round != bc.lastConfirmed.Position.Round+1 { + return ErrRoundNotSwitch + } + } else { + if b.Position.Round != bc.lastConfirmed.Position.Round { + return ErrInvalidRoundID + } + } + if !b.ParentHash.Equal(bc.lastConfirmed.Hash) { + return ErrIncorrectParentHash + } + if err := utils.VerifyBlockSignature(b); err != nil { + return err + } + return nil +} + +// addEmptyBlock is called when an empty block is confirmed by BA. +func (bc *blockChain) addEmptyBlock(position types.Position) ( + *types.Block, error) { + if position.ChainID != 0 { + panic(fmt.Errorf("attempt to process block from non-zero chainID")) + } + bc.lock.Lock() + defer bc.lock.Unlock() + add := func() *types.Block { + emptyB, err := bc.prepareBlock(position, time.Time{}, true) + if err != nil || emptyB == nil { + // This helper is expected to be called when an empty block is ready + // to be confirmed. + panic(err) + } + bc.confirmBlock(emptyB) + bc.checkIfBlocksConfirmed() + return emptyB + } + if bc.lastConfirmed != nil { + if !position.Newer(bc.lastConfirmed.Position) { + bc.logger.Warn("Dropping empty block: older than tip", + "position", &position, + "last-confirmed", bc.lastConfirmed) + return nil, ErrBlockFromOlderPosition + } + if bc.lastConfirmed.Position.Height+1 == position.Height { + return add(), nil + } + } else if position.Height == 0 && position.Round == 0 { + return add(), nil + } + bc.addPendingBlockRecord(pendingBlockRecord{position, nil}) + return nil, nil +} + +// addBlock should be called when the block is confirmed by BA, we won't perform +// sanity check against this block, it's ok to add block with skipping height. +func (bc *blockChain) addBlock(b *types.Block) error { + if b.Position.ChainID != 0 { + panic(fmt.Errorf("attempt to process block from non-zero chainID")) + } + bc.lock.Lock() + defer bc.lock.Unlock() + confirmed := false + if bc.lastConfirmed != nil { + if !b.Position.Newer(bc.lastConfirmed.Position) { + bc.logger.Warn("Dropping block: older than tip", + "block", b, "last-confirmed", bc.lastConfirmed) + return nil + } + if bc.lastConfirmed.Position.Height+1 == b.Position.Height { + confirmed = true + } + } else if b.IsGenesis() { + confirmed = true + } + if !confirmed { + bc.addPendingBlockRecord(pendingBlockRecord{b.Position, b}) + } else { + bc.confirmBlock(b) + bc.checkIfBlocksConfirmed() + } + return nil +} + +func (bc *blockChain) addRandomness(r *types.BlockRandomnessResult) error { + if r.Position.ChainID != 0 { + panic(fmt.Errorf("attempt to process block from non-zero chainID")) + } + if func() bool { + bc.lock.RLock() + defer bc.lock.RUnlock() + if bc.lastDelivered != nil && + bc.lastDelivered.Position.Newer(r.Position) { + return true + } + _, exists := bc.pendingRandomnesses[r.Position] + if exists { + return true + } + b := bc.findPendingBlock(r.Position) + return b != nil && len(b.Finalization.Randomness) > 0 + }() { + return nil + } + ok, err := bc.verifyRandomness(r.BlockHash, r.Position.Round, r.Randomness) + if err != nil { + return err + } + if !ok { + return ErrIncorrectBlockRandomnessResult + } + bc.lock.Lock() + defer bc.lock.Unlock() + if b := bc.findPendingBlock(r.Position); b != nil { + if !r.BlockHash.Equal(b.Hash) { + panic(fmt.Errorf("mismathed randomness: %s %s", b, r)) + } + b.Finalization.Randomness = r.Randomness + } else { + bc.pendingRandomnesses[r.Position] = r + } + return nil +} + +// TODO(mission): remove this method after removing the strong binding between +// BA and blockchain. +func (bc *blockChain) tipRound() uint64 { + bc.lock.RLock() + defer bc.lock.RUnlock() + if bc.lastConfirmed == nil { + return 0 + } + offset := uint64(0) + if bc.lastConfirmed.Timestamp.After(bc.getTipConfig().roundEndTime) { + offset++ + } + return bc.lastConfirmed.Position.Round + offset +} + +// TODO(mission): the pulling should be done inside of blockchain, then we don't +// have to expose this method. +func (bc *blockChain) confirmed(h uint64) bool { + bc.lock.RLock() + defer bc.lock.RUnlock() + if bc.lastConfirmed != nil && bc.lastConfirmed.Position.Height >= h { + return true + } + r, found := bc.pendingBlocks.searchByHeight(h) + if !found { + return false + } + return r.block != nil +} + +// TODO(mission): this method can be removed after refining the relation between +// BA and block storage. +func (bc *blockChain) nextBlock() (uint64, time.Time) { + bc.lock.RLock() + defer bc.lock.RUnlock() + // It's ok to access tip config directly without checking the existence of + // lastConfirmed block in the scenario of "nextBlock" method. + tip, config := bc.lastConfirmed, bc.configs[0] + if tip == nil { + return 0, config.roundBeginTime + } + return tip.Position.Height + 1, tip.Timestamp.Add(config.minBlockInterval) +} + +func (bc *blockChain) pendingBlocksWithoutRandomness() (hashes common.Hashes) { + bc.lock.RLock() + defer bc.lock.RUnlock() + for _, b := range bc.confirmedBlocks { + if b.Position.Round == 0 || len(b.Finalization.Randomness) > 0 { + continue + } + hashes = append(hashes, b.Hash) + } + for _, r := range bc.pendingBlocks { + if r.position.Round == 0 { + continue + } + if r.block != nil && len(r.block.Finalization.Randomness) == 0 { + hashes = append(hashes, r.block.Hash) + } + } + return +} + +func (bc *blockChain) lastDeliveredBlock() *types.Block { + bc.lock.RLock() + defer bc.lock.RUnlock() + return bc.lastDelivered +} + +func (bc *blockChain) lastPendingBlock() *types.Block { + bc.lock.RLock() + defer bc.lock.RUnlock() + if len(bc.confirmedBlocks) == 0 { + return nil + } + return bc.confirmedBlocks[0] +} + +func (bc *blockChain) processFinalizedBlock(b *types.Block) error { + return bc.addRandomness(&types.BlockRandomnessResult{ + BlockHash: b.Hash, + Position: b.Position, + Randomness: b.Finalization.Randomness, + }) +} + +///////////////////////////////////////////// +// +// internal helpers +// +///////////////////////////////////////////// + +// findPendingBlock is a helper to find a block in either pending or confirmed +// state by position. +func (bc *blockChain) findPendingBlock(p types.Position) *types.Block { + if idx := sort.Search(len(bc.confirmedBlocks), func(i int) bool { + return !bc.confirmedBlocks[i].Position.Older(p) + }); idx != len(bc.confirmedBlocks) && + bc.confirmedBlocks[idx].Position.Equal(p) { + return bc.confirmedBlocks[idx] + } + pendingRec, _ := bc.pendingBlocks.searchByPosition(p) + return pendingRec.block +} + +func (bc *blockChain) addPendingBlockRecord(p pendingBlockRecord) { + if err := bc.pendingBlocks.insert(p); err != nil { + panic(err) + } + if p.block != nil { + bc.setRandomnessFromPending(p.block) + } +} + +func (bc *blockChain) checkIfBlocksConfirmed() { + var err error + for len(bc.pendingBlocks) > 0 { + if bc.pendingBlocks[0].position.Height < + bc.lastConfirmed.Position.Height+1 { + panic(fmt.Errorf("unexpected case %s %s", bc.lastConfirmed, + bc.pendingBlocks[0].position)) + } + if bc.pendingBlocks[0].position.Height > + bc.lastConfirmed.Position.Height+1 { + break + } + var pending pendingBlockRecord + pending, bc.pendingBlocks = bc.pendingBlocks[0], bc.pendingBlocks[1:] + nextTip := pending.block + if nextTip == nil { + if nextTip, err = bc.prepareBlock( + pending.position, time.Time{}, true); err != nil { + // It should not be error when prepare empty block for correct + // position. + panic(err) + } + } + bc.confirmBlock(nextTip) + } +} + +func (bc *blockChain) purgeConfig() { + for bc.configs[0].roundID < bc.lastConfirmed.Position.Round { + bc.configs = bc.configs[1:] + } + if bc.configs[0].roundID != bc.lastConfirmed.Position.Round { + panic(fmt.Errorf("mismatched tip config: %d %d", + bc.configs[0].roundID, bc.lastConfirmed.Position.Round)) + } +} + +func (bc *blockChain) verifyRandomness( + blockHash common.Hash, round uint64, randomness []byte) (bool, error) { + if round == 0 { + return len(randomness) == 0, nil + } + v, ok, err := bc.vGetter.UpdateAndGet(round) + if err != nil { + return false, err + } + if !ok { + return false, ErrTSigNotReady + } + return v.VerifySignature(blockHash, crypto.Signature{ + Type: "bls", + Signature: randomness}), nil +} + +func (bc *blockChain) prepareBlock(position types.Position, + proposeTime time.Time, empty bool) (b *types.Block, err error) { + // TODO(mission): refine timestamp. + b = &types.Block{Position: position, Timestamp: proposeTime} + tip := bc.lastConfirmed + // Make sure we can propose a block at expected position for callers. + expectedPosition := types.Position{} + if tip == nil { + // The case for genesis block. + if !position.Equal(expectedPosition) { + b, err = nil, ErrNotGenesisBlock + } else if empty { + b.Timestamp = bc.configs[0].roundBeginTime + } + } else { + expectedPosition.Height = tip.Position.Height + 1 + tipConfig := bc.getTipConfig() + if tipConfig.isLastBlock(tip) { + expectedPosition.Round = tip.Position.Round + 1 + } else { + expectedPosition.Round = tip.Position.Round + } + if !expectedPosition.Equal(position) { + b, err = nil, ErrNotFollowTipPosition + return + } + b.ParentHash = tip.Hash + if !empty { + bc.logger.Debug("Calling Application.PreparePayload", + "position", b.Position) + if b.Payload, err = bc.app.PreparePayload(b.Position); err != nil { + return + } + bc.logger.Debug("Calling Application.PrepareWitness", + "height", tip.Witness.Height) + if b.Witness, err = bc.app.PrepareWitness( + tip.Witness.Height); err != nil { + return + } + if !b.Timestamp.After(tip.Timestamp) { + b.Timestamp = tip.Timestamp.Add(tipConfig.minBlockInterval) + } + + } else { + b.Witness.Height = tip.Witness.Height + b.Witness.Data = make([]byte, len(tip.Witness.Data)) + copy(b.Witness.Data, tip.Witness.Data) + b.Timestamp = tip.Timestamp.Add(tipConfig.minBlockInterval) + } + } + if empty { + if b.Hash, err = utils.HashBlock(b); err != nil { + b = nil + return + } + } else { + if err = bc.signer.SignBlock(b); err != nil { + return + } + } + return +} + +func (bc *blockChain) getTipConfig() blockChainConfig { + if bc.lastConfirmed == nil { + panic(fmt.Errorf("attempting to access config without tip")) + } + if bc.lastConfirmed.Position.Round != bc.configs[0].roundID { + panic(fmt.Errorf("inconsist config and tip: %d %d", + bc.lastConfirmed.Position.Round, bc.configs[0].roundID)) + } + return bc.configs[0] +} + +func (bc *blockChain) confirmBlock(b *types.Block) { + if bc.lastConfirmed != nil && + bc.lastConfirmed.Position.Height+1 != b.Position.Height { + panic(fmt.Errorf("confirmed blocks not continuous in height: %s %s", + bc.lastConfirmed, b)) + } + bc.logger.Debug("Calling Application.BlockConfirmed", "block", b) + bc.app.BlockConfirmed(*b) + bc.lastConfirmed = b + bc.setRandomnessFromPending(b) + bc.confirmedBlocks = append(bc.confirmedBlocks, b) + bc.purgeConfig() +} + +func (bc *blockChain) setRandomnessFromPending(b *types.Block) { + if r, exist := bc.pendingRandomnesses[b.Position]; exist { + if !r.BlockHash.Equal(b.Hash) { + panic(fmt.Errorf("mismathed randomness: %s %s", b, r)) + } + b.Finalization.Randomness = r.Randomness + delete(bc.pendingRandomnesses, b.Position) + } +} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockpool.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/blockpool.go deleted file mode 100644 index 4e41aa7c4..000000000 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockpool.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2018 The dexon-consensus Authors -// This file is part of the dexon-consensus library. -// -// The dexon-consensus library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus library. If not, see -// . - -package core - -import ( - "container/heap" - - "github.com/dexon-foundation/dexon-consensus/core/types" -) - -// blockPool is a heaped slice of blocks ([][]*types.Block), indexed by chainID, -// and blocks in each is sorted by block's height. -type blockPool []types.ByPosition - -func newBlockPool(chainNum uint32) (pool blockPool) { - pool = make(blockPool, chainNum) - for i := range pool { - heap.Init(&pool[i]) - } - return -} - -func (p *blockPool) resize(num uint32) { - if uint32(len(*p)) >= num { - // Do nothing If the origin size is larger. - return - } - newPool := make(blockPool, num) - copy(newPool, *p) - for i := uint32(len(*p)); i < num; i++ { - newChain := types.ByPosition{} - heap.Init(&newChain) - newPool[i] = newChain - } - *p = newPool -} - -// addBlock adds a block into pool and sorts them by height. -func (p blockPool) addBlock(b *types.Block) { - heap.Push(&p[b.Position.ChainID], b) -} - -// purgeBlocks purges blocks of a specified chain with less-or-equal heights. -// NOTE: "chainID" is not checked here, this should be ensured by the caller. -func (p blockPool) purgeBlocks(chainID uint32, height uint64) { - for len(p[chainID]) > 0 && p[chainID][0].Position.Height <= height { - heap.Pop(&p[chainID]) - } -} - -// tip returns block with the smallest height, nil if no existing block. -func (p blockPool) tip(chainID uint32) *types.Block { - if len(p[chainID]) == 0 { - return nil - } - return p[chainID][0] -} - -// removeTip removes block with lowest height of a specified chain. -func (p blockPool) removeTip(chainID uint32) { - if len(p[chainID]) > 0 { - heap.Pop(&p[chainID]) - } -} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go deleted file mode 100644 index d7c2f8556..000000000 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go +++ /dev/null @@ -1,291 +0,0 @@ -// Copyright 2018 The dexon-consensus Authors -// This file is part of the dexon-consensus library. -// -// The dexon-consensus library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus library. If not, see -// . - -package core - -import ( - "fmt" - "sync" - "time" - - "github.com/dexon-foundation/dexon-consensus/common" - "github.com/dexon-foundation/dexon-consensus/core/crypto" - "github.com/dexon-foundation/dexon-consensus/core/types" - "github.com/dexon-foundation/dexon-consensus/core/utils" -) - -// Errors for compaction chain module. -var ( - ErrBlockNotRegistered = fmt.Errorf( - "block not registered") - ErrNotInitiazlied = fmt.Errorf( - "not initialized") - ErrTSigNotReady = fmt.Errorf( - "tsig not ready") - ErrIncorrectBlockRandomnessResult = fmt.Errorf( - "incorrect block randomness result") -) - -const maxPendingPeriod = 3 * time.Second -const maxRandomnessCache = 100 - -type pendingRandomnessResult struct { - receivedTime time.Time - randResult *types.BlockRandomnessResult -} - -type finalizedBlockHeap = types.ByFinalizationHeight - -type compactionChain struct { - gov Governance - chainUnsynced uint32 - tsigVerifier *TSigVerifierCache - blocks map[common.Hash]*types.Block - blockRandomness map[common.Hash][]byte - pendingRandomness map[common.Hash]pendingRandomnessResult - processedRandomnessResult map[types.Position]struct{} - pendingBlocks []*types.Block - lock sync.RWMutex - prevBlock *types.Block -} - -func newCompactionChain(gov Governance) *compactionChain { - return &compactionChain{ - gov: gov, - tsigVerifier: NewTSigVerifierCache(gov, 7), - blocks: make(map[common.Hash]*types.Block), - blockRandomness: make(map[common.Hash][]byte), - pendingRandomness: make(map[common.Hash]pendingRandomnessResult), - processedRandomnessResult: make(map[types.Position]struct{}, maxRandomnessCache), - } -} - -// init the compaction chain module with a finalized block, or just an empty -// block for bootstrap case. -func (cc *compactionChain) init(initBlock *types.Block) { - cc.lock.Lock() - defer cc.lock.Unlock() - cc.prevBlock = initBlock - cc.pendingBlocks = []*types.Block{} - // It's the bootstrap case, compactionChain would only deliver blocks until - // tips of all chains are received. - if initBlock.Finalization.Height == 0 { - cc.chainUnsynced = utils.GetConfigWithPanic(cc.gov, 0, nil).NumChains - } -} - -func (cc *compactionChain) registerBlock(block *types.Block) { - if cc.blockRegistered(block.Hash) { - return - } - cc.lock.Lock() - defer cc.lock.Unlock() - cc.blocks[block.Hash] = block - if rand, exist := cc.pendingRandomness[block.Hash]; exist { - cc.blockRandomness[rand.randResult.BlockHash] = rand.randResult.Randomness - delete(cc.pendingRandomness, block.Hash) - } -} - -func (cc *compactionChain) blockRegistered(hash common.Hash) bool { - cc.lock.RLock() - defer cc.lock.RUnlock() - return cc.blockRegisteredNoLock(hash) -} - -func (cc *compactionChain) blockRegisteredNoLock( - hash common.Hash) (exist bool) { - _, exist = cc.blocks[hash] - return -} - -func (cc *compactionChain) processBlock(block *types.Block) error { - prevBlock := cc.lastDeliveredBlock() - if prevBlock == nil { - return ErrNotInitiazlied - } - cc.lock.Lock() - defer cc.lock.Unlock() - if prevBlock.Finalization.Height == 0 && block.Position.Height == 0 { - cc.chainUnsynced-- - } - cc.pendingBlocks = append(cc.pendingBlocks, block) - return nil -} - -func (cc *compactionChain) extractBlocks() []*types.Block { - // Check if we're synced. - if !func() bool { - cc.lock.RLock() - defer cc.lock.RUnlock() - if len(cc.pendingBlocks) == 0 { - return false - } - // Finalization.Height == 0 is syncing from bootstrap. - if cc.prevBlock.Finalization.Height == 0 { - return cc.chainUnsynced == 0 - } - return true - }() { - return []*types.Block{} - } - deliveringBlocks := make([]*types.Block, 0) - cc.lock.Lock() - defer cc.lock.Unlock() - var ( - block *types.Block - prevBlock = cc.prevBlock - ) - for len(cc.pendingBlocks) > 0 && - (len(cc.blockRandomness[cc.pendingBlocks[0].Hash]) != 0 || - cc.pendingBlocks[0].Position.Round == 0) { - delete(cc.blocks, cc.pendingBlocks[0].Hash) - block, cc.pendingBlocks = cc.pendingBlocks[0], cc.pendingBlocks[1:] - block.Finalization.ParentHash = prevBlock.Hash - block.Finalization.Height = prevBlock.Finalization.Height + 1 - if block.Position.Round != 0 { - block.Finalization.Randomness = cc.blockRandomness[block.Hash] - delete(cc.blockRandomness, block.Hash) - } - deliveringBlocks = append(deliveringBlocks, block) - prevBlock = block - } - cc.prevBlock = prevBlock - return deliveringBlocks -} - -func (cc *compactionChain) verifyRandomness( - blockHash common.Hash, round uint64, randomness []byte) (bool, error) { - if round == 0 { - return len(randomness) == 0, nil - } - // Randomness is not available at round 0. - v, ok, err := cc.tsigVerifier.UpdateAndGet(round) - if err != nil { - return false, err - } - if !ok { - return false, ErrTSigNotReady - } - return v.VerifySignature(blockHash, crypto.Signature{ - Type: "bls", - Signature: randomness}), nil -} - -func (cc *compactionChain) processFinalizedBlock(block *types.Block) error { - if block.Finalization.Height <= cc.lastDeliveredBlock().Finalization.Height { - return nil - } - // Block of round 0 should not have randomness. - if block.Position.Round == 0 && len(block.Finalization.Randomness) != 0 { - return nil - } - cc.lock.Lock() - defer cc.lock.Unlock() - // The randomness result is missed previously. - if cc.blockRegisteredNoLock(block.Hash) { - ok, err := cc.verifyRandomness( - block.Hash, block.Position.Round, block.Finalization.Randomness) - if err != nil { - return err - } - if ok { - cc.blockRandomness[block.Hash] = block.Finalization.Randomness - } - } - return nil -} - -func (cc *compactionChain) touchBlockRandomnessResult( - rand *types.BlockRandomnessResult) (first bool) { - // DO NOT LOCK THIS FUNCTION!!!!!!!! YOU WILL REGRET IT!!!!! - if _, exist := cc.processedRandomnessResult[rand.Position]; !exist { - first = true - if len(cc.processedRandomnessResult) > maxRandomnessCache { - for k := range cc.processedRandomnessResult { - // Randomly drop one element. - delete(cc.processedRandomnessResult, k) - break - } - } - cc.processedRandomnessResult[rand.Position] = struct{}{} - } - return -} - -func (cc *compactionChain) processBlockRandomnessResult( - rand *types.BlockRandomnessResult) error { - ok, err := cc.verifyRandomness( - rand.BlockHash, rand.Position.Round, rand.Randomness) - if err != nil { - return err - } - if !ok { - return ErrIncorrectBlockRandomnessResult - } - cc.lock.Lock() - defer cc.lock.Unlock() - if !cc.blockRegisteredNoLock(rand.BlockHash) { - cc.purgePending() - cc.pendingRandomness[rand.BlockHash] = pendingRandomnessResult{ - receivedTime: time.Now(), - randResult: rand, - } - return ErrBlockNotRegistered - } - cc.blockRandomness[rand.BlockHash] = rand.Randomness - return nil -} - -func (cc *compactionChain) purgePending() { - now := time.Now() - for key, rand := range cc.pendingRandomness { - if now.After(rand.receivedTime.Add(maxPendingPeriod)) { - delete(cc.pendingRandomness, key) - } - } -} - -// lastDeliveredBlock returns the last delivered block, or the one used to -// initialize this module. -func (cc *compactionChain) lastDeliveredBlock() *types.Block { - cc.lock.RLock() - defer cc.lock.RUnlock() - return cc.prevBlock -} - -// lastPendingBlock returns the last pending block. -func (cc *compactionChain) lastPendingBlock() *types.Block { - cc.lock.RLock() - defer cc.lock.RUnlock() - if len(cc.pendingBlocks) > 0 { - return cc.pendingBlocks[0] - } - return nil -} - -func (cc *compactionChain) pendingBlocksWithoutRandomness() ( - hashes common.Hashes) { - cc.lock.RLock() - defer cc.lock.RUnlock() - for _, block := range cc.pendingBlocks { - if _, exist := cc.blockRandomness[block.Hash]; !exist { - hashes = append(hashes, block.Hash) - } - } - return -} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go index bec47f491..ad4d7e633 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go @@ -308,11 +308,6 @@ func (cc *configurationChain) recoverDKGInfo(round uint64) error { if err != nil { return err } - // Restore DKG share secret, this segment of code is copied from - // dkgProtocol.recoverShareSecret. - if len(gpk.qualifyIDs) < threshold { - return ErrNotReachThreshold - } // Check if we have private shares in DB. prvKey, err := cc.db.GetDKGPrivateKey(round) if err != nil { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus-timestamp.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus-timestamp.go deleted file mode 100644 index d7ce8e23e..000000000 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus-timestamp.go +++ /dev/null @@ -1,162 +0,0 @@ -// Copyright 2018 The dexon-consensus Authors -// This file is part of the dexon-consensus library. -// -// The dexon-consensus library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus library. If not, see -// . - -package core - -import ( - "errors" - "time" - - "github.com/dexon-foundation/dexon-consensus/core/types" -) - -// consensusTimestamp calculates a block's finalization timestamp. Notice that -// the Finalization timestamps are increasing (but not strictly increasing). -// Instance functions: -// - processBlocks(blocks []*types.Block) error -// called with blocks output from total ordering -// - appendConfig(round uint64, config *types.Config) error -// called when a new config is known -// - synced() bool -// called in sync mode -type consensusTimestamp struct { - timestampsOfChains []time.Time - - // Stores number of chains for rounds. - numChainsOfRounds []uint32 - numChainsBase uint64 - - // dMoment represents the genesis time. - dMoment time.Time - - // lastTimestamp is the previous assigned consensus timestamp. - lastTimestamp time.Time -} - -var ( - // ErrTimestampNotIncrease for timestamp is not strictly increasing on one - // chain. - ErrTimestampNotIncrease = errors.New("timestamp is not increasing") - // ErrNoRoundConfig for no round config found. - ErrNoRoundConfig = errors.New("no round config found") - // ErrConsensusTimestampRewind for rewinding timestamp. - ErrConsensusTimestampRewind = errors.New("consensus timestamp rewind") -) - -// newConsensusTimestamp creates consensus timestamp instance. -func newConsensusTimestamp( - dMoment time.Time, round uint64, numChains uint32) *consensusTimestamp { - - ts := make([]time.Time, numChains) - for i := range ts { - ts[i] = dMoment - } - return &consensusTimestamp{ - numChainsOfRounds: []uint32{numChains}, - numChainsBase: round, - dMoment: dMoment, - timestampsOfChains: ts, - } -} - -// appendConfig appends a configuration for upcoming round. When you append -// a config for round R, next time you can only append the config for round R+1. -func (ct *consensusTimestamp) appendConfig( - round uint64, config *types.Config) error { - if round != uint64(len(ct.numChainsOfRounds))+ct.numChainsBase { - return ErrRoundNotIncreasing - } - // This segment is to handle the corner case for config checking logic in - // processBlock method. - if len(ct.numChainsOfRounds) == 1 { - if ct.numChainsOfRounds[0] > config.NumChains { - ct.resizeTimetamps(ct.numChainsOfRounds[0]) - } else { - ct.resizeTimetamps(config.NumChains) - } - } - ct.numChainsOfRounds = append(ct.numChainsOfRounds, config.NumChains) - return nil -} - -func (ct *consensusTimestamp) resizeTimetamps(numChain uint32) { - l := uint32(len(ct.timestampsOfChains)) - if numChain > l { - for i := l; i < numChain; i++ { - ct.timestampsOfChains = append(ct.timestampsOfChains, ct.dMoment) - } - } else if numChain < l { - ct.timestampsOfChains = ct.timestampsOfChains[:numChain] - } -} - -// ProcessBlocks is the entry function. -func (ct *consensusTimestamp) processBlocks(blocks []*types.Block) (err error) { - for _, block := range blocks { - // Rounds might interleave within rounds if no configuration change - // occurs. And it is limited to one round, that is, round r can only - // interleave with r-1 and r+1. - round := block.Position.Round - if ct.numChainsBase == round || ct.numChainsBase+1 == round { - // Normal case, no need to modify timestampsOfChains. - } else if ct.numChainsBase+2 == round { - // Resize timestampsOfChains if block from r+2 comes, because the interleave - // of rounds must be less than 1. Resize the size to - // max(numChainsOfRounds[r+1], numChainsOfRounds[r+2]). - if len(ct.numChainsOfRounds) < 2 { - return ErrNoRoundConfig - } - ct.numChainsBase++ - ct.numChainsOfRounds = ct.numChainsOfRounds[1:] - if ct.numChainsOfRounds[0] > ct.numChainsOfRounds[1] { - ct.resizeTimetamps(ct.numChainsOfRounds[0]) - } else { - ct.resizeTimetamps(ct.numChainsOfRounds[1]) - } - } else { - // Error if round < base or round > base + 2. - return ErrInvalidRoundID - } - ts := ct.timestampsOfChains[:ct.numChainsOfRounds[round-ct.numChainsBase]] - if block.Finalization.Timestamp, err = getMedianTime(ts); err != nil { - return - } - if block.Timestamp.Before(ct.timestampsOfChains[block.Position.ChainID]) { - return ErrTimestampNotIncrease - } - ct.timestampsOfChains[block.Position.ChainID] = block.Timestamp - // If the finalization timestamp is before the last timestamp, set it to - // the last one. Notice that the finalization timestamps are increasing but - // not strictly increasing. - if block.Finalization.Timestamp.Before(ct.lastTimestamp) { - block.Finalization.Timestamp = ct.lastTimestamp - } else { - ct.lastTimestamp = block.Finalization.Timestamp - } - } - return -} - -func (ct *consensusTimestamp) isSynced() bool { - numChain := ct.numChainsOfRounds[0] - for i := uint32(0); i < numChain; i++ { - if ct.timestampsOfChains[i].Equal(ct.dMoment) { - return false - } - } - return true -} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go index f46501038..f4c0a372d 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go @@ -57,10 +57,9 @@ var ( // consensusBAReceiver implements agreementReceiver. type consensusBAReceiver struct { - // TODO(mission): consensus would be replaced by lattice and network. + // TODO(mission): consensus would be replaced by blockChain and network. consensus *Consensus agreementModule *agreement - chainID uint32 changeNotaryTime time.Time roundValue *atomic.Value isNotary bool @@ -96,9 +95,9 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash { if !recv.isNotary { return common.Hash{} } - block := recv.consensus.proposeBlock(recv.chainID, recv.round()) - if block == nil { - recv.consensus.logger.Error("unable to propose block") + block, err := recv.consensus.proposeBlock(recv.agreementModule.agreementID()) + if err != nil || block == nil { + recv.consensus.logger.Error("unable to propose block", "error", err) return types.NullBlockHash } go func() { @@ -115,25 +114,29 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash { func (recv *consensusBAReceiver) ConfirmBlock( hash common.Hash, votes map[types.NodeID]*types.Vote) { - var block *types.Block + var ( + block *types.Block + aID = recv.agreementModule.agreementID() + ) isEmptyBlockConfirmed := hash == common.Hash{} if isEmptyBlockConfirmed { - aID := recv.agreementModule.agreementID() - recv.consensus.logger.Info("Empty block is confirmed", - "position", &aID) + recv.consensus.logger.Info("Empty block is confirmed", "position", aID) var err error - block, err = recv.consensus.proposeEmptyBlock(recv.round(), recv.chainID) + block, err = recv.consensus.bcModule.addEmptyBlock(aID) if err != nil { - recv.consensus.logger.Error("Propose empty block failed", "error", err) + recv.consensus.logger.Error("Add position for empty failed", + "error", err) return } + if block == nil { + panic(fmt.Errorf("empty block should be proposed directly: %s", aID)) + } } else { var exist bool block, exist = recv.agreementModule.findBlockNoLock(hash) if !exist { recv.consensus.logger.Error("Unknown block confirmed", - "hash", hash.String()[:6], - "chainID", recv.chainID) + "hash", hash.String()[:6]) ch := make(chan *types.Block) func() { recv.consensus.lock.Lock() @@ -155,8 +158,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( } recv.consensus.logger.Info("Receive unknown block", "hash", hash.String()[:6], - "position", &block.Position, - "chainID", recv.chainID) + "position", block.Position) recv.agreementModule.addCandidateBlock(block) recv.agreementModule.lock.Lock() defer recv.agreementModule.lock.Unlock() @@ -165,15 +167,14 @@ func (recv *consensusBAReceiver) ConfirmBlock( return } } - recv.consensus.ccModule.registerBlock(block) if block.Position.Height != 0 && - !recv.consensus.lattice.Exist(block.ParentHash) { + !recv.consensus.bcModule.confirmed(block.Position.Height-1) { go func(hash common.Hash) { parentHash := hash for { recv.consensus.logger.Warn("Parent block not confirmed", "parent-hash", parentHash.String()[:6], - "cur-position", &block.Position) + "cur-position", block.Position) ch := make(chan *types.Block) if !func() bool { recv.consensus.lock.Lock() @@ -200,13 +201,12 @@ func (recv *consensusBAReceiver) ConfirmBlock( } recv.consensus.logger.Info("Receive parent block", "parent-hash", block.ParentHash.String()[:6], - "cur-position", &block.Position, - "chainID", recv.chainID) - recv.consensus.ccModule.registerBlock(block) + "cur-position", block.Position) recv.consensus.processBlockChan <- block parentHash = block.ParentHash if block.Position.Height == 0 || - recv.consensus.lattice.Exist(parentHash) { + recv.consensus.bcModule.confirmed( + block.Position.Height-1) { return } } @@ -372,11 +372,6 @@ type Consensus struct { dkgReady *sync.Cond cfgModule *configurationChain - // Dexon consensus v1's modules. - lattice *Lattice - ccModule *compactionChain - toSyncer *totalOrderingSyncer - // Interfaces. db db.Database app Application @@ -385,21 +380,21 @@ type Consensus struct { network Network // Misc. - dMoment time.Time - nodeSetCache *utils.NodeSetCache - round uint64 - roundForNewConfig uint64 - lock sync.RWMutex - ctx context.Context - ctxCancel context.CancelFunc - event *common.Event - logger common.Logger - nonFinalizedBlockDelivered bool - resetRandomnessTicker chan struct{} - resetDeliveryGuardTicker chan struct{} - msgChan chan interface{} - waitGroup sync.WaitGroup - processBlockChan chan *types.Block + bcModule *blockChain + dMoment time.Time + nodeSetCache *utils.NodeSetCache + round uint64 + roundForNewConfig uint64 + lock sync.RWMutex + ctx context.Context + ctxCancel context.CancelFunc + event *common.Event + logger common.Logger + resetRandomnessTicker chan struct{} + resetDeliveryGuardTicker chan struct{} + msgChan chan interface{} + waitGroup sync.WaitGroup + processBlockChan chan *types.Block // Context of Dummy receiver during switching from syncer. dummyCancel context.CancelFunc @@ -417,7 +412,7 @@ func NewConsensus( prv crypto.PrivateKey, logger common.Logger) *Consensus { return newConsensusForRound( - &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, true) + nil, dMoment, app, gov, db, network, prv, logger, true) } // NewConsensusForSimulation creates an instance of Consensus for simulation, @@ -431,7 +426,7 @@ func NewConsensusForSimulation( prv crypto.PrivateKey, logger common.Logger) *Consensus { return newConsensusForRound( - &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, false) + nil, dMoment, app, gov, db, network, prv, logger, false) } // NewConsensusFromSyncer constructs an Consensus instance from information @@ -451,14 +446,13 @@ func NewConsensusFromSyncer( db db.Database, networkModule Network, prv crypto.PrivateKey, - latticeModule *Lattice, - confirmedBlocks [][]*types.Block, + confirmedBlocks []*types.Block, randomnessResults []*types.BlockRandomnessResult, cachedMessages []interface{}, logger common.Logger) (*Consensus, error) { // Setup Consensus instance. con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db, - networkModule, prv, logger, latticeModule, true) + networkModule, prv, logger, true) // Launch a dummy receiver before we start receiving from network module. con.dummyMsgBuffer = cachedMessages con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( @@ -467,29 +461,18 @@ func NewConsensusFromSyncer( }) // Dump all BA-confirmed blocks to the consensus instance, make sure these // added blocks forming a DAG. - for { - updated := false - for idx, bs := range confirmedBlocks { - for bIdx, b := range bs { - // Only when its parent block is already added to lattice, we can - // then add this block. If not, our pulling mechanism would stop at - // the block we added, and lost its parent block forever. - if !latticeModule.Exist(b.ParentHash) { - logger.Debug("Skip discontinuous confirmed block", - "from", b, - "until", bs[len(bs)-1]) - confirmedBlocks[idx] = bs[bIdx:] - break - } - con.ccModule.registerBlock(b) - if err := con.processBlock(b); err != nil { - return nil, err - } - } - } - if !updated { + refBlock := initBlock + for _, b := range confirmedBlocks { + // Only when its parent block is already added to lattice, we can + // then add this block. If not, our pulling mechanism would stop at + // the block we added, and lost its parent block forever. + if b.Position.Height != refBlock.Position.Height+1 { break } + if err := con.processBlock(b); err != nil { + return nil, err + } + refBlock = b } // Dump all randomness result to the consensus instance. for _, r := range randomnessResults { @@ -502,7 +485,7 @@ func NewConsensusFromSyncer( return con, nil } -// newConsensus creates a Consensus instance. +// newConsensusForRound creates a Consensus instance. func newConsensusForRound( initBlock *types.Block, initRoundBeginTime time.Time, @@ -512,9 +495,7 @@ func newConsensusForRound( network Network, prv crypto.PrivateKey, logger common.Logger, - latticeModule *Lattice, usingNonBlocking bool) *Consensus { - // TODO(w): load latest blockHeight from DB, and use config at that height. nodeSetCache := utils.NewNodeSetCache(gov) // Setup signer module. @@ -525,13 +506,11 @@ func newConsensusForRound( debugApp = a } // Get configuration for bootstrap round. - initRound := initBlock.Position.Round - initConfig := utils.GetConfigWithPanic(gov, initRound, logger) - // Init lattice. - if latticeModule == nil { - latticeModule = NewLattice(initRoundBeginTime, initRound, initConfig, - signer, app, debugApp, db, logger) + initRound := uint64(0) + if initBlock != nil { + initRound = initBlock.Position.Round } + initConfig := utils.GetConfigWithPanic(gov, initRound, logger) // Init configuration chain. ID := types.NewNodeID(prv.PublicKey()) recv := &consensusDKGReceiver{ @@ -548,11 +527,14 @@ func newConsensusForRound( if usingNonBlocking { appModule = newNonBlocking(app, debugApp) } + bcConfig := blockChainConfig{} + bcConfig.fromConfig(initRound, initConfig) + bcConfig.setRoundBeginTime(initRoundBeginTime) + bcModule := newBlockChain(ID, initBlock, bcConfig, appModule, + NewTSigVerifierCache(gov, 7), signer, logger) // Construct Consensus instance. con := &Consensus{ ID: ID, - ccModule: newCompactionChain(gov), - lattice: latticeModule, app: appModule, debugApp: debugApp, gov: gov, @@ -561,6 +543,7 @@ func newConsensusForRound( baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), dkgReady: sync.NewCond(&sync.Mutex{}), cfgModule: cfgModule, + bcModule: bcModule, dMoment: initRoundBeginTime, nodeSetCache: nodeSetCache, signer: signer, @@ -581,52 +564,56 @@ func newConsensusForRound( // prepare the Consensus instance to be ready for blocks after 'initBlock'. // 'initBlock' could be either: -// - an empty block +// - nil // - the last finalized block -func (con *Consensus) prepare(initBlock *types.Block) error { +func (con *Consensus) prepare(initBlock *types.Block) (err error) { // The block past from full node should be delivered already or known by // full node. We don't have to notify it. - con.roundForNewConfig = initBlock.Position.Round + 1 - initRound := initBlock.Position.Round + initRound := uint64(0) + if initBlock != nil { + initRound = initBlock.Position.Round + } + con.roundForNewConfig = initRound + 1 initConfig := utils.GetConfigWithPanic(con.gov, initRound, con.logger) // Setup context. - con.ccModule.init(initBlock) con.logger.Debug("Calling Governance.CRS", "round", initRound) initCRS := con.gov.CRS(initRound) if (initCRS == common.Hash{}) { - return ErrCRSNotReady + err = ErrCRSNotReady + return } - if err := con.baMgr.appendConfig(initRound, initConfig, initCRS); err != nil { - return err + if err = con.baMgr.appendConfig(initRound, initConfig, initCRS); err != nil { + return } - // Setup lattice module. + // Setup blockChain module. initPlusOneCfg := utils.GetConfigWithPanic(con.gov, initRound+1, con.logger) - if err := con.lattice.AppendConfig(initRound+1, initPlusOneCfg); err != nil { - if err == ErrRoundNotIncreasing { - err = nil - } else { + if err = con.bcModule.appendConfig(initRound+1, initPlusOneCfg); err != nil { + return + } + if initRound == 0 { + dkgSet, err := con.nodeSetCache.GetDKGSet(initRound) + if err != nil { return err } + if _, exist := dkgSet[con.ID]; exist { + con.logger.Info("Selected as DKG set", "round", initRound) + go func() { + // Sleep until dMoment come. + time.Sleep(con.dMoment.Sub(time.Now().UTC())) + // Network is not stable upon starting. Wait some time to ensure first + // DKG would success. Three is a magic number. + time.Sleep(initConfig.MinBlockInterval * 3) + con.cfgModule.registerDKG(initRound, getDKGThreshold(initConfig)) + con.event.RegisterTime(con.dMoment.Add(initConfig.RoundInterval/4), + func(time.Time) { + con.runDKG(initRound, initConfig) + }) + }() + } } // Register events. - dkgSet, err := con.nodeSetCache.GetDKGSet(initRound) - if err != nil { - return err - } - if _, exist := dkgSet[con.ID]; exist { - con.logger.Info("Selected as DKG set", "round", initRound) - go func() { - // Sleep until dMoment come. - time.Sleep(con.dMoment.Sub(time.Now().UTC())) - con.cfgModule.registerDKG(initRound, getDKGThreshold(initConfig)) - con.event.RegisterTime(con.dMoment.Add(initConfig.RoundInterval/4), - func(time.Time) { - con.runDKG(initRound, initConfig) - }) - }() - } con.initialRound(con.dMoment, initRound, initConfig) - return nil + return } // Run starts running DEXON Consensus. @@ -847,7 +834,7 @@ func (con *Consensus) initialRound( }) }(round + 1) }) - // Prepare lattice module for next round and next "initialRound" routine. + // Prepare blockChain module for next round and next "initialRound" routine. con.event.RegisterTime(startTime.Add(config.RoundInterval), func(time.Time) { // Change round. @@ -917,7 +904,7 @@ MessageLoop: ch, e := con.baConfirmedBlock[val.Hash] return ch, e }(); exist { - if err := con.lattice.SanityCheck(val, false); err != nil { + if err := con.bcModule.sanityCheck(val); err != nil { if err == ErrRetrySanityCheckLater { err = nil } else { @@ -965,7 +952,7 @@ MessageLoop: if err := con.ProcessBlockRandomnessResult(val, true); err != nil { con.logger.Error("Failed to process block randomness result", "hash", val.BlockHash.String()[:6], - "position", &val.Position, + "position", val.Position, "error", err) } case *typesDKG.PrivateShare: @@ -983,34 +970,6 @@ MessageLoop: } } -func (con *Consensus) proposeBlock(chainID uint32, round uint64) *types.Block { - block := &types.Block{ - Position: types.Position{ - ChainID: chainID, - Round: round, - }, - } - if err := con.prepareBlock(block, time.Now().UTC()); err != nil { - con.logger.Error("Failed to prepare block", "error", err) - return nil - } - return block -} - -func (con *Consensus) proposeEmptyBlock( - round uint64, chainID uint32) (*types.Block, error) { - block := &types.Block{ - Position: types.Position{ - Round: round, - ChainID: chainID, - }, - } - if err := con.lattice.PrepareEmptyBlock(block); err != nil { - return nil, err - } - return block, nil -} - // ProcessVote is the entry point to submit ont vote to a Consensus instance. func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { v := vote.Clone() @@ -1024,14 +983,11 @@ func (con *Consensus) ProcessAgreementResult( if !con.baMgr.touchAgreementResult(rand) { return nil } - // Sanity Check. if err := VerifyAgreementResult(rand, con.nodeSetCache); err != nil { con.baMgr.untouchAgreementResult(rand) return err } - con.lattice.AddShallowBlock(rand.BlockHash, rand.Position) - // Syncing BA Module. if err := con.baMgr.processAgreementResult(rand); err != nil { return err @@ -1089,7 +1045,7 @@ func (con *Consensus) ProcessAgreementResult( if err != nil { if err != ErrTSigAlreadyRunning { con.logger.Error("Failed to run TSIG", - "position", &rand.Position, + "position", rand.Position, "hash", rand.BlockHash.String()[:6], "error", err) } @@ -1113,15 +1069,8 @@ func (con *Consensus) ProcessBlockRandomnessResult( if rand.Position.Round == 0 { return nil } - if !con.ccModule.touchBlockRandomnessResult(rand) { - return nil - } - if err := con.ccModule.processBlockRandomnessResult(rand); err != nil { - if err == ErrBlockNotRegistered { - err = nil - } else { - return err - } + if err := con.bcModule.addRandomness(rand); err != nil { + return err } if needBroadcast { con.logger.Debug("Calling Network.BroadcastRandomnessResult", @@ -1154,7 +1103,7 @@ func (con *Consensus) pullRandomness() { case <-con.resetRandomnessTicker: case <-time.After(1500 * time.Millisecond): // TODO(jimmy): pulling period should be related to lambdaBA. - hashes := con.ccModule.pendingBlocksWithoutRandomness() + hashes := con.bcModule.pendingBlocksWithoutRandomness() if len(hashes) > 0 { con.logger.Debug( "Calling Network.PullRandomness", "blocks", hashes) @@ -1196,7 +1145,8 @@ func (con *Consensus) deliverBlock(b *types.Block) { case con.resetDeliveryGuardTicker <- struct{}{}: default: } - if err := con.db.UpdateBlock(*b); err != nil { + // TODO(mission): do we need to put block when confirmed now? + if err := con.db.PutBlock(*b); err != nil { panic(err) } if err := con.db.PutCompactionChainTipInfo( @@ -1209,13 +1159,13 @@ func (con *Consensus) deliverBlock(b *types.Block) { if b.Position.Round == con.roundForNewConfig { // Get configuration for the round next to next round. Configuration // for that round should be ready at this moment and is required for - // lattice module. This logic is related to: + // blockChain module. This logic is related to: // - roundShift // - notifyGenesisRound futureRound := con.roundForNewConfig + 1 futureConfig := utils.GetConfigWithPanic(con.gov, futureRound, con.logger) con.logger.Debug("Append Config", "round", futureRound) - if err := con.lattice.AppendConfig( + if err := con.bcModule.appendConfig( futureRound, futureConfig); err != nil { con.logger.Debug("Unable to append config", "round", futureRound, @@ -1238,14 +1188,14 @@ func (con *Consensus) deliverFinalizedBlocks() error { } func (con *Consensus) deliverFinalizedBlocksWithoutLock() (err error) { - deliveredBlocks := con.ccModule.extractBlocks() + deliveredBlocks := con.bcModule.extractBlocks() con.logger.Debug("Last blocks in compaction chain", - "delivered", con.ccModule.lastDeliveredBlock(), - "pending", con.ccModule.lastPendingBlock()) + "delivered", con.bcModule.lastDeliveredBlock(), + "pending", con.bcModule.lastPendingBlock()) for _, b := range deliveredBlocks { con.deliverBlock(b) + go con.event.NotifyTime(b.Finalization.Timestamp) } - err = con.lattice.PurgeBlocks(deliveredBlocks) return } @@ -1271,34 +1221,14 @@ func (con *Consensus) processBlockLoop() { // processBlock is the entry point to submit one block to a Consensus instance. func (con *Consensus) processBlock(block *types.Block) (err error) { + // Block processed by blockChain can be out-of-order. But the output from + // blockChain (deliveredBlocks) cannot, thus we need to protect the part + // below with writer lock. con.lock.Lock() defer con.lock.Unlock() - // Block processed by lattice can be out-of-order. But the output of lattice - // (deliveredBlocks) cannot. - deliveredBlocks, err := con.lattice.ProcessBlock(block) - if err != nil { + if err = con.bcModule.addBlock(block); err != nil { return } - // Pass delivered blocks to compaction chain. - for _, b := range deliveredBlocks { - if b.IsFinalized() { - if con.nonFinalizedBlockDelivered { - panic(fmt.Errorf("attempting to skip finalized block: %s", b)) - } - con.logger.Debug("skip delivery of finalized block", - "block", b, - "finalization-height", b.Finalization.Height) - continue - } else { - // Mark that some non-finalized block delivered. After this flag - // turned on, it's not allowed to deliver finalized blocks anymore. - con.nonFinalizedBlockDelivered = true - } - if err = con.ccModule.processBlock(b); err != nil { - return - } - go con.event.NotifyTime(b.Finalization.Timestamp) - } if err = con.deliverFinalizedBlocksWithoutLock(); err != nil { return } @@ -1307,36 +1237,28 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { // processFinalizedBlock is the entry point for handling finalized blocks. func (con *Consensus) processFinalizedBlock(block *types.Block) error { - return con.ccModule.processFinalizedBlock(block) + return con.bcModule.processFinalizedBlock(block) } // PrepareBlock would setup header fields of block based on its ProposerID. -func (con *Consensus) prepareBlock(b *types.Block, - proposeTime time.Time) (err error) { - if err = con.lattice.PrepareBlock(b, proposeTime); err != nil { - return +func (con *Consensus) proposeBlock(position types.Position) ( + *types.Block, error) { + b, err := con.bcModule.proposeBlock(position, time.Now().UTC()) + if err != nil { + return nil, err } con.logger.Debug("Calling Governance.CRS", "round", b.Position.Round) crs := con.gov.CRS(b.Position.Round) if crs.Equal(common.Hash{}) { con.logger.Error("CRS for round is not ready, unable to prepare block", "position", &b.Position) - err = ErrCRSNotReady - return + return nil, ErrCRSNotReady } - err = con.signer.SignCRS(b, crs) - return -} - -// PrepareGenesisBlock would setup header fields for genesis block. -func (con *Consensus) PrepareGenesisBlock(b *types.Block, - proposeTime time.Time) (err error) { - if err = con.prepareBlock(b, proposeTime); err != nil { - return + if err = con.signer.SignCRS(b, crs); err != nil { + return nil, err } - if len(b.Payload) != 0 { - err = ErrGenesisBlockNotEmpty - return + if b.IsGenesis() && len(b.Payload) != 0 { + return nil, ErrGenesisBlockNotEmpty } - return + return b, nil } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/dkg-tsig-protocol.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/dkg-tsig-protocol.go index 73b8abfd9..4f15a74ac 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/dkg-tsig-protocol.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/dkg-tsig-protocol.go @@ -53,6 +53,8 @@ var ( "not enough of partial signatures") ErrRoundAlreadyPurged = fmt.Errorf( "cache of round already been purged") + ErrTSigNotReady = fmt.Errorf( + "tsig not ready") ) type dkgReceiver interface { @@ -400,6 +402,9 @@ func NewDKGGroupPublicKey( } } qualifyIDs := make(dkg.IDs, 0, len(mpks)-len(disqualifyIDs)) + if cap(qualifyIDs) < threshold { + return nil, ErrNotReachThreshold + } qualifyNodeIDs := make(map[types.NodeID]struct{}) mpkMap := make(map[dkg.ID]*typesDKG.MasterPublicKey, cap(qualifyIDs)) idMap := make(map[types.NodeID]dkg.ID) @@ -515,6 +520,13 @@ func (tc *TSigVerifierCache) Update(round uint64) (bool, error) { return true, nil } +// Delete the cache of given round. +func (tc *TSigVerifierCache) Delete(round uint64) { + tc.lock.Lock() + defer tc.lock.Unlock() + delete(tc.verifier, round) +} + // Get the TSigVerifier of round and returns if it exists. func (tc *TSigVerifierCache) Get(round uint64) (TSigVerifier, bool) { tc.lock.RLock() diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go index 408343f3b..3879e36a5 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go @@ -51,9 +51,6 @@ type Application interface { type Debug interface { // BlockReceived is called when the block received in agreement. BlockReceived(common.Hash) - // TotalOrderingDelivered is called when the total ordering algorithm deliver - // a set of block. - TotalOrderingDelivered(common.Hashes, uint32) // BlockReady is called when the block's randomness is ready. BlockReady(common.Hash) } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go deleted file mode 100644 index 0bbe8902a..000000000 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go +++ /dev/null @@ -1,683 +0,0 @@ -// Copyright 2018 The dexon-consensus Authors -// This file is part of the dexon-consensus library. -// -// The dexon-consensus library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus library. If not, see -// . - -package core - -import ( - "errors" - "fmt" - "sort" - "time" - - "github.com/dexon-foundation/dexon-consensus/common" - "github.com/dexon-foundation/dexon-consensus/core/db" - "github.com/dexon-foundation/dexon-consensus/core/types" - "github.com/dexon-foundation/dexon-consensus/core/utils" -) - -// Errors for sanity check error. -var ( - ErrDuplicatedAckOnOneChain = fmt.Errorf("duplicated ack on one chain") - ErrInvalidProposerID = fmt.Errorf("invalid proposer id") - ErrInvalidWitness = fmt.Errorf("invalid witness data") - ErrInvalidBlock = fmt.Errorf("invalid block") - ErrNotAckParent = fmt.Errorf("not ack parent") - ErrDoubleAck = fmt.Errorf("double ack") - ErrAcksNotSorted = fmt.Errorf("acks not sorted") - ErrInvalidBlockHeight = fmt.Errorf("invalid block height") - ErrAlreadyInLattice = fmt.Errorf("block already in lattice") - ErrIncorrectBlockTime = fmt.Errorf("block timestamp is incorrect") - ErrInvalidRoundID = fmt.Errorf("invalid round id") - ErrUnknownRoundID = fmt.Errorf("unknown round id") - ErrRoundOutOfRange = fmt.Errorf("round out of range") - ErrRoundNotSwitch = fmt.Errorf("round not switch") - ErrNotGenesisBlock = fmt.Errorf("not a genesis block") - ErrUnexpectedGenesisBlock = fmt.Errorf("unexpected genesis block") -) - -// ErrAckingBlockNotExists is for sanity check error. -type ErrAckingBlockNotExists struct { - hash common.Hash -} - -func (e ErrAckingBlockNotExists) Error() string { - return fmt.Sprintf("acking block %s not exists", e.hash.String()[:6]) -} - -// Errors for method usage -var ( - ErrRoundNotIncreasing = errors.New("round not increasing") - ErrPurgedBlockNotFound = errors.New("purged block not found") - ErrPurgeNotDeliveredBlock = errors.New("not purge from head") -) - -// latticeDataConfig is the configuration for latticeData for each round. -type latticeDataConfig struct { - roundBasedConfig - // Number of chains between runs - numChains uint32 - // Block interval specifies reasonable time difference between - // parent/child blocks. - minBlockTimeInterval time.Duration -} - -// Initiate latticeDataConfig from types.Config. -func (config *latticeDataConfig) fromConfig(roundID uint64, cfg *types.Config) { - config.numChains = cfg.NumChains - config.minBlockTimeInterval = cfg.MinBlockInterval - config.setupRoundBasedFields(roundID, cfg) -} - -// isValidBlockTime checks if timestamp of a block is valid according to a -// reference time. -func (config *latticeDataConfig) isValidBlockTime( - b *types.Block, ref time.Time) bool { - return !b.Timestamp.Before(ref.Add(config.minBlockTimeInterval)) -} - -// isValidGenesisBlockTime checks if a timestamp is valid for a genesis block. -func (config *latticeDataConfig) isValidGenesisBlockTime(b *types.Block) bool { - return !b.Timestamp.Before(config.roundBeginTime) -} - -// newLatticeDataConfig constructs a latticeDataConfig instance. -func newLatticeDataConfig( - prev *latticeDataConfig, cur *types.Config) *latticeDataConfig { - c := &latticeDataConfig{} - c.fromConfig(prev.roundID+1, cur) - c.setRoundBeginTime(prev.roundEndTime) - return c -} - -// latticeData is a module for storing lattice. -type latticeData struct { - // DB for getting blocks purged in memory. - db db.Database - // chains stores chains' blocks and other info. - chains []*chainStatus - // blockByHash stores blocks, indexed by block hash. - blockByHash map[common.Hash]*types.Block - // This stores configuration for each round. - configs []*latticeDataConfig - // shallowBlocks stores the hash of blocks that their body is not receive yet. - shallowBlocks map[common.Hash]types.Position -} - -// newLatticeData creates a new latticeData instance. -func newLatticeData( - db db.Database, - dMoment time.Time, - round uint64, - config *types.Config) (data *latticeData) { - - genesisConfig := &latticeDataConfig{} - genesisConfig.fromConfig(round, config) - genesisConfig.setRoundBeginTime(dMoment) - data = &latticeData{ - db: db, - chains: make([]*chainStatus, genesisConfig.numChains), - blockByHash: make(map[common.Hash]*types.Block), - configs: []*latticeDataConfig{genesisConfig}, - shallowBlocks: make(map[common.Hash]types.Position), - } - for i := range data.chains { - data.chains[i] = &chainStatus{ - ID: uint32(i), - blocks: []*types.Block{}, - lastAckPos: make([]*types.Position, genesisConfig.numChains), - } - } - return -} - -func (data *latticeData) addShallowBlock(hash common.Hash, pos types.Position) { - // We don't care other errors here. This `if` is to prevent being spammed by - // very old blocks. - if _, err := data.findBlock(hash); err != db.ErrBlockDoesNotExist { - return - } - data.shallowBlocks[hash] = pos -} - -func (data *latticeData) checkAckingRelations( - b *types.Block, allowShallow bool) error { - acksByChainID := make(map[uint32]struct{}, len(data.chains)) - for _, hash := range b.Acks { - bAck, err := data.findBlock(hash) - if err != nil { - if err == db.ErrBlockDoesNotExist { - err = &ErrAckingBlockNotExists{hash} - if allowShallow { - if pos, exist := data.shallowBlocks[hash]; exist { - bAck = &types.Block{ - Position: pos, - } - err = nil - } - } - } - if err != nil { - return err - } - } - // Check if it acks blocks from old rounds, the allowed round difference - // is 1. - if DiffUint64(bAck.Position.Round, b.Position.Round) > 1 { - return ErrRoundOutOfRange - } - // Check if it acks older blocks than blocks on the same chain. - lastAckPos := - data.chains[bAck.Position.ChainID].lastAckPos[b.Position.ChainID] - if lastAckPos != nil && !bAck.Position.Newer(lastAckPos) { - return ErrDoubleAck - } - // Check if it acks two blocks on the same chain. This would need - // to check after we replace map with slice for acks. - if _, acked := acksByChainID[bAck.Position.ChainID]; acked { - return ErrDuplicatedAckOnOneChain - } - acksByChainID[bAck.Position.ChainID] = struct{}{} - } - return nil -} - -func (data *latticeData) sanityCheck(b *types.Block, allowShallow bool) error { - // TODO(mission): Check if its proposer is in validator set, lattice has no - // knowledge about node set. - config := data.getConfig(b.Position.Round) - if config == nil { - return ErrInvalidRoundID - } - // Check if the chain id is valid. - if b.Position.ChainID >= config.numChains { - return utils.ErrInvalidChainID - } - // Make sure parent block is arrived. - chain := data.chains[b.Position.ChainID] - chainTip := chain.tip - if chainTip == nil { - if !b.ParentHash.Equal(common.Hash{}) { - return &ErrAckingBlockNotExists{b.ParentHash} - } - if !b.IsGenesis() { - return ErrNotGenesisBlock - } - if !config.isValidGenesisBlockTime(b) { - return ErrIncorrectBlockTime - } - return data.checkAckingRelations(b, allowShallow) - } - // Check parent block if parent hash is specified. - if !b.ParentHash.Equal(common.Hash{}) { - if !b.ParentHash.Equal(chainTip.Hash) { - return &ErrAckingBlockNotExists{b.ParentHash} - } - if !b.IsAcking(b.ParentHash) { - return ErrNotAckParent - } - } - chainTipConfig := data.getConfig(chainTip.Position.Round) - // Round can't be rewinded. - if chainTip.Position.Round > b.Position.Round { - return ErrInvalidRoundID - } - checkTip := false - if chainTip.Timestamp.After(chainTipConfig.roundEndTime) { - // Round switching should happen when chainTip already pass - // round end time of its round. - if chainTip.Position.Round == b.Position.Round { - return ErrRoundNotSwitch - } - // The round ID is continuous. - if b.Position.Round-chainTip.Position.Round == 1 { - checkTip = true - } else { - // This block should be genesis block of new round because round - // ID is not continuous. - if !b.IsGenesis() { - return ErrNotGenesisBlock - } - if !config.isValidGenesisBlockTime(b) { - return ErrIncorrectBlockTime - } - // TODO(mission): make sure rounds between chainTip and current block - // don't expect blocks from this chain. - } - } else { - if chainTip.Position.Round != b.Position.Round { - // Round should not switch. - return ErrInvalidRoundID - } - checkTip = true - } - // Validate the relation between chain tip when needed. - if checkTip { - if b.Position.Height != chainTip.Position.Height+1 { - return ErrInvalidBlockHeight - } - if b.Witness.Height < chainTip.Witness.Height { - return ErrInvalidWitness - } - if !config.isValidBlockTime(b, chainTip.Timestamp) { - return ErrIncorrectBlockTime - } - // Chain tip should be acked. - if !b.IsAcking(chainTip.Hash) { - return ErrNotAckParent - } - } - return data.checkAckingRelations(b, allowShallow) -} - -// addBlock processes blocks. It does sanity check, inserts block into lattice -// and deletes blocks which will not be used. -func (data *latticeData) addBlock( - block *types.Block) (deliverable []*types.Block, err error) { - var ( - bAck *types.Block - updated bool - ) - if err = data.db.PutBlock(*block); err != nil { - if err == db.ErrBlockExists { - // If a node is crashed and restarted, we might encounter some - // blocks that already confirmed but not delivered yet. Then - // syncer might still try to add that block in this way. - err = nil - } else { - return - } - } - data.chains[block.Position.ChainID].addBlock(block) - data.blockByHash[block.Hash] = block - // Update lastAckPos. - for _, ack := range block.Acks { - if bAck, err = data.findBlock(ack); err != nil { - return - } - data.chains[bAck.Position.ChainID].lastAckPos[block.Position.ChainID] = - bAck.Position.Clone() - } - // Extract deliverable blocks to total ordering. A block is deliverable to - // total ordering iff all its ackings blocks were delivered to total ordering. - for { - updated = false - for _, status := range data.chains { - if status.nextOutputIndex >= len(status.blocks) { - continue - } - tip := status.blocks[status.nextOutputIndex] - allAckingBlockDelivered := true - for _, ack := range tip.Acks { - if bAck, err = data.findBlock(ack); err != nil { - if err == db.ErrBlockDoesNotExist { - err = nil - allAckingBlockDelivered = false - break - } - return - } - // Check if this block is outputed or not. - idx := data.chains[bAck.Position.ChainID].findBlock(&bAck.Position) - var ok bool - if idx == -1 { - // Either the block is delivered or not added to chain yet. - if out := - data.chains[bAck.Position.ChainID].lastOutputPosition; out != nil { - ok = !out.Older(&bAck.Position) - } else if ackTip := - data.chains[bAck.Position.ChainID].tip; ackTip != nil { - ok = !ackTip.Position.Older(&bAck.Position) - } - } else { - ok = idx < data.chains[bAck.Position.ChainID].nextOutputIndex - } - if ok { - continue - } - // This acked block exists and not delivered yet. - allAckingBlockDelivered = false - } - if allAckingBlockDelivered { - status.lastOutputPosition = &tip.Position - status.nextOutputIndex++ - deliverable = append(deliverable, tip) - updated = true - } - } - if !updated { - break - } - } - return -} - -// addFinalizedBlock processes block for syncing internal data. -func (data *latticeData) addFinalizedBlock(block *types.Block) (err error) { - var bAck *types.Block - chain := data.chains[block.Position.ChainID] - if chain.tip != nil && chain.tip.Position.Height >= block.Position.Height { - return - } - chain.nextOutputIndex = 0 - chain.blocks = []*types.Block{} - chain.tip = block - chain.lastOutputPosition = nil - // Update lastAckPost. - for _, ack := range block.Acks { - if bAck, err = data.findBlock(ack); err != nil { - return - } - data.chains[bAck.Position.ChainID].lastAckPos[block.Position.ChainID] = - bAck.Position.Clone() - } - return -} - -func (data *latticeData) tipRound(chainID uint32) uint64 { - if tip := data.chains[chainID].tip; tip != nil { - tipConfig := data.getConfig(tip.Position.Round) - offset := uint64(0) - if tip.Timestamp.After(tipConfig.roundEndTime) { - offset++ - } - return tip.Position.Round + offset - } - return uint64(0) - -} - -// isBindTip checks if a block's fields should follow up its parent block. -func (data *latticeData) isBindTip( - pos types.Position, tip *types.Block) (bindTip bool, err error) { - if tip == nil { - return - } - if pos.Round < tip.Position.Round { - err = ErrInvalidRoundID - return - } - tipConfig := data.getConfig(tip.Position.Round) - if tip.Timestamp.After(tipConfig.roundEndTime) { - if pos.Round == tip.Position.Round { - err = ErrRoundNotSwitch - return - } - if pos.Round == tip.Position.Round+1 { - bindTip = true - } - } else { - if pos.Round != tip.Position.Round { - err = ErrInvalidRoundID - return - } - bindTip = true - } - return -} - -// prepareBlock setups fields of a block based on its ChainID and Round, -// including: -// - Acks -// - Timestamp -// - ParentHash and Height from parent block. If there is no valid parent block -// (e.g. Newly added chain or bootstrap), these fields should be setup as -// genesis block. -func (data *latticeData) prepareBlock(b *types.Block) error { - var ( - minTimestamp time.Time - config *latticeDataConfig - acks common.Hashes - bindTip bool - ) - if config = data.getConfig(b.Position.Round); config == nil { - return ErrUnknownRoundID - } - // If chainID is illegal in this round, reject it. - if b.Position.ChainID >= config.numChains { - return utils.ErrInvalidChainID - } - // Reset fields to make sure we got these information from parent block. - b.Position.Height = 0 - b.ParentHash = common.Hash{} - // Decide valid timestamp range. - chainTip := data.chains[b.Position.ChainID].tip - if chainTip != nil { - // TODO(mission): find a way to prevent us to assign a witness height - // from Jurassic period. - b.Witness.Height = chainTip.Witness.Height - } - bindTip, err := data.isBindTip(b.Position, chainTip) - if err != nil { - return err - } - // For blocks with continuous round ID, assign timestamp range based on - // parent block and bound config. - if bindTip { - minTimestamp = chainTip.Timestamp.Add(config.minBlockTimeInterval) - // When a chain is removed and added back, the reference block - // of previous round can't be used as parent block. - b.ParentHash = chainTip.Hash - b.Position.Height = chainTip.Position.Height + 1 - } else { - // Discontinuous round ID detected, another fresh start of - // new round. - minTimestamp = config.roundBeginTime - } - // Fix timestamp if the given one is invalid. - if b.Timestamp.Before(minTimestamp) { - b.Timestamp = minTimestamp - } - // Setup acks fields. - for _, status := range data.chains { - // Check if we can ack latest block on that chain. - if status.tip == nil { - continue - } - lastAckPos := status.lastAckPos[b.Position.ChainID] - if lastAckPos != nil && !status.tip.Position.Newer(lastAckPos) { - // The reference block is already acked. - continue - } - if status.tip.Position.Round > b.Position.Round { - // Avoid forward acking: acking some block from later rounds. - continue - } - if b.Position.Round > status.tip.Position.Round+1 { - // Can't ack block too old or too new to us. - continue - } - acks = append(acks, status.tip.Hash) - } - b.Acks = common.NewSortedHashes(acks) - return nil -} - -// prepareEmptyBlock setups fields of a block based on its ChainID. -// including: -// - Acks only acking its parent -// - Timestamp with parent.Timestamp + minBlockProposeInterval -// - ParentHash and Height from parent block. If there is no valid parent block -// (ex. Newly added chain or bootstrap), these fields would be setup as -// genesis block. -func (data *latticeData) prepareEmptyBlock(b *types.Block) (err error) { - // emptyBlock has no proposer. - b.ProposerID = types.NodeID{} - // Reset fields to make sure we got these information from parent block. - b.Position.Height = 0 - b.ParentHash = common.Hash{} - b.Timestamp = time.Time{} - // Decide valid timestamp range. - config := data.getConfig(b.Position.Round) - chainTip := data.chains[b.Position.ChainID].tip - bindTip, err := data.isBindTip(b.Position, chainTip) - if err != nil { - return - } - if bindTip { - b.ParentHash = chainTip.Hash - b.Position.Height = chainTip.Position.Height + 1 - b.Timestamp = chainTip.Timestamp.Add(config.minBlockTimeInterval) - b.Witness.Height = chainTip.Witness.Height - b.Witness.Data = make([]byte, len(chainTip.Witness.Data)) - copy(b.Witness.Data, chainTip.Witness.Data) - b.Acks = common.NewSortedHashes(common.Hashes{chainTip.Hash}) - } else { - b.Timestamp = config.roundBeginTime - } - return -} - -// TODO(mission): make more abstraction for this method. -// nextBlock returns the next height and timestamp of a chain. -func (data *latticeData) nextBlock( - round uint64, chainID uint32) (uint64, time.Time, error) { - chainTip := data.chains[chainID].tip - bindTip, err := data.isBindTip( - types.Position{Round: round, ChainID: chainID}, chainTip) - if err != nil { - return 0, time.Time{}, err - } - config := data.getConfig(round) - if bindTip { - return chainTip.Position.Height + 1, - chainTip.Timestamp.Add(config.minBlockTimeInterval), nil - } - return 0, config.roundBeginTime, nil -} - -// findBlock seeks blocks in memory or db. -func (data *latticeData) findBlock(h common.Hash) (b *types.Block, err error) { - if b = data.blockByHash[h]; b != nil { - return - } - var tmpB types.Block - if tmpB, err = data.db.GetBlock(h); err != nil { - return - } - b = &tmpB - return -} - -// purgeBlocks purges blocks from cache. -func (data *latticeData) purgeBlocks(blocks []*types.Block) error { - for _, b := range blocks { - if _, exists := data.blockByHash[b.Hash]; !exists { - return ErrPurgedBlockNotFound - } - delete(data.blockByHash, b.Hash) - // Blocks are purged in ascending order by position. - if err := data.chains[b.Position.ChainID].purgeBlock(b); err != nil { - return err - } - } - return nil -} - -// getConfig get configuration for lattice-data by round ID. -func (data *latticeData) getConfig(round uint64) (config *latticeDataConfig) { - r := data.configs[0].roundID - if round < r || round >= r+uint64(len(data.configs)) { - return - } - return data.configs[round-r] -} - -// appendConfig appends a configuration for upcoming round. Rounds appended -// should be consecutive. -func (data *latticeData) appendConfig( - round uint64, config *types.Config) (err error) { - // Check if the round of config is increasing by 1. - if round != uint64(len(data.configs))+data.configs[0].roundID { - return ErrRoundNotIncreasing - } - // Set round beginning time. - newConfig := newLatticeDataConfig(data.configs[len(data.configs)-1], config) - data.configs = append(data.configs, newConfig) - // Resize each slice if incoming config contains larger number of chains. - if uint32(len(data.chains)) < newConfig.numChains { - count := newConfig.numChains - uint32(len(data.chains)) - for _, status := range data.chains { - status.lastAckPos = append( - status.lastAckPos, make([]*types.Position, count)...) - } - for i := uint32(len(data.chains)); i < newConfig.numChains; i++ { - data.chains = append(data.chains, &chainStatus{ - ID: i, - blocks: []*types.Block{}, - lastAckPos: make([]*types.Position, newConfig.numChains), - }) - } - } - return nil -} - -type chainStatus struct { - // ID keeps the chainID of this chain status. - ID uint32 - // blocks stores blocks proposed for this chain, sorted by height. - blocks []*types.Block - // tip is the last block on this chain. - tip *types.Block - // lastAckPos caches last acking position from other chains. Nil means - // not acked yet. - lastAckPos []*types.Position - // the index to be output next time. - nextOutputIndex int - // the position of output last time. - lastOutputPosition *types.Position -} - -// findBlock finds index of block in current pending blocks on this chain. -// Return -1 if not found. -func (s *chainStatus) findBlock(pos *types.Position) (idx int) { - idx = sort.Search(len(s.blocks), func(i int) bool { - return s.blocks[i].Position.Newer(pos) || - s.blocks[i].Position.Equal(pos) - }) - if idx == len(s.blocks) { - idx = -1 - } else if !s.blocks[idx].Position.Equal(pos) { - idx = -1 - } - return idx -} - -// getBlock returns a pending block by giving its index from findBlock method. -func (s *chainStatus) getBlock(idx int) (b *types.Block) { - if idx < 0 || idx >= len(s.blocks) { - return - } - b = s.blocks[idx] - return -} - -// addBlock adds a block to pending blocks on this chain. -func (s *chainStatus) addBlock(b *types.Block) { - s.blocks = append(s.blocks, b) - s.tip = b -} - -// purgeBlock purges a block from cache, make sure this block is already saved -// in db. -func (s *chainStatus) purgeBlock(b *types.Block) error { - if b.Hash != s.blocks[0].Hash || s.nextOutputIndex <= 0 { - return ErrPurgeNotDeliveredBlock - } - s.blocks = s.blocks[1:] - s.nextOutputIndex-- - return nil -} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go deleted file mode 100644 index de0e54910..000000000 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go +++ /dev/null @@ -1,363 +0,0 @@ -// Copyright 2018 The dexon-consensus Authors -// This file is part of the dexon-consensus library. -// -// The dexon-consensus library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus library. If not, see -// . - -package core - -import ( - "fmt" - "sync" - "time" - - "github.com/dexon-foundation/dexon-consensus/common" - "github.com/dexon-foundation/dexon-consensus/core/db" - "github.com/dexon-foundation/dexon-consensus/core/types" - "github.com/dexon-foundation/dexon-consensus/core/utils" -) - -// Errors for sanity check error. -var ( - ErrRetrySanityCheckLater = fmt.Errorf("retry sanity check later") -) - -// Lattice represents a unit to produce a global ordering from multiple chains. -type Lattice struct { - lock sync.RWMutex - signer *utils.Signer - app Application - debug Debug - pool blockPool - data *latticeData - toModule *totalOrdering - ctModule *consensusTimestamp - logger common.Logger -} - -// NewLattice constructs an Lattice instance. -func NewLattice( - dMoment time.Time, - round uint64, - cfg *types.Config, - signer *utils.Signer, - app Application, - debug Debug, - db db.Database, - logger common.Logger) *Lattice { - - // Create genesis latticeDataConfig. - return &Lattice{ - signer: signer, - app: app, - debug: debug, - pool: newBlockPool(cfg.NumChains), - data: newLatticeData(db, dMoment, round, cfg), - toModule: newTotalOrdering(dMoment, round, cfg), - ctModule: newConsensusTimestamp(dMoment, round, cfg.NumChains), - logger: logger, - } -} - -// PrepareBlock setups block's fields based on current status. -func (l *Lattice) PrepareBlock( - b *types.Block, proposeTime time.Time) (err error) { - - l.lock.RLock() - defer l.lock.RUnlock() - - b.Timestamp = proposeTime - if err = l.data.prepareBlock(b); err != nil { - return - } - l.logger.Debug("Calling Application.PreparePayload", "position", &b.Position) - if b.Payload, err = l.app.PreparePayload(b.Position); err != nil { - return - } - l.logger.Debug("Calling Application.PrepareWitness", - "height", b.Witness.Height) - if b.Witness, err = l.app.PrepareWitness(b.Witness.Height); err != nil { - return - } - err = l.signer.SignBlock(b) - return -} - -// PrepareEmptyBlock setups block's fields based on current lattice status. -func (l *Lattice) PrepareEmptyBlock(b *types.Block) (err error) { - l.lock.RLock() - defer l.lock.RUnlock() - if err = l.data.prepareEmptyBlock(b); err != nil { - return - } - b.Hash, err = utils.HashBlock(b) - return -} - -// AddShallowBlock adds a hash of a block that is confirmed by other nodes but -// the content is not arrived yet. -func (l *Lattice) AddShallowBlock(hash common.Hash, pos types.Position) { - l.lock.Lock() - defer l.lock.Unlock() - l.data.addShallowBlock(hash, pos) -} - -// SanityCheck checks the validity of a block. -// -// If any acking block of this block does not exist, Lattice caches this block -// and retries when Lattice.ProcessBlock is called. -func (l *Lattice) SanityCheck(b *types.Block, allowShallow bool) (err error) { - if b.IsEmpty() { - // Only need to verify block's hash. - var hash common.Hash - if hash, err = utils.HashBlock(b); err != nil { - return - } - if b.Hash != hash { - return ErrInvalidBlock - } - } else { - // Verify block's signature. - if err = utils.VerifyBlockSignature(b); err != nil { - return - } - } - // Make sure acks are sorted. - for i := range b.Acks { - if i == 0 { - continue - } - if !b.Acks[i-1].Less(b.Acks[i]) { - err = ErrAcksNotSorted - return - } - } - l.lock.RLock() - defer l.lock.RUnlock() - if err = l.data.sanityCheck(b, allowShallow); err != nil { - if _, ok := err.(*ErrAckingBlockNotExists); ok { - err = ErrRetrySanityCheckLater - } - return - } - return -} - -// Exist checks if the block is known to lattice. -func (l *Lattice) Exist(hash common.Hash) bool { - l.lock.RLock() - defer l.lock.RUnlock() - _, err := l.data.findBlock(hash) - return err == nil -} - -// addBlockToLattice adds a block into lattice, and delivers blocks with the -// acks already delivered. -// -// NOTE: input block should pass sanity check. -func (l *Lattice) addBlockToLattice( - input *types.Block) (outputBlocks []*types.Block, err error) { - - if tip := l.data.chains[input.Position.ChainID].tip; tip != nil { - if !input.Position.Newer(&tip.Position) { - l.logger.Warn("Dropping block: older than tip", - "block", input, "tip", tip) - return - } - } - l.pool.addBlock(input) - // Check tips in pool to check their validity for moving blocks from pool - // to lattice. - for { - hasOutput := false - for i := uint32(0); i < uint32(len(l.pool)); i++ { - var tip *types.Block - if tip = l.pool.tip(i); tip == nil { - continue - } - err = l.data.sanityCheck(tip, false) - if err == nil { - var output []*types.Block - if output, err = l.data.addBlock(tip); err != nil { - // We should be able to add this block once sanity check - // passed. - l.logger.Error("Failed to add sanity-checked block", - "block", tip, "error", err) - panic(err) - } - delete(l.data.shallowBlocks, tip.Hash) - hasOutput = true - outputBlocks = append(outputBlocks, output...) - l.pool.removeTip(i) - continue - } - if _, ok := err.(*ErrAckingBlockNotExists); ok { - l.logger.Debug("Pending block for lattice", - "pending", tip, - "err", err, - "last", l.data.chains[tip.Position.ChainID].tip) - err = nil - continue - } else { - l.logger.Error("Unexpected sanity check error", - "block", tip, "error", err) - panic(err) - } - } - if !hasOutput { - break - } - } - - for _, b := range outputBlocks { - l.logger.Debug("Calling Application.BlockConfirmed", "block", b) - l.app.BlockConfirmed(*b.Clone()) - // Purge blocks in pool with the same chainID and lower height. - l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height) - } - - return -} - -// ProcessBlock adds a block into lattice, and deliver ordered blocks. -// If any block pass sanity check after this block add into lattice, they -// would be returned, too. -// -// NOTE: assume the block passed sanity check. -func (l *Lattice) ProcessBlock( - input *types.Block) (delivered []*types.Block, err error) { - var ( - b *types.Block - inLattice []*types.Block - toDelivered []*types.Block - deliveredMode uint32 - ) - l.lock.Lock() - defer l.lock.Unlock() - if inLattice, err = l.addBlockToLattice(input); err != nil { - return - } - if len(inLattice) == 0 { - return - } - for _, b = range inLattice { - if err = l.toModule.addBlock(b); err != nil { - // All errors from total ordering is serious, should panic. - panic(err) - } - } - for { - toDelivered, deliveredMode, err = l.toModule.extractBlocks() - if err != nil { - panic(err) - } - if len(toDelivered) == 0 { - break - } - if l.debug != nil { - hashes := make(common.Hashes, len(toDelivered)) - for idx := range toDelivered { - hashes[idx] = toDelivered[idx].Hash - } - l.debug.TotalOrderingDelivered(hashes, deliveredMode) - } - // Perform consensus timestamp module. - if err = l.ctModule.processBlocks(toDelivered); err != nil { - break - } - delivered = append(delivered, toDelivered...) - } - return -} - -// NextBlock returns expected height and timestamp of incoming block for -// specified chain and given round. -func (l *Lattice) NextBlock(round uint64, chainID uint32) ( - uint64, time.Time, error) { - l.lock.RLock() - defer l.lock.RUnlock() - return l.data.nextBlock(round, chainID) -} - -// TipRound returns the round of the tip of given chain. -func (l *Lattice) TipRound(chainID uint32) uint64 { - l.lock.RLock() - defer l.lock.RUnlock() - return l.data.tipRound(chainID) -} - -// PurgeBlocks purges blocks' cache in memory, this is called when the caller -// makes sure those blocks are already saved in db. -func (l *Lattice) PurgeBlocks(blocks []*types.Block) error { - l.lock.Lock() - defer l.lock.Unlock() - return l.data.purgeBlocks(blocks) -} - -// AppendConfig adds a new config for upcoming rounds. If a config of round r is -// added, only config in round r + 1 is allowed next. -func (l *Lattice) AppendConfig(round uint64, config *types.Config) (err error) { - l.lock.Lock() - defer l.lock.Unlock() - l.pool.resize(config.NumChains) - if err = l.data.appendConfig(round, config); err != nil { - return - } - if err = l.toModule.appendConfig(round, config); err != nil { - return - } - if err = l.ctModule.appendConfig(round, config); err != nil { - return - } - return -} - -// ProcessFinalizedBlock is used for syncing lattice data. -func (l *Lattice) ProcessFinalizedBlock( - b *types.Block) (delivered []*types.Block, err error) { - var ( - toDelivered []*types.Block - deliveredMode uint32 - ) - l.lock.Lock() - defer l.lock.Unlock() - // Syncing state for core.latticeData module. - if err = l.data.addFinalizedBlock(b); err != nil { - return - } - l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height) - // Syncing state for core.totalOrdering module. - if err = l.toModule.addBlock(b); err != nil { - return - } - for { - toDelivered, deliveredMode, err = l.toModule.extractBlocks() - if err != nil || len(toDelivered) == 0 { - break - } - hashes := make(common.Hashes, len(toDelivered)) - for idx := range toDelivered { - hashes[idx] = toDelivered[idx].Hash - } - if l.debug != nil { - l.debug.TotalOrderingDelivered(hashes, deliveredMode) - } - // Sync core.consensusTimestamp module. - if err = l.ctModule.processBlocks(toDelivered); err != nil { - break - } - delivered = append(delivered, toDelivered...) - } - return -} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/negative-ack.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/negative-ack.go deleted file mode 100644 index 417862912..000000000 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/negative-ack.go +++ /dev/null @@ -1,211 +0,0 @@ -// Copyright 2018 The dexon-consensus Authors -// This file is part of the dexon-consensus library. -// -// The dexon-consensus library is free software: you can redistribute it and/or -// modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus library. If not, see -// . - -package core - -import ( - "time" - - "github.com/dexon-foundation/dexon-consensus/core/types" -) - -type negativeAck struct { - // owner is the ID of proposer itself, this is used when deciding - // a node to be restricted or not. - owner types.NodeID - - numOfNodes int - - // timeDelay and timeExpire are for nack timeout. - timeDelay time.Duration - timeExpire time.Duration - - // restricteds stores nodes which has been restricted and the time it's - // restricted. - restricteds map[types.NodeID]time.Time - - // lastVotes and lockedVotes store the votes for nack. lastVotes[nid1][nid2] - // and lockedVotes[nid1][nid2] both mean that nid2 votes nid1. The difference - // is lockedVotes works only when nid1 is restricted, so that the votes are - // needed to be locked. - lastVotes map[types.NodeID]map[types.NodeID]struct{} - lockedVotes map[types.NodeID]map[types.NodeID]struct{} - - // timeDiffs is the cache for last time stamps. timeDiffs[nid1][nid2] means - // the last updated timestamps nid1 sees nid2. - timeDiffs map[types.NodeID]map[types.NodeID]map[types.NodeID]time.Time -} - -// newNegativeAck creates a new negaticeAck instance. -func newNegativeAck(nid types.NodeID) *negativeAck { - n := &negativeAck{ - owner: nid, - numOfNodes: 0, - restricteds: make(map[types.NodeID]time.Time), - lastVotes: make(map[types.NodeID]map[types.NodeID]struct{}), - lockedVotes: make(map[types.NodeID]map[types.NodeID]struct{}), - timeDiffs: make(map[types.NodeID]map[types.NodeID]map[types.NodeID]time.Time), - } - n.addNode(nid) - return n -} - -// processNewVote is called when a new "vote" occurs, that is, a node -// sees that other 2f + 1 nodes think a node is slow. "nid" is the -// node which propesed the block which the timestamps votes and "h" is -// the node been voted to be nacked. -func (n *negativeAck) processNewVote( - nid types.NodeID, - h types.NodeID, -) []types.NodeID { - - nackeds := []types.NodeID{} - if _, exist := n.restricteds[h]; exist { - n.lockedVotes[h][nid] = struct{}{} - if len(n.lockedVotes[h]) > 2*(n.numOfNodes-1)/3 { - nackeds = append(nackeds, h) - delete(n.restricteds, h) - } - } else { - if n.owner == nid { - n.restrict(h) - } else { - n.lastVotes[h][nid] = struct{}{} - if len(n.lastVotes[h]) > (n.numOfNodes-1)/3 { - n.restrict(h) - } - } - } - return nackeds -} - -// processTimestamps process new timestamps of a block which is proposed by -// node nid, and returns the nodes being nacked. -func (n *negativeAck) processTimestamps( - nid types.NodeID, - ts map[types.NodeID]time.Time, -) []types.NodeID { - - n.checkRestrictExpire() - - nackeds := []types.NodeID{} - for h := range n.timeDiffs { - if n.timeDiffs[nid][h][h].Equal(ts[h]) { - votes := 0 - for hh := range n.timeDiffs { - if ts[hh].Sub(n.timeDiffs[nid][h][hh]) >= n.timeDelay { - votes++ - } - } - if votes > 2*((n.numOfNodes-1)/3) { - n.lastVotes[h][nid] = struct{}{} - nack := n.processNewVote(nid, h) - for _, i := range nack { - nackeds = append(nackeds, i) - } - } else { - delete(n.lastVotes[h], nid) - } - } else { - for hh := range n.timeDiffs { - n.timeDiffs[nid][h][hh] = ts[hh] - } - delete(n.lastVotes[h], nid) - } - } - return nackeds -} - -func (n *negativeAck) checkRestrictExpire() { - expired := []types.NodeID{} - now := time.Now() - for h, t := range n.restricteds { - if now.Sub(t) >= n.timeExpire { - expired = append(expired, h) - } - } - for _, h := range expired { - delete(n.restricteds, h) - } -} - -func (n *negativeAck) restrict(nid types.NodeID) { - if _, exist := n.restricteds[nid]; !exist { - n.restricteds[nid] = time.Now().UTC() - n.lockedVotes[nid] = map[types.NodeID]struct{}{} - for h := range n.lastVotes[nid] { - n.lockedVotes[nid][h] = struct{}{} - } - } -} - -func (n *negativeAck) getRestrictedNodes() map[types.NodeID]struct{} { - n.checkRestrictExpire() - ret := map[types.NodeID]struct{}{} - for h := range n.restricteds { - ret[h] = struct{}{} - } - return ret -} - -func (n *negativeAck) setTimeDelay(t time.Duration) { - n.timeDelay = t -} - -func (n *negativeAck) setTimeExpire(t time.Duration) { - n.timeExpire = t -} - -func (n *negativeAck) addNode(nid types.NodeID) { - n.numOfNodes++ - n.lastVotes[nid] = make(map[types.NodeID]struct{}) - n.lockedVotes[nid] = make(map[types.NodeID]struct{}) - - newTimeDiff := make(map[types.NodeID]map[types.NodeID]time.Time) - for h := range n.timeDiffs { - newTimeDiff2 := make(map[types.NodeID]time.Time) - for hh := range n.timeDiffs { - newTimeDiff2[hh] = time.Time{} - } - newTimeDiff[h] = newTimeDiff2 - } - n.timeDiffs[nid] = newTimeDiff - for h := range n.timeDiffs { - n.timeDiffs[h][nid] = make(map[types.NodeID]time.Time) - } -} - -func (n *negativeAck) deleteNode(nid types.NodeID) { - n.numOfNodes-- - - delete(n.timeDiffs, nid) - - for h := range n.lastVotes { - delete(n.lastVotes[h], nid) - } - delete(n.lastVotes, nid) - delete(n.lockedVotes, nid) - - for h := range n.timeDiffs { - delete(n.timeDiffs[h], nid) - for hh := range n.timeDiffs[h] { - delete(n.timeDiffs[h][hh], nid) - } - } - - delete(n.restricteds, nid) -} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/nonblocking.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/nonblocking.go index 56c42fec6..095170bf0 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/nonblocking.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/nonblocking.go @@ -29,11 +29,6 @@ type blockConfirmedEvent struct { block *types.Block } -type totalOrderingDeliveredEvent struct { - blockHashes common.Hashes - mode uint32 -} - type blockDeliveredEvent struct { blockHash common.Hash blockPosition types.Position @@ -91,8 +86,6 @@ func (nb *nonBlocking) run() { switch e := event.(type) { case blockConfirmedEvent: nb.app.BlockConfirmed(*e.block) - case totalOrderingDeliveredEvent: - nb.debug.TotalOrderingDelivered(e.blockHashes, e.mode) case blockDeliveredEvent: nb.app.BlockDelivered(e.blockHash, e.blockPosition, *e.result) default: @@ -133,15 +126,6 @@ func (nb *nonBlocking) BlockConfirmed(block types.Block) { nb.addEvent(blockConfirmedEvent{&block}) } -// TotalOrderingDelivered is called when the total ordering algorithm deliver -// a set of block. -func (nb *nonBlocking) TotalOrderingDelivered( - blockHashes common.Hashes, mode uint32) { - if nb.debug != nil { - nb.addEvent(totalOrderingDeliveredEvent{blockHashes, mode}) - } -} - // BlockDelivered is called when a block is add to the compaction chain. func (nb *nonBlocking) BlockDelivered(blockHash common.Hash, blockPosition types.Position, result types.FinalizationResult) { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go index 75c106793..618d90e8c 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go @@ -63,19 +63,16 @@ type Consensus struct { nodeSetCache *utils.NodeSetCache tsigVerifier *core.TSigVerifierCache - lattice *core.Lattice - validatedChains map[uint32]struct{} - finalizedBlockHashes common.Hashes - latticeLastRound uint64 - randomnessResults map[common.Hash]*types.BlockRandomnessResult - blocks []types.ByPosition - agreements []*agreement - configs []*types.Config - roundBeginTimes []time.Time - agreementRoundCut uint64 + randomnessResults map[common.Hash]*types.BlockRandomnessResult + blocks types.BlocksByPosition + agreementModule *agreement + configs []*types.Config + roundBeginTimes []time.Time + agreementRoundCut uint64 // lock for accessing all fields. lock sync.RWMutex + duringBuffering bool moduleWaitGroup sync.WaitGroup agreementWaitGroup sync.WaitGroup pullChan chan common.Hash @@ -100,16 +97,15 @@ func NewConsensus( logger common.Logger) *Consensus { con := &Consensus{ - dMoment: dMoment, - app: app, - gov: gov, - db: db, - network: network, - nodeSetCache: utils.NewNodeSetCache(gov), - tsigVerifier: core.NewTSigVerifierCache(gov, 7), - prv: prv, - logger: logger, - validatedChains: make(map[uint32]struct{}), + dMoment: dMoment, + app: app, + gov: gov, + db: db, + network: network, + nodeSetCache: utils.NewNodeSetCache(gov), + tsigVerifier: core.NewTSigVerifierCache(gov, 7), + prv: prv, + logger: logger, configs: []*types.Config{ utils.GetConfigWithPanic(gov, 0, logger), }, @@ -119,294 +115,66 @@ func NewConsensus( randomnessResults: make(map[common.Hash]*types.BlockRandomnessResult), } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) + con.agreementModule = newAgreement( + con.receiveChan, con.pullChan, con.nodeSetCache, con.logger) + con.agreementWaitGroup.Add(1) + go func() { + defer con.agreementWaitGroup.Done() + con.agreementModule.run() + }() return con } -func (con *Consensus) initConsensusObj(initBlock *types.Block) { - func() { - con.lock.Lock() - defer con.lock.Unlock() - con.latticeLastRound = initBlock.Position.Round - debugApp, _ := con.app.(core.Debug) - con.lattice = core.NewLattice( - con.roundBeginTimes[con.latticeLastRound], - con.latticeLastRound, - con.configs[con.latticeLastRound], - utils.NewSigner(con.prv), - con.app, - debugApp, - con.db, - con.logger, - ) - }() +func (con *Consensus) assureBuffering() { + if func() bool { + con.lock.RLock() + defer con.lock.RUnlock() + return con.duringBuffering + }() { + return + } + con.lock.Lock() + defer con.lock.Unlock() + if con.duringBuffering { + return + } + con.duringBuffering = true con.startAgreement() con.startNetwork() con.startCRSMonitor() } -func (con *Consensus) checkIfValidated() (validated bool) { - con.lock.RLock() - defer con.lock.RUnlock() - var ( - round = con.blocks[0][0].Position.Round - numChains = con.configs[round].NumChains - validatedChainCount uint32 - ) - // Make sure we validate some block in all chains. - for chainID := range con.validatedChains { - if chainID < numChains { - validatedChainCount++ - } - } - validated = validatedChainCount == numChains - con.logger.Debug("syncer chain-validation status", - "validated-chain", validatedChainCount, - "round", round, - "valid", validated) - return -} - func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) { con.lock.RLock() defer con.lock.RUnlock() - var ( - round = con.blocks[0][0].Position.Round - numChains = con.configs[round].NumChains - compactionTips = make([]*types.Block, numChains) - overlapCount = uint32(0) - ) defer func() { con.logger.Debug("syncer synced status", - "overlap-count", overlapCount, - "num-chain", numChains, "last-block", blocks[len(blocks)-1], - "synced", synced) + "synced", synced, + ) }() - // Find tips (newset blocks) of each chain in compaction chain. - b := blocks[len(blocks)-1] - for tipCount := uint32(0); tipCount < numChains; { - if compactionTips[b.Position.ChainID] == nil { - // Check chainID for config change. - if b.Position.ChainID < numChains { - compactionTips[b.Position.ChainID] = b - tipCount++ - } - } - if (b.Finalization.ParentHash == common.Hash{}) { - return - } - b1, err := con.db.GetBlock(b.Finalization.ParentHash) - if err != nil { - panic(err) - } - b = &b1 - } - // Check if chain tips of compaction chain and current cached confirmed - // blocks are overlapped on each chain, numChains is decided by the round - // of last block we seen on compaction chain. - for chainID, b := range compactionTips { - if len(con.blocks[chainID]) > 0 { - if !b.Position.Older(&con.blocks[chainID][0].Position) { - overlapCount++ - } - } + if len(con.blocks) == 0 || len(blocks) == 0 { + return } - synced = overlapCount == numChains + synced = !blocks[len(blocks)-1].Position.Older(con.blocks[0].Position) return } -// ensureAgreementOverlapRound ensures the oldest blocks in each chain in -// con.blocks are all in the same round, for avoiding config change while -// syncing. -func (con *Consensus) ensureAgreementOverlapRound() bool { +func (con *Consensus) buildAllEmptyBlocks() { con.lock.Lock() defer con.lock.Unlock() - defer func() { - con.logger.Debug("ensureAgreementOverlapRound returned", - "round", con.agreementRoundCut) - }() - if con.agreementRoundCut > 0 { - return true - } // Clean empty blocks on tips of chains. - for idx, bs := range con.blocks { - for len(bs) > 0 && con.isEmptyBlock(bs[0]) { - bs = bs[1:] - } - con.blocks[idx] = bs + for len(con.blocks) > 0 && con.isEmptyBlock(con.blocks[0]) { + con.blocks = con.blocks[1:] } // Build empty blocks. - for _, bs := range con.blocks { - for i := range bs { - if con.isEmptyBlock(bs[i]) { - if bs[i-1].Position.Height == bs[i].Position.Height-1 { - con.buildEmptyBlock(bs[i], bs[i-1]) - } + for i, b := range con.blocks { + if con.isEmptyBlock(b) { + if con.blocks[i-1].Position.Height+1 == b.Position.Height { + con.buildEmptyBlock(b, con.blocks[i-1]) } } } - var tipRoundMap map[uint64]uint32 - for { - tipRoundMap = make(map[uint64]uint32) - for _, bs := range con.blocks { - if len(bs) > 0 { - tipRoundMap[bs[0].Position.Round]++ - } - } - if len(tipRoundMap) <= 1 { - break - } - // Make all tips in same round. - var maxRound uint64 - for r := range tipRoundMap { - if r > maxRound { - maxRound = r - } - } - for idx, bs := range con.blocks { - for len(bs) > 0 && bs[0].Position.Round < maxRound { - bs = bs[1:] - } - con.blocks[idx] = bs - } - } - if len(tipRoundMap) == 1 { - var r uint64 - for r = range tipRoundMap { - break - } - con.logger.Debug("check agreement round cut", - "tip-round", r, - "configs", len(con.configs)) - if tipRoundMap[r] == con.configs[r].NumChains { - con.agreementRoundCut = r - return true - } - } - return false -} - -func (con *Consensus) findLatticeSyncBlock( - blocks []*types.Block) (*types.Block, error) { - lastBlock := blocks[len(blocks)-1] - round := lastBlock.Position.Round - isConfigChanged := func(prev, cur *types.Config) bool { - return prev.K != cur.K || - prev.NumChains != cur.NumChains || - prev.PhiRatio != cur.PhiRatio - } - for { - // Find round r which r-1, r, r+1 are all in same total ordering config. - for { - sameAsPrevRound := round == 0 || !isConfigChanged( - con.configs[round-1], con.configs[round]) - sameAsNextRound := !isConfigChanged( - con.configs[round], con.configs[round+1]) - if sameAsPrevRound && sameAsNextRound { - break - } - if round == 0 { - // Unable to find a safe round, wait for new rounds. - return nil, nil - } - round-- - } - // Find the newset block which round is "round". - for lastBlock.Position.Round != round { - if (lastBlock.Finalization.ParentHash == common.Hash{}) { - return nil, ErrGenesisBlockReached - } - b, err := con.db.GetBlock(lastBlock.Finalization.ParentHash) - if err != nil { - return nil, err - } - lastBlock = &b - } - // Find the deliver set by hash for two times. Blocks in a deliver set - // returned by total ordering is sorted by hash. If a block's parent - // hash is greater than its hash means there is a cut between deliver - // sets. - var curBlock, prevBlock *types.Block - var deliverSetFirstBlock, deliverSetLastBlock *types.Block - curBlock = lastBlock - for { - if (curBlock.Finalization.ParentHash == common.Hash{}) { - return nil, ErrGenesisBlockReached - } - b, err := con.db.GetBlock(curBlock.Finalization.ParentHash) - if err != nil { - return nil, err - } - prevBlock = &b - if !prevBlock.Hash.Less(curBlock.Hash) { - break - } - curBlock = prevBlock - } - deliverSetLastBlock = prevBlock - curBlock = prevBlock - for { - if (curBlock.Finalization.ParentHash == common.Hash{}) { - break - } - b, err := con.db.GetBlock(curBlock.Finalization.ParentHash) - if err != nil { - return nil, err - } - prevBlock = &b - if !prevBlock.Hash.Less(curBlock.Hash) { - break - } - curBlock = prevBlock - } - deliverSetFirstBlock = curBlock - // Check if all blocks from deliverSetFirstBlock to deliverSetLastBlock - // are in the same round. - ok := true - curBlock = deliverSetLastBlock - for { - if curBlock.Position.Round != round { - ok = false - break - } - b, err := con.db.GetBlock(curBlock.Finalization.ParentHash) - if err != nil { - return nil, err - } - curBlock = &b - if curBlock.Hash == deliverSetFirstBlock.Hash { - break - } - } - if ok { - return deliverSetFirstBlock, nil - } - if round == 0 { - return nil, nil - } - round-- - } -} - -func (con *Consensus) processFinalizedBlock(block *types.Block) error { - if con.lattice == nil { - return nil - } - delivered, err := con.lattice.ProcessFinalizedBlock(block) - if err != nil { - return err - } - con.lock.Lock() - defer con.lock.Unlock() - con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash) - for idx, b := range delivered { - if con.finalizedBlockHashes[idx] != b.Hash { - return ErrMismatchBlockHashSequence - } - con.validatedChains[b.Position.ChainID] = struct{}{} - } - con.finalizedBlockHashes = con.finalizedBlockHashes[len(delivered):] - return nil } // SyncBlocks syncs blocks from compaction chain, latest is true if the caller @@ -420,7 +188,8 @@ func (con *Consensus) SyncBlocks( con.logger.Debug("SyncBlocks returned", "synced", synced, "error", err, - "last-block", con.syncedLastBlock) + "last-block", con.syncedLastBlock, + ) }() if con.syncedLastBlock != nil { synced, err = true, ErrAlreadySynced @@ -442,7 +211,8 @@ func (con *Consensus) SyncBlocks( if blocks[0].Finalization.Height != tipHeight+1 { con.logger.Error("mismatched finalization height", "now", blocks[0].Finalization.Height, - "expected", tipHeight+1) + "expected", tipHeight+1, + ) err = ErrInvalidSyncingFinalizationHeight return } @@ -454,7 +224,6 @@ func (con *Consensus) SyncBlocks( ) con.setupConfigs(blocks) for _, b := range blocks { - // TODO(haoping) remove this if lattice puts blocks into db. if err = con.db.PutBlock(*b); err != nil { // A block might be put into db when confirmed by BA, but not // finalized yet. @@ -469,60 +238,15 @@ func (con *Consensus) SyncBlocks( b.Hash, b.Finalization.Height); err != nil { return } - if err = con.processFinalizedBlock(b); err != nil { - return - } - } - if latest && con.lattice == nil { - // New Lattice and find the deliver set of total ordering when "latest" - // is true for first time. Deliver set is found by block hashes. - var syncBlock *types.Block - syncBlock, err = con.findLatticeSyncBlock(blocks) - if err != nil { - if err == ErrGenesisBlockReached { - con.logger.Debug("SyncBlocks skip error", "error", err) - err = nil - } - return - } - if syncBlock != nil { - con.logger.Debug("deliver set found", "block", syncBlock) - // New lattice with the round of syncBlock. - con.initConsensusObj(syncBlock) - con.setupConfigs(blocks) - // Process blocks from syncBlock to blocks' last block. - b := blocks[len(blocks)-1] - blocksCount := - b.Finalization.Height - syncBlock.Finalization.Height + 1 - blocksToProcess := make([]*types.Block, blocksCount) - for { - blocksToProcess[blocksCount-1] = b - blocksCount-- - if b.Hash == syncBlock.Hash { - break - } - var b1 types.Block - b1, err = con.db.GetBlock(b.Finalization.ParentHash) - if err != nil { - return - } - b = &b1 - } - for _, b := range blocksToProcess { - if err = con.processFinalizedBlock(b); err != nil { - return - } - } - } } - if latest && con.ensureAgreementOverlapRound() { + if latest { + con.assureBuffering() + con.buildAllEmptyBlocks() // Check if compaction and agreements' blocks are overlapped. The // overlapping of compaction chain and BA's oldest blocks means the // syncing is done. - if con.checkIfValidated() && con.checkIfSynced(blocks) { - if err = con.Stop(); err != nil { - return - } + if con.checkIfSynced(blocks) { + con.stopBuffering() con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( context.Background(), con.network.ReceiveChan(), func(msg interface{}) { @@ -547,10 +271,6 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { } // flush all blocks in con.blocks into core.Consensus, and build // core.Consensus from syncer. - confirmedBlocks := make([][]*types.Block, len(con.blocks)) - for i, bs := range con.blocks { - confirmedBlocks[i] = []*types.Block(bs) - } randomnessResults := []*types.BlockRandomnessResult{} for _, r := range con.randomnessResults { randomnessResults = append(randomnessResults, r) @@ -566,19 +286,31 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { con.db, con.network, con.prv, - con.lattice, - confirmedBlocks, + con.blocks, randomnessResults, con.dummyMsgBuffer, con.logger) return con.syncedConsensus, err } -// Stop the syncer. +// stopBuffering stops the syncer buffering routines. // // This method is mainly for caller to stop the syncer before synced, the syncer // would call this method automatically after being synced. -func (con *Consensus) Stop() error { +func (con *Consensus) stopBuffering() { + if func() bool { + con.lock.RLock() + defer con.lock.RUnlock() + return !con.duringBuffering + }() { + return + } + con.lock.Lock() + defer con.lock.Unlock() + if !con.duringBuffering { + return + } + con.duringBuffering = false con.logger.Trace("syncer is about to stop") // Stop network and CRS routines, wait until they are all stoped. con.ctxCancel() @@ -588,7 +320,7 @@ func (con *Consensus) Stop() error { con.logger.Trace("stop syncer agreement modules") con.stopAgreement() con.logger.Trace("syncer stopped") - return nil + return } // isEmptyBlock checks if a block is an empty block by both its hash and parent @@ -607,41 +339,6 @@ func (con *Consensus) buildEmptyBlock(b *types.Block, parent *types.Block) { b.Acks = common.NewSortedHashes(common.Hashes{parent.Hash}) } -func (con *Consensus) setupConfigsUntilRound(round uint64) { - curMaxNumChains := uint32(0) - func() { - con.lock.Lock() - defer con.lock.Unlock() - con.logger.Debug("syncer setupConfigs", - "until-round", round, - "length", len(con.configs), - "lattice", con.latticeLastRound) - for r := uint64(len(con.configs)); r <= round; r++ { - cfg := utils.GetConfigWithPanic(con.gov, r, con.logger) - con.configs = append(con.configs, cfg) - con.roundBeginTimes = append( - con.roundBeginTimes, - con.roundBeginTimes[r-1].Add(con.configs[r-1].RoundInterval)) - if cfg.NumChains >= curMaxNumChains { - curMaxNumChains = cfg.NumChains - } - } - // Notify core.Lattice for new configs. - if con.lattice != nil { - for con.latticeLastRound+1 <= round { - con.latticeLastRound++ - if err := con.lattice.AppendConfig( - con.latticeLastRound, - con.configs[con.latticeLastRound]); err != nil { - panic(err) - } - } - } - }() - con.resizeByNumChains(curMaxNumChains) - con.logger.Trace("setupConfgis finished", "round", round) -} - // setupConfigs is called by SyncBlocks with blocks from compaction chain. In // the first time, setupConfigs setups from round 0. func (con *Consensus) setupConfigs(blocks []*types.Block) { @@ -661,25 +358,19 @@ func (con *Consensus) setupConfigs(blocks []*types.Block) { con.setupConfigsUntilRound(maxRound + core.ConfigRoundShift - 1) } -// resizeByNumChains resizes fake lattice and agreement if numChains increases. -// Notice the decreasing case is neglected. -func (con *Consensus) resizeByNumChains(numChains uint32) { +func (con *Consensus) setupConfigsUntilRound(round uint64) { con.lock.Lock() defer con.lock.Unlock() - if numChains > uint32(len(con.blocks)) { - for i := uint32(len(con.blocks)); i < numChains; i++ { - // Resize the pool of blocks. - con.blocks = append(con.blocks, types.ByPosition{}) - // Resize agreement modules. - a := newAgreement( - con.receiveChan, con.pullChan, con.nodeSetCache, con.logger) - con.agreements = append(con.agreements, a) - con.agreementWaitGroup.Add(1) - go func() { - defer con.agreementWaitGroup.Done() - a.run() - }() - } + con.logger.Debug("syncer setupConfigs", + "until-round", round, + "length", len(con.configs), + ) + for r := uint64(len(con.configs)); r <= round; r++ { + cfg := utils.GetConfigWithPanic(con.gov, r, con.logger) + con.configs = append(con.configs, cfg) + con.roundBeginTimes = append( + con.roundBeginTimes, + con.roundBeginTimes[r-1].Add(con.configs[r-1].RoundInterval)) } } @@ -693,17 +384,15 @@ func (con *Consensus) startAgreement() { if !ok { return } - chainID := b.Position.ChainID func() { con.lock.Lock() defer con.lock.Unlock() - // If round is cut in agreements, do not add blocks with - // round less then cut round. - if b.Position.Round < con.agreementRoundCut { + if len(con.blocks) > 0 && + !b.Position.Newer(con.blocks[0].Position) { return } - con.blocks[chainID] = append(con.blocks[chainID], b) - sort.Sort(con.blocks[chainID]) + con.blocks = append(con.blocks, b) + sort.Sort(con.blocks) }() case h, ok := <-con.pullChan: if !ok { @@ -721,18 +410,14 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) { return } // We only have to cache randomness result after cutting round. - if r.Position.Round < func() uint64 { - con.lock.RLock() - defer con.lock.RUnlock() - return con.agreementRoundCut - }() { - return - } - if func() (exists bool) { + if func() bool { con.lock.RLock() defer con.lock.RUnlock() - _, exists = con.randomnessResults[r.BlockHash] - return + if len(con.blocks) > 0 && r.Position.Older(con.blocks[0].Position) { + return true + } + _, exists := con.randomnessResults[r.BlockHash] + return exists }() { return } @@ -740,8 +425,9 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) { if err != nil { con.logger.Error("Unable to get tsig verifier", "hash", r.BlockHash.String()[:6], - "position", &r.Position, - "error", err) + "position", r.Position, + "error", err, + ) return } if !ok { @@ -752,8 +438,9 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) { Type: "bls", Signature: r.Randomness}) { con.logger.Info("Block randomness is not valid", - "position", &r.Position, - "hash", r.BlockHash.String()[:6]) + "position", r.Position, + "hash", r.BlockHash.String()[:6], + ) return } con.lock.Lock() @@ -785,18 +472,19 @@ func (con *Consensus) startNetwork() { if func() bool { con.lock.RLock() defer con.lock.RUnlock() - if pos.ChainID >= uint32(len(con.agreements)) { + if pos.ChainID > 0 { // This error might be easily encountered when the // "latest" parameter of SyncBlocks is turned on too // early. con.logger.Error( "Unknown chainID message received (syncer)", - "position", &pos) + "position", pos, + ) return false } return true }() { - con.agreements[pos.ChainID].inputChan <- val + con.agreementModule.inputChan <- val } case <-con.ctx.Done(): return @@ -817,23 +505,25 @@ func (con *Consensus) startCRSMonitor() { } con.logger.Debug("CRS is ready", "round", round) lastNotifiedRound = round - con.lock.Lock() - defer con.lock.Unlock() - for idx, a := range con.agreements { - loop: - for { - select { - case <-con.ctx.Done(): - break loop - case a.inputChan <- round: - break loop - case <-time.After(500 * time.Millisecond): - con.logger.Debug( - "agreement input channel is full when putting CRS", - "chainID", idx, - "round", round) - } + for func() bool { + con.lock.RLock() + defer con.lock.RUnlock() + if !con.duringBuffering { + return false + } + select { + case <-con.ctx.Done(): + return false + case con.agreementModule.inputChan <- round: + return false + case <-time.After(500 * time.Millisecond): + con.logger.Debug( + "agreement input channel is full when putting CRS", + "round", round, + ) + return true } + }() { } } con.moduleWaitGroup.Add(1) @@ -860,16 +550,10 @@ func (con *Consensus) startCRSMonitor() { } func (con *Consensus) stopAgreement() { - func() { - con.lock.Lock() - defer con.lock.Unlock() - for _, a := range con.agreements { - if a.inputChan != nil { - close(a.inputChan) - a.inputChan = nil - } - } - }() + if con.agreementModule.inputChan != nil { + close(con.agreementModule.inputChan) + con.agreementModule.inputChan = nil + } con.agreementWaitGroup.Wait() close(con.receiveChan) close(con.pullChan) diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering-syncer.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering-syncer.go deleted file mode 100644 index 1360611f7..000000000 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering-syncer.go +++ /dev/null @@ -1,177 +0,0 @@ -// Copyright 2018 The dexon-consensus Authors -// This file is part of the dexon-consensus library. -// -// The dexon-consensus library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus library. If not, see -// . - -package core - -import ( - "sort" - "sync" - - "github.com/dexon-foundation/dexon-consensus/common" - "github.com/dexon-foundation/dexon-consensus/core/types" -) - -type totalOrderingSyncer struct { - lock sync.RWMutex - - numChains uint32 - syncHeight map[uint32]uint64 - syncDeliverySetIdx int - pendingBlocks []*types.Block - inPendingBlocks map[common.Hash]struct{} - - bootstrapChain map[uint32]struct{} - - // Data to restore delivery set. - pendingDeliveryBlocks []*types.Block - deliverySet map[int][]*types.Block - mapToDeliverySet map[common.Hash]int -} - -func newTotalOrderingSyncer(numChains uint32) *totalOrderingSyncer { - return &totalOrderingSyncer{ - numChains: numChains, - syncHeight: make(map[uint32]uint64), - syncDeliverySetIdx: -1, - inPendingBlocks: make(map[common.Hash]struct{}), - bootstrapChain: make(map[uint32]struct{}), - deliverySet: make(map[int][]*types.Block), - mapToDeliverySet: make(map[common.Hash]int), - } -} - -func (tos *totalOrderingSyncer) synced() bool { - tos.lock.RLock() - defer tos.lock.RUnlock() - return tos.syncDeliverySetIdx != -1 -} - -func (tos *totalOrderingSyncer) processBlock( - block *types.Block) (delivered []*types.Block) { - if tos.synced() { - if tos.syncHeight[block.Position.ChainID] >= block.Position.Height { - return - } - delivered = append(delivered, block) - return - } - tos.lock.Lock() - defer tos.lock.Unlock() - tos.inPendingBlocks[block.Hash] = struct{}{} - tos.pendingBlocks = append(tos.pendingBlocks, block) - if block.Position.Height == 0 { - tos.bootstrapChain[block.Position.ChainID] = struct{}{} - } - if uint32(len(tos.bootstrapChain)) == tos.numChains { - // Bootstrap mode. - delivered = tos.pendingBlocks - tos.syncDeliverySetIdx = 0 - for i := uint32(0); i < tos.numChains; i++ { - tos.syncHeight[i] = uint64(0) - } - } else { - maxDeliverySetIdx := -1 - // TODO(jimmy-dexon): below for loop can be optimized. - PendingBlockLoop: - for i, block := range tos.pendingBlocks { - idx, exist := tos.mapToDeliverySet[block.Hash] - if !exist { - continue - } - deliverySet := tos.deliverySet[idx] - // Check if all the blocks in deliverySet are in the pendingBlocks. - for _, dBlock := range deliverySet { - if _, exist := tos.inPendingBlocks[dBlock.Hash]; !exist { - continue PendingBlockLoop - } - } - if idx > maxDeliverySetIdx { - maxDeliverySetIdx = idx - } - // Check if all of the chains have delivered. - for _, dBlock := range deliverySet { - if h, exist := tos.syncHeight[dBlock.Position.ChainID]; exist { - if dBlock.Position.Height < h { - continue - } - } - tos.syncHeight[dBlock.Position.ChainID] = dBlock.Position.Height - } - if uint32(len(tos.syncHeight)) != tos.numChains { - continue - } - // Core is fully synced, it can start delivering blocks from idx. - tos.syncDeliverySetIdx = maxDeliverySetIdx - delivered = make([]*types.Block, 0, i) - break - } - if tos.syncDeliverySetIdx == -1 { - return - } - // Generating delivering blocks. - for i := maxDeliverySetIdx; i < len(tos.deliverySet); i++ { - deliverySet := tos.deliverySet[i] - sort.Sort(types.ByHash(deliverySet)) - for _, block := range deliverySet { - if block.Position.Height > tos.syncHeight[block.Position.ChainID] { - tos.syncHeight[block.Position.ChainID] = block.Position.Height - } - delivered = append(delivered, block) - } - } - // Flush remaining blocks. - for _, block := range tos.pendingBlocks { - if _, exist := tos.mapToDeliverySet[block.Hash]; exist { - continue - } - if block.Position.Height > tos.syncHeight[block.Position.ChainID] { - tos.syncHeight[block.Position.ChainID] = block.Position.Height - } - delivered = append(delivered, block) - } - } - // Clean internal data model to save memory. - tos.pendingBlocks = nil - tos.inPendingBlocks = nil - tos.bootstrapChain = nil - tos.pendingDeliveryBlocks = nil - tos.deliverySet = nil - tos.mapToDeliverySet = nil - return -} - -// The finalized block should be passed by the order of consensus height. -func (tos *totalOrderingSyncer) processFinalizedBlock(block *types.Block) { - if tos.synced() { - return - } - tos.lock.Lock() - defer tos.lock.Unlock() - if len(tos.pendingDeliveryBlocks) > 0 { - if block.Hash.Less( - tos.pendingDeliveryBlocks[len(tos.pendingDeliveryBlocks)-1].Hash) { - // pendingDeliveryBlocks forms a deliverySet. - idx := len(tos.deliverySet) - tos.deliverySet[idx] = tos.pendingDeliveryBlocks - for _, block := range tos.pendingDeliveryBlocks { - tos.mapToDeliverySet[block.Hash] = idx - } - tos.pendingDeliveryBlocks = []*types.Block{} - } - } - tos.pendingDeliveryBlocks = append(tos.pendingDeliveryBlocks, block) -} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go deleted file mode 100644 index 2e2158e7c..000000000 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go +++ /dev/null @@ -1,1321 +0,0 @@ -// Copyright 2018 The dexon-consensus Authors -// This file is part of the dexon-consensus library. -// -// The dexon-consensus library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus library. If not, see -// . - -package core - -import ( - "errors" - "fmt" - "math" - "sort" - "sync" - "time" - - "github.com/dexon-foundation/dexon-consensus/common" - "github.com/dexon-foundation/dexon-consensus/core/types" -) - -const ( - infinity uint64 = math.MaxUint64 -) - -const ( - // TotalOrderingModeError returns mode error. - TotalOrderingModeError uint32 = iota - // TotalOrderingModeNormal returns mode normal. - TotalOrderingModeNormal - // TotalOrderingModeEarly returns mode early. - TotalOrderingModeEarly - // TotalOrderingModeFlush returns mode flush. - TotalOrderingModeFlush -) - -var ( - // ErrInvalidDAG is reported when block subbmitted to totalOrdering - // didn't form a DAG. - ErrInvalidDAG = errors.New("invalid dag") - // ErrFutureRoundDelivered means some blocks from later rounds are - // delivered, this means program error. - ErrFutureRoundDelivered = errors.New("future round delivered") - // ErrBlockFromPastRound means we receive some block from past round. - ErrBlockFromPastRound = errors.New("block from past round") - // ErrTotalOrderingHangs means total ordering hangs somewhere. - ErrTotalOrderingHangs = errors.New("total ordering hangs") - // ErrForwardAck means a block acking some blocks from newer round. - ErrForwardAck = errors.New("forward ack") - // ErrUnexpected means general (I'm lazy) errors. - ErrUnexpected = errors.New("unexpected") - // ErrTotalOrderingPhiRatio means invalid phi ratio - ErrTotalOrderingPhiRatio = errors.New("invalid total ordering phi ratio") -) - -// totalOrderingConfig is the configuration for total ordering. -type totalOrderingConfig struct { - roundBasedConfig - // k represents the k in 'k-level total ordering'. - // In short, only block height equals to (global minimum height + k) - // would be taken into consideration. - k uint64 - - // phi is a const to control how strong the leading preceding block - // should be. - phi uint64 - - numChains uint32 - isFlushRequired bool -} - -func (config *totalOrderingConfig) fromConfig(round uint64, cfg *types.Config) { - config.k = uint64(cfg.K) - config.numChains = cfg.NumChains - config.phi = uint64(float32(cfg.NumChains-1)*cfg.PhiRatio + 1) - config.setupRoundBasedFields(round, cfg) -} - -func newTotalOrderingConfig( - prev *totalOrderingConfig, cur *types.Config) *totalOrderingConfig { - c := &totalOrderingConfig{} - c.fromConfig(prev.roundID+1, cur) - c.setRoundBeginTime(prev.roundEndTime) - prev.isFlushRequired = c.k != prev.k || - c.phi != prev.phi || - c.numChains != prev.numChains - return c -} - -// totalOrderingWinRecord caches the comparison of candidates calculated by -// their height vector. -type totalOrderingWinRecord struct { - wins []int8 - count uint -} - -func (rec *totalOrderingWinRecord) reset() { - rec.count = 0 - for idx := range rec.wins { - rec.wins[idx] = 0 - } -} - -func newTotalOrderingWinRecord(numChains uint32) *totalOrderingWinRecord { - return &totalOrderingWinRecord{ - wins: make([]int8, numChains), - count: 0, - } -} - -// grade implements the 'grade' potential function in algorithm. -func (rec *totalOrderingWinRecord) grade( - numChains uint32, phi uint64, globalAnsLength uint64) int { - if uint64(rec.count) >= phi { - return 1 - } else if uint64(rec.count) < phi-uint64(numChains)+globalAnsLength { - return 0 - } else { - return -1 - } -} - -// totalOrderingHeightRecord records: -// - the minimum height of block which acks this block. -// - the count of blocks acking this block. -type totalOrderingHeightRecord struct{ minHeight, count uint64 } - -// totalOrderingCache caches objects for reuse and not being colloected by GC. -// Each cached target has "get-" and "put-" functions for getting and reusing -// of objects. -type totalOrderingCache struct { - ackedStatus [][]*totalOrderingHeightRecord - heightVectors [][]uint64 - winRecords [][]*totalOrderingWinRecord - winRecordPool sync.Pool - ackedVectors []map[common.Hash]struct{} - numChains uint32 -} - -// newTotalOrderingObjectCache constructs an totalOrderingCache instance. -func newTotalOrderingObjectCache(numChains uint32) *totalOrderingCache { - return &totalOrderingCache{ - winRecordPool: sync.Pool{ - New: func() interface{} { - return newTotalOrderingWinRecord(numChains) - }, - }, - numChains: numChains, - } -} - -// resize makes sure internal storage of totalOrdering instance can handle -// maximum possible numChains in future configs. -func (cache *totalOrderingCache) resize(numChains uint32) { - // Basically, everything in cache needs to be cleaned. - if cache.numChains >= numChains { - return - } - cache.ackedStatus = nil - cache.heightVectors = nil - cache.winRecords = nil - cache.ackedVectors = nil - cache.numChains = numChains - cache.winRecordPool = sync.Pool{ - New: func() interface{} { - return newTotalOrderingWinRecord(numChains) - }, - } -} - -func (cache *totalOrderingCache) getAckedStatus() ( - acked []*totalOrderingHeightRecord) { - - if len(cache.ackedStatus) == 0 { - acked = make([]*totalOrderingHeightRecord, cache.numChains) - for idx := range acked { - acked[idx] = &totalOrderingHeightRecord{count: 0} - } - } else { - acked = cache.ackedStatus[len(cache.ackedStatus)-1] - cache.ackedStatus = cache.ackedStatus[:len(cache.ackedStatus)-1] - // Reset acked status. - for idx := range acked { - acked[idx].count = 0 - } - } - return -} - -func (cache *totalOrderingCache) putAckedStatus( - acked []*totalOrderingHeightRecord) { - // If the recycled objects supports lower numChains than we required, - // don't recycle it. - if uint32(len(acked)) != cache.numChains { - return - } - cache.ackedStatus = append(cache.ackedStatus, acked) -} - -func (cache *totalOrderingCache) getWinRecord() ( - win *totalOrderingWinRecord) { - win = cache.winRecordPool.Get().(*totalOrderingWinRecord) - win.reset() - return -} - -func (cache *totalOrderingCache) putWinRecord(win *totalOrderingWinRecord) { - if win == nil { - return - } - // If the recycled objects supports lower numChains than we required, - // don't recycle it. - if uint32(len(win.wins)) != cache.numChains { - return - } - cache.winRecordPool.Put(win) -} - -func (cache *totalOrderingCache) getHeightVector() (hv []uint64) { - if len(cache.heightVectors) == 0 { - hv = make([]uint64, cache.numChains) - } else { - hv = cache.heightVectors[len(cache.heightVectors)-1] - cache.heightVectors = cache.heightVectors[:len(cache.heightVectors)-1] - } - for idx := range hv { - hv[idx] = infinity - } - return -} - -func (cache *totalOrderingCache) putHeightVector(hv []uint64) { - if uint32(len(hv)) != cache.numChains { - return - } - cache.heightVectors = append(cache.heightVectors, hv) -} - -func (cache *totalOrderingCache) getWinRecords() (w []*totalOrderingWinRecord) { - if len(cache.winRecords) == 0 { - w = make([]*totalOrderingWinRecord, cache.numChains) - } else { - w = cache.winRecords[len(cache.winRecords)-1] - cache.winRecords = cache.winRecords[:len(cache.winRecords)-1] - for idx := range w { - w[idx] = nil - } - } - return -} - -func (cache *totalOrderingCache) putWinRecords(w []*totalOrderingWinRecord) { - if uint32(len(w)) != cache.numChains { - return - } - cache.winRecords = append(cache.winRecords, w) -} - -func (cache *totalOrderingCache) getAckedVector() ( - acked map[common.Hash]struct{}) { - if len(cache.ackedVectors) == 0 { - acked = make(map[common.Hash]struct{}) - } else { - acked, cache.ackedVectors = - cache.ackedVectors[len(cache.ackedVectors)-1], - cache.ackedVectors[:len(cache.ackedVectors)-1] - for k := range acked { - delete(acked, k) - } - } - return -} - -func (cache *totalOrderingCache) putAckedVector( - acked map[common.Hash]struct{}) { - if acked != nil { - cache.ackedVectors = append(cache.ackedVectors, acked) - } -} - -// totalOrderingCandidateInfo stores proceding status for a candidate including -// - acked status as height records, which keeps the number of blocks from other -// chains acking this candidate. -// - cached height vector, which valids height based on K-level used for -// comparison in 'grade' function. -// - cached result of grade function to other candidates. -// -// Height Record: -// When block A acks block B, all blocks proposed from the same proposer as -// block A with higher height also acks block B. Thus records below is needed -// - the minimum height of acking block from that proposer -// - count of acking blocks from that proposer -// to repsent the acking status for block A. -type totalOrderingCandidateInfo struct { - ackedStatus []*totalOrderingHeightRecord - cachedHeightVector []uint64 - winRecords []*totalOrderingWinRecord - hash common.Hash -} - -// newTotalOrderingCandidateInfo creates an totalOrderingCandidateInfo instance. -func newTotalOrderingCandidateInfo( - candidateHash common.Hash, - objCache *totalOrderingCache) *totalOrderingCandidateInfo { - return &totalOrderingCandidateInfo{ - ackedStatus: objCache.getAckedStatus(), - winRecords: objCache.getWinRecords(), - hash: candidateHash, - } -} - -// clean clears information related to another candidate, which should be called -// when that candidate is selected in deliver set. -func (v *totalOrderingCandidateInfo) clean(otherCandidateChainID uint32) { - v.winRecords[otherCandidateChainID] = nil -} - -// recycle recycles objects for later usage, this eases GC's work. -func (v *totalOrderingCandidateInfo) recycle(objCache *totalOrderingCache) { - if v.winRecords != nil { - for _, win := range v.winRecords { - objCache.putWinRecord(win) - } - objCache.putWinRecords(v.winRecords) - } - if v.cachedHeightVector != nil { - objCache.putHeightVector(v.cachedHeightVector) - } - objCache.putAckedStatus(v.ackedStatus) -} - -// addBlock would update totalOrderingCandidateInfo, it's caller's duty -// to make sure the input block acutally acking the target block. -func (v *totalOrderingCandidateInfo) addBlock(b *types.Block) error { - rec := v.ackedStatus[b.Position.ChainID] - if rec.count == 0 { - rec.minHeight = b.Position.Height - rec.count = 1 - } else { - if b.Position.Height <= rec.minHeight { - return ErrInvalidDAG - } - rec.count++ - } - return nil -} - -// getAckingNodeSetLength returns the size of acking node set. Only heights -// larger than "global minimum height + k" are counted. For example, global -// minimum acking height is 1 and k is 1, only block heights which is larger or -// equal to 2 are added into acking node set. -func (v *totalOrderingCandidateInfo) getAckingNodeSetLength( - global *totalOrderingCandidateInfo, - k uint64, - numChains uint32) (count uint64) { - - var rec *totalOrderingHeightRecord - for idx, gRec := range global.ackedStatus[:numChains] { - if gRec.count == 0 { - continue - } - rec = v.ackedStatus[idx] - if rec.count == 0 { - continue - } - if rec.minHeight+rec.count-1 >= gRec.minHeight+k { - count++ - } - } - return -} - -// updateAckingHeightVector would cached acking height vector. -// -// Only block height equals to (global minimum block height + k) would be -// taken into consideration. -func (v *totalOrderingCandidateInfo) updateAckingHeightVector( - global *totalOrderingCandidateInfo, - k uint64, - dirtyChainIDs []int, - objCache *totalOrderingCache) { - - var ( - idx int - gRec, rec *totalOrderingHeightRecord - ) - // The reason for not merging two loops is that the performance impact of map - // iteration is large if the size is large. Iteration of dirty chains is - // faster the map. - // TODO(mission): merge the code in this if/else if the performance won't be - // downgraded when adding a function for the shared part. - if v.cachedHeightVector == nil { - // Generate height vector from scratch. - v.cachedHeightVector = objCache.getHeightVector() - for idx, gRec = range global.ackedStatus { - if gRec.count <= k { - continue - } - rec = v.ackedStatus[idx] - if rec.count == 0 { - v.cachedHeightVector[idx] = infinity - } else if rec.minHeight <= gRec.minHeight+k { - // This check is sufficient to make sure the block height: - // - // gRec.minHeight + k - // - // would be included in this totalOrderingCandidateInfo. - v.cachedHeightVector[idx] = gRec.minHeight + k - } else { - v.cachedHeightVector[idx] = infinity - } - } - } else { - // Return the cached one, only update dirty fields. - for _, idx = range dirtyChainIDs { - gRec = global.ackedStatus[idx] - if gRec.count == 0 || gRec.count <= k { - v.cachedHeightVector[idx] = infinity - continue - } - rec = v.ackedStatus[idx] - if rec.count == 0 { - v.cachedHeightVector[idx] = infinity - } else if rec.minHeight <= gRec.minHeight+k { - v.cachedHeightVector[idx] = gRec.minHeight + k - } else { - v.cachedHeightVector[idx] = infinity - } - } - } - return -} - -// updateWinRecord setups win records from two candidates. -func (v *totalOrderingCandidateInfo) updateWinRecord( - otherChainID uint32, - other *totalOrderingCandidateInfo, - dirtyChainIDs []int, - objCache *totalOrderingCache, - numChains uint32) { - var ( - idx int - height uint64 - ) - // The reason not to merge two loops is that the iteration of map is - // expensive when chain count is large, iterating of dirty chains is cheaper. - // TODO(mission): merge the code in this if/else if adding a function won't - // affect the performance. - win := v.winRecords[otherChainID] - if win == nil { - win = objCache.getWinRecord() - v.winRecords[otherChainID] = win - for idx, height = range v.cachedHeightVector[:numChains] { - if height == infinity { - continue - } - if other.cachedHeightVector[idx] == infinity { - win.wins[idx] = 1 - win.count++ - } - } - } else { - for _, idx = range dirtyChainIDs { - if v.cachedHeightVector[idx] == infinity { - if win.wins[idx] == 1 { - win.wins[idx] = 0 - win.count-- - } - continue - } - if other.cachedHeightVector[idx] == infinity { - if win.wins[idx] == 0 { - win.wins[idx] = 1 - win.count++ - } - } else { - if win.wins[idx] == 1 { - win.wins[idx] = 0 - win.count-- - } - } - } - } -} - -// totalOrderingBreakpoint is a record of height discontinuity on a chain -type totalOrderingBreakpoint struct { - roundID uint64 - // height of last block. - lastHeight uint64 -} - -// totalOrderingGroupVector keeps global status of current pending set. -type totalOrderingGlobalVector struct { - // blocks stores all blocks grouped by their proposers and sorted by height. - // TODO(mission): slice used here reallocates frequently. - blocks [][]*types.Block - - // breakpoints stores rounds for chains that blocks' height on them are - // not consecutive, for example in chain i - // Round Height - // 0 0 - // 0 1 - // 1 2 - // 1 3 - // 1 4 - // 2 - <- a config change of chain number occured - // 2 - - // 3 - - // 3 - - // 4 0 <- a breakpoint for round 3 is cached here - // 5 - - // 5 - - // 6 0 <- breakpoint again - // breakpoints[i][0] == &totalOrderingBreakpoint{roundID: 4, lastHeight: 4} - // breakpoints[i][1] == &totalOrderingBreakpoint{roundID: 6, lastHeight: 0} - breakpoints [][]*totalOrderingBreakpoint - - // curRound stores the last round ID used for purging breakpoints. - curRound uint64 - - // tips records the last seen block for each chain. - tips []*types.Block - - // Only ackedStatus in cachedCandidateInfo is used. - cachedCandidateInfo *totalOrderingCandidateInfo -} - -func newTotalOrderingGlobalVector( - initRound uint64, numChains uint32) *totalOrderingGlobalVector { - return &totalOrderingGlobalVector{ - blocks: make([][]*types.Block, numChains), - tips: make([]*types.Block, numChains), - breakpoints: make([][]*totalOrderingBreakpoint, numChains), - curRound: initRound, - } -} - -func (global *totalOrderingGlobalVector) resize(numChains uint32) { - if len(global.blocks) >= int(numChains) { - return - } - // Resize blocks. - newBlocks := make([][]*types.Block, numChains) - copy(newBlocks, global.blocks) - global.blocks = newBlocks - // Resize breakpoints. - newBreakPoints := make([][]*totalOrderingBreakpoint, numChains) - copy(newBreakPoints, global.breakpoints) - global.breakpoints = newBreakPoints - // Resize tips. - newTips := make([]*types.Block, numChains) - copy(newTips, global.tips) - global.tips = newTips -} - -func (global *totalOrderingGlobalVector) switchRound(roundID uint64) { - if global.curRound+1 != roundID { - panic(ErrUnexpected) - } - global.curRound = roundID - for chainID, bs := range global.breakpoints { - if len(bs) == 0 { - continue - } - if bs[0].roundID == roundID { - global.breakpoints[chainID] = bs[1:] - } - } -} - -func (global *totalOrderingGlobalVector) prepareHeightRecord( - candidate *types.Block, - info *totalOrderingCandidateInfo, - acked map[common.Hash]struct{}) { - - var ( - chainID = candidate.Position.ChainID - breakpoints = global.breakpoints[chainID] - breakpoint *totalOrderingBreakpoint - rec *totalOrderingHeightRecord - ) - // Setup height record for own chain. - rec = &totalOrderingHeightRecord{ - minHeight: candidate.Position.Height, - } - if len(breakpoints) == 0 { - // If no breakpoint, count is the amount of blocks. - rec.count = uint64(len(global.blocks[chainID])) - } else { - // If there are breakpoints, only the first counts. - rec.count = breakpoints[0].lastHeight - candidate.Position.Height + 1 - } - info.ackedStatus[chainID] = rec - if acked == nil { - return - } - for idx, blocks := range global.blocks { - if idx == int(chainID) { - continue - } - breakpoint = nil - if len(global.breakpoints[idx]) > 0 { - breakpoint = global.breakpoints[idx][0] - } - for i, b := range blocks { - if breakpoint != nil && b.Position.Round >= breakpoint.roundID { - break - } - if _, acked := acked[b.Hash]; !acked { - continue - } - // If this block acks the candidate, all newer blocks from the same chain - // also 'indirectly' acks the candidate. - rec = info.ackedStatus[idx] - rec.minHeight = b.Position.Height - if breakpoint == nil { - rec.count = uint64(len(blocks) - i) - } else { - rec.count = breakpoint.lastHeight - b.Position.Height + 1 - } - break - } - } -} - -func (global *totalOrderingGlobalVector) addBlock( - b *types.Block) (isOldest bool, pending bool, err error) { - // isOldest implies the block is the oldest in global vector - chainID := b.Position.ChainID - tip := global.tips[chainID] - isOldest = len(global.blocks[chainID]) == 0 - if tip != nil { - // Perform light weight sanity check based on tip. - if tip.Position.Round > b.Position.Round { - err = ErrInvalidDAG - return - } - if DiffUint64(tip.Position.Round, b.Position.Round) > 1 { - if b.Position.Height != 0 { - err = ErrInvalidDAG - return - } - // Add breakpoint. - if b.Position.Round > global.curRound { - global.breakpoints[chainID] = append( - global.breakpoints[chainID], - &totalOrderingBreakpoint{ - roundID: b.Position.Round, - lastHeight: tip.Position.Height, - }) - } - } else { - if b.Position.Height != tip.Position.Height+1 { - err = ErrInvalidDAG - return - } - } - } else { - if b.Position.Round < global.curRound { - err = ErrBlockFromPastRound - return - } - if b.Position.Round > global.curRound { - // Add breakpoint. - bp := &totalOrderingBreakpoint{ - roundID: b.Position.Round, - lastHeight: 0, - } - global.breakpoints[chainID] = append(global.breakpoints[chainID], bp) - } - } - bps := global.breakpoints[chainID] - pending = len(bps) > 0 && bps[0].roundID <= b.Position.Round - global.blocks[chainID] = append(global.blocks[chainID], b) - global.tips[chainID] = b - return -} - -// updateCandidateInfo udpates cached candidate info. -func (global *totalOrderingGlobalVector) updateCandidateInfo( - dirtyChainIDs []int, objCache *totalOrderingCache) { - var ( - idx int - blocks []*types.Block - block *types.Block - info *totalOrderingCandidateInfo - rec *totalOrderingHeightRecord - breakpoint *totalOrderingBreakpoint - ) - if global.cachedCandidateInfo == nil { - info = newTotalOrderingCandidateInfo(common.Hash{}, objCache) - for idx, blocks = range global.blocks { - if len(blocks) == 0 { - continue - } - rec = info.ackedStatus[idx] - if len(global.breakpoints[idx]) > 0 { - breakpoint = global.breakpoints[idx][0] - block = blocks[0] - if block.Position.Round >= breakpoint.roundID { - continue - } - rec.minHeight = block.Position.Height - rec.count = breakpoint.lastHeight - block.Position.Height + 1 - } else { - rec.minHeight = blocks[0].Position.Height - rec.count = uint64(len(blocks)) - } - } - global.cachedCandidateInfo = info - } else { - info = global.cachedCandidateInfo - for _, idx = range dirtyChainIDs { - blocks = global.blocks[idx] - if len(blocks) == 0 { - info.ackedStatus[idx].count = 0 - continue - } - rec = info.ackedStatus[idx] - if len(global.breakpoints[idx]) > 0 { - breakpoint = global.breakpoints[idx][0] - block = blocks[0] - if block.Position.Round >= breakpoint.roundID { - continue - } - rec.minHeight = block.Position.Height - rec.count = breakpoint.lastHeight - block.Position.Height + 1 - } else { - rec.minHeight = blocks[0].Position.Height - rec.count = uint64(len(blocks)) - } - } - } - return -} - -// totalOrdering represent a process unit to handle total ordering for blocks. -type totalOrdering struct { - // pendings stores blocks awaiting to be ordered. - pendings map[common.Hash]*types.Block - - // The round of config used when performing total ordering. - curRound uint64 - - // duringFlush is a flag to switch the flush mode and normal mode. - duringFlush bool - - // flushReadyChains checks if the last block of that chain arrived. Once - // last blocks from all chains in current config are arrived, we can - // perform flush. - flushReadyChains map[uint32]struct{} - - // flushed is a map of flushed blocks. - flushed map[uint32]struct{} - - // globalVector group all pending blocks by proposers and - // sort them by block height. This structure is helpful when: - // - // - build global height vector - // - picking candidates next round - globalVector *totalOrderingGlobalVector - - // candidates caches result of potential function during generating preceding - // set. - candidates []*totalOrderingCandidateInfo - - // acked stores the 'block A acked by block B' by acked[A.Hash][B.Hash] - acked map[common.Hash]map[common.Hash]struct{} - - // dirtyChainIDs stores chainIDs that is "dirty", i.e. needed updating all - // cached statuses (win record, acking status). - dirtyChainIDs []int - - // objCache caches allocated objects, like map. - objCache *totalOrderingCache - - // candidateChainMapping keeps a mapping from candidate's hash to - // their chain IDs. - candidateChainMapping map[uint32]common.Hash - - // candidateChainIDs records chain ID of all candidates. - candidateChainIDs []uint32 - - // configs keeps configuration for each round in continuous way. - configs []*totalOrderingConfig -} - -// newTotalOrdering constructs an totalOrdering instance. -func newTotalOrdering( - dMoment time.Time, round uint64, cfg *types.Config) *totalOrdering { - config := &totalOrderingConfig{} - config.fromConfig(round, cfg) - config.setRoundBeginTime(dMoment) - candidates := make([]*totalOrderingCandidateInfo, config.numChains) - to := &totalOrdering{ - pendings: make(map[common.Hash]*types.Block), - dirtyChainIDs: make([]int, 0, config.numChains), - acked: make(map[common.Hash]map[common.Hash]struct{}), - objCache: newTotalOrderingObjectCache(config.numChains), - candidateChainMapping: make(map[uint32]common.Hash), - candidates: candidates, - candidateChainIDs: make([]uint32, 0, config.numChains), - curRound: config.roundID, - globalVector: newTotalOrderingGlobalVector( - config.roundID, config.numChains), - } - to.configs = []*totalOrderingConfig{config} - return to -} - -// appendConfig add new configs for upcoming rounds. If you add a config for -// round R, next time you can only add the config for round R+1. -func (to *totalOrdering) appendConfig( - round uint64, config *types.Config) error { - - if round != uint64(len(to.configs))+to.configs[0].roundID { - return ErrRoundNotIncreasing - } - if config.PhiRatio < 0.5 || config.PhiRatio > 1.0 { - return ErrTotalOrderingPhiRatio - } - to.configs = append( - to.configs, - newTotalOrderingConfig(to.configs[len(to.configs)-1], config)) - // Resize internal structures. - to.globalVector.resize(config.NumChains) - to.objCache.resize(config.NumChains) - if int(config.NumChains) > len(to.candidates) { - newCandidates := make([]*totalOrderingCandidateInfo, config.NumChains) - copy(newCandidates, to.candidates) - to.candidates = newCandidates - } - return nil -} - -func (to *totalOrdering) switchRound() { - to.curRound++ - to.globalVector.switchRound(to.curRound) -} - -// buildBlockRelation update all its indirect acks recursively. -func (to *totalOrdering) buildBlockRelation(b *types.Block) { - var ( - curBlock, nextBlock *types.Block - ack common.Hash - acked map[common.Hash]struct{} - exists, alreadyPopulated bool - toCheck = []*types.Block{b} - ) - for len(toCheck) != 0 { - curBlock, toCheck = toCheck[len(toCheck)-1], toCheck[:len(toCheck)-1] - if curBlock.Position.Round > b.Position.Round { - // It's illegal for a block to ack some blocks in future round. - panic(ErrForwardAck) - } - for _, ack = range curBlock.Acks { - if acked, exists = to.acked[ack]; !exists { - acked = to.objCache.getAckedVector() - to.acked[ack] = acked - } - // Check if the block is handled. - if _, alreadyPopulated = acked[b.Hash]; alreadyPopulated { - continue - } - acked[b.Hash] = struct{}{} - // See if we need to do this recursively. - if nextBlock, exists = to.pendings[ack]; exists { - toCheck = append(toCheck, nextBlock) - } - } - } -} - -// clean a block from working set. This behaviour would prevent -// our memory usage growing infinity. -func (to *totalOrdering) clean(b *types.Block) { - var ( - h = b.Hash - chainID = b.Position.ChainID - ) - to.objCache.putAckedVector(to.acked[h]) - delete(to.acked, h) - delete(to.pendings, h) - to.candidates[chainID].recycle(to.objCache) - to.candidates[chainID] = nil - delete(to.candidateChainMapping, chainID) - // Remove this candidate from candidate IDs. - to.candidateChainIDs = - removeFromSortedUint32Slice(to.candidateChainIDs, chainID) - // Clear records of this candidate from other candidates. - for _, idx := range to.candidateChainIDs { - to.candidates[idx].clean(chainID) - } -} - -// updateVectors is a helper function to update all cached vectors. -func (to *totalOrdering) updateVectors( - b *types.Block) (isOldest bool, err error) { - var ( - candidateHash common.Hash - chainID uint32 - acked bool - pending bool - ) - // Update global height vector - if isOldest, pending, err = to.globalVector.addBlock(b); err != nil { - return - } - if to.duringFlush { - // It makes no sense to calculate potential functions of total ordering - // when flushing would be happened. - return - } - if pending { - // The chain of this block contains breakpoints, which means their - // height are not continuous. This implementation of DEXON total - // ordering algorithm assumes the height of blocks in working set should - // be continuous. - // - // To workaround this issue, when block arrived after breakpoints, - // their information would not be contributed to current working set. - // This mechanism works because we switch rounds by flushing and - // reset the whole working set. - // This works because forward acking blocks are rejected. - return - } - // Update candidates' acking status. - for chainID, candidateHash = range to.candidateChainMapping { - if _, acked = to.acked[candidateHash][b.Hash]; !acked { - continue - } - if err = to.candidates[chainID].addBlock(b); err != nil { - return - } - } - return -} - -// prepareCandidate builds totalOrderingCandidateInfo for a new candidate. -func (to *totalOrdering) prepareCandidate(b *types.Block) { - var ( - info = newTotalOrderingCandidateInfo(b.Hash, to.objCache) - chainID = b.Position.ChainID - ) - to.candidates[chainID] = info - to.candidateChainMapping[chainID] = b.Hash - // Add index to slot to allocated list, make sure the modified list is sorted. - to.candidateChainIDs = append(to.candidateChainIDs, chainID) - sort.Slice(to.candidateChainIDs, func(i, j int) bool { - return to.candidateChainIDs[i] < to.candidateChainIDs[j] - }) - to.globalVector.prepareHeightRecord(b, info, to.acked[b.Hash]) - return -} - -// isCandidate checks if a block only contains acks to delivered blocks. -func (to *totalOrdering) isCandidate(b *types.Block) bool { - for _, ack := range b.Acks { - if _, exists := to.pendings[ack]; exists { - return false - } - } - return true -} - -// output finishes the delivery of preceding set. -func (to *totalOrdering) output( - precedings map[common.Hash]struct{}, - numChains uint32) (ret []*types.Block) { - - for p := range precedings { - // Remove the first element from corresponding blockVector. - b := to.pendings[p] - chainID := b.Position.ChainID - // TODO(mission): frequent reallocation here. - to.globalVector.blocks[chainID] = to.globalVector.blocks[chainID][1:] - ret = append(ret, b) - // Remove block relations. - to.clean(b) - to.dirtyChainIDs = append(to.dirtyChainIDs, int(chainID)) - } - sort.Sort(types.ByHash(ret)) - // Find new candidates from global vector's tips. - // The complexity here is O(N^2logN). - // TODO(mission): only tips which acking some blocks in the devliered set - // should be checked. This improvement related to the latency introduced by K. - for chainID, blocks := range to.globalVector.blocks[:numChains] { - if len(blocks) == 0 { - continue - } - if _, picked := to.candidateChainMapping[uint32(chainID)]; picked { - continue - } - if !to.isCandidate(blocks[0]) { - continue - } - // Build totalOrderingCandidateInfo for new candidate. - to.prepareCandidate(blocks[0]) - } - return -} - -// generateDeliverSet generates preceding set and checks if the preceding set -// is deliverable by potential function. -func (to *totalOrdering) generateDeliverSet() ( - delivered map[common.Hash]struct{}, mode uint32) { - - var ( - chainID, otherChainID uint32 - info, otherInfo *totalOrderingCandidateInfo - precedings = make(map[uint32]struct{}) - cfg = to.getCurrentConfig() - ) - mode = TotalOrderingModeNormal - to.globalVector.updateCandidateInfo(to.dirtyChainIDs, to.objCache) - globalInfo := to.globalVector.cachedCandidateInfo - for _, chainID = range to.candidateChainIDs { - to.candidates[chainID].updateAckingHeightVector( - globalInfo, cfg.k, to.dirtyChainIDs, to.objCache) - } - // Update winning records for each candidate. - // TODO(mission): It's not reasonable to request one routine for each - // candidate, the context switch rate would be high. - var wg sync.WaitGroup - wg.Add(len(to.candidateChainIDs)) - for _, chainID := range to.candidateChainIDs { - info = to.candidates[chainID] - go func(can uint32, canInfo *totalOrderingCandidateInfo) { - defer wg.Done() - for _, otherChainID := range to.candidateChainIDs { - if can == otherChainID { - continue - } - canInfo.updateWinRecord( - otherChainID, - to.candidates[otherChainID], - to.dirtyChainIDs, - to.objCache, - cfg.numChains) - } - }(chainID, info) - } - wg.Wait() - // Reset dirty chains. - to.dirtyChainIDs = to.dirtyChainIDs[:0] - globalAnsLength := globalInfo.getAckingNodeSetLength( - globalInfo, cfg.k, cfg.numChains) -CheckNextCandidateLoop: - for _, chainID = range to.candidateChainIDs { - info = to.candidates[chainID] - for _, otherChainID = range to.candidateChainIDs { - if chainID == otherChainID { - continue - } - otherInfo = to.candidates[otherChainID] - // TODO(mission): grade should be bounded by current numChains. - if otherInfo.winRecords[chainID].grade( - cfg.numChains, cfg.phi, globalAnsLength) != 0 { - continue CheckNextCandidateLoop - } - } - precedings[chainID] = struct{}{} - } - if len(precedings) == 0 { - return - } - // internal is a helper function to verify internal stability. - internal := func() bool { - var ( - isPreceding, beaten bool - p uint32 - ) - for _, chainID = range to.candidateChainIDs { - if _, isPreceding = precedings[chainID]; isPreceding { - continue - } - beaten = false - for p = range precedings { - // TODO(mission): grade should be bound by current numChains. - if beaten = to.candidates[p].winRecords[chainID].grade( - cfg.numChains, cfg.phi, globalAnsLength) == 1; beaten { - break - } - } - if !beaten { - return false - } - } - return true - } - // checkAHV is a helper function to verify external stability. - // It would make sure some preceding block is strong enough - // to lead the whole preceding set. - checkAHV := func() bool { - var ( - height, count uint64 - p uint32 - ) - for p = range precedings { - count = 0 - info = to.candidates[p] - for _, height = range info.cachedHeightVector { - if height != infinity { - count++ - if count > cfg.phi { - return true - } - } - } - } - return false - } - // checkANS is a helper function to verify external stability. - // It would make sure all preceding blocks are strong enough - // to be delivered. - checkANS := func() bool { - var chainAnsLength uint64 - for p := range precedings { - chainAnsLength = to.candidates[p].getAckingNodeSetLength( - globalInfo, cfg.k, cfg.numChains) - if uint64(chainAnsLength) < uint64(cfg.numChains)-cfg.phi { - return false - } - } - return true - } - // If all chains propose enough blocks, we should force - // to deliver since the whole picture of the DAG is revealed. - if globalAnsLength != uint64(cfg.numChains) { - // Check internal stability first. - if !internal() { - return - } - - // The whole picture is not ready, we need to check if - // exteranl stability is met, and we can deliver earlier. - if checkAHV() && checkANS() { - mode = TotalOrderingModeEarly - } else { - return - } - } - delivered = make(map[common.Hash]struct{}) - for p := range precedings { - delivered[to.candidates[p].hash] = struct{}{} - } - return -} - -// flushBlocks flushes blocks. -func (to *totalOrdering) flushBlocks() ( - flushed []*types.Block, mode uint32, err error) { - mode = TotalOrderingModeFlush - cfg := to.getCurrentConfig() - - // Flush blocks until last blocks from all chains appeared. - if len(to.flushReadyChains) < int(cfg.numChains) { - return - } - if len(to.flushReadyChains) > int(cfg.numChains) { - // This case should never be occured. - err = ErrFutureRoundDelivered - return - } - // Dump all blocks in this round. - for len(to.flushed) != int(cfg.numChains) { - // Dump all candidates without checking potential function. - flushedHashes := make(map[common.Hash]struct{}) - for _, chainID := range to.candidateChainIDs { - candidateBlock := to.pendings[to.candidates[chainID].hash] - if candidateBlock.Position.Round > to.curRound { - continue - } - flushedHashes[candidateBlock.Hash] = struct{}{} - } - if len(flushedHashes) == 0 { - err = ErrTotalOrderingHangs - return - } - flushedBlocks := to.output(flushedHashes, cfg.numChains) - for _, b := range flushedBlocks { - if cfg.isLastBlock(b) { - to.flushed[b.Position.ChainID] = struct{}{} - } - } - flushed = append(flushed, flushedBlocks...) - } - // Switch back to non-flushing mode. - to.duringFlush = false - to.flushed = make(map[uint32]struct{}) - to.flushReadyChains = make(map[uint32]struct{}) - // Clean all cached intermediate stats. - for idx := range to.candidates { - if to.candidates[idx] == nil { - continue - } - to.candidates[idx].recycle(to.objCache) - to.candidates[idx] = nil - } - to.dirtyChainIDs = nil - to.candidateChainMapping = make(map[uint32]common.Hash) - to.candidateChainIDs = nil - to.globalVector.cachedCandidateInfo = nil - to.switchRound() - // Force picking new candidates. - numChains := to.getCurrentConfig().numChains - to.output(map[common.Hash]struct{}{}, numChains) - return -} - -// deliverBlocks delivers blocks by DEXON total ordering algorithm. -func (to *totalOrdering) deliverBlocks() ( - delivered []*types.Block, mode uint32, err error) { - - hashes, mode := to.generateDeliverSet() - cfg := to.getCurrentConfig() - // Output precedings. - delivered = to.output(hashes, cfg.numChains) - // Check if any block in delivered set is the last block in this round, if - // there is, perform flush or round-switch. - for _, b := range delivered { - if b.Position.Round > to.curRound { - err = ErrFutureRoundDelivered - return - } - if !cfg.isLastBlock(b) { - continue - } - // Code reaches here if a last block is processed. This triggers - // "duringFlush" mode if config changes. - if cfg.isFlushRequired { - // Switch to flush mode. - to.duringFlush = true - to.flushReadyChains = make(map[uint32]struct{}) - to.flushed = make(map[uint32]struct{}) - } else { - // Switch round directly. - to.switchRound() - } - break - } - if to.duringFlush { - // Collect last blocks until all last blocks appears and function - // flushBlocks will be called. - for _, b := range delivered { - if cfg.isLastBlock(b) { - to.flushed[b.Position.ChainID] = struct{}{} - } - } - // Some last blocks for the round to be flushed might not be delivered - // yet. - for _, tip := range to.globalVector.tips[:cfg.numChains] { - if tip.Position.Round > to.curRound || cfg.isLastBlock(tip) { - to.flushReadyChains[tip.Position.ChainID] = struct{}{} - } - } - } - return -} - -func (to *totalOrdering) getCurrentConfig() *totalOrderingConfig { - cfgIdx := to.curRound - to.configs[0].roundID - if cfgIdx >= uint64(len(to.configs)) { - panic(fmt.Errorf("total ordering config is not ready: %v, %v, %v", - to.curRound, to.configs[0].roundID, len(to.configs))) - } - return to.configs[cfgIdx] -} - -// addBlock adds a block to the working set of total ordering module. -func (to *totalOrdering) addBlock(b *types.Block) error { - // NOTE: Block b is assumed to be in topologically sorted, i.e., all its - // acking blocks are during or after total ordering stage. - cfg := to.getCurrentConfig() - to.pendings[b.Hash] = b - to.buildBlockRelation(b) - isOldest, err := to.updateVectors(b) - if err != nil { - return err - } - // Mark the proposer of incoming block as dirty. - if b.Position.ChainID < cfg.numChains { - to.dirtyChainIDs = append(to.dirtyChainIDs, int(b.Position.ChainID)) - _, exists := to.candidateChainMapping[b.Position.ChainID] - if isOldest && !exists && to.isCandidate(b) { - // isOldest means b is the oldest block in global vector, and isCandidate - // is still needed here due to round change. For example: - // o o o <- genesis block for round change, isCandidate returns true - // | | but isOldest is false - // o o - // | | - // o o o <- isOldest is true but isCandidate returns false - // | | / - // o o - to.prepareCandidate(b) - } - } - if to.duringFlush && cfg.isLastBlock(b) { - to.flushReadyChains[b.Position.ChainID] = struct{}{} - } - return nil -} - -// extractBlocks check if there is any deliverable set. -func (to *totalOrdering) extractBlocks() ([]*types.Block, uint32, error) { - if to.duringFlush { - return to.flushBlocks() - } - return to.deliverBlocks() -} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go index 65cb635ca..74360c735 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go @@ -34,7 +34,7 @@ type AgreementResult struct { func (r *AgreementResult) String() string { return fmt.Sprintf("agreementResult{Hash:%s %s}", - r.BlockHash.String()[:6], &r.Position) + r.BlockHash.String()[:6], r.Position) } // BlockRandomnessResult describes a block randomness result @@ -46,7 +46,7 @@ type BlockRandomnessResult struct { func (r *BlockRandomnessResult) String() string { return fmt.Sprintf("blockRandomness{Block:%s Pos:%s Rand:%s}", - r.BlockHash.String()[:6], &r.Position, + r.BlockHash.String()[:6], r.Position, hex.EncodeToString(r.Randomness)[:6], ) } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block.go index b2a8f57f8..a2b697ce0 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block.go @@ -198,7 +198,7 @@ func (b *Block) DecodeRLP(s *rlp.Stream) error { } func (b *Block) String() string { - return fmt.Sprintf("Block{Hash:%v %s}", b.Hash.String()[:6], &b.Position) + return fmt.Sprintf("Block{Hash:%v %s}", b.Hash.String()[:6], b.Position) } // Clone returns a deep copy of a block. @@ -263,62 +263,62 @@ func (b ByHash) Swap(i int, j int) { b[i], b[j] = b[j], b[i] } -// ByPosition is the helper type for sorting slice of blocks by position. -type ByPosition []*Block +// BlocksByPosition is the helper type for sorting slice of blocks by position. +type BlocksByPosition []*Block // Len implements Len method in sort.Sort interface. -func (bs ByPosition) Len() int { +func (bs BlocksByPosition) Len() int { return len(bs) } // Less implements Less method in sort.Sort interface. -func (bs ByPosition) Less(i int, j int) bool { - return bs[j].Position.Newer(&bs[i].Position) +func (bs BlocksByPosition) Less(i int, j int) bool { + return bs[j].Position.Newer(bs[i].Position) } // Swap implements Swap method in sort.Sort interface. -func (bs ByPosition) Swap(i int, j int) { +func (bs BlocksByPosition) Swap(i int, j int) { bs[i], bs[j] = bs[j], bs[i] } // Push implements Push method in heap interface. -func (bs *ByPosition) Push(x interface{}) { +func (bs *BlocksByPosition) Push(x interface{}) { *bs = append(*bs, x.(*Block)) } // Pop implements Pop method in heap interface. -func (bs *ByPosition) Pop() (ret interface{}) { +func (bs *BlocksByPosition) Pop() (ret interface{}) { n := len(*bs) *bs, ret = (*bs)[0:n-1], (*bs)[n-1] return } -// ByFinalizationHeight is the helper type for sorting slice of blocks by +// BlocksByFinalizationHeight is the helper type for sorting slice of blocks by // finalization height. -type ByFinalizationHeight []*Block +type BlocksByFinalizationHeight []*Block // Len implements Len method in sort.Sort interface. -func (bs ByFinalizationHeight) Len() int { +func (bs BlocksByFinalizationHeight) Len() int { return len(bs) } // Less implements Less method in sort.Sort interface. -func (bs ByFinalizationHeight) Less(i int, j int) bool { +func (bs BlocksByFinalizationHeight) Less(i int, j int) bool { return bs[i].Finalization.Height < bs[j].Finalization.Height } // Swap implements Swap method in sort.Sort interface. -func (bs ByFinalizationHeight) Swap(i int, j int) { +func (bs BlocksByFinalizationHeight) Swap(i int, j int) { bs[i], bs[j] = bs[j], bs[i] } // Push implements Push method in heap interface. -func (bs *ByFinalizationHeight) Push(x interface{}) { +func (bs *BlocksByFinalizationHeight) Push(x interface{}) { *bs = append(*bs, x.(*Block)) } // Pop implements Pop method in heap interface. -func (bs *ByFinalizationHeight) Pop() (ret interface{}) { +func (bs *BlocksByFinalizationHeight) Pop() (ret interface{}) { n := len(*bs) *bs, ret = (*bs)[0:n-1], (*bs)[n-1] return diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/dkg/dkg.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/dkg/dkg.go index efe846755..db58168a2 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/dkg/dkg.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/dkg/dkg.go @@ -163,6 +163,64 @@ func (c *Complaint) Equal(other *Complaint) bool { bytes.Compare(c.Signature.Signature, other.Signature.Signature) == 0 } +type rlpComplaint struct { + ProposerID types.NodeID + Round uint64 + IsNack bool + PrivateShare []byte + Signature crypto.Signature +} + +// EncodeRLP implements rlp.Encoder +func (c *Complaint) EncodeRLP(w io.Writer) error { + if c.IsNack() { + return rlp.Encode(w, rlpComplaint{ + ProposerID: c.ProposerID, + Round: c.Round, + IsNack: true, + PrivateShare: c.PrivateShare.ProposerID.Hash[:], + Signature: c.Signature, + }) + } + prvShare, err := rlp.EncodeToBytes(&c.PrivateShare) + if err != nil { + return err + } + return rlp.Encode(w, rlpComplaint{ + ProposerID: c.ProposerID, + Round: c.Round, + IsNack: false, + PrivateShare: prvShare, + Signature: c.Signature, + }) +} + +// DecodeRLP implements rlp.Decoder +func (c *Complaint) DecodeRLP(s *rlp.Stream) error { + var dec rlpComplaint + if err := s.Decode(&dec); err != nil { + return err + } + + var prvShare PrivateShare + if dec.IsNack { + copy(prvShare.ProposerID.Hash[:], dec.PrivateShare) + prvShare.Round = dec.Round + } else { + if err := rlp.DecodeBytes(dec.PrivateShare, &prvShare); err != nil { + return err + } + } + + *c = Complaint{ + ProposerID: dec.ProposerID, + Round: dec.Round, + PrivateShare: prvShare, + Signature: dec.Signature, + } + return nil +} + // PartialSignature describe a partial signature in DKG protocol. type PartialSignature struct { ProposerID types.NodeID `json:"proposer_id"` diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/nodeset.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/nodeset.go index 21a1e528e..fccfbb6aa 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/nodeset.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/nodeset.go @@ -85,11 +85,8 @@ func NewNodeSetFromMap(nodes map[NodeID]struct{}) *NodeSet { } // NewNotarySetTarget is the target for getting Notary Set. -func NewNotarySetTarget(crs common.Hash, chainID uint32) *SubSetTarget { - binaryChainID := make([]byte, 4) - binary.LittleEndian.PutUint32(binaryChainID, chainID) - - return newTarget(targetNotarySet, crs[:], binaryChainID) +func NewNotarySetTarget(crs common.Hash) *SubSetTarget { + return newTarget(targetNotarySet, crs[:]) } // NewDKGSetTarget is the target for getting DKG Set. @@ -98,15 +95,10 @@ func NewDKGSetTarget(crs common.Hash) *SubSetTarget { } // NewNodeLeaderTarget is the target for getting leader of fast BA. -func NewNodeLeaderTarget(crs common.Hash, pos Position) *SubSetTarget { - binaryRoundID := make([]byte, 8) - binary.LittleEndian.PutUint64(binaryRoundID, pos.Round) - binaryChainID := make([]byte, 4) - binary.LittleEndian.PutUint32(binaryChainID, pos.ChainID) +func NewNodeLeaderTarget(crs common.Hash, height uint64) *SubSetTarget { binaryHeight := make([]byte, 8) - binary.LittleEndian.PutUint64(binaryHeight, pos.Height) - return newTarget(targetNodeLeader, crs[:], - binaryRoundID, binaryChainID, binaryHeight) + binary.LittleEndian.PutUint64(binaryHeight, height) + return newTarget(targetNodeLeader, crs[:], binaryHeight) } // Add a NodeID to the set. diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/position.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/position.go index 8822f6ea9..902a55fec 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/position.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/position.go @@ -28,14 +28,14 @@ type Position struct { Height uint64 `json:"height"` } -func (pos *Position) String() string { +func (pos Position) String() string { return fmt.Sprintf("Position{Round:%d Chain:%d Height:%d}", pos.Round, pos.ChainID, pos.Height) } // Equal checks if two positions are equal, it panics when their chainIDs // are different. -func (pos *Position) Equal(other *Position) bool { +func (pos Position) Equal(other Position) bool { if pos.ChainID != other.ChainID { panic(fmt.Errorf("unexpected chainID %d, should be %d", other.ChainID, pos.ChainID)) @@ -45,7 +45,7 @@ func (pos *Position) Equal(other *Position) bool { // Newer checks if one block is newer than another one on the same chain. // If two blocks on different chain compared by this function, it would panic. -func (pos *Position) Newer(other *Position) bool { +func (pos Position) Newer(other Position) bool { if pos.ChainID != other.ChainID { panic(fmt.Errorf("unexpected chainID %d, should be %d", other.ChainID, pos.ChainID)) @@ -56,7 +56,7 @@ func (pos *Position) Newer(other *Position) bool { // Older checks if one block is older than another one on the same chain. // If two blocks on different chain compared by this function, it would panic. -func (pos *Position) Older(other *Position) bool { +func (pos Position) Older(other Position) bool { if pos.ChainID != other.ChainID { panic(fmt.Errorf("unexpected chainID %d, should be %d", other.ChainID, pos.ChainID)) @@ -64,12 +64,3 @@ func (pos *Position) Older(other *Position) bool { return pos.Round < other.Round || (pos.Round == other.Round && pos.Height < other.Height) } - -// Clone a position instance. -func (pos *Position) Clone() *Position { - return &Position{ - ChainID: pos.ChainID, - Round: pos.Round, - Height: pos.Height, - } -} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/vote.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/vote.go index 46ea1dfb0..6481eb46d 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/vote.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/vote.go @@ -68,7 +68,7 @@ type Vote struct { func (v *Vote) String() string { return fmt.Sprintf("Vote{BP:%s %s Period:%d Type:%d Hash:%s}", v.ProposerID.String()[:6], - &v.Position, v.Period, v.Type, v.BlockHash.String()[:6]) + v.Position, v.Period, v.Type, v.BlockHash.String()[:6]) } // NewVote constructs a Vote instance with header fields. diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go index 8a07c9d2b..83541283b 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go @@ -19,6 +19,7 @@ package utils import ( "errors" + "fmt" "sync" "github.com/dexon-foundation/dexon-consensus/common" @@ -40,9 +41,9 @@ var ( type sets struct { crs common.Hash nodeSet *types.NodeSet - notarySet []map[types.NodeID]struct{} + notarySet map[types.NodeID]struct{} dkgSet map[types.NodeID]struct{} - leaderNode []map[uint64]types.NodeID + leaderNode map[uint64]types.NodeID } // NodeSetCacheInterface interface specifies interface used by NodeSetCache. @@ -112,30 +113,29 @@ func (cache *NodeSetCache) GetPublicKey( } // GetNodeSet returns IDs of nodes set of this round as map. -func (cache *NodeSetCache) GetNodeSet( - round uint64) (nIDs *types.NodeSet, err error) { - +func (cache *NodeSetCache) GetNodeSet(round uint64) (*types.NodeSet, error) { IDs, exists := cache.get(round) if !exists { + var err error if IDs, err = cache.update(round); err != nil { - return + return nil, err } } - nIDs = IDs.nodeSet.Clone() - return + return IDs.nodeSet.Clone(), nil } // GetNotarySet returns of notary set of this round. +// TODO(mission): remove chainID parameter. func (cache *NodeSetCache) GetNotarySet( round uint64, chainID uint32) (map[types.NodeID]struct{}, error) { + if chainID != 0 { + panic(fmt.Errorf("non-zero chainID found: %d", chainID)) + } IDs, err := cache.getOrUpdate(round) if err != nil { return nil, err } - if chainID >= uint32(len(IDs.notarySet)) { - return nil, ErrInvalidChainID - } - return cache.cloneMap(IDs.notarySet[chainID]), nil + return cache.cloneMap(IDs.notarySet), nil } // GetDKGSet returns of DKG set of this round. @@ -155,24 +155,21 @@ func (cache *NodeSetCache) GetLeaderNode(pos types.Position) ( if err != nil { return types.NodeID{}, err } - if pos.ChainID >= uint32(len(IDs.leaderNode)) { - return types.NodeID{}, ErrInvalidChainID - } cache.lock.Lock() defer cache.lock.Unlock() - if _, exist := IDs.leaderNode[pos.ChainID][pos.Height]; !exist { - notarySet := types.NewNodeSetFromMap(IDs.notarySet[pos.ChainID]) - leader := - notarySet.GetSubSet(1, types.NewNodeLeaderTarget(IDs.crs, pos)) + if _, exist := IDs.leaderNode[pos.Height]; !exist { + notarySet := types.NewNodeSetFromMap(IDs.notarySet) + leader := notarySet.GetSubSet(1, types.NewNodeLeaderTarget( + IDs.crs, pos.Height)) if len(leader) != 1 { panic(errors.New("length of leader is not one")) } for nID := range leader { - IDs.leaderNode[pos.ChainID][pos.Height] = nID + IDs.leaderNode[pos.Height] = nID break } } - return IDs.leaderNode[pos.ChainID][pos.Height], nil + return IDs.leaderNode[pos.Height], nil } func (cache *NodeSetCache) cloneMap( @@ -235,23 +232,17 @@ func (cache *NodeSetCache) update( err = ErrConfigurationNotReady return } + nodesPerChain := cfg.RoundInterval / cfg.MinBlockInterval nIDs = &sets{ crs: crs, nodeSet: nodeSet, - notarySet: make([]map[types.NodeID]struct{}, cfg.NumChains), + notarySet: make(map[types.NodeID]struct{}), dkgSet: nodeSet.GetSubSet( int(cfg.DKGSetSize), types.NewDKGSetTarget(crs)), - leaderNode: make([]map[uint64]types.NodeID, cfg.NumChains), - } - for i := range nIDs.notarySet { - nIDs.notarySet[i] = nodeSet.GetSubSet( - int(cfg.NotarySetSize), types.NewNotarySetTarget(crs, uint32(i))) + leaderNode: make(map[uint64]types.NodeID, nodesPerChain), } - nodesPerChain := cfg.RoundInterval / cfg.MinBlockInterval - for i := range nIDs.leaderNode { - nIDs.leaderNode[i] = make(map[uint64]types.NodeID, nodesPerChain) - } - + nIDs.notarySet = nodeSet.GetSubSet( + int(cfg.NotarySetSize), types.NewNotarySetTarget(crs)) cache.rounds[round] = nIDs // Purge older rounds. for rID, nIDs := range cache.rounds { -- cgit v1.2.3