aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/byzantine-lab/dexon-consensus/core/agreement-mgr.go
diff options
context:
space:
mode:
authorWei-Ning Huang <w@byzantine-lab.io>2019-06-12 17:31:08 +0800
committerWei-Ning Huang <w@byzantine-lab.io>2019-09-17 16:57:29 +0800
commitac088de6322fc16ebe75c2e5554be73754bf1fe2 (patch)
tree086b7827d46a4d07b834cd94be73beaabb77b734 /vendor/github.com/byzantine-lab/dexon-consensus/core/agreement-mgr.go
parent67d565f3f0e398e99bef96827f729e3e4b0edf31 (diff)
downloadgo-tangerine-ac088de6322fc16ebe75c2e5554be73754bf1fe2.tar
go-tangerine-ac088de6322fc16ebe75c2e5554be73754bf1fe2.tar.gz
go-tangerine-ac088de6322fc16ebe75c2e5554be73754bf1fe2.tar.bz2
go-tangerine-ac088de6322fc16ebe75c2e5554be73754bf1fe2.tar.lz
go-tangerine-ac088de6322fc16ebe75c2e5554be73754bf1fe2.tar.xz
go-tangerine-ac088de6322fc16ebe75c2e5554be73754bf1fe2.tar.zst
go-tangerine-ac088de6322fc16ebe75c2e5554be73754bf1fe2.zip
Rebrand as tangerine-network/go-tangerine
Diffstat (limited to 'vendor/github.com/byzantine-lab/dexon-consensus/core/agreement-mgr.go')
-rw-r--r--vendor/github.com/byzantine-lab/dexon-consensus/core/agreement-mgr.go676
1 files changed, 676 insertions, 0 deletions
diff --git a/vendor/github.com/byzantine-lab/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/byzantine-lab/dexon-consensus/core/agreement-mgr.go
new file mode 100644
index 000000000..cdbfadf13
--- /dev/null
+++ b/vendor/github.com/byzantine-lab/dexon-consensus/core/agreement-mgr.go
@@ -0,0 +1,676 @@
+// Copyright 2018 The dexon-consensus Authors
+// This file is part of the dexon-consensus library.
+//
+// The dexon-consensus library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "context"
+ "errors"
+ "math"
+ "sync"
+ "time"
+
+ lru "github.com/hashicorp/golang-lru"
+
+ "github.com/byzantine-lab/dexon-consensus/common"
+ "github.com/byzantine-lab/dexon-consensus/core/types"
+ typesDKG "github.com/byzantine-lab/dexon-consensus/core/types/dkg"
+ "github.com/byzantine-lab/dexon-consensus/core/utils"
+)
+
+// Errors returned from BA modules
+var (
+ ErrPreviousRoundIsNotFinished = errors.New("previous round is not finished")
+ ErrRoundOutOfRange = errors.New("round out of range")
+ ErrInvalidBlock = errors.New("invalid block")
+ ErrNoValidLeader = errors.New("no valid leader")
+ ErrIncorrectCRSSignature = errors.New("incorrect CRS signature")
+ ErrBlockTooOld = errors.New("block too old")
+)
+
+const maxResultCache = 100
+const settingLimit = 3
+
+// genValidLeader generate a validLeader function for agreement modules.
+func genValidLeader(
+ mgr *agreementMgr) validLeaderFn {
+ return func(block *types.Block, crs common.Hash) (bool, error) {
+ if block.Timestamp.After(time.Now()) {
+ return false, nil
+ }
+ if block.Position.Round >= DKGDelayRound {
+ if mgr.recv.npks == nil {
+ return false, nil
+ }
+ if block.Position.Round > mgr.recv.npks.Round {
+ return false, nil
+ }
+ if block.Position.Round < mgr.recv.npks.Round {
+ return false, ErrBlockTooOld
+ }
+ }
+ if !utils.VerifyCRSSignature(block, crs, mgr.recv.npks) {
+ return false, ErrIncorrectCRSSignature
+ }
+ if err := mgr.bcModule.sanityCheck(block); err != nil {
+ if err == ErrRetrySanityCheckLater {
+ return false, nil
+ }
+ return false, err
+ }
+ mgr.logger.Debug("Calling Application.VerifyBlock", "block", block)
+ switch mgr.app.VerifyBlock(block) {
+ case types.VerifyInvalidBlock:
+ return false, ErrInvalidBlock
+ case types.VerifyRetryLater:
+ return false, nil
+ default:
+ }
+ return true, nil
+ }
+}
+
+type agreementMgrConfig struct {
+ utils.RoundBasedConfig
+
+ notarySetSize uint32
+ lambdaBA time.Duration
+ crs common.Hash
+}
+
+func (c *agreementMgrConfig) from(
+ round uint64, config *types.Config, crs common.Hash) {
+ c.notarySetSize = config.NotarySetSize
+ c.lambdaBA = config.LambdaBA
+ c.crs = crs
+ c.SetupRoundBasedFields(round, config)
+}
+
+func newAgreementMgrConfig(prev agreementMgrConfig, config *types.Config,
+ crs common.Hash) (c agreementMgrConfig) {
+ c = agreementMgrConfig{}
+ c.from(prev.RoundID()+1, config, crs)
+ c.AppendTo(prev.RoundBasedConfig)
+ return
+}
+
+type baRoundSetting struct {
+ round uint64
+ dkgSet map[types.NodeID]struct{}
+ threshold int
+ ticker Ticker
+ crs common.Hash
+}
+
+type agreementMgr struct {
+ // TODO(mission): unbound Consensus instance from this module.
+ con *Consensus
+ ID types.NodeID
+ app Application
+ gov Governance
+ network Network
+ logger common.Logger
+ cache *utils.NodeSetCache
+ signer *utils.Signer
+ bcModule *blockChain
+ ctx context.Context
+ configs []agreementMgrConfig
+ baModule *agreement
+ recv *consensusBAReceiver
+ processedBAResult map[types.Position]struct{}
+ voteFilter *utils.VoteFilter
+ settingCache *lru.Cache
+ curRoundSetting *baRoundSetting
+ waitGroup sync.WaitGroup
+ isRunning bool
+ lock sync.RWMutex
+}
+
+func newAgreementMgr(con *Consensus) (mgr *agreementMgr, err error) {
+ settingCache, _ := lru.New(settingLimit)
+ mgr = &agreementMgr{
+ con: con,
+ ID: con.ID,
+ app: con.app,
+ gov: con.gov,
+ network: con.network,
+ logger: con.logger,
+ cache: con.nodeSetCache,
+ signer: con.signer,
+ bcModule: con.bcModule,
+ ctx: con.ctx,
+ processedBAResult: make(map[types.Position]struct{}, maxResultCache),
+ voteFilter: utils.NewVoteFilter(),
+ settingCache: settingCache,
+ }
+ mgr.recv = &consensusBAReceiver{
+ consensus: con,
+ restartNotary: make(chan types.Position, 1),
+ }
+ return mgr, nil
+}
+
+func (mgr *agreementMgr) prepare() {
+ round := mgr.bcModule.tipRound()
+ agr := newAgreement(
+ mgr.ID,
+ mgr.recv,
+ newLeaderSelector(genValidLeader(mgr), mgr.logger),
+ mgr.signer,
+ mgr.logger)
+ setting := mgr.generateSetting(round)
+ if setting == nil {
+ mgr.logger.Warn("Unable to prepare init setting", "round", round)
+ return
+ }
+ mgr.curRoundSetting = setting
+ agr.notarySet = mgr.curRoundSetting.dkgSet
+ // Hacky way to make agreement module self contained.
+ mgr.recv.agreementModule = agr
+ mgr.baModule = agr
+ if round >= DKGDelayRound {
+ if _, exist := setting.dkgSet[mgr.ID]; exist {
+ mgr.logger.Debug("Preparing signer and npks.", "round", round)
+ npk, signer, err := mgr.con.cfgModule.getDKGInfo(round, false)
+ if err != nil {
+ mgr.logger.Error("Failed to prepare signer and npks.",
+ "round", round,
+ "error", err)
+ }
+ mgr.logger.Debug("Prepared signer and npks.",
+ "round", round, "signer", signer != nil, "npks", npk != nil)
+ }
+ }
+ return
+}
+
+func (mgr *agreementMgr) run() {
+ mgr.lock.Lock()
+ defer mgr.lock.Unlock()
+ if mgr.isRunning {
+ return
+ }
+ mgr.isRunning = true
+ mgr.waitGroup.Add(1)
+ go func() {
+ defer mgr.waitGroup.Done()
+ mgr.runBA(mgr.bcModule.tipRound())
+ }()
+}
+
+func (mgr *agreementMgr) calcLeader(
+ dkgSet map[types.NodeID]struct{},
+ crs common.Hash, pos types.Position) (
+ types.NodeID, error) {
+ nodeSet := types.NewNodeSetFromMap(dkgSet)
+ leader := nodeSet.GetSubSet(1, types.NewNodeLeaderTarget(
+ crs, pos.Height))
+ for nID := range leader {
+ return nID, nil
+ }
+ return types.NodeID{}, ErrNoValidLeader
+}
+
+func (mgr *agreementMgr) config(round uint64) *agreementMgrConfig {
+ mgr.lock.RLock()
+ defer mgr.lock.RUnlock()
+ if round < mgr.configs[0].RoundID() {
+ panic(ErrRoundOutOfRange)
+ }
+ roundIndex := round - mgr.configs[0].RoundID()
+ if roundIndex >= uint64(len(mgr.configs)) {
+ return nil
+ }
+ return &mgr.configs[roundIndex]
+}
+
+func (mgr *agreementMgr) notifyRoundEvents(evts []utils.RoundEventParam) error {
+ mgr.lock.Lock()
+ defer mgr.lock.Unlock()
+ apply := func(e utils.RoundEventParam) error {
+ if len(mgr.configs) > 0 {
+ lastCfg := mgr.configs[len(mgr.configs)-1]
+ if e.BeginHeight != lastCfg.RoundEndHeight() {
+ return ErrInvalidBlockHeight
+ }
+ if lastCfg.RoundID() == e.Round {
+ mgr.configs[len(mgr.configs)-1].ExtendLength()
+ } else if lastCfg.RoundID()+1 == e.Round {
+ mgr.configs = append(mgr.configs, newAgreementMgrConfig(
+ lastCfg, e.Config, e.CRS))
+ } else {
+ return ErrInvalidRoundID
+ }
+ } else {
+ c := agreementMgrConfig{}
+ c.from(e.Round, e.Config, e.CRS)
+ c.SetRoundBeginHeight(e.BeginHeight)
+ mgr.configs = append(mgr.configs, c)
+ }
+ return nil
+ }
+ for _, e := range evts {
+ if err := apply(e); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (mgr *agreementMgr) checkProposer(
+ round uint64, proposerID types.NodeID) error {
+ if round == mgr.curRoundSetting.round {
+ if _, exist := mgr.curRoundSetting.dkgSet[proposerID]; !exist {
+ return ErrNotInNotarySet
+ }
+ } else if round == mgr.curRoundSetting.round+1 {
+ setting := mgr.generateSetting(round)
+ if setting == nil {
+ return ErrConfigurationNotReady
+ }
+ if _, exist := setting.dkgSet[proposerID]; !exist {
+ return ErrNotInNotarySet
+ }
+ }
+ return nil
+}
+
+func (mgr *agreementMgr) processVote(v *types.Vote) (err error) {
+ if !mgr.recv.isNotary {
+ return nil
+ }
+ if mgr.voteFilter.Filter(v) {
+ return nil
+ }
+ if err := mgr.checkProposer(v.Position.Round, v.ProposerID); err != nil {
+ return err
+ }
+ if err = mgr.baModule.processVote(v); err == nil {
+ mgr.baModule.updateFilter(mgr.voteFilter)
+ mgr.voteFilter.AddVote(v)
+ }
+ if err == ErrSkipButNoError {
+ err = nil
+ }
+ return
+}
+
+func (mgr *agreementMgr) processBlock(b *types.Block) error {
+ if err := mgr.checkProposer(b.Position.Round, b.ProposerID); err != nil {
+ return err
+ }
+ return mgr.baModule.processBlock(b)
+}
+
+func (mgr *agreementMgr) touchAgreementResult(
+ result *types.AgreementResult) (first bool) {
+ // DO NOT LOCK THIS FUNCTION!!!!!!!! YOU WILL REGRET IT!!!!!
+ if _, exist := mgr.processedBAResult[result.Position]; !exist {
+ first = true
+ if len(mgr.processedBAResult) > maxResultCache {
+ for k := range mgr.processedBAResult {
+ // Randomly drop one element.
+ delete(mgr.processedBAResult, k)
+ break
+ }
+ }
+ mgr.processedBAResult[result.Position] = struct{}{}
+ }
+ return
+}
+
+func (mgr *agreementMgr) untouchAgreementResult(
+ result *types.AgreementResult) {
+ // DO NOT LOCK THIS FUNCTION!!!!!!!! YOU WILL REGRET IT!!!!!
+ delete(mgr.processedBAResult, result.Position)
+}
+
+func (mgr *agreementMgr) processAgreementResult(
+ result *types.AgreementResult) error {
+ aID := mgr.baModule.agreementID()
+ if isStop(aID) {
+ return nil
+ }
+ if result.Position == aID && !mgr.baModule.confirmed() {
+ mgr.logger.Info("Syncing BA", "position", result.Position)
+ if result.Position.Round >= DKGDelayRound {
+ return mgr.baModule.processAgreementResult(result)
+ }
+ for key := range result.Votes {
+ if err := mgr.baModule.processVote(&result.Votes[key]); err != nil {
+ return err
+ }
+ }
+ } else if result.Position.Newer(aID) {
+ mgr.logger.Info("Fast syncing BA", "position", result.Position)
+ if result.Position.Round < DKGDelayRound {
+ mgr.logger.Debug("Calling Network.PullBlocks for fast syncing BA",
+ "hash", result.BlockHash)
+ mgr.network.PullBlocks(common.Hashes{result.BlockHash})
+ for key := range result.Votes {
+ if err := mgr.baModule.processVote(&result.Votes[key]); err != nil {
+ return err
+ }
+ }
+ }
+ setting := mgr.generateSetting(result.Position.Round)
+ if setting == nil {
+ mgr.logger.Warn("unable to get setting", "round",
+ result.Position.Round)
+ return ErrConfigurationNotReady
+ }
+ mgr.curRoundSetting = setting
+ leader, err := mgr.calcLeader(setting.dkgSet, setting.crs, result.Position)
+ if err != nil {
+ return err
+ }
+ mgr.baModule.restart(
+ setting.dkgSet, setting.threshold,
+ result.Position, leader, setting.crs)
+ if result.Position.Round >= DKGDelayRound {
+ return mgr.baModule.processAgreementResult(result)
+ }
+ }
+ return nil
+}
+
+func (mgr *agreementMgr) processFinalizedBlock(block *types.Block) error {
+ aID := mgr.baModule.agreementID()
+ if block.Position.Older(aID) {
+ return nil
+ }
+ mgr.baModule.processFinalizedBlock(block)
+ return nil
+}
+
+func (mgr *agreementMgr) stop() {
+ // Stop all running agreement modules.
+ func() {
+ mgr.lock.Lock()
+ defer mgr.lock.Unlock()
+ mgr.baModule.stop()
+ }()
+ // Block until all routines are done.
+ mgr.waitGroup.Wait()
+}
+
+func (mgr *agreementMgr) generateSetting(round uint64) *baRoundSetting {
+ if setting, exist := mgr.settingCache.Get(round); exist {
+ return setting.(*baRoundSetting)
+ }
+ curConfig := mgr.config(round)
+ if curConfig == nil {
+ return nil
+ }
+ var dkgSet map[types.NodeID]struct{}
+ if round >= DKGDelayRound {
+ _, qualidifed, err := typesDKG.CalcQualifyNodes(
+ mgr.gov.DKGMasterPublicKeys(round),
+ mgr.gov.DKGComplaints(round),
+ utils.GetDKGThreshold(mgr.gov.Configuration(round)),
+ )
+ if err != nil {
+ mgr.logger.Error("Failed to get gpk", "round", round, "error", err)
+ return nil
+ }
+ dkgSet = qualidifed
+ }
+ if len(dkgSet) == 0 {
+ var err error
+ dkgSet, err = mgr.cache.GetNotarySet(round)
+ if err != nil {
+ mgr.logger.Error("Failed to get notarySet", "round", round, "error", err)
+ return nil
+ }
+ }
+ setting := &baRoundSetting{
+ crs: curConfig.crs,
+ dkgSet: dkgSet,
+ round: round,
+ threshold: utils.GetBAThreshold(&types.Config{
+ NotarySetSize: curConfig.notarySetSize}),
+ }
+ mgr.settingCache.Add(round, setting)
+ return setting
+}
+
+func (mgr *agreementMgr) runBA(initRound uint64) {
+ // These are round based variables.
+ var (
+ currentRound uint64
+ nextRound = initRound
+ curConfig = mgr.config(initRound)
+ setting = &baRoundSetting{}
+ tickDuration time.Duration
+ ticker Ticker
+ )
+
+ // Check if this routine needs to awake in this round and prepare essential
+ // variables when yes.
+ checkRound := func() (isDKG bool) {
+ defer func() {
+ currentRound = nextRound
+ nextRound++
+ }()
+ // Wait until the configuartion for next round is ready.
+ for {
+ if setting = mgr.generateSetting(nextRound); setting != nil {
+ break
+ } else {
+ mgr.logger.Debug("Round is not ready", "round", nextRound)
+ time.Sleep(1 * time.Second)
+ }
+ }
+ _, isDKG = setting.dkgSet[mgr.ID]
+ if isDKG {
+ mgr.logger.Info("Selected as dkg set",
+ "ID", mgr.ID,
+ "round", nextRound)
+ } else {
+ mgr.logger.Info("Not selected as dkg set",
+ "ID", mgr.ID,
+ "round", nextRound)
+ }
+ // Setup ticker
+ if tickDuration != curConfig.lambdaBA {
+ if ticker != nil {
+ ticker.Stop()
+ }
+ ticker = newTicker(mgr.gov, nextRound, TickerBA)
+ tickDuration = curConfig.lambdaBA
+ }
+ setting.ticker = ticker
+ return
+ }
+Loop:
+ for {
+ select {
+ case <-mgr.ctx.Done():
+ break Loop
+ default:
+ }
+ mgr.recv.isNotary = checkRound()
+ mgr.voteFilter = utils.NewVoteFilter()
+ mgr.voteFilter.Position.Round = currentRound
+ mgr.recv.emptyBlockHashMap = &sync.Map{}
+ if currentRound >= DKGDelayRound && mgr.recv.isNotary {
+ var err error
+ mgr.recv.npks, mgr.recv.psigSigner, err =
+ mgr.con.cfgModule.getDKGInfo(currentRound, false)
+ if err != nil {
+ mgr.logger.Warn("cannot get dkg info",
+ "round", currentRound, "error", err)
+ }
+ } else {
+ mgr.recv.npks = nil
+ mgr.recv.psigSigner = nil
+ }
+ // Run BA for this round.
+ mgr.recv.restartNotary <- types.Position{
+ Round: currentRound,
+ Height: math.MaxUint64,
+ }
+ if err := mgr.baRoutineForOneRound(setting); err != nil {
+ mgr.logger.Error("BA routine failed",
+ "error", err,
+ "nodeID", mgr.ID)
+ break Loop
+ }
+ }
+}
+
+func (mgr *agreementMgr) baRoutineForOneRound(
+ setting *baRoundSetting) (err error) {
+ agr := mgr.baModule
+ recv := mgr.recv
+ oldPos := agr.agreementID()
+ restart := func(restartPos types.Position) (breakLoop bool, err error) {
+ if !isStop(restartPos) {
+ if restartPos.Height+1 >= mgr.config(setting.round).RoundEndHeight() {
+ for {
+ select {
+ case <-mgr.ctx.Done():
+ break
+ default:
+ }
+ tipRound := mgr.bcModule.tipRound()
+ if tipRound > setting.round {
+ break
+ } else {
+ mgr.logger.Debug("Waiting blockChain to change round...",
+ "curRound", setting.round,
+ "tipRound", tipRound)
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ // This round is finished.
+ breakLoop = true
+ return
+ }
+ if restartPos.Older(oldPos) {
+ // The restartNotary event is triggered by 'BlockConfirmed'
+ // of some older block.
+ return
+ }
+ }
+ var nextHeight uint64
+ var nextTime time.Time
+ for {
+ // Make sure we are stoppable.
+ select {
+ case <-mgr.ctx.Done():
+ breakLoop = true
+ return
+ default:
+ }
+ nextHeight, nextTime = mgr.bcModule.nextBlock()
+ if nextHeight != notReadyHeight {
+ if isStop(restartPos) {
+ break
+ }
+ if nextHeight > restartPos.Height {
+ break
+ }
+ }
+ mgr.logger.Debug("BlockChain not ready!!!",
+ "old", oldPos, "restart", restartPos, "next", nextHeight)
+ time.Sleep(100 * time.Millisecond)
+ }
+ nextPos := types.Position{
+ Round: setting.round,
+ Height: nextHeight,
+ }
+ oldPos = nextPos
+ var leader types.NodeID
+ leader, err = mgr.calcLeader(setting.dkgSet, setting.crs, nextPos)
+ if err != nil {
+ return
+ }
+ time.Sleep(nextTime.Sub(time.Now()))
+ setting.ticker.Restart()
+ agr.restart(setting.dkgSet, setting.threshold, nextPos, leader, setting.crs)
+ return
+ }
+Loop:
+ for {
+ select {
+ case <-mgr.ctx.Done():
+ break Loop
+ default:
+ }
+ if agr.confirmed() {
+ // Block until receive restartPos
+ select {
+ case restartPos := <-recv.restartNotary:
+ breakLoop, err := restart(restartPos)
+ if err != nil {
+ return err
+ }
+ if breakLoop {
+ break Loop
+ }
+ case <-mgr.ctx.Done():
+ break Loop
+ }
+ }
+ select {
+ case restartPos := <-recv.restartNotary:
+ breakLoop, err := restart(restartPos)
+ if err != nil {
+ return err
+ }
+ if breakLoop {
+ break Loop
+ }
+ default:
+ }
+ if !mgr.recv.isNotary {
+ select {
+ case <-setting.ticker.Tick():
+ continue Loop
+ case <-mgr.ctx.Done():
+ break Loop
+ }
+ }
+ if err = agr.nextState(); err != nil {
+ mgr.logger.Error("Failed to proceed to next state",
+ "nodeID", mgr.ID.String(),
+ "error", err)
+ break Loop
+ }
+ if agr.pullVotes() {
+ pos := agr.agreementID()
+ mgr.logger.Debug("Calling Network.PullVotes for syncing votes",
+ "position", pos)
+ mgr.network.PullVotes(pos)
+ }
+ for i := 0; i < agr.clocks(); i++ {
+ // Priority select for agreement.done().
+ select {
+ case <-agr.done():
+ continue Loop
+ default:
+ }
+ select {
+ case <-agr.done():
+ continue Loop
+ case <-setting.ticker.Tick():
+ }
+ }
+ }
+ return nil
+}