diff options
21 files changed, 891 insertions, 344 deletions
diff --git a/dex/backend.go b/dex/backend.go index a000d5fbc..b16bdc493 100644 --- a/dex/backend.go +++ b/dex/backend.go @@ -163,6 +163,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { engine.SetGovStateFetcher(dex.governance) dMoment := time.Unix(int64(chainConfig.DMoment), 0) + log.Info("Consensus DMoment", "dMoment", dMoment) // Force starting with full sync mode if this node is a bootstrap proposer. if config.BlockProposerEnabled && dMoment.After(time.Now()) { diff --git a/params/config.go b/params/config.go index 5074b9a82..cee8bdf22 100644 --- a/params/config.go +++ b/params/config.go @@ -83,7 +83,7 @@ var ( // TestnetChainConfig contains the chain parameters to run a node on the Taiwan test network. TestnetChainConfig = &ChainConfig{ ChainID: big.NewInt(238), - DMoment: 1547471400, + DMoment: 1547608200, HomesteadBlock: big.NewInt(0), DAOForkBlock: nil, DAOForkSupport: true, diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go index e8cafbd68..e468e9c2e 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go @@ -22,6 +22,7 @@ import ( "errors" "math" "sync" + "sync/atomic" "time" "github.com/dexon-foundation/dexon-consensus/common" @@ -41,7 +42,7 @@ func genValidLeader( if block.Timestamp.After(time.Now()) { return false, nil } - if err := mgr.lattice.SanityCheck(block); err != nil { + if err := mgr.lattice.SanityCheck(block, true); err != nil { if err == ErrRetrySanityCheckLater { return false, nil } @@ -180,12 +181,23 @@ func (mgr *agreementMgr) appendConfig( consensus: mgr.con, chainID: i, restartNotary: make(chan types.Position, 1), + roundValue: &atomic.Value{}, } + recv.roundValue.Store(uint64(0)) agrModule := newAgreement( mgr.con.ID, recv, newLeaderSelector(genValidLeader(mgr), mgr.logger), - mgr.signer) + mgr.signer, + mgr.logger) + // Hacky way to initialize first notarySet. + nodes, err := mgr.cache.GetNodeSet(round) + if err != nil { + return err + } + agrModule.notarySet = nodes.GetSubSet( + int(config.NotarySetSize), + types.NewNotarySetTarget(crs, i)) // Hacky way to make agreement module self contained. recv.agreementModule = agrModule mgr.baModules = append(mgr.baModules, agrModule) @@ -266,7 +278,11 @@ func (mgr *agreementMgr) processAgreementResult( return err } } - agreement.restart(nIDs, result.Position, crs) + leader, err := mgr.cache.GetLeaderNode(result.Position) + if err != nil { + return err + } + agreement.restart(nIDs, result.Position, leader, crs) } return nil } @@ -332,14 +348,12 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { return } // Check if this node in notary set of this chain in this round. - nodeSet, err := mgr.cache.GetNodeSet(nextRound) + notarySet, err := mgr.cache.GetNotarySet(nextRound, chainID) if err != nil { panic(err) } setting.crs = config.crs - setting.notarySet = nodeSet.GetSubSet( - int(config.notarySetSize), - types.NewNotarySetTarget(config.crs, chainID)) + setting.notarySet = notarySet _, isNotary = setting.notarySet[mgr.ID] if isNotary { mgr.logger.Info("selected as notary set", @@ -396,7 +410,7 @@ Loop: <-setting.ticker.Tick() } // Run BA for this round. - recv.round = currentRound + recv.roundValue.Store(currentRound) recv.changeNotaryTime = roundEndTime recv.restartNotary <- types.Position{ChainID: math.MaxUint32} if err := mgr.baRoutineForOneRound(&setting); err != nil { @@ -435,12 +449,14 @@ Loop: } } var nextHeight uint64 + var nextTime time.Time for { - nextHeight, err = mgr.lattice.NextHeight(recv.round, setting.chainID) + nextHeight, nextTime, err = + mgr.lattice.NextBlock(recv.round(), setting.chainID) if err != nil { mgr.logger.Debug("Error getting next height", "error", err, - "round", recv.round, + "round", recv.round(), "chainID", setting.chainID) err = nil nextHeight = oldPos.Height @@ -456,12 +472,18 @@ Loop: time.Sleep(100 * time.Millisecond) } nextPos := types.Position{ - Round: recv.round, + Round: recv.round(), ChainID: setting.chainID, Height: nextHeight, } oldPos = nextPos - agr.restart(setting.notarySet, nextPos, setting.crs) + leader, err := mgr.cache.GetLeaderNode(nextPos) + if err != nil { + return err + } + time.Sleep(nextTime.Sub(time.Now())) + setting.ticker.Restart() + agr.restart(setting.notarySet, nextPos, leader, setting.crs) default: } if agr.pullVotes() { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-state.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-state.go index 77f293376..266e44294 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-state.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-state.go @@ -35,7 +35,10 @@ type agreementStateType int // agreementStateType enum. const ( - stateInitial agreementStateType = iota + stateFast agreementStateType = iota + stateFastVote + stateFastRollback + stateInitial statePreCommit stateCommit stateForward @@ -43,7 +46,7 @@ const ( stateSleep ) -var nullBlockHash = common.Hash{} +var nullBlockHash common.Hash var skipBlockHash common.Hash func init() { @@ -58,6 +61,63 @@ type agreementState interface { clocks() int } +//----- FastState ----- +type fastState struct { + a *agreementData +} + +func newFastState(a *agreementData) *fastState { + return &fastState{a: a} +} + +func (s *fastState) state() agreementStateType { return stateFast } +func (s *fastState) clocks() int { return 0 } +func (s *fastState) nextState() (agreementState, error) { + if func() bool { + s.a.lock.Lock() + defer s.a.lock.Unlock() + return s.a.isLeader + }() { + hash := s.a.recv.ProposeBlock() + if hash != nullBlockHash { + s.a.lock.Lock() + defer s.a.lock.Unlock() + s.a.recv.ProposeVote(types.NewVote(types.VoteFast, hash, s.a.period)) + } + } + return newFastVoteState(s.a), nil +} + +//----- FastVoteState ----- +type fastVoteState struct { + a *agreementData +} + +func newFastVoteState(a *agreementData) *fastVoteState { + return &fastVoteState{a: a} +} + +func (s *fastVoteState) state() agreementStateType { return stateFastVote } +func (s *fastVoteState) clocks() int { return 2 } +func (s *fastVoteState) nextState() (agreementState, error) { + return newFastRollbackState(s.a), nil +} + +//----- FastRollbackState ----- +type fastRollbackState struct { + a *agreementData +} + +func newFastRollbackState(a *agreementData) *fastRollbackState { + return &fastRollbackState{a: a} +} + +func (s *fastRollbackState) state() agreementStateType { return stateFastRollback } +func (s *fastRollbackState) clocks() int { return 1 } +func (s *fastRollbackState) nextState() (agreementState, error) { + return newInitialState(s.a), nil +} + //----- InitialState ----- type initialState struct { a *agreementData @@ -70,10 +130,17 @@ func newInitialState(a *agreementData) *initialState { func (s *initialState) state() agreementStateType { return stateInitial } func (s *initialState) clocks() int { return 0 } func (s *initialState) nextState() (agreementState, error) { - hash := s.a.recv.ProposeBlock() - s.a.lock.Lock() - defer s.a.lock.Unlock() - s.a.recv.ProposeVote(types.NewVote(types.VoteInit, hash, s.a.period)) + if func() bool { + s.a.lock.Lock() + defer s.a.lock.Unlock() + return !s.a.isLeader + }() { + // Leader already proposed block in fastState. + hash := s.a.recv.ProposeBlock() + s.a.lock.Lock() + defer s.a.lock.Unlock() + s.a.recv.ProposeVote(types.NewVote(types.VoteInit, hash, s.a.period)) + } return newPreCommitState(s.a), nil } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go index eead4628a..62bbe250f 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go @@ -31,6 +31,7 @@ import ( // Errors for agreement module. var ( + ErrInvalidVote = fmt.Errorf("invalid vote") ErrNotInNotarySet = fmt.Errorf("not in notary set") ErrIncorrectVoteSignature = fmt.Errorf("incorrect vote signature") ) @@ -73,6 +74,8 @@ type agreementReceiver interface { // agreement module. ConfirmBlock(common.Hash, map[types.NodeID]*types.Vote) PullBlocks(common.Hashes) + ReportForkVote(v1, v2 *types.Vote) + ReportForkBlock(b1, b2 *types.Block) } type pendingBlock struct { @@ -90,6 +93,7 @@ type agreementData struct { recv agreementReceiver ID types.NodeID + isLeader bool leader *leaderSelector lockValue common.Hash lockRound uint64 @@ -114,6 +118,7 @@ type agreement struct { candidateBlock map[common.Hash]*types.Block fastForward chan uint64 signer *utils.Signer + logger common.Logger } // newAgreement creates a agreement instance. @@ -121,7 +126,8 @@ func newAgreement( ID types.NodeID, recv agreementReceiver, leader *leaderSelector, - signer *utils.Signer) *agreement { + signer *utils.Signer, + logger common.Logger) *agreement { agreement := &agreement{ data: &agreementData{ recv: recv, @@ -132,6 +138,7 @@ func newAgreement( candidateBlock: make(map[common.Hash]*types.Block), fastForward: make(chan uint64, 1), signer: signer, + logger: logger, } agreement.stop() return agreement @@ -139,8 +146,8 @@ func newAgreement( // restart the agreement func (a *agreement) restart( - notarySet map[types.NodeID]struct{}, aID types.Position, crs common.Hash) { - + notarySet map[types.NodeID]struct{}, aID types.Position, leader types.NodeID, + crs common.Hash) { if !func() bool { a.lock.Lock() defer a.lock.Unlock() @@ -162,12 +169,16 @@ func (a *agreement) restart( a.data.leader.restart(crs) a.data.lockValue = nullBlockHash a.data.lockRound = 0 + a.data.isLeader = a.data.ID == leader a.fastForward = make(chan uint64, 1) a.hasOutput = false - a.state = newInitialState(a.data) + a.state = newFastState(a.data) a.notarySet = notarySet a.candidateBlock = make(map[common.Hash]*types.Block) - a.aID.Store(aID) + a.aID.Store(struct { + pos types.Position + leader types.NodeID + }{aID, leader}) return true }() { return @@ -213,18 +224,24 @@ func (a *agreement) restart( }() for _, block := range replayBlock { - a.processBlock(block) + if err := a.processBlock(block); err != nil { + a.logger.Error("failed to process block when restarting agreement", + "block", block) + } } for _, vote := range replayVote { - a.processVote(vote) + if err := a.processVote(vote); err != nil { + a.logger.Error("failed to process vote when restarting agreement", + "vote", vote) + } } } func (a *agreement) stop() { a.restart(make(map[types.NodeID]struct{}), types.Position{ ChainID: math.MaxUint32, - }, common.Hash{}) + }, types.NodeID{}, common.Hash{}) } func isStop(aID types.Position) bool { @@ -235,7 +252,12 @@ func isStop(aID types.Position) bool { func (a *agreement) clocks() int { a.data.lock.RLock() defer a.data.lock.RUnlock() - return a.state.clocks() + scale := int(a.data.period) + // 10 is a magic number derived from many years of experience. + if scale > 10 { + scale = 10 + } + return a.state.clocks() * scale } // pullVotes returns if current agreement requires more votes to continue. @@ -243,21 +265,31 @@ func (a *agreement) pullVotes() bool { a.data.lock.RLock() defer a.data.lock.RUnlock() return a.state.state() == statePullVote || + a.state.state() == stateFastRollback || (a.state.state() == statePreCommit && (a.data.period%3) == 0) } // agreementID returns the current agreementID. func (a *agreement) agreementID() types.Position { - return a.aID.Load().(types.Position) + return a.aID.Load().(struct { + pos types.Position + leader types.NodeID + }).pos +} + +// leader returns the current leader. +func (a *agreement) leader() types.NodeID { + return a.aID.Load().(struct { + pos types.Position + leader types.NodeID + }).leader } // nextState is called at the specific clock time. func (a *agreement) nextState() (err error) { - if func() bool { - a.lock.RLock() - defer a.lock.RUnlock() - return a.hasOutput - }() { + a.lock.Lock() + defer a.lock.Unlock() + if a.hasOutput { a.state = newSleepState(a.data) return } @@ -266,6 +298,9 @@ func (a *agreement) nextState() (err error) { } func (a *agreement) sanityCheck(vote *types.Vote) error { + if vote.Type >= types.MaxVoteType { + return ErrInvalidVote + } if _, exist := a.notarySet[vote.ProposerID]; !exist { return ErrNotInNotarySet } @@ -279,22 +314,21 @@ func (a *agreement) sanityCheck(vote *types.Vote) error { return nil } -func (a *agreement) checkForkVote(vote *types.Vote) error { - if err := func() error { - a.data.lock.RLock() - defer a.data.lock.RUnlock() - if votes, exist := a.data.votes[vote.Period]; exist { - if oldVote, exist := votes[vote.Type][vote.ProposerID]; exist { - if vote.BlockHash != oldVote.BlockHash { - return &ErrForkVote{vote.ProposerID, oldVote, vote} - } +func (a *agreement) checkForkVote(vote *types.Vote) ( + alreadyExist bool, err error) { + a.data.lock.RLock() + defer a.data.lock.RUnlock() + if votes, exist := a.data.votes[vote.Period]; exist { + if oldVote, exist := votes[vote.Type][vote.ProposerID]; exist { + alreadyExist = true + if vote.BlockHash != oldVote.BlockHash { + a.data.recv.ReportForkVote(oldVote, vote) + err = &ErrForkVote{vote.ProposerID, oldVote, vote} + return } } - return nil - }(); err != nil { - return err } - return nil + return } // prepareVote prepares a vote. @@ -314,6 +348,13 @@ func (a *agreement) processVote(vote *types.Vote) error { aID := a.agreementID() // Agreement module has stopped. if isStop(aID) { + // Hacky way to not drop first votes for height 0. + if vote.Position.Height == uint64(0) { + a.pendingVote = append(a.pendingVote, pendingVote{ + vote: vote, + receivedTime: time.Now().UTC(), + }) + } return nil } if vote.Position != aID { @@ -326,9 +367,13 @@ func (a *agreement) processVote(vote *types.Vote) error { }) return nil } - if err := a.checkForkVote(vote); err != nil { + exist, err := a.checkForkVote(vote) + if err != nil { return err } + if exist { + return nil + } a.data.lock.Lock() defer a.data.lock.Unlock() @@ -336,12 +381,13 @@ func (a *agreement) processVote(vote *types.Vote) error { a.data.votes[vote.Period] = newVoteListMap() } a.data.votes[vote.Period][vote.Type][vote.ProposerID] = vote - if !a.hasOutput && vote.Type == types.VoteCom { + if !a.hasOutput && + (vote.Type == types.VoteCom || vote.Type == types.VoteFast) { if hash, ok := a.data.countVoteNoLock(vote.Period, vote.Type); ok && hash != skipBlockHash { a.hasOutput = true a.data.recv.ConfirmBlock(hash, - a.data.votes[vote.Period][types.VoteCom]) + a.data.votes[vote.Period][vote.Type]) return nil } } else if a.hasOutput { @@ -443,6 +489,7 @@ func (a *agreement) processBlock(block *types.Block) error { } if b, exist := a.data.blocks[block.ProposerID]; exist { if b.Hash != block.Hash { + a.data.recv.ReportForkBlock(b, block) return &ErrFork{block.ProposerID, b.Hash, block.Hash} } return nil @@ -452,6 +499,39 @@ func (a *agreement) processBlock(block *types.Block) error { } a.data.blocks[block.ProposerID] = block a.addCandidateBlockNoLock(block) + if (a.state.state() == stateFast || a.state.state() == stateFastVote) && + block.ProposerID == a.leader() { + go func() { + for func() bool { + a.lock.RLock() + defer a.lock.RUnlock() + if a.state.state() != stateFast && a.state.state() != stateFastVote { + return false + } + block, exist := a.data.blocks[a.leader()] + if !exist { + return true + } + a.data.lock.RLock() + defer a.data.lock.RUnlock() + ok, err := a.data.leader.validLeader(block) + if err != nil { + fmt.Println("Error checking validLeader for Fast BA", + "error", err, "block", block) + return false + } + if ok { + a.data.recv.ProposeVote( + types.NewVote(types.VoteFast, block.Hash, a.data.period)) + return false + } + return true + }() { + // TODO(jimmy): retry interval should be related to configurations. + time.Sleep(250 * time.Millisecond) + } + }() + } return nil } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go index 0e5a1fbdb..0754e800f 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go @@ -22,6 +22,7 @@ import ( "encoding/hex" "fmt" "sync" + "sync/atomic" "time" "github.com/dexon-foundation/dexon-consensus/common" @@ -69,11 +70,15 @@ type consensusBAReceiver struct { agreementModule *agreement chainID uint32 changeNotaryTime time.Time - round uint64 + roundValue *atomic.Value isNotary bool restartNotary chan types.Position } +func (recv *consensusBAReceiver) round() uint64 { + return recv.roundValue.Load().(uint64) +} + func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { if !recv.isNotary { return @@ -99,17 +104,20 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash { if !recv.isNotary { return common.Hash{} } - block := recv.consensus.proposeBlock(recv.chainID, recv.round) + block := recv.consensus.proposeBlock(recv.chainID, recv.round()) if block == nil { recv.consensus.logger.Error("unable to propose block") return nullBlockHash } - if err := recv.consensus.preProcessBlock(block); err != nil { - recv.consensus.logger.Error("Failed to pre-process block", "error", err) - return common.Hash{} - } - recv.consensus.logger.Debug("Calling Network.BroadcastBlock", "block", block) - recv.consensus.network.BroadcastBlock(block) + go func() { + if err := recv.consensus.preProcessBlock(block); err != nil { + recv.consensus.logger.Error("Failed to pre-process block", "error", err) + return + } + recv.consensus.logger.Debug("Calling Network.BroadcastBlock", + "block", block) + recv.consensus.network.BroadcastBlock(block) + }() return block.Hash } @@ -122,7 +130,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( recv.consensus.logger.Info("Empty block is confirmed", "position", &aID) var err error - block, err = recv.consensus.proposeEmptyBlock(recv.round, recv.chainID) + block, err = recv.consensus.proposeEmptyBlock(recv.round(), recv.chainID) if err != nil { recv.consensus.logger.Error("Propose empty block failed", "error", err) return @@ -140,9 +148,19 @@ func (recv *consensusBAReceiver) ConfirmBlock( defer recv.consensus.lock.Unlock() recv.consensus.baConfirmedBlock[hash] = ch }() - recv.consensus.network.PullBlocks(common.Hashes{hash}) go func() { - block = <-ch + hashes := common.Hashes{hash} + PullBlockLoop: + for { + recv.consensus.logger.Debug("Calling Network.PullBlock for BA block", + "hash", hash) + recv.consensus.network.PullBlocks(hashes) + select { + case block = <-ch: + break PullBlockLoop + case <-time.After(1 * time.Second): + } + } recv.consensus.logger.Info("Receive unknown block", "hash", hash.String()[:6], "position", &block.Position, @@ -242,7 +260,7 @@ CleanChannelLoop: } newPos := block.Position if block.Timestamp.After(recv.changeNotaryTime) { - recv.round++ + recv.roundValue.Store(recv.round() + 1) newPos.Round++ } recv.restartNotary <- newPos @@ -256,6 +274,14 @@ func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) { recv.consensus.network.PullBlocks(hashes) } +func (recv *consensusBAReceiver) ReportForkVote(v1, v2 *types.Vote) { + recv.consensus.gov.ReportForkVote(v1, v2) +} + +func (recv *consensusBAReceiver) ReportForkBlock(b1, b2 *types.Block) { + recv.consensus.gov.ReportForkBlock(b1, b2) +} + // consensusDKGReceiver implements dkgReceiver. type consensusDKGReceiver struct { ID types.NodeID @@ -388,6 +414,14 @@ type Consensus struct { logger common.Logger nonFinalizedBlockDelivered bool resetRandomnessTicker chan struct{} + resetDeliveryGuardTicker chan struct{} + msgChan chan interface{} + waitGroup sync.WaitGroup + + // Context of Dummy receiver during switching from syncer. + dummyCancel context.CancelFunc + dummyFinished <-chan struct{} + dummyMsgBuffer []interface{} } // NewConsensus construct an Consensus instance. @@ -423,6 +457,9 @@ func NewConsensusForSimulation( // You need to provide the initial block for this newly created Consensus // instance to bootstrap with. A proper choice is the last finalized block you // delivered to syncer. +// +// NOTE: those confirmed blocks should be organized by chainID and sorted by +// their positions, in ascending order. func NewConsensusFromSyncer( initBlock *types.Block, initRoundBeginTime time.Time, @@ -432,17 +469,43 @@ func NewConsensusFromSyncer( networkModule Network, prv crypto.PrivateKey, latticeModule *Lattice, - blocks []*types.Block, + confirmedBlocks [][]*types.Block, randomnessResults []*types.BlockRandomnessResult, + cachedMessages []interface{}, logger common.Logger) (*Consensus, error) { // Setup Consensus instance. con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db, networkModule, prv, logger, latticeModule, true) - // Dump all BA-confirmed blocks to the consensus instance. - for _, b := range blocks { - con.ccModule.registerBlock(b) - if err := con.processBlock(b); err != nil { - return nil, err + // Launch a dummy receiver before we start receiving from network module. + con.dummyMsgBuffer = cachedMessages + con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( + con.ctx, networkModule.ReceiveChan(), func(msg interface{}) { + con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) + }) + // Dump all BA-confirmed blocks to the consensus instance, make sure these + // added blocks forming a DAG. + for { + updated := false + for idx, bs := range confirmedBlocks { + for bIdx, b := range bs { + // Only when its parent block is already added to lattice, we can + // then add this block. If not, our pulling mechanism would stop at + // the block we added, and lost its parent block forever. + if !latticeModule.Exist(b.ParentHash) { + logger.Debug("Skip discontinuous confirmed block", + "from", b, + "until", bs[len(bs)-1]) + confirmedBlocks[idx] = bs[bIdx:] + break + } + con.ccModule.registerBlock(b) + if err := con.processBlock(b); err != nil { + return nil, err + } + } + } + if !updated { + break } } // Dump all randomness result to the consensus instance. @@ -504,22 +567,25 @@ func newConsensusForRound( } // Construct Consensus instance. con := &Consensus{ - ID: ID, - ccModule: newCompactionChain(gov), - lattice: latticeModule, - app: appModule, - debugApp: debugApp, - gov: gov, - db: db, - network: network, - baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), - dkgReady: sync.NewCond(&sync.Mutex{}), - cfgModule: cfgModule, - dMoment: initRoundBeginTime, - nodeSetCache: nodeSetCache, - signer: signer, - event: common.NewEvent(), - logger: logger, + ID: ID, + ccModule: newCompactionChain(gov), + lattice: latticeModule, + app: appModule, + debugApp: debugApp, + gov: gov, + db: db, + network: network, + baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), + dkgReady: sync.NewCond(&sync.Mutex{}), + cfgModule: cfgModule, + dMoment: initRoundBeginTime, + nodeSetCache: nodeSetCache, + signer: signer, + event: common.NewEvent(), + logger: logger, + resetRandomnessTicker: make(chan struct{}), + resetDeliveryGuardTicker: make(chan struct{}), + msgChan: make(chan interface{}, 1024), } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime) @@ -585,12 +651,40 @@ func (con *Consensus) Run() { con.baMgr.run() // Launch network handler. con.logger.Debug("Calling Network.ReceiveChan") - go con.processMsg(con.network.ReceiveChan()) + con.waitGroup.Add(1) + go con.deliverNetworkMsg() + con.waitGroup.Add(1) + go con.processMsg() // Sleep until dMoment come. time.Sleep(con.dMoment.Sub(time.Now().UTC())) // Take some time to bootstrap. time.Sleep(3 * time.Second) + con.waitGroup.Add(1) go con.pullRandomness() + // Stop dummy receiver if launched. + if con.dummyCancel != nil { + con.logger.Trace("Stop dummy receiver") + con.dummyCancel() + <-con.dummyFinished + // Replay those cached messages. + con.logger.Trace("Dummy receiver stoped, start dumping cached messages", + "count", len(con.dummyMsgBuffer)) + for _, msg := range con.dummyMsgBuffer { + loop: + for { + select { + case con.msgChan <- msg: + break loop + case <-time.After(50 * time.Millisecond): + con.logger.Debug( + "internal message channel is full when syncing") + } + } + } + con.logger.Trace("Finish dumping cached messages") + } + con.waitGroup.Add(1) + go con.deliveryGuard() // Block until done. select { case <-con.ctx.Done(): @@ -785,18 +879,51 @@ func (con *Consensus) Stop() { con.ctxCancel() con.baMgr.stop() con.event.Reset() + con.waitGroup.Wait() } -func (con *Consensus) processMsg(msgChan <-chan interface{}) { +func (con *Consensus) deliverNetworkMsg() { + defer con.waitGroup.Done() + recv := con.network.ReceiveChan() + for { + select { + case <-con.ctx.Done(): + return + default: + } + select { + case msg := <-recv: + innerLoop: + for { + select { + case con.msgChan <- msg: + break innerLoop + case <-time.After(500 * time.Millisecond): + con.logger.Debug("internal message channel is full", + "pending", msg) + } + } + case <-con.ctx.Done(): + return + } + } +} + +func (con *Consensus) processMsg() { + defer con.waitGroup.Done() MessageLoop: for { + select { + case <-con.ctx.Done(): + return + default: + } var msg interface{} select { - case msg = <-msgChan: + case msg = <-con.msgChan: case <-con.ctx.Done(): return } - switch val := msg.(type) { case *types.Block: if ch, exist := func() (chan<- *types.Block, bool) { @@ -805,7 +932,7 @@ MessageLoop: ch, e := con.baConfirmedBlock[val.Hash] return ch, e }(); exist { - if err := con.lattice.SanityCheck(val); err != nil { + if err := con.lattice.SanityCheck(val, false); err != nil { if err == ErrRetrySanityCheckLater { err = nil } else { @@ -913,6 +1040,7 @@ func (con *Consensus) ProcessAgreementResult( if err := VerifyAgreementResult(rand, con.nodeSetCache); err != nil { return err } + con.lattice.AddShallowBlock(rand.BlockHash, rand.Position) // Syncing BA Module. if err := con.baMgr.processAgreementResult(rand); err != nil { return err @@ -1007,10 +1135,16 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) { } func (con *Consensus) pullRandomness() { + defer con.waitGroup.Done() for { select { case <-con.ctx.Done(): return + default: + } + select { + case <-con.ctx.Done(): + return case <-con.resetRandomnessTicker: case <-time.After(1500 * time.Millisecond): // TODO(jimmy): pulling period should be related to lambdaBA. @@ -1024,12 +1158,38 @@ func (con *Consensus) pullRandomness() { } } +func (con *Consensus) deliveryGuard() { + defer con.waitGroup.Done() + time.Sleep(con.dMoment.Sub(time.Now())) + // Node takes time to start. + time.Sleep(60 * time.Second) + for { + select { + case <-con.ctx.Done(): + return + default: + } + select { + case <-con.ctx.Done(): + return + case <-con.resetDeliveryGuardTicker: + case <-time.After(60 * time.Second): + con.logger.Error("no blocks delivered for too long", "ID", con.ID) + panic(fmt.Errorf("no blocks delivered for too long")) + } + } +} + // deliverBlock deliver a block to application layer. func (con *Consensus) deliverBlock(b *types.Block) { select { case con.resetRandomnessTicker <- struct{}{}: default: } + select { + case con.resetDeliveryGuardTicker <- struct{}{}: + default: + } if err := con.db.UpdateBlock(*b); err != nil { panic(err) } @@ -1086,9 +1246,7 @@ func (con *Consensus) deliverFinalizedBlocksWithoutLock() (err error) { for _, b := range deliveredBlocks { con.deliverBlock(b) } - if err = con.lattice.PurgeBlocks(deliveredBlocks); err != nil { - return - } + err = con.lattice.PurgeBlocks(deliveredBlocks) return } @@ -1150,9 +1308,7 @@ func (con *Consensus) prepareBlock(b *types.Block, err = ErrCRSNotReady return } - if err = con.signer.SignCRS(b, crs); err != nil { - return - } + err = con.signer.SignCRS(b, crs) return } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/db/level-db.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/db/level-db.go index 75c30372f..1fe29fa10 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/db/level-db.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/db/level-db.go @@ -72,11 +72,7 @@ func (lvl *LevelDBBackedDB) HasBlock(hash common.Hash) bool { } func (lvl *LevelDBBackedDB) internalHasBlock(key []byte) (bool, error) { - exists, err := lvl.db.Has(key, nil) - if err != nil { - return false, err - } - return exists, nil + return lvl.db.Has(key, nil) } // GetBlock implements the Reader.GetBlock method. @@ -90,9 +86,6 @@ func (lvl *LevelDBBackedDB) GetBlock( return } err = rlp.DecodeBytes(queried, &block) - if err != nil { - return - } return } @@ -113,9 +106,7 @@ func (lvl *LevelDBBackedDB) UpdateBlock(block types.Block) (err error) { err = ErrBlockDoesNotExist return } - if err = lvl.db.Put(blockKey, marshaled, nil); err != nil { - return - } + err = lvl.db.Put(blockKey, marshaled, nil) return } @@ -134,9 +125,7 @@ func (lvl *LevelDBBackedDB) PutBlock(block types.Block) (err error) { err = ErrBlockExists return } - if err = lvl.db.Put(blockKey, marshaled, nil); err != nil { - return - } + err = lvl.db.Put(blockKey, marshaled, nil) return } @@ -166,10 +155,7 @@ func (lvl *LevelDBBackedDB) PutCompactionChainTipInfo( if info.Height+1 != height { return ErrInvalidCompactionChainTipHeight } - if err = lvl.db.Put(compactionChainTipInfoKey, marshaled, nil); err != nil { - return err - } - return nil + return lvl.db.Put(compactionChainTipInfoKey, marshaled, nil) } func (lvl *LevelDBBackedDB) internalGetCompactionChainTipInfo() ( @@ -181,9 +167,7 @@ func (lvl *LevelDBBackedDB) internalGetCompactionChainTipInfo() ( } return } - if err = rlp.DecodeBytes(queried, &info); err != nil { - return - } + err = rlp.DecodeBytes(queried, &info) return } @@ -201,11 +185,7 @@ func (lvl *LevelDBBackedDB) GetCompactionChainTipInfo() ( // HasDKGPrivateKey check existence of DKG private key of one round. func (lvl *LevelDBBackedDB) HasDKGPrivateKey(round uint64) (bool, error) { - exists, err := lvl.db.Has(lvl.getDKGPrivateKeyKey(round), nil) - if err != nil { - return false, err - } - return exists, nil + return lvl.db.Has(lvl.getDKGPrivateKeyKey(round), nil) } // GetDKGPrivateKey get DKG private key of one round. @@ -218,9 +198,7 @@ func (lvl *LevelDBBackedDB) GetDKGPrivateKey(round uint64) ( } return } - if err = rlp.DecodeBytes(queried, &prv); err != nil { - return - } + err = rlp.DecodeBytes(queried, &prv) return } @@ -239,11 +217,8 @@ func (lvl *LevelDBBackedDB) PutDKGPrivateKey( if err != nil { return err } - if err := lvl.db.Put( - lvl.getDKGPrivateKeyKey(round), marshaled, nil); err != nil { - return err - } - return nil + return lvl.db.Put( + lvl.getDKGPrivateKeyKey(round), marshaled, nil) } func (lvl *LevelDBBackedDB) getBlockKey(hash common.Hash) (ret []byte) { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go index aa87e38f7..a77ec9385 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go @@ -159,4 +159,7 @@ type Ticker interface { // Stop the ticker. Stop() + + // Retart the ticker and clear all internal data. + Restart() } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go index e55c0dbfc..cf81a1161 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go @@ -55,7 +55,7 @@ type ErrAckingBlockNotExists struct { } func (e ErrAckingBlockNotExists) Error() string { - return fmt.Sprintf("acking block %s not exists", e.hash) + return fmt.Sprintf("acking block %s not exists", e.hash.String()[:6]) } // Errors for method usage @@ -113,6 +113,8 @@ type latticeData struct { blockByHash map[common.Hash]*types.Block // This stores configuration for each round. configs []*latticeDataConfig + // shallowBlocks stores the hash of blocks that their body is not receive yet. + shallowBlocks map[common.Hash]types.Position } // newLatticeData creates a new latticeData instance. @@ -126,10 +128,11 @@ func newLatticeData( genesisConfig.fromConfig(round, config) genesisConfig.setRoundBeginTime(dMoment) data = &latticeData{ - db: db, - chains: make([]*chainStatus, genesisConfig.numChains), - blockByHash: make(map[common.Hash]*types.Block), - configs: []*latticeDataConfig{genesisConfig}, + db: db, + chains: make([]*chainStatus, genesisConfig.numChains), + blockByHash: make(map[common.Hash]*types.Block), + configs: []*latticeDataConfig{genesisConfig}, + shallowBlocks: make(map[common.Hash]types.Position), } for i := range data.chains { data.chains[i] = &chainStatus{ @@ -141,15 +144,35 @@ func newLatticeData( return } -func (data *latticeData) checkAckingRelations(b *types.Block) error { +func (data *latticeData) addShallowBlock(hash common.Hash, pos types.Position) { + // We don't care other errors here. This `if` is to prevent being spammed by + // very old blocks. + if _, err := data.findBlock(hash); err != db.ErrBlockDoesNotExist { + return + } + data.shallowBlocks[hash] = pos +} + +func (data *latticeData) checkAckingRelations( + b *types.Block, allowShallow bool) error { acksByChainID := make(map[uint32]struct{}, len(data.chains)) for _, hash := range b.Acks { bAck, err := data.findBlock(hash) if err != nil { if err == db.ErrBlockDoesNotExist { - return &ErrAckingBlockNotExists{hash} + err = &ErrAckingBlockNotExists{hash} + if allowShallow { + if pos, exist := data.shallowBlocks[hash]; exist { + bAck = &types.Block{ + Position: pos, + } + err = nil + } + } + } + if err != nil { + return err } - return err } // Check if it acks blocks from old rounds, the allowed round difference // is 1. @@ -172,7 +195,7 @@ func (data *latticeData) checkAckingRelations(b *types.Block) error { return nil } -func (data *latticeData) sanityCheck(b *types.Block) error { +func (data *latticeData) sanityCheck(b *types.Block, allowShallow bool) error { // TODO(mission): Check if its proposer is in validator set, lattice has no // knowledge about node set. config := data.getConfig(b.Position.Round) @@ -196,7 +219,7 @@ func (data *latticeData) sanityCheck(b *types.Block) error { if !config.isValidGenesisBlockTime(b) { return ErrIncorrectBlockTime } - return data.checkAckingRelations(b) + return data.checkAckingRelations(b, allowShallow) } // Check parent block if parent hash is specified. if !b.ParentHash.Equal(common.Hash{}) { @@ -257,10 +280,7 @@ func (data *latticeData) sanityCheck(b *types.Block) error { return ErrNotAckParent } } - if err := data.checkAckingRelations(b); err != nil { - return err - } - return nil + return data.checkAckingRelations(b, allowShallow) } // addBlock processes blocks. It does sanity check, inserts block into lattice @@ -504,19 +524,21 @@ func (data *latticeData) prepareEmptyBlock(b *types.Block) (err error) { } // TODO(mission): make more abstraction for this method. -// nextHeight returns the next height of a chain. -func (data *latticeData) nextHeight( - round uint64, chainID uint32) (uint64, error) { +// nextBlock returns the next height and timestamp of a chain. +func (data *latticeData) nextBlock( + round uint64, chainID uint32) (uint64, time.Time, error) { chainTip := data.chains[chainID].tip bindTip, err := data.isBindTip( types.Position{Round: round, ChainID: chainID}, chainTip) if err != nil { - return 0, err + return 0, time.Time{}, err } + config := data.getConfig(round) if bindTip { - return chainTip.Position.Height + 1, nil + return chainTip.Position.Height + 1, + chainTip.Timestamp.Add(config.minBlockTimeInterval), nil } - return 0, nil + return 0, config.roundBeginTime, nil } // findBlock seeks blocks in memory or db. diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go index b30306aef..d531639b9 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go @@ -90,9 +90,7 @@ func (l *Lattice) PrepareBlock( if b.Witness, err = l.app.PrepareWitness(b.Witness.Height); err != nil { return } - if err = l.signer.SignBlock(b); err != nil { - return - } + err = l.signer.SignBlock(b) return } @@ -103,17 +101,23 @@ func (l *Lattice) PrepareEmptyBlock(b *types.Block) (err error) { if err = l.data.prepareEmptyBlock(b); err != nil { return } - if b.Hash, err = utils.HashBlock(b); err != nil { - return - } + b.Hash, err = utils.HashBlock(b) return } +// AddShallowBlock adds a hash of a block that is confirmed by other nodes but +// the content is not arrived yet. +func (l *Lattice) AddShallowBlock(hash common.Hash, pos types.Position) { + l.lock.Lock() + defer l.lock.Unlock() + l.data.addShallowBlock(hash, pos) +} + // SanityCheck checks the validity of a block. // // If any acking block of this block does not exist, Lattice caches this block // and retries when Lattice.ProcessBlock is called. -func (l *Lattice) SanityCheck(b *types.Block) (err error) { +func (l *Lattice) SanityCheck(b *types.Block, allowShallow bool) (err error) { if b.IsEmpty() { // Only need to verify block's hash. var hash common.Hash @@ -139,18 +143,13 @@ func (l *Lattice) SanityCheck(b *types.Block) (err error) { return } } - if err = func() (err error) { - l.lock.RLock() - defer l.lock.RUnlock() - if err = l.data.sanityCheck(b); err != nil { - if _, ok := err.(*ErrAckingBlockNotExists); ok { - err = ErrRetrySanityCheckLater - } - return + l.lock.RLock() + defer l.lock.RUnlock() + if err = l.data.sanityCheck(b, allowShallow); err != nil { + if _, ok := err.(*ErrAckingBlockNotExists); ok { + err = ErrRetrySanityCheckLater } return - }(); err != nil { - return } return } @@ -159,10 +158,8 @@ func (l *Lattice) SanityCheck(b *types.Block) (err error) { func (l *Lattice) Exist(hash common.Hash) bool { l.lock.RLock() defer l.lock.RUnlock() - if _, err := l.data.findBlock(hash); err != nil { - return false - } - return true + _, err := l.data.findBlock(hash) + return err == nil } // addBlockToLattice adds a block into lattice, and delivers blocks with the @@ -189,7 +186,7 @@ func (l *Lattice) addBlockToLattice( if tip = l.pool.tip(i); tip == nil { continue } - err = l.data.sanityCheck(tip) + err = l.data.sanityCheck(tip, false) if err == nil { var output []*types.Block if output, err = l.data.addBlock(tip); err != nil { @@ -199,6 +196,7 @@ func (l *Lattice) addBlockToLattice( "block", tip, "error", err) panic(err) } + delete(l.data.shallowBlocks, tip.Hash) hasOutput = true outputBlocks = append(outputBlocks, output...) l.pool.removeTip(i) @@ -267,11 +265,11 @@ func (l *Lattice) ProcessBlock( if len(toDelivered) == 0 { break } - hashes := make(common.Hashes, len(toDelivered)) - for idx := range toDelivered { - hashes[idx] = toDelivered[idx].Hash - } if l.debug != nil { + hashes := make(common.Hashes, len(toDelivered)) + for idx := range toDelivered { + hashes[idx] = toDelivered[idx].Hash + } l.debug.TotalOrderingDelivered(hashes, deliveredMode) } // Perform consensus timestamp module. @@ -283,12 +281,13 @@ func (l *Lattice) ProcessBlock( return } -// NextHeight returns expected height of incoming block for specified chain and -// given round. -func (l *Lattice) NextHeight(round uint64, chainID uint32) (uint64, error) { +// NextBlock returns expected height and timestamp of incoming block for +// specified chain and given round. +func (l *Lattice) NextBlock(round uint64, chainID uint32) ( + uint64, time.Time, error) { l.lock.RLock() defer l.lock.RUnlock() - return l.data.nextHeight(round, chainID) + return l.data.nextBlock(round, chainID) } // PurgeBlocks purges blocks' cache in memory, this is called when the caller diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/leader-selector.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/leader-selector.go index 94d28faeb..bcfa57fe6 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/leader-selector.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/leader-selector.go @@ -150,10 +150,7 @@ func (l *leaderSelector) processBlock(block *types.Block) error { func (l *leaderSelector) potentialLeader(block *types.Block) (bool, *big.Int) { dist := l.distance(block.CRSSignature) cmp := l.minCRSBlock.Cmp(dist) - if cmp > 0 || (cmp == 0 && block.Hash.Less(l.minBlockHash)) { - return true, dist - } - return false, dist + return (cmp > 0 || (cmp == 0 && block.Hash.Less(l.minBlockHash))), dist } func (l *leaderSelector) updateLeader(block *types.Block, dist *big.Int) { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go index 9b351eabc..acc4f1c6c 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go @@ -1,4 +1,4 @@ -// Copyright 2018 The dexon-consensus-core Authors +// Copyright 2018 The dexon-consensus Authors // This file is part of the dexon-consensus-core library. // // The dexon-consensus-core library is free software: you can redistribute it @@ -18,6 +18,9 @@ package syncer import ( + "context" + "time" + "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core" "github.com/dexon-foundation/dexon-consensus/core/types" @@ -37,16 +40,14 @@ type agreement struct { pendings map[uint64]map[common.Hash]*types.AgreementResult logger common.Logger confirmedBlocks map[common.Hash]struct{} + ctx context.Context + ctxCancel context.CancelFunc } // newAgreement creates a new agreement instance. -func newAgreement( - ch chan<- *types.Block, - pullChan chan<- common.Hash, - cache *utils.NodeSetCache, - logger common.Logger) *agreement { - - return &agreement{ +func newAgreement(ch chan<- *types.Block, pullChan chan<- common.Hash, + cache *utils.NodeSetCache, logger common.Logger) *agreement { + a := &agreement{ cache: cache, inputChan: make(chan interface{}, 1000), outputChan: ch, @@ -58,11 +59,14 @@ func newAgreement( map[uint64]map[common.Hash]*types.AgreementResult), confirmedBlocks: make(map[common.Hash]struct{}), } + a.ctx, a.ctxCancel = context.WithCancel(context.Background()) + return a } // run starts the agreement, this does not start a new routine, go a new // routine explicitly in the caller. func (a *agreement) run() { + defer a.ctxCancel() for { select { case val, ok := <-a.inputChan: @@ -119,22 +123,35 @@ func (a *agreement) processAgreementResult(r *types.AgreementResult) { return } if r.IsEmptyBlock { - // Empty block is also confirmed. b := &types.Block{ Position: r.Position, } + // Empty blocks should be confirmed directly, they won't be sent over + // the wire. a.confirm(b) - } else { - needPull := true - if bs, exist := a.blocks[r.Position]; exist { - if b, exist := bs[r.BlockHash]; exist { - a.confirm(b) - needPull = false - } + return + } + if bs, exist := a.blocks[r.Position]; exist { + if b, exist := bs[r.BlockHash]; exist { + a.confirm(b) + return } - if needPull { - a.agreementResults[r.BlockHash] = struct{}{} - a.pullChan <- r.BlockHash + } + a.agreementResults[r.BlockHash] = struct{}{} +loop: + for { + select { + case a.pullChan <- r.BlockHash: + break loop + case <-a.ctx.Done(): + a.logger.Error("pull request is not sent", + "position", &r.Position, + "hash", r.BlockHash.String()[:6]) + return + case <-time.After(500 * time.Millisecond): + a.logger.Debug("pull request is unable to send", + "position", &r.Position, + "hash", r.BlockHash.String()[:6]) } } } @@ -168,7 +185,18 @@ func (a *agreement) confirm(b *types.Block) { if _, exist := a.confirmedBlocks[b.Hash]; !exist { delete(a.blocks, b.Position) delete(a.agreementResults, b.Hash) - a.outputChan <- b + loop: + for { + select { + case a.outputChan <- b: + break loop + case <-a.ctx.Done(): + a.logger.Error("confirmed block is not sent", "block", b) + return + case <-time.After(500 * time.Millisecond): + a.logger.Debug("agreement output channel is full", "block", b) + } + } a.confirmedBlocks[b.Hash] = struct{}{} } } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go index 92f8fd8d0..75c106793 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go @@ -18,7 +18,6 @@ package syncer import ( - "bytes" "context" "fmt" "sort" @@ -40,7 +39,8 @@ var ( ErrNotSynced = fmt.Errorf("not synced yet") // ErrGenesisBlockReached is reported when genesis block reached. ErrGenesisBlockReached = fmt.Errorf("genesis block reached") - // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered blocks. + // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered + // blocks. ErrInvalidBlockOrder = fmt.Errorf("invalid block order") // ErrMismatchBlockHashSequence means the delivering sequence is not // correct, compared to finalized blocks. @@ -61,6 +61,7 @@ type Consensus struct { prv crypto.PrivateKey network core.Network nodeSetCache *utils.NodeSetCache + tsigVerifier *core.TSigVerifierCache lattice *core.Lattice validatedChains map[uint32]struct{} @@ -83,6 +84,9 @@ type Consensus struct { ctxCancel context.CancelFunc syncedLastBlock *types.Block syncedConsensus *core.Consensus + dummyCancel context.CancelFunc + dummyFinished <-chan struct{} + dummyMsgBuffer []interface{} } // NewConsensus creates an instance for Consensus (syncer consensus). @@ -102,6 +106,7 @@ func NewConsensus( db: db, network: network, nodeSetCache: utils.NewNodeSetCache(gov), + tsigVerifier: core.NewTSigVerifierCache(gov, 7), prv: prv, logger: logger, validatedChains: make(map[uint32]struct{}), @@ -118,17 +123,15 @@ func NewConsensus( } func (con *Consensus) initConsensusObj(initBlock *types.Block) { - var cfg *types.Config func() { con.lock.Lock() defer con.lock.Unlock() con.latticeLastRound = initBlock.Position.Round - cfg = con.configs[con.latticeLastRound] debugApp, _ := con.app.(core.Debug) con.lattice = core.NewLattice( con.roundBeginTimes[con.latticeLastRound], con.latticeLastRound, - cfg, + con.configs[con.latticeLastRound], utils.NewSigner(con.prv), con.app, debugApp, @@ -136,37 +139,49 @@ func (con *Consensus) initConsensusObj(initBlock *types.Block) { con.logger, ) }() - con.startAgreement(cfg.NumChains) + con.startAgreement() con.startNetwork() con.startCRSMonitor() } -func (con *Consensus) checkIfValidated() bool { +func (con *Consensus) checkIfValidated() (validated bool) { con.lock.RLock() defer con.lock.RUnlock() - var numChains = con.configs[con.blocks[0][0].Position.Round].NumChains - var validatedChainCount uint32 + var ( + round = con.blocks[0][0].Position.Round + numChains = con.configs[round].NumChains + validatedChainCount uint32 + ) // Make sure we validate some block in all chains. for chainID := range con.validatedChains { if chainID < numChains { validatedChainCount++ } } - if validatedChainCount == numChains { - return true - } - con.logger.Debug("not validated yet", "validated-chain", validatedChainCount) - return false + validated = validatedChainCount == numChains + con.logger.Debug("syncer chain-validation status", + "validated-chain", validatedChainCount, + "round", round, + "valid", validated) + return } -func (con *Consensus) checkIfSynced(blocks []*types.Block) bool { +func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) { con.lock.RLock() defer con.lock.RUnlock() var ( - numChains = con.configs[con.blocks[0][0].Position.Round].NumChains + round = con.blocks[0][0].Position.Round + numChains = con.configs[round].NumChains compactionTips = make([]*types.Block, numChains) overlapCount = uint32(0) ) + defer func() { + con.logger.Debug("syncer synced status", + "overlap-count", overlapCount, + "num-chain", numChains, + "last-block", blocks[len(blocks)-1], + "synced", synced) + }() // Find tips (newset blocks) of each chain in compaction chain. b := blocks[len(blocks)-1] for tipCount := uint32(0); tipCount < numChains; { @@ -178,7 +193,7 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool { } } if (b.Finalization.ParentHash == common.Hash{}) { - return false + return } b1, err := con.db.GetBlock(b.Finalization.ParentHash) if err != nil { @@ -196,14 +211,8 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool { } } } - if overlapCount == numChains { - return true - } - con.logger.Debug("not synced yet", - "overlap-count", overlapCount, - "num-chain", numChains, - "last-block", blocks[len(blocks)-1]) - return false + synced = overlapCount == numChains + return } // ensureAgreementOverlapRound ensures the oldest blocks in each chain in @@ -212,6 +221,10 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool { func (con *Consensus) ensureAgreementOverlapRound() bool { con.lock.Lock() defer con.lock.Unlock() + defer func() { + con.logger.Debug("ensureAgreementOverlapRound returned", + "round", con.agreementRoundCut) + }() if con.agreementRoundCut > 0 { return true } @@ -267,7 +280,6 @@ func (con *Consensus) ensureAgreementOverlapRound() bool { "configs", len(con.configs)) if tipRoundMap[r] == con.configs[r].NumChains { con.agreementRoundCut = r - con.logger.Info("agreement round cut found, round", r) return true } } @@ -278,12 +290,17 @@ func (con *Consensus) findLatticeSyncBlock( blocks []*types.Block) (*types.Block, error) { lastBlock := blocks[len(blocks)-1] round := lastBlock.Position.Round + isConfigChanged := func(prev, cur *types.Config) bool { + return prev.K != cur.K || + prev.NumChains != cur.NumChains || + prev.PhiRatio != cur.PhiRatio + } for { // Find round r which r-1, r, r+1 are all in same total ordering config. for { - sameAsPrevRound := round == 0 || !con.isConfigChanged( + sameAsPrevRound := round == 0 || !isConfigChanged( con.configs[round-1], con.configs[round]) - sameAsNextRound := !con.isConfigChanged( + sameAsNextRound := !isConfigChanged( con.configs[round], con.configs[round+1]) if sameAsPrevRound && sameAsNextRound { break @@ -306,8 +323,9 @@ func (con *Consensus) findLatticeSyncBlock( lastBlock = &b } // Find the deliver set by hash for two times. Blocks in a deliver set - // returned by total ordering is sorted by hash. If a block's parent hash - // is greater than its hash means there is a cut between deliver sets. + // returned by total ordering is sorted by hash. If a block's parent + // hash is greater than its hash means there is a cut between deliver + // sets. var curBlock, prevBlock *types.Block var deliverSetFirstBlock, deliverSetLastBlock *types.Block curBlock = lastBlock @@ -374,11 +392,13 @@ func (con *Consensus) processFinalizedBlock(block *types.Block) error { if con.lattice == nil { return nil } - con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash) delivered, err := con.lattice.ProcessFinalizedBlock(block) if err != nil { return err } + con.lock.Lock() + defer con.lock.Unlock() + con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash) for idx, b := range delivered { if con.finalizedBlockHashes[idx] != b.Hash { return ErrMismatchBlockHashSequence @@ -393,18 +413,27 @@ func (con *Consensus) processFinalizedBlock(block *types.Block) error { // regards the blocks are the latest ones. Notice that latest can be true for // many times. // NOTICE: parameter "blocks" should be consecutive in compaction height. +// NOTICE: this method is not expected to be called concurrently. func (con *Consensus) SyncBlocks( - blocks []*types.Block, latest bool) (bool, error) { + blocks []*types.Block, latest bool) (synced bool, err error) { + defer func() { + con.logger.Debug("SyncBlocks returned", + "synced", synced, + "error", err, + "last-block", con.syncedLastBlock) + }() if con.syncedLastBlock != nil { - return true, ErrAlreadySynced + synced, err = true, ErrAlreadySynced + return } if len(blocks) == 0 { - return false, nil + return } // Check if blocks are consecutive. for i := 1; i < len(blocks); i++ { if blocks[i].Finalization.Height != blocks[i-1].Finalization.Height+1 { - return false, ErrInvalidBlockOrder + err = ErrInvalidBlockOrder + return } } // Make sure the first block is the next block of current compaction chain @@ -414,7 +443,8 @@ func (con *Consensus) SyncBlocks( con.logger.Error("mismatched finalization height", "now", blocks[0].Finalization.Height, "expected", tipHeight+1) - return false, ErrInvalidSyncingFinalizationHeight + err = ErrInvalidSyncingFinalizationHeight + return } con.logger.Trace("syncBlocks", "position", &blocks[0].Position, @@ -425,30 +455,35 @@ func (con *Consensus) SyncBlocks( con.setupConfigs(blocks) for _, b := range blocks { // TODO(haoping) remove this if lattice puts blocks into db. - if err := con.db.PutBlock(*b); err != nil { + if err = con.db.PutBlock(*b); err != nil { // A block might be put into db when confirmed by BA, but not // finalized yet. if err == db.ErrBlockExists { err = con.db.UpdateBlock(*b) } if err != nil { - return false, err + return } } - if err := con.db.PutCompactionChainTipInfo( + if err = con.db.PutCompactionChainTipInfo( b.Hash, b.Finalization.Height); err != nil { - return false, err + return } - if err := con.processFinalizedBlock(b); err != nil { - return false, err + if err = con.processFinalizedBlock(b); err != nil { + return } } if latest && con.lattice == nil { - // New Lattice and find the deliver set of total ordering when "latest" is - // true for first time. Deliver set is found by block hashes. - syncBlock, err := con.findLatticeSyncBlock(blocks) + // New Lattice and find the deliver set of total ordering when "latest" + // is true for first time. Deliver set is found by block hashes. + var syncBlock *types.Block + syncBlock, err = con.findLatticeSyncBlock(blocks) if err != nil { - return false, err + if err == ErrGenesisBlockReached { + con.logger.Debug("SyncBlocks skip error", "error", err) + err = nil + } + return } if syncBlock != nil { con.logger.Debug("deliver set found", "block", syncBlock) @@ -457,7 +492,8 @@ func (con *Consensus) SyncBlocks( con.setupConfigs(blocks) // Process blocks from syncBlock to blocks' last block. b := blocks[len(blocks)-1] - blocksCount := b.Finalization.Height - syncBlock.Finalization.Height + 1 + blocksCount := + b.Finalization.Height - syncBlock.Finalization.Height + 1 blocksToProcess := make([]*types.Block, blocksCount) for { blocksToProcess[blocksCount-1] = b @@ -465,15 +501,16 @@ func (con *Consensus) SyncBlocks( if b.Hash == syncBlock.Hash { break } - b1, err := con.db.GetBlock(b.Finalization.ParentHash) + var b1 types.Block + b1, err = con.db.GetBlock(b.Finalization.ParentHash) if err != nil { - return false, err + return } b = &b1 } for _, b := range blocksToProcess { - if err := con.processFinalizedBlock(b); err != nil { - return false, err + if err = con.processFinalizedBlock(b); err != nil { + return } } } @@ -483,19 +520,25 @@ func (con *Consensus) SyncBlocks( // overlapping of compaction chain and BA's oldest blocks means the // syncing is done. if con.checkIfValidated() && con.checkIfSynced(blocks) { - if err := con.Stop(); err != nil { - return false, err + if err = con.Stop(); err != nil { + return } + con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( + context.Background(), con.network.ReceiveChan(), + func(msg interface{}) { + con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) + }) con.syncedLastBlock = blocks[len(blocks)-1] - con.logger.Info("syncer.Consensus synced", - "last-block", con.syncedLastBlock) + synced = true } } - return con.syncedLastBlock != nil, nil + return } // GetSyncedConsensus returns the core.Consensus instance after synced. func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { + con.lock.Lock() + defer con.lock.Unlock() if con.syncedConsensus != nil { return con.syncedConsensus, nil } @@ -504,18 +547,16 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { } // flush all blocks in con.blocks into core.Consensus, and build // core.Consensus from syncer. - confirmedBlocks := []*types.Block{} + confirmedBlocks := make([][]*types.Block, len(con.blocks)) + for i, bs := range con.blocks { + confirmedBlocks[i] = []*types.Block(bs) + } randomnessResults := []*types.BlockRandomnessResult{} - func() { - con.lock.Lock() - defer con.lock.Unlock() - for _, bs := range con.blocks { - confirmedBlocks = append(confirmedBlocks, bs...) - } - for _, r := range con.randomnessResults { - randomnessResults = append(randomnessResults, r) - } - }() + for _, r := range con.randomnessResults { + randomnessResults = append(randomnessResults, r) + } + con.dummyCancel() + <-con.dummyFinished var err error con.syncedConsensus, err = core.NewConsensusFromSyncer( con.syncedLastBlock, @@ -528,6 +569,7 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { con.lattice, confirmedBlocks, randomnessResults, + con.dummyMsgBuffer, con.logger) return con.syncedConsensus, err } @@ -535,13 +577,17 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { // Stop the syncer. // // This method is mainly for caller to stop the syncer before synced, the syncer -// would call this method automatically after synced. +// would call this method automatically after being synced. func (con *Consensus) Stop() error { + con.logger.Trace("syncer is about to stop") // Stop network and CRS routines, wait until they are all stoped. con.ctxCancel() + con.logger.Trace("stop syncer modules") con.moduleWaitGroup.Wait() // Stop agreements. + con.logger.Trace("stop syncer agreement modules") con.stopAgreement() + con.logger.Trace("syncer stopped") return nil } @@ -566,6 +612,10 @@ func (con *Consensus) setupConfigsUntilRound(round uint64) { func() { con.lock.Lock() defer con.lock.Unlock() + con.logger.Debug("syncer setupConfigs", + "until-round", round, + "length", len(con.configs), + "lattice", con.latticeLastRound) for r := uint64(len(con.configs)); r <= round; r++ { cfg := utils.GetConfigWithPanic(con.gov, r, con.logger) con.configs = append(con.configs, cfg) @@ -589,6 +639,7 @@ func (con *Consensus) setupConfigsUntilRound(round uint64) { } }() con.resizeByNumChains(curMaxNumChains) + con.logger.Trace("setupConfgis finished", "round", round) } // setupConfigs is called by SyncBlocks with blocks from compaction chain. In @@ -601,9 +652,6 @@ func (con *Consensus) setupConfigs(blocks []*types.Block) { maxRound = b.Position.Round } } - con.logger.Debug("syncer setupConfigs", - "max", maxRound, - "lattice", con.latticeLastRound) // Get configs from governance. // // In fullnode, the notification of new round is yet another TX, which @@ -623,7 +671,8 @@ func (con *Consensus) resizeByNumChains(numChains uint32) { // Resize the pool of blocks. con.blocks = append(con.blocks, types.ByPosition{}) // Resize agreement modules. - a := newAgreement(con.receiveChan, con.pullChan, con.nodeSetCache, con.logger) + a := newAgreement( + con.receiveChan, con.pullChan, con.nodeSetCache, con.logger) con.agreements = append(con.agreements, a) con.agreementWaitGroup.Add(1) go func() { @@ -635,7 +684,7 @@ func (con *Consensus) resizeByNumChains(numChains uint32) { } // startAgreement starts agreements for receiving votes and agreements. -func (con *Consensus) startAgreement(numChains uint32) { +func (con *Consensus) startAgreement() { // Start a routine for listening receive channel and pull block channel. go func() { for { @@ -648,8 +697,8 @@ func (con *Consensus) startAgreement(numChains uint32) { func() { con.lock.Lock() defer con.lock.Unlock() - // If round is cut in agreements, do not add blocks with round less - // then cut round. + // If round is cut in agreements, do not add blocks with + // round less then cut round. if b.Position.Round < con.agreementRoundCut { return } @@ -667,6 +716,10 @@ func (con *Consensus) startAgreement(numChains uint32) { } func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) { + // There is no block randomness at round-0. + if r.Position.Round == 0 { + return + } // We only have to cache randomness result after cutting round. if r.Position.Round < func() uint64 { con.lock.RLock() @@ -675,23 +728,43 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) { }() { return } - con.lock.Lock() - defer con.lock.Unlock() - if old, exists := con.randomnessResults[r.BlockHash]; exists { - if bytes.Compare(old.Randomness, r.Randomness) != 0 { - panic(fmt.Errorf("receive different randomness result: %s, %s", - r.BlockHash.String()[:6], &r.Position)) - } - // We don't have to assign the map again. + if func() (exists bool) { + con.lock.RLock() + defer con.lock.RUnlock() + _, exists = con.randomnessResults[r.BlockHash] + return + }() { + return + } + v, ok, err := con.tsigVerifier.UpdateAndGet(r.Position.Round) + if err != nil { + con.logger.Error("Unable to get tsig verifier", + "hash", r.BlockHash.String()[:6], + "position", &r.Position, + "error", err) + return + } + if !ok { + con.logger.Error("Tsig is not ready", "position", &r.Position) return } + if !v.VerifySignature(r.BlockHash, crypto.Signature{ + Type: "bls", + Signature: r.Randomness}) { + con.logger.Info("Block randomness is not valid", + "position", &r.Position, + "hash", r.BlockHash.String()[:6]) + return + } + con.lock.Lock() + defer con.lock.Unlock() con.randomnessResults[r.BlockHash] = r } // startNetwork starts network for receiving blocks and agreement results. func (con *Consensus) startNetwork() { + con.moduleWaitGroup.Add(1) go func() { - con.moduleWaitGroup.Add(1) defer con.moduleWaitGroup.Done() Loop: for { @@ -709,15 +782,22 @@ func (con *Consensus) startNetwork() { default: continue Loop } - func() { + if func() bool { con.lock.RLock() defer con.lock.RUnlock() if pos.ChainID >= uint32(len(con.agreements)) { - con.logger.Error("Unknown chainID message received (syncer)", + // This error might be easily encountered when the + // "latest" parameter of SyncBlocks is turned on too + // early. + con.logger.Error( + "Unknown chainID message received (syncer)", "position", &pos) + return false } - }() - con.agreements[pos.ChainID].inputChan <- val + return true + }() { + con.agreements[pos.ChainID].inputChan <- val + } case <-con.ctx.Done(): return } @@ -732,19 +812,32 @@ func (con *Consensus) startCRSMonitor() { // Notify all agreements for new CRS. notifyNewCRS := func(round uint64) { con.setupConfigsUntilRound(round) - con.lock.Lock() - defer con.lock.Unlock() if round == lastNotifiedRound { return } con.logger.Debug("CRS is ready", "round", round) lastNotifiedRound = round - for _, a := range con.agreements { - a.inputChan <- round + con.lock.Lock() + defer con.lock.Unlock() + for idx, a := range con.agreements { + loop: + for { + select { + case <-con.ctx.Done(): + break loop + case a.inputChan <- round: + break loop + case <-time.After(500 * time.Millisecond): + con.logger.Debug( + "agreement input channel is full when putting CRS", + "chainID", idx, + "round", round) + } + } } } + con.moduleWaitGroup.Add(1) go func() { - con.moduleWaitGroup.Add(1) defer con.moduleWaitGroup.Done() for { select { @@ -781,9 +874,3 @@ func (con *Consensus) stopAgreement() { close(con.receiveChan) close(con.pullChan) } - -func (con *Consensus) isConfigChanged(prev, cur *types.Config) bool { - return prev.K != cur.K || - prev.NumChains != cur.NumChains || - prev.PhiRatio != cur.PhiRatio -} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/ticker.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/ticker.go index f8d0c67d9..ffd5ab45d 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/ticker.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/ticker.go @@ -36,12 +36,16 @@ const ( // defaultTicker is a wrapper to implement ticker interface based on // time.Ticker. type defaultTicker struct { - ticker *time.Ticker + ticker *time.Ticker + duration time.Duration } // newDefaultTicker constructs an defaultTicker instance by giving an interval. func newDefaultTicker(lambda time.Duration) *defaultTicker { - return &defaultTicker{ticker: time.NewTicker(lambda)} + return &defaultTicker{ + ticker: time.NewTicker(lambda), + duration: lambda, + } } // Tick implements Tick method of ticker interface. @@ -54,6 +58,12 @@ func (t *defaultTicker) Stop() { t.ticker.Stop() } +// Restart implements Stop method of ticker interface. +func (t *defaultTicker) Restart() { + t.ticker.Stop() + t.ticker = time.NewTicker(t.duration) +} + // newTicker is a helper to setup a ticker by giving an Governance. If // the governace object implements a ticker generator, a ticker from that // generator would be returned, else constructs a default one. diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go index de8dd0bb7..2e2158e7c 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go @@ -652,12 +652,14 @@ func (global *totalOrderingGlobalVector) addBlock( return } // Add breakpoint. - global.breakpoints[chainID] = append( - global.breakpoints[chainID], - &totalOrderingBreakpoint{ - roundID: b.Position.Round, - lastHeight: tip.Position.Height, - }) + if b.Position.Round > global.curRound { + global.breakpoints[chainID] = append( + global.breakpoints[chainID], + &totalOrderingBreakpoint{ + roundID: b.Position.Round, + lastHeight: tip.Position.Height, + }) + } } else { if b.Position.Height != tip.Position.Height+1 { err = ErrInvalidDAG @@ -1052,7 +1054,6 @@ func (to *totalOrdering) generateDeliverSet() ( wg.Wait() // Reset dirty chains. to.dirtyChainIDs = to.dirtyChainIDs[:0] - // TODO(mission): ANS should be bounded by current numChains. globalAnsLength := globalInfo.getAckingNodeSetLength( globalInfo, cfg.k, cfg.numChains) CheckNextCandidateLoop: @@ -1126,7 +1127,6 @@ CheckNextCandidateLoop: checkANS := func() bool { var chainAnsLength uint64 for p := range precedings { - // TODO(mission): ANS should be bound by current numChains. chainAnsLength = to.candidates[p].getAckingNodeSetLength( globalInfo, cfg.k, cfg.numChains) if uint64(chainAnsLength) < uint64(cfg.numChains)-cfg.phi { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/nodeset.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/nodeset.go index da615e1f0..21a1e528e 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/nodeset.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/nodeset.go @@ -41,6 +41,7 @@ type subSetTargetType byte const ( targetNotarySet subSetTargetType = iota targetDKGSet + targetNodeLeader ) type nodeRank struct { @@ -72,6 +73,17 @@ func NewNodeSet() *NodeSet { } } +// NewNodeSetFromMap creates a new NodeSet from NodeID map. +func NewNodeSetFromMap(nodes map[NodeID]struct{}) *NodeSet { + nIDs := make(map[NodeID]struct{}, len(nodes)) + for nID := range nodes { + nIDs[nID] = struct{}{} + } + return &NodeSet{ + IDs: nIDs, + } +} + // NewNotarySetTarget is the target for getting Notary Set. func NewNotarySetTarget(crs common.Hash, chainID uint32) *SubSetTarget { binaryChainID := make([]byte, 4) @@ -80,11 +92,23 @@ func NewNotarySetTarget(crs common.Hash, chainID uint32) *SubSetTarget { return newTarget(targetNotarySet, crs[:], binaryChainID) } -// NewDKGSetTarget is the target for getting DKG Set. +// NewDKGSetTarget is the target for getting DKG Set. func NewDKGSetTarget(crs common.Hash) *SubSetTarget { return newTarget(targetDKGSet, crs[:]) } +// NewNodeLeaderTarget is the target for getting leader of fast BA. +func NewNodeLeaderTarget(crs common.Hash, pos Position) *SubSetTarget { + binaryRoundID := make([]byte, 8) + binary.LittleEndian.PutUint64(binaryRoundID, pos.Round) + binaryChainID := make([]byte, 4) + binary.LittleEndian.PutUint32(binaryChainID, pos.ChainID) + binaryHeight := make([]byte, 8) + binary.LittleEndian.PutUint64(binaryHeight, pos.Height) + return newTarget(targetNodeLeader, crs[:], + binaryRoundID, binaryChainID, binaryHeight) +} + // Add a NodeID to the set. func (ns *NodeSet) Add(ID NodeID) { ns.IDs[ID] = struct{}{} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/vote.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/vote.go index 7601542ae..97044f5aa 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/vote.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/vote.go @@ -32,6 +32,7 @@ const ( VoteInit VoteType = iota VotePreCom VoteCom + VoteFast // Do not add any type below MaxVoteType. MaxVoteType ) diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go index 0c2d15588..3b1069eb8 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go @@ -159,8 +159,10 @@ func VerifyAgreementResult( if len(res.Votes) < len(notarySet)/3*2+1 { return ErrNotEnoughVotes } - if len(res.Votes) > len(notarySet) { - return ErrIncorrectVoteProposer + voted := make(map[types.NodeID]struct{}, len(notarySet)) + voteType := res.Votes[0].Type + if voteType != types.VoteFast && voteType != types.VoteCom { + return ErrIncorrectVoteType } for _, vote := range res.Votes { if res.IsEmptyBlock { @@ -172,7 +174,7 @@ func VerifyAgreementResult( return ErrIncorrectVoteBlockHash } } - if vote.Type != types.VoteCom { + if vote.Type != voteType { return ErrIncorrectVoteType } if vote.Position != res.Position { @@ -188,6 +190,10 @@ func VerifyAgreementResult( if !ok { return ErrIncorrectVoteSignature } + voted[vote.ProposerID] = struct{}{} + } + if len(voted) < len(notarySet)/3*2+1 { + return ErrNotEnoughVotes } return nil } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go index 6d4f7b0ba..8a07c9d2b 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go @@ -38,9 +38,11 @@ var ( ) type sets struct { - nodeSet *types.NodeSet - notarySet []map[types.NodeID]struct{} - dkgSet map[types.NodeID]struct{} + crs common.Hash + nodeSet *types.NodeSet + notarySet []map[types.NodeID]struct{} + dkgSet map[types.NodeID]struct{} + leaderNode []map[uint64]types.NodeID } // NodeSetCacheInterface interface specifies interface used by NodeSetCache. @@ -146,6 +148,33 @@ func (cache *NodeSetCache) GetDKGSet( return cache.cloneMap(IDs.dkgSet), nil } +// GetLeaderNode returns the BA leader of the position. +func (cache *NodeSetCache) GetLeaderNode(pos types.Position) ( + types.NodeID, error) { + IDs, err := cache.getOrUpdate(pos.Round) + if err != nil { + return types.NodeID{}, err + } + if pos.ChainID >= uint32(len(IDs.leaderNode)) { + return types.NodeID{}, ErrInvalidChainID + } + cache.lock.Lock() + defer cache.lock.Unlock() + if _, exist := IDs.leaderNode[pos.ChainID][pos.Height]; !exist { + notarySet := types.NewNodeSetFromMap(IDs.notarySet[pos.ChainID]) + leader := + notarySet.GetSubSet(1, types.NewNodeLeaderTarget(IDs.crs, pos)) + if len(leader) != 1 { + panic(errors.New("length of leader is not one")) + } + for nID := range leader { + IDs.leaderNode[pos.ChainID][pos.Height] = nID + break + } + } + return IDs.leaderNode[pos.ChainID][pos.Height], nil +} + func (cache *NodeSetCache) cloneMap( nIDs map[types.NodeID]struct{}) map[types.NodeID]struct{} { nIDsCopy := make(map[types.NodeID]struct{}, len(nIDs)) @@ -207,15 +236,21 @@ func (cache *NodeSetCache) update( return } nIDs = &sets{ + crs: crs, nodeSet: nodeSet, notarySet: make([]map[types.NodeID]struct{}, cfg.NumChains), dkgSet: nodeSet.GetSubSet( int(cfg.DKGSetSize), types.NewDKGSetTarget(crs)), + leaderNode: make([]map[uint64]types.NodeID, cfg.NumChains), } for i := range nIDs.notarySet { nIDs.notarySet[i] = nodeSet.GetSubSet( int(cfg.NotarySetSize), types.NewNotarySetTarget(crs, uint32(i))) } + nodesPerChain := cfg.RoundInterval / cfg.MinBlockInterval + for i := range nIDs.leaderNode { + nIDs.leaderNode[i] = make(map[uint64]types.NodeID, nodesPerChain) + } cache.rounds[round] = nIDs // Purge older rounds. diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go index 8c9f77a69..8486d2854 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go @@ -18,6 +18,7 @@ package utils import ( + "context" "fmt" "github.com/dexon-foundation/dexon-consensus/common" @@ -92,3 +93,36 @@ func VerifyDKGComplaint( } return ok, nil } + +// LaunchDummyReceiver launches a go routine to receive from the receive +// channel of a network module. An context is required to stop the go routine +// automatically. An optinal message handler could be provided. +func LaunchDummyReceiver( + ctx context.Context, recv <-chan interface{}, handler func(interface{})) ( + context.CancelFunc, <-chan struct{}) { + var ( + dummyCtx, dummyCancel = context.WithCancel(ctx) + finishedChan = make(chan struct{}, 1) + ) + go func() { + defer func() { + finishedChan <- struct{}{} + }() + loop: + for { + select { + case <-dummyCtx.Done(): + break loop + case v, ok := <-recv: + if !ok { + panic(fmt.Errorf( + "receive channel is closed before dummy receiver")) + } + if handler != nil { + handler(v) + } + } + } + }() + return dummyCancel, finishedChan +} diff --git a/vendor/vendor.json b/vendor/vendor.json index f2da6e117..6d367a737 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -141,16 +141,16 @@ { "checksumSHA1": "ZUuiRqS6PnoNIvBmLStVQiyhkOM=", "path": "github.com/dexon-foundation/dexon-consensus/common", - "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903", - "revisionTime": "2019-01-05T09:58:34Z", + "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc", + "revisionTime": "2019-01-16T02:21:07Z", "version": "master", "versionExact": "master" }, { - "checksumSHA1": "aqhVp5CBDq52ytHUH3HatpWhTDQ=", + "checksumSHA1": "TKAPWiezlt9t0oZca1Cq9S388lI=", "path": "github.com/dexon-foundation/dexon-consensus/core", - "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903", - "revisionTime": "2019-01-05T09:58:34Z", + "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc", + "revisionTime": "2019-01-16T02:21:07Z", "version": "master", "versionExact": "master" }, @@ -165,64 +165,64 @@ { "checksumSHA1": "tQSbYCu5P00lUhKsx3IbBZCuSLY=", "path": "github.com/dexon-foundation/dexon-consensus/core/crypto", - "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903", - "revisionTime": "2019-01-05T09:58:34Z", + "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc", + "revisionTime": "2019-01-16T02:21:07Z", "version": "master", "versionExact": "master" }, { "checksumSHA1": "W2P7pkuJ+26BpJg03K4Y0nB5obI=", "path": "github.com/dexon-foundation/dexon-consensus/core/crypto/dkg", - "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903", - "revisionTime": "2019-01-05T09:58:34Z", + "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc", + "revisionTime": "2019-01-16T02:21:07Z", "version": "master", "versionExact": "master" }, { "checksumSHA1": "6Pf6caC8LTNCI7IflFmglKYnxYo=", "path": "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa", - "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903", - "revisionTime": "2019-01-05T09:58:34Z", + "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc", + "revisionTime": "2019-01-16T02:21:07Z", "version": "master", "versionExact": "master" }, { - "checksumSHA1": "PJXR1OuWwVVYrdJMK3skPr1/8ls=", + "checksumSHA1": "zpuCdMT8MGsy4pLgHKpg/Wd4izU=", "path": "github.com/dexon-foundation/dexon-consensus/core/db", - "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903", - "revisionTime": "2019-01-05T09:58:34Z", + "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc", + "revisionTime": "2019-01-16T02:21:07Z", "version": "master", "versionExact": "master" }, { - "checksumSHA1": "h674l/hugVujbZUy/NSeDmio3/U=", + "checksumSHA1": "eq19vhMpc90UUJ7I91ti5P2CkQ0=", "path": "github.com/dexon-foundation/dexon-consensus/core/syncer", - "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903", - "revisionTime": "2019-01-05T09:58:34Z", + "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc", + "revisionTime": "2019-01-16T02:21:07Z", "version": "master", "versionExact": "master" }, { - "checksumSHA1": "GRiBmU5T1LAoGHs5g1owGE1tNNo=", + "checksumSHA1": "zkrt3MOtqHPB3BmZtppZ9uesw3s=", "path": "github.com/dexon-foundation/dexon-consensus/core/types", - "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903", - "revisionTime": "2019-01-05T09:58:34Z", + "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc", + "revisionTime": "2019-01-16T02:21:07Z", "version": "master", "versionExact": "master" }, { "checksumSHA1": "rmv8uxwrqMhJAeA3RPvwYP8mFro=", "path": "github.com/dexon-foundation/dexon-consensus/core/types/dkg", - "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903", - "revisionTime": "2019-01-05T09:58:34Z", + "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc", + "revisionTime": "2019-01-16T02:21:07Z", "version": "master", "versionExact": "master" }, { - "checksumSHA1": "NCAEGRVPfM0jCKdrBN2yvEXkeIo=", + "checksumSHA1": "FUHa68Hif8F8YHmx4h0sQIUNp40=", "path": "github.com/dexon-foundation/dexon-consensus/core/utils", - "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903", - "revisionTime": "2019-01-05T09:58:34Z", + "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc", + "revisionTime": "2019-01-16T02:21:07Z", "version": "master", "versionExact": "master" }, |