From 2d92fa035aa8705d9f81a20dd454c6b8eeb25882 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Tue, 26 Mar 2019 10:38:00 +0800 Subject: vendor: sync to latest core (#300) --- .../dexon-consensus/core/agreement-mgr.go | 6 +- .../dexon-consensus/core/agreement.go | 6 +- .../dexon-consensus/core/blockchain.go | 4 + .../dexon-consensus/core/configuration-chain.go | 204 +++++++++++---------- .../dexon-consensus/core/consensus.go | 63 +++---- .../dexon-consensus/core/dkg-tsig-protocol.go | 7 + .../dexon-consensus/core/syncer/agreement.go | 18 +- .../dexon-consensus/core/syncer/consensus.go | 88 +++------ .../dexon-foundation/dexon-consensus/core/utils.go | 2 +- .../dexon-consensus/core/utils/round-event.go | 15 +- 10 files changed, 199 insertions(+), 214 deletions(-) (limited to 'vendor/github.com/dexon-foundation') diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go index 14aa3857b..d29863df5 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go @@ -337,7 +337,7 @@ func (mgr *agreementMgr) runBA(initRound uint64) { if curConfig = mgr.config(nextRound); curConfig != nil { break } else { - mgr.logger.Debug("round is not ready", "round", nextRound) + mgr.logger.Debug("Round is not ready", "round", nextRound) time.Sleep(1 * time.Second) } } @@ -350,11 +350,11 @@ func (mgr *agreementMgr) runBA(initRound uint64) { setting.notarySet = notarySet _, isNotary = setting.notarySet[mgr.ID] if isNotary { - mgr.logger.Info("selected as notary set", + mgr.logger.Info("Selected as notary set", "ID", mgr.ID, "round", nextRound) } else { - mgr.logger.Info("not selected as notary set", + mgr.logger.Info("Not selected as notary set", "ID", mgr.ID, "round", nextRound) } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go index b0c773429..16f36bccd 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go @@ -174,7 +174,7 @@ func (a *agreement) restart( a.data.votes[1] = newVoteListMap() a.data.period = 2 a.data.blocks = make(map[types.NodeID]*types.Block) - a.data.requiredVote = len(notarySet)/3*2 + 1 + a.data.requiredVote = len(notarySet)*2/3 + 1 a.data.leader.restart(crs) a.data.lockValue = types.NullBlockHash a.data.lockIter = 0 @@ -239,14 +239,14 @@ func (a *agreement) restart( for _, block := range replayBlock { if err := a.processBlock(block); err != nil { - a.logger.Error("failed to process block when restarting agreement", + a.logger.Error("Failed to process block when restarting agreement", "block", block) } } for _, vote := range replayVote { if err := a.processVote(vote); err != nil { - a.logger.Error("failed to process vote when restarting agreement", + a.logger.Error("Failed to process vote when restarting agreement", "vote", vote) } } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go index c5a22b628..610ab28bd 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go @@ -122,6 +122,7 @@ func newBlockChainConfig(prev blockChainConfig, config *types.Config) ( type tsigVerifierGetter interface { UpdateAndGet(uint64) (TSigVerifier, bool, error) + Purge(uint64) } type blockChain struct { @@ -196,6 +197,9 @@ func (bc *blockChain) notifyRoundEvents(evts []utils.RoundEventParam) error { } bc.configs = append(bc.configs, c) } + if e.Reset != 0 { + bc.vGetter.Purge(e.Round + 1) + } return nil } for _, e := range evts { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go index 084f9d0c9..48b0f2a89 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go @@ -18,6 +18,7 @@ package core import ( + "context" "fmt" "sync" "time" @@ -57,7 +58,7 @@ func (e ErrMismatchDKG) Error() string { e.expectRound, e.expectReset, e.actualRound, e.actualReset) } -type dkgStepFn func(round uint64, reset uint64) (chan<- struct{}, error) +type dkgStepFn func(round uint64, reset uint64) error type configurationChain struct { ID types.NodeID @@ -70,7 +71,6 @@ type configurationChain struct { dkgSigner map[uint64]*dkgShareSecret npks map[uint64]*typesDKG.NodePublicKeys dkgResult sync.RWMutex - abortDKGCh chan chan<- struct{} tsig map[common.Hash]*tsigProtocol tsigTouched map[common.Hash]struct{} tsigReady *sync.Cond @@ -80,8 +80,11 @@ type configurationChain struct { mpkReady bool pendingPrvShare map[types.NodeID]*typesDKG.PrivateShare // TODO(jimmy-dexon): add timeout to pending psig. - pendingPsig map[common.Hash][]*typesDKG.PartialSignature - prevHash common.Hash + pendingPsig map[common.Hash][]*typesDKG.PartialSignature + prevHash common.Hash + dkgCtx context.Context + dkgCtxCancel context.CancelFunc + dkgRunning bool } func newConfigurationChain( @@ -98,7 +101,6 @@ func newConfigurationChain( logger: logger, dkgSigner: make(map[uint64]*dkgShareSecret), npks: make(map[uint64]*typesDKG.NodePublicKeys), - abortDKGCh: make(chan chan<- struct{}, 1), tsig: make(map[common.Hash]*tsigProtocol), tsigTouched: make(map[common.Hash]struct{}), tsigReady: sync.NewCond(&sync.Mutex{}), @@ -110,18 +112,23 @@ func newConfigurationChain( return configurationChain } -func (cc *configurationChain) abortDKG(round, reset uint64) { +func (cc *configurationChain) abortDKG( + parentCtx context.Context, + round, reset uint64) bool { cc.dkgLock.Lock() defer cc.dkgLock.Unlock() if cc.dkg != nil { - cc.abortDKGNoLock(round, reset) + return cc.abortDKGNoLock(parentCtx, round, reset) } + return false } -func (cc *configurationChain) abortDKGNoLock(round, reset uint64) bool { +func (cc *configurationChain) abortDKGNoLock( + ctx context.Context, + round, reset uint64) bool { if cc.dkg.round > round || - (cc.dkg.round == round && cc.dkg.reset >= reset) { - cc.logger.Error("newer DKG already is registered", + (cc.dkg.round == round && cc.dkg.reset > reset) { + cc.logger.Error("Newer DKG already is registered", "round", round, "reset", reset) return false @@ -132,30 +139,50 @@ func (cc *configurationChain) abortDKGNoLock(round, reset uint64) bool { "previous-round", cc.dkg.round, "previous-reset", cc.dkg.reset) // Abort DKG routine in previous round. - aborted := make(chan struct{}, 1) cc.logger.Error("Aborting DKG in previous round", "round", round, "previous-round", cc.dkg.round) - cc.dkgLock.Unlock() // Notify current running DKG protocol to abort. - cc.abortDKGCh <- aborted + if cc.dkgCtxCancel != nil { + cc.dkgCtxCancel() + } + cc.dkgLock.Unlock() // Wait for current running DKG protocol aborting. - <-aborted + for { + cc.dkgLock.Lock() + if cc.dkgRunning == false { + cc.dkg = nil + break + } + select { + case <-ctx.Done(): + return false + case <-time.After(100 * time.Millisecond): + } + cc.dkgLock.Unlock() + } cc.logger.Error("Previous DKG aborted", "round", round, "reset", reset) - cc.dkgLock.Lock() - return true + return cc.dkg == nil } -func (cc *configurationChain) registerDKG(round, reset uint64, threshold int) { +func (cc *configurationChain) registerDKG( + parentCtx context.Context, + round, reset uint64, + threshold int) { cc.dkgLock.Lock() defer cc.dkgLock.Unlock() if cc.dkg != nil { // Make sure we only proceed when cc.dkg is nil. - if !cc.abortDKGNoLock(round, reset) { + if !cc.abortDKGNoLock(parentCtx, round, reset) { return } + select { + case <-parentCtx.Done(): + return + default: + } if cc.dkg != nil { // This panic would only raise when multiple attampts to register // a DKG protocol at the same time. @@ -176,6 +203,7 @@ func (cc *configurationChain) registerDKG(round, reset uint64, threshold int) { cc.pendingPrvShare = make(map[types.NodeID]*typesDKG.PrivateShare) cc.mpkReady = false cc.dkg, err = recoverDKGProtocol(cc.ID, cc.recv, round, reset, cc.db) + cc.dkgCtx, cc.dkgCtxCancel = context.WithCancel(parentCtx) if err != nil { panic(err) } @@ -207,45 +235,39 @@ func (cc *configurationChain) registerDKG(round, reset uint64, threshold int) { }() } -func (cc *configurationChain) runDKGPhaseOne(round uint64, reset uint64) ( - abortCh chan<- struct{}, err error) { +func (cc *configurationChain) runDKGPhaseOne(round uint64, reset uint64) error { if cc.dkg.round < round || (cc.dkg.round == round && cc.dkg.reset < reset) { - err = ErrDKGNotRegistered - return + return ErrDKGNotRegistered } if cc.dkg.round != round || cc.dkg.reset != reset { cc.logger.Warn("DKG canceled", "round", round, "reset", reset) - err = ErrSkipButNoError - return + return ErrSkipButNoError } cc.logger.Debug("Calling Governance.IsDKGFinal", "round", round) if cc.gov.IsDKGFinal(round) { cc.logger.Warn("DKG already final", "round", round) - err = ErrSkipButNoError - return + return ErrSkipButNoError } cc.logger.Debug("Calling Governance.IsDKGMPKReady", "round", round) - for abortCh == nil && !cc.gov.IsDKGMPKReady(round) { + var err error + for err == nil && !cc.gov.IsDKGMPKReady(round) { cc.dkgLock.Unlock() cc.logger.Debug("DKG MPKs are not ready yet. Try again later...", "nodeID", cc.ID, "round", round) select { - case abortCh = <-cc.abortDKGCh: + case <-cc.dkgCtx.Done(): err = ErrDKGAborted case <-time.After(500 * time.Millisecond): } cc.dkgLock.Lock() } - if abortCh != nil { - return - } - return + return err } func (cc *configurationChain) runDKGPhaseTwoAndThree( - round uint64, reset uint64) (chan<- struct{}, error) { + round uint64, reset uint64) error { // Check if this node successfully join the protocol. cc.logger.Debug("Calling Governance.DKGMasterPublicKeys", "round", round) mpks := cc.gov.DKGMasterPublicKeys(round) @@ -260,7 +282,7 @@ func (cc *configurationChain) runDKGPhaseTwoAndThree( cc.logger.Warn("Failed to join DKG protocol", "round", round, "reset", reset) - return nil, ErrSkipButNoError + return ErrSkipButNoError } // Phase 2(T = 0): Exchange DKG secret key share. if err := cc.dkg.processMasterPublicKeys(mpks); err != nil { @@ -273,8 +295,8 @@ func (cc *configurationChain) runDKGPhaseTwoAndThree( // The time to process private share might be long, check aborting before // get into that loop. select { - case abortCh := <-cc.abortDKGCh: - return abortCh, ErrDKGAborted + case <-cc.dkgCtx.Done(): + return ErrDKGAborted default: } for _, prvShare := range cc.pendingPrvShare { @@ -288,7 +310,7 @@ func (cc *configurationChain) runDKGPhaseTwoAndThree( // Phase 3(T = 0~λ): Propose complaint. // Propose complaint is done in `processMasterPublicKeys`. - return nil, nil + return nil } func (cc *configurationChain) runDKGPhaseFour() { @@ -322,27 +344,27 @@ func (cc *configurationChain) runDKGPhaseEight() { cc.dkg.proposeFinalize() } -func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) ( - abortCh chan<- struct{}, err error) { +func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) error { // Phase 9(T = 6λ): DKG is ready. // Normally, IsDKGFinal would return true here. Use this for in case of // unexpected network fluctuation and ensure the robustness of DKG protocol. cc.logger.Debug("Calling Governance.IsDKGFinal", "round", round) - for abortCh == nil && !cc.gov.IsDKGFinal(round) { + var err error + for err == nil && !cc.gov.IsDKGFinal(round) { cc.dkgLock.Unlock() cc.logger.Debug("DKG is not ready yet. Try again later...", "nodeID", cc.ID.String()[:6], "round", round, "reset", reset) select { - case abortCh = <-cc.abortDKGCh: + case <-cc.dkgCtx.Done(): err = ErrDKGAborted case <-time.After(500 * time.Millisecond): } cc.dkgLock.Lock() } - if abortCh != nil { - return + if err != nil { + return err } cc.logger.Debug("Calling Governance.DKGMasterPublicKeys", "round", round) cc.logger.Debug("Calling Governance.DKGComplaints", "round", round) @@ -351,7 +373,7 @@ func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) ( cc.gov.DKGComplaints(round), cc.dkg.threshold) if err != nil { - return + return err } qualifies := "" for nID := range npks.QualifyNodeIDs { @@ -367,28 +389,29 @@ func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) ( cc.logger.Warn("Self is not in Qualify Nodes", "round", round, "reset", reset) - return + return nil } signer, err := cc.dkg.recoverShareSecret(npks.QualifyIDs) if err != nil { - return + return err } // Save private shares to DB. if err = cc.db.PutDKGPrivateKey(round, *signer.privateKey); err != nil { - return + return err } cc.dkgResult.Lock() defer cc.dkgResult.Unlock() cc.dkgSigner[round] = signer cc.npks[round] = npks - return + return nil } -func (cc *configurationChain) runTick(ticker Ticker) (abortCh chan<- struct{}) { +func (cc *configurationChain) runTick(ticker Ticker) (aborted bool) { cc.dkgLock.Unlock() defer cc.dkgLock.Lock() select { - case abortCh = <-cc.abortDKGCh: + case <-cc.dkgCtx.Done(): + aborted = true case <-ticker.Tick(): } return @@ -396,69 +419,42 @@ func (cc *configurationChain) runTick(ticker Ticker) (abortCh chan<- struct{}) { func (cc *configurationChain) initDKGPhasesFunc() { cc.dkgRunPhases = []dkgStepFn{ - func(round uint64, reset uint64) (chan<- struct{}, error) { + func(round uint64, reset uint64) error { return cc.runDKGPhaseOne(round, reset) }, - func(round uint64, reset uint64) (chan<- struct{}, error) { + func(round uint64, reset uint64) error { return cc.runDKGPhaseTwoAndThree(round, reset) }, - func(round uint64, reset uint64) (chan<- struct{}, error) { + func(round uint64, reset uint64) error { cc.runDKGPhaseFour() - return nil, nil + return nil }, - func(round uint64, reset uint64) (chan<- struct{}, error) { + func(round uint64, reset uint64) error { cc.runDKGPhaseFiveAndSix(round, reset) - return nil, nil + return nil }, - func(round uint64, reset uint64) (chan<- struct{}, error) { + func(round uint64, reset uint64) error { complaints := cc.gov.DKGComplaints(round) cc.runDKGPhaseSeven(complaints) - return nil, nil + return nil }, - func(round uint64, reset uint64) (chan<- struct{}, error) { + func(round uint64, reset uint64) error { cc.runDKGPhaseEight() - return nil, nil + return nil }, - func(round uint64, reset uint64) (chan<- struct{}, error) { + func(round uint64, reset uint64) error { return cc.runDKGPhaseNine(round, reset) }, } } func (cc *configurationChain) runDKG(round uint64, reset uint64) (err error) { - // Check if corresponding DKG signer is ready. - if _, _, err = cc.getDKGInfo(round); err == nil { - return ErrSkipButNoError - } cc.dkgLock.Lock() defer cc.dkgLock.Unlock() - var ( - ticker Ticker - abortCh chan<- struct{} - ) - defer func() { - if ticker != nil { - ticker.Stop() - } - // Here we should hold the cc.dkgLock, reset cc.dkg to nil when done. - cc.dkg = nil - if abortCh == nil { - select { - case abortCh = <-cc.abortDKGCh: - // The previous DKG finishes its job, don't overwrite its error - // with "aborted" here. - default: - } - } - if abortCh != nil { - abortCh <- struct{}{} - } - }() - tickStartAt := 1 - if cc.dkg == nil { return ErrDKGNotRegistered } + // Make sure the existed dkgProtocol is expected one. if cc.dkg.round != round || cc.dkg.reset != reset { return ErrMismatchDKG{ expectRound: round, @@ -467,19 +463,37 @@ func (cc *configurationChain) runDKG(round uint64, reset uint64) (err error) { actualReset: cc.dkg.reset, } } + if cc.dkgRunning { + panic(fmt.Errorf("duplicated call to runDKG: %d %d", round, reset)) + } + cc.dkgRunning = true + var ticker Ticker + defer func() { + if ticker != nil { + ticker.Stop() + } + // Here we should hold the cc.dkgLock, reset cc.dkg to nil when done. + if cc.dkg != nil { + cc.dkg = nil + } + cc.dkgRunning = false + }() + // Check if corresponding DKG signer is ready. + if _, _, err = cc.getDKGInfo(round); err == nil { + return ErrSkipButNoError + } + tickStartAt := 1 for i := cc.dkg.step; i < len(cc.dkgRunPhases); i++ { if i >= tickStartAt && ticker == nil { ticker = newTicker(cc.gov, round, TickerDKG) } - if ticker != nil { - if abortCh = cc.runTick(ticker); abortCh != nil { - return - } + if ticker != nil && cc.runTick(ticker) { + return } - switch abortCh, err = cc.dkgRunPhases[i](round, reset); err { + switch err = cc.dkgRunPhases[i](round, reset); err { case ErrSkipButNoError, nil: cc.dkg.step = i + 1 err = cc.db.PutOrUpdateDKGProtocol(cc.dkg.toDKGProtocolInfo()) @@ -633,7 +647,7 @@ func (cc *configurationChain) runTSig( go func() { for _, psig := range pendingPsig { if err := cc.processPartialSignature(psig); err != nil { - cc.logger.Error("failed to process partial signature", + cc.logger.Error("Failed to process partial signature", "nodeID", cc.ID, "error", err) } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go index 83727ec58..d74a4a290 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go @@ -99,7 +99,7 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash { } block, err := recv.consensus.proposeBlock(recv.agreementModule.agreementID()) if err != nil || block == nil { - recv.consensus.logger.Error("unable to propose block", "error", err) + recv.consensus.logger.Error("Unable to propose block", "error", err) return types.NullBlockHash } go func() { @@ -429,7 +429,7 @@ func NewConsensus( prv crypto.PrivateKey, logger common.Logger) *Consensus { return newConsensusForRound( - nil, 0, dMoment, app, gov, db, network, prv, logger, true) + nil, dMoment, app, gov, db, network, prv, logger, true) } // NewConsensusForSimulation creates an instance of Consensus for simulation, @@ -443,7 +443,7 @@ func NewConsensusForSimulation( prv crypto.PrivateKey, logger common.Logger) *Consensus { return newConsensusForRound( - nil, 0, dMoment, app, gov, db, network, prv, logger, false) + nil, dMoment, app, gov, db, network, prv, logger, false) } // NewConsensusFromSyncer constructs an Consensus instance from information @@ -457,7 +457,6 @@ func NewConsensusForSimulation( // their positions, in ascending order. func NewConsensusFromSyncer( initBlock *types.Block, - initRoundBeginHeight uint64, startWithEmpty bool, dMoment time.Time, app Application, @@ -470,8 +469,8 @@ func NewConsensusFromSyncer( cachedMessages []interface{}, logger common.Logger) (*Consensus, error) { // Setup Consensus instance. - con := newConsensusForRound(initBlock, initRoundBeginHeight, dMoment, app, - gov, db, networkModule, prv, logger, true) + con := newConsensusForRound(initBlock, dMoment, app, gov, db, + networkModule, prv, logger, true) // Launch a dummy receiver before we start receiving from network module. con.dummyMsgBuffer = cachedMessages con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( @@ -525,7 +524,6 @@ func NewConsensusFromSyncer( // TODO(mission): remove dMoment, it's no longer one part of consensus. func newConsensusForRound( initBlock *types.Block, - initRoundBeginHeight uint64, dMoment time.Time, app Application, gov Governance, @@ -594,18 +592,19 @@ func newConsensusForRound( } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) var err error - if con.roundEvent, err = utils.NewRoundEvent(con.ctx, gov, logger, initRound, - initRoundBeginHeight, initBlockHeight, ConfigRoundShift); err != nil { + con.roundEvent, err = utils.NewRoundEvent(con.ctx, gov, logger, initRound, + initBlockHeight, ConfigRoundShift) + if err != nil { panic(err) } baConfig := agreementMgrConfig{} baConfig.from(initRound, initConfig, initCRS) - baConfig.SetRoundBeginHeight(initRoundBeginHeight) + baConfig.SetRoundBeginHeight(gov.GetRoundHeight(initRound)) con.baMgr, err = newAgreementMgr(con, initRound, baConfig) if err != nil { panic(err) } - if err = con.prepare(initRoundBeginHeight, initBlock); err != nil { + if err = con.prepare(initBlock); err != nil { panic(err) } return con @@ -615,8 +614,7 @@ func newConsensusForRound( // 'initBlock' could be either: // - nil // - the last finalized block -func (con *Consensus) prepare( - initRoundBeginHeight uint64, initBlock *types.Block) (err error) { +func (con *Consensus) prepare(initBlock *types.Block) (err error) { // Trigger the round validation method for the next round of the first // round. // The block past from full node should be delivered already or known by @@ -633,11 +631,11 @@ func (con *Consensus) prepare( // Measure time elapse for each handler of round events. elapse := func(what string, lastE utils.RoundEventParam) func() { start := time.Now() - con.logger.Info("handle round event", + con.logger.Info("Handle round event", "what", what, "event", lastE) return func() { - con.logger.Info("finish round event", + con.logger.Info("Finish round event", "what", what, "event", lastE, "elapse", time.Since(start)) @@ -647,7 +645,7 @@ func (con *Consensus) prepare( // modules see the up-to-date node set, we need to make sure this action // should be taken as the first one. con.roundEvent.Register(func(evts []utils.RoundEventParam) { - defer elapse("purge node set", evts[len(evts)-1])() + defer elapse("purge-node-set", evts[len(evts)-1])() for _, e := range evts { if e.Reset == 0 { continue @@ -659,19 +657,25 @@ func (con *Consensus) prepare( con.roundEvent.Register(func(evts []utils.RoundEventParam) { e := evts[len(evts)-1] go func() { - defer elapse("abort DKG", e)() - con.cfgModule.abortDKG(e.Round+1, e.Reset) + defer elapse("abort-DKG", e)() + if e.Reset > 0 { + aborted := con.cfgModule.abortDKG(con.ctx, e.Round+1, e.Reset-1) + con.logger.Info("DKG aborting result", + "round", e.Round+1, + "reset", e.Reset-1, + "aborted", aborted) + } }() }) // Register round event handler to update BA and BC modules. con.roundEvent.Register(func(evts []utils.RoundEventParam) { - defer elapse("append config", evts[len(evts)-1])() + defer elapse("append-config", evts[len(evts)-1])() // Always updates newer configs to the later modules first in the flow. if err := con.bcModule.notifyRoundEvents(evts); err != nil { panic(err) } // The init config is provided to baModule when construction. - if evts[len(evts)-1].BeginHeight != initRoundBeginHeight { + if evts[len(evts)-1].BeginHeight != con.gov.GetRoundHeight(initRound) { if err := con.baMgr.notifyRoundEvents(evts); err != nil { panic(err) } @@ -681,7 +685,7 @@ func (con *Consensus) prepare( // failed to setup. con.roundEvent.Register(func(evts []utils.RoundEventParam) { e := evts[len(evts)-1] - defer elapse("reset DKG", e)() + defer elapse("reset-DKG", e)() nextRound := e.Round + 1 if nextRound < DKGDelayRound { return @@ -723,10 +727,7 @@ func (con *Consensus) prepare( return } // Aborting all previous running DKG protocol instance if any. - go func() { - con.cfgModule.abortDKG(nextRound, e.Reset) - con.runCRS(e.Round, utils.Rehash(e.CRS, uint(e.Reset+1)), true) - }() + go con.runCRS(e.Round, utils.Rehash(e.CRS, uint(e.Reset+1)), true) }) }) // Register round event handler to propose new CRS. @@ -734,7 +735,7 @@ func (con *Consensus) prepare( // We don't have to propose new CRS during DKG reset, the reset of DKG // would be done by the DKG set in previous round. e := evts[len(evts)-1] - defer elapse("propose CRS", e)() + defer elapse("propose-CRS", e)() if e.Reset != 0 || e.Round < DKGDelayRound { return } @@ -761,7 +762,7 @@ func (con *Consensus) prepare( // Touch nodeSetCache for next round. con.roundEvent.Register(func(evts []utils.RoundEventParam) { e := evts[len(evts)-1] - defer elapse("touch node set cache", e)() + defer elapse("touch-NodeSetCache", e)() if e.Reset != 0 { return } @@ -790,7 +791,7 @@ func (con *Consensus) prepare( // Trigger round validation method for next period. con.roundEvent.Register(func(evts []utils.RoundEventParam) { e := evts[len(evts)-1] - defer elapse("next round", e)() + defer elapse("next-round", e)() // Register a routine to trigger round events. con.event.RegisterHeight(e.NextRoundValidationHeight(), utils.RoundEventRetryHandlerGenerator(con.roundEvent, con.event)) @@ -832,7 +833,7 @@ func (con *Consensus) prepare( "reset", e.Reset) nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, con.logger) - con.cfgModule.registerDKG(nextRound, e.Reset, + con.cfgModule.registerDKG(con.ctx, nextRound, e.Reset, utils.GetDKGThreshold(nextConfig)) con.event.RegisterHeight(e.NextDKGPreparationHeight(), func(uint64) { @@ -1249,8 +1250,8 @@ func (con *Consensus) deliveryGuard() { return case <-con.resetDeliveryGuardTicker: case <-time.After(60 * time.Second): - con.logger.Error("no blocks delivered for too long", "ID", con.ID) - panic(fmt.Errorf("no blocks delivered for too long")) + con.logger.Error("No blocks delivered for too long", "ID", con.ID) + panic(fmt.Errorf("No blocks delivered for too long")) } } } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/dkg-tsig-protocol.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/dkg-tsig-protocol.go index 50c3a0bff..9a71bea1f 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/dkg-tsig-protocol.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/dkg-tsig-protocol.go @@ -555,6 +555,13 @@ func (tc *TSigVerifierCache) UpdateAndGet(round uint64) ( return v, ok, nil } +// Purge the cache and returns if success. +func (tc *TSigVerifierCache) Purge(round uint64) { + tc.lock.Lock() + defer tc.lock.Unlock() + delete(tc.verifier, round) +} + // Update the cache and returns if success. func (tc *TSigVerifierCache) Update(round uint64) (bool, error) { tc.lock.Lock() diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go index 9f1abcaf5..f172b3b4c 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go @@ -103,7 +103,7 @@ func (a *agreement) processBlock(b *types.Block) { func (a *agreement) processAgreementResult(r *types.AgreementResult) { // Cache those results that CRS is not ready yet. if _, exists := a.confirmedBlocks[r.BlockHash]; exists { - a.logger.Trace("agreement result already confirmed", "result", r) + a.logger.Trace("Agreement result already confirmed", "result", r) return } if r.Position.Round > a.latestCRSRound { @@ -113,11 +113,11 @@ func (a *agreement) processAgreementResult(r *types.AgreementResult) { a.pendings[r.Position.Round] = pendingsForRound } pendingsForRound[r.BlockHash] = r - a.logger.Trace("agreement result cached", "result", r) + a.logger.Trace("Agreement result cached", "result", r) return } if err := core.VerifyAgreementResult(r, a.cache); err != nil { - a.logger.Error("agreement result verification failed", + a.logger.Error("Agreement result verification failed", "result", r, "error", err) return @@ -144,12 +144,12 @@ loop: case a.pullChan <- r.BlockHash: break loop case <-a.ctx.Done(): - a.logger.Error("pull request is not sent", + a.logger.Error("Pull request is not sent", "position", &r.Position, "hash", r.BlockHash.String()[:6]) return case <-time.After(500 * time.Millisecond): - a.logger.Debug("pull request is unable to send", + a.logger.Debug("Pull request is unable to send", "position", &r.Position, "hash", r.BlockHash.String()[:6]) } @@ -171,12 +171,12 @@ func (a *agreement) processNewCRS(round uint64) { delete(a.pendings, r) for _, res := range pendingsForRound { if err := core.VerifyAgreementResult(res, a.cache); err != nil { - a.logger.Error("invalid agreement result", + a.logger.Error("Invalid agreement result", "result", res, "error", err) continue } - a.logger.Error("flush agreement result", "result", res) + a.logger.Error("Flush agreement result", "result", res) a.processAgreementResult(res) break } @@ -194,10 +194,10 @@ func (a *agreement) confirm(b *types.Block) { case a.outputChan <- b: break loop case <-a.ctx.Done(): - a.logger.Error("confirmed block is not sent", "block", b) + a.logger.Error("Confirmed block is not sent", "block", b) return case <-time.After(500 * time.Millisecond): - a.logger.Debug("agreement output channel is full", "block", b) + a.logger.Debug("Agreement output channel is full", "block", b) } } a.confirmedBlocks[b.Hash] = struct{}{} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go index 4fc24b407..24c781aac 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go @@ -66,8 +66,6 @@ type Consensus struct { randomnessResults map[common.Hash]*types.BlockRandomnessResult blocks types.BlocksByPosition agreementModule *agreement - configs []*types.Config - roundBeginHeights []uint64 agreementRoundCut uint64 heightEvt *common.Event roundEvt *utils.RoundEvent @@ -102,19 +100,15 @@ func NewConsensus( logger common.Logger) *Consensus { con := &Consensus{ - dMoment: dMoment, - app: app, - gov: gov, - db: db, - network: network, - nodeSetCache: utils.NewNodeSetCache(gov), - tsigVerifier: core.NewTSigVerifierCache(gov, 7), - prv: prv, - logger: logger, - configs: []*types.Config{ - utils.GetConfigWithPanic(gov, 0, logger), - }, - roundBeginHeights: []uint64{0}, + dMoment: dMoment, + app: app, + gov: gov, + db: db, + network: network, + nodeSetCache: utils.NewNodeSetCache(gov), + tsigVerifier: core.NewTSigVerifierCache(gov, 7), + prv: prv, + logger: logger, receiveChan: make(chan *types.Block, 1000), pullChan: make(chan common.Hash, 1000), randomnessResults: make(map[common.Hash]*types.BlockRandomnessResult), @@ -153,13 +147,12 @@ func (con *Consensus) assureBuffering() { ) if height == 0 { con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov, con.logger, - uint64(0), uint64(0), uint64(0), core.ConfigRoundShift) + 0, 0, core.ConfigRoundShift) } else { var b types.Block if b, err = con.db.GetBlock(blockHash); err == nil { - beginHeight := con.roundBeginHeights[b.Position.Round] con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov, - con.logger, b.Position.Round, beginHeight, beginHeight, + con.logger, b.Position.Round, b.Finalization.Height, core.ConfigRoundShift) } } @@ -176,6 +169,7 @@ func (con *Consensus) assureBuffering() { continue } con.nodeSetCache.Purge(e.Round + 1) + con.tsigVerifier.Purge(e.Round + 1) } }) // Register a round event handler to notify CRS to agreementModule. @@ -197,7 +191,7 @@ func (con *Consensus) assureBuffering() { return false case <-time.After(500 * time.Millisecond): con.logger.Warn( - "agreement input channel is full when putting CRS", + "Agreement input channel is full when notifying new round", "round", e.Round, ) return true @@ -223,7 +217,7 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) { con.lock.RLock() defer con.lock.RUnlock() defer func() { - con.logger.Debug("syncer synced status", + con.logger.Debug("Syncer synced status", "last-block", blocks[len(blocks)-1], "synced", synced, ) @@ -264,7 +258,6 @@ func (con *Consensus) ForceSync(skip bool) { panic(err) } con.logger.Info("Force Sync", "block", &block) - con.setupConfigsUntilRound(block.Position.Round + core.ConfigRoundShift - 1) con.syncedLastBlock = &block con.stopBuffering() // We might call stopBuffering without calling assureBuffering. @@ -310,20 +303,19 @@ func (con *Consensus) SyncBlocks( // tip in DB. _, tipHeight := con.db.GetCompactionChainTipInfo() if blocks[0].Finalization.Height != tipHeight+1 { - con.logger.Error("mismatched finalization height", + con.logger.Error("Mismatched finalization height", "now", blocks[0].Finalization.Height, "expected", tipHeight+1, ) err = ErrInvalidSyncingFinalizationHeight return } - con.logger.Trace("syncBlocks", + con.logger.Trace("SyncBlocks", "position", &blocks[0].Position, "final height", blocks[0].Finalization.Height, "len", len(blocks), "latest", latest, ) - con.setupConfigs(blocks) for _, b := range blocks { if err = con.db.PutBlock(*b); err != nil { // A block might be put into db when confirmed by BA, but not @@ -339,7 +331,7 @@ func (con *Consensus) SyncBlocks( b.Hash, b.Finalization.Height); err != nil { return } - go con.heightEvt.NotifyHeight(b.Finalization.Height) + con.heightEvt.NotifyHeight(b.Finalization.Height) } if latest { con.assureBuffering() @@ -377,7 +369,6 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { var err error con.syncedConsensus, err = core.NewConsensusFromSyncer( con.syncedLastBlock, - con.roundBeginHeights[con.syncedLastBlock.Position.Round], con.syncedSkipNext, con.dMoment, con.app, @@ -413,14 +404,14 @@ func (con *Consensus) stopBuffering() { return } con.duringBuffering = false - con.logger.Trace("syncer is about to stop") + con.logger.Trace("Syncer is about to stop") // Stop network and CRS routines, wait until they are all stoped. con.ctxCancel() return }() { return } - con.logger.Trace("stop syncer modules") + con.logger.Trace("Stop syncer modules") con.roundEvt.Stop() con.waitGroup.Done() // Wait for all routines depends on con.agreementModule stopped. @@ -433,9 +424,9 @@ func (con *Consensus) stopBuffering() { con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) }) // Stop agreements. - con.logger.Trace("stop syncer agreement modules") + con.logger.Trace("Stop syncer agreement modules") con.stopAgreement() - con.logger.Trace("syncer stopped") + con.logger.Trace("Syncer stopped") return } @@ -447,48 +438,13 @@ func (con *Consensus) isEmptyBlock(b *types.Block) bool { // buildEmptyBlock builds an empty block in agreement. func (con *Consensus) buildEmptyBlock(b *types.Block, parent *types.Block) { - cfg := con.configs[b.Position.Round] + cfg := utils.GetConfigWithPanic(con.gov, b.Position.Round, con.logger) b.Timestamp = parent.Timestamp.Add(cfg.MinBlockInterval) b.Witness.Height = parent.Witness.Height b.Witness.Data = make([]byte, len(parent.Witness.Data)) copy(b.Witness.Data, parent.Witness.Data) } -// setupConfigs is called by SyncBlocks with blocks from compaction chain. In -// the first time, setupConfigs setups from round 0. -func (con *Consensus) setupConfigs(blocks []*types.Block) { - // Find max round in blocks. - var maxRound uint64 - for _, b := range blocks { - if b.Position.Round > maxRound { - maxRound = b.Position.Round - } - } - // Get configs from governance. - // - // In fullnode, the notification of new round is yet another TX, which - // needs to be executed after corresponding block delivered. Thus, the - // configuration for 'maxRound + core.ConfigRoundShift' won't be ready when - // seeing this block. - con.setupConfigsUntilRound(maxRound + core.ConfigRoundShift - 1) -} - -func (con *Consensus) setupConfigsUntilRound(round uint64) { - con.lock.Lock() - defer con.lock.Unlock() - con.logger.Debug("syncer setupConfigs", - "until-round", round, - "length", len(con.configs), - ) - for r := uint64(len(con.configs)); r <= round; r++ { - cfg := utils.GetConfigWithPanic(con.gov, r, con.logger) - con.configs = append(con.configs, cfg) - con.roundBeginHeights = append( - con.roundBeginHeights, - con.roundBeginHeights[r-1]+con.configs[r-1].RoundLength) - } -} - // startAgreement starts agreements for receiving votes and agreements. func (con *Consensus) startAgreement() { // Start a routine for listening receive channel and pull block channel. diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go index 5742d113a..bd3170153 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go @@ -163,7 +163,7 @@ func VerifyAgreementResult( if err != nil { return err } - if len(res.Votes) < len(notarySet)/3*2+1 { + if len(res.Votes) < len(notarySet)*2/3+1 { return ErrNotEnoughVotes } voted := make(map[types.NodeID]struct{}, len(notarySet)) diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/round-event.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/round-event.go index ff1d91e3d..885c755f7 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/round-event.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/round-event.go @@ -124,6 +124,9 @@ type governanceAccessor interface { // DKGResetCount returns the reset count for DKG of given round. DKGResetCount(round uint64) uint64 + + // Get the begin height of a round. + GetRoundHeight(round uint64) uint64 } // RoundEventRetryHandlerGenerator generates a handler to common.Event, which @@ -162,7 +165,7 @@ type RoundEvent struct { // NewRoundEvent creates an RoundEvent instance. func NewRoundEvent(parentCtx context.Context, gov governanceAccessor, logger common.Logger, initRound uint64, - initRoundBeginHeight, initBlockHeight uint64, + initBlockHeight uint64, roundShift uint64) (*RoundEvent, error) { // We need to generate valid ending block height of this round (taken // DKG reset count into consideration). @@ -176,12 +179,12 @@ func NewRoundEvent(parentCtx context.Context, gov governanceAccessor, e.ctx, e.ctxCancel = context.WithCancel(parentCtx) e.config = RoundBasedConfig{} e.config.SetupRoundBasedFields(initRound, initConfig) - e.config.SetRoundBeginHeight(initRoundBeginHeight) + e.config.SetRoundBeginHeight(gov.GetRoundHeight(initRound)) // Make sure the DKG reset count in current governance can cover the initial // block height. resetCount := gov.DKGResetCount(initRound + 1) remains := resetCount - for ; resetCount > 0 && !e.config.Contains(initBlockHeight); remains-- { + for ; remains > 0 && !e.config.Contains(initBlockHeight); remains-- { e.config.ExtendLength() } if !e.config.Contains(initBlockHeight) { @@ -272,7 +275,7 @@ func (e *RoundEvent) check(blockHeight, startRound uint64) ( } // A simple assertion to make sure we didn't pick the wrong round. if e.config.RoundID() != e.lastTriggeredRound { - panic(fmt.Errorf("triggered round not matched: %d, %d", + panic(fmt.Errorf("Triggered round not matched: %d, %d", e.config.RoundID(), e.lastTriggeredRound)) } param.Round = e.lastTriggeredRound @@ -280,7 +283,7 @@ func (e *RoundEvent) check(blockHeight, startRound uint64) ( param.BeginHeight = e.config.LastPeriodBeginHeight() param.CRS = GetCRSWithPanic(e.gov, e.lastTriggeredRound, e.logger) param.Config = GetConfigWithPanic(e.gov, e.lastTriggeredRound, e.logger) - e.logger.Info("new RoundEvent triggered", + e.logger.Info("New RoundEvent triggered", "round", e.lastTriggeredRound, "reset", e.lastTriggeredResetCount, "begin-height", e.config.LastPeriodBeginHeight(), @@ -322,7 +325,7 @@ func (e *RoundEvent) check(blockHeight, startRound uint64) ( e.gov.DKGComplaints(nextRound), GetDKGThreshold(nextCfg)); err != nil { e.logger.Debug( - "group public key setup failed, waiting for DKG reset", + "Group public key setup failed, waiting for DKG reset", "round", nextRound, "reset", e.lastTriggeredResetCount) e.dkgFailed = true -- cgit v1.2.3