From 1b352d9e52839c8b6c316c2601d08c91c995d8f0 Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Fri, 7 Dec 2018 18:02:59 +0800 Subject: core: fix bugs found when node-set is not equal to notary-set (#362) --- core/agreement-mgr.go | 27 ++++++++----- core/configuration-chain.go | 25 ++++++------ core/consensus.go | 83 ++++++++++++++++++++++++--------------- core/consensus_test.go | 1 + core/test/state-change-request.go | 8 +++- core/test/state.go | 5 +++ core/utils/nodeset-cache.go | 19 ++++++--- 7 files changed, 106 insertions(+), 62 deletions(-) diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index d3a0af2..c95f913 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -298,7 +298,7 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { // Check if this routine needs to awake in this round and prepare essential // variables when yes. - checkRound := func() (awake bool) { + checkRound := func() (isNotary, isDisabled bool) { defer func() { currentRound = nextRound nextRound++ @@ -318,7 +318,8 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { roundEndTime = config.beginTime.Add(config.roundInterval) // Check if this chain handled by this routine included in this round. if chainID >= config.numChains { - return false + isDisabled = true + return } // Check if this node in notary set of this chain in this round. nodeSet, err := mgr.cache.GetNodeSet(nextRound) @@ -329,7 +330,18 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { setting.notarySet = nodeSet.GetSubSet( int(config.notarySetSize), types.NewNotarySetTarget(config.crs, chainID)) - _, awake = setting.notarySet[mgr.ID] + _, isNotary = setting.notarySet[mgr.ID] + if isNotary { + mgr.logger.Info("selected as notary set", + "ID", mgr.ID, + "round", nextRound, + "chainID", chainID) + } else { + mgr.logger.Info("not selected as notary set", + "ID", mgr.ID, + "round", nextRound, + "chainID", chainID) + } // Setup ticker if tickDuration != config.lambdaBA { if setting.ticker != nil { @@ -348,12 +360,9 @@ Loop: default: } now := time.Now().UTC() - if !checkRound() { - if now.After(roundEndTime) { - // That round is passed. - continue Loop - } - // Sleep until next checkpoint. + var isDisabled bool + setting.recv.isNotary, isDisabled = checkRound() + if isDisabled { select { case <-mgr.ctx.Done(): break Loop diff --git a/core/configuration-chain.go b/core/configuration-chain.go index fdfcd13..364f2c7 100644 --- a/core/configuration-chain.go +++ b/core/configuration-chain.go @@ -216,6 +216,18 @@ func (cc *configurationChain) runDKG(round uint64) error { return nil } +func (cc *configurationChain) isDKGReady(round uint64) bool { + if !cc.gov.IsDKGFinal(round) { + return false + } + return func() bool { + cc.dkgResult.RLock() + defer cc.dkgResult.RUnlock() + _, exist := cc.gpk[round] + return exist + }() +} + func (cc *configurationChain) preparePartialSignature( round uint64, hash common.Hash) (*typesDKG.PartialSignature, error) { signer, exist := func() (*dkgShareSecret, bool) { @@ -305,19 +317,6 @@ func (cc *configurationChain) runTSig( return signature, nil } -func (cc *configurationChain) runBlockTSig( - round uint64, hash common.Hash) (crypto.Signature, error) { - sig, err := cc.runTSig(round, hash) - if err != nil { - return crypto.Signature{}, err - } - cc.logger.Info("Block TSIG", - "nodeID", cc.ID, - "round", round, - "signature", sig) - return sig, nil -} - func (cc *configurationChain) runCRSTSig( round uint64, crs common.Hash) ([]byte, error) { sig, err := cc.runTSig(round, crs) diff --git a/core/consensus.go b/core/consensus.go index 6ca54e0..11df5d4 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -70,17 +70,23 @@ type consensusBAReceiver struct { chainID uint32 changeNotaryTime time.Time round uint64 + isNotary bool restartNotary chan bool } func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { + if !recv.isNotary { + return + } if err := recv.agreementModule.prepareVote(vote); err != nil { recv.consensus.logger.Error("Failed to prepare vote", "error", err) return } go func() { if err := recv.agreementModule.processVote(vote); err != nil { - recv.consensus.logger.Error("Failed to process vote", "error", err) + recv.consensus.logger.Error("Failed to process self vote", + "error", err, + "vote", vote) return } recv.consensus.logger.Debug("Calling Network.BroadcastVote", @@ -90,6 +96,9 @@ func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { } func (recv *consensusBAReceiver) ProposeBlock() common.Hash { + if !recv.isNotary { + return common.Hash{} + } block := recv.consensus.proposeBlock(recv.chainID, recv.round) if block == nil { recv.consensus.logger.Error("unable to propose block") @@ -123,7 +132,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( block, exist = recv.agreementModule.findCandidateBlockNoLock(hash) if !exist { recv.consensus.logger.Error("Unknown block confirmed", - "hash", hash, + "hash", hash.String()[:6], "chainID", recv.chainID) ch := make(chan *types.Block) func() { @@ -135,7 +144,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( go func() { block = <-ch recv.consensus.logger.Info("Receive unknown block", - "hash", hash, + "hash", hash.String()[:6], "chainID", recv.chainID) recv.agreementModule.addCandidateBlock(block) recv.agreementModule.lock.Lock() @@ -179,11 +188,13 @@ func (recv *consensusBAReceiver) ConfirmBlock( } } recv.consensus.logger.Info("Receive parent block", - "hash", block.ParentHash, + "hash", block.ParentHash.String()[:6], "chainID", recv.chainID) recv.consensus.ccModule.registerBlock(block) if err := recv.consensus.processBlock(block); err != nil { - recv.consensus.logger.Error("Failed to process block", "error", err) + recv.consensus.logger.Error("Failed to process block", + "block", block, + "error", err) return } parentHash = block.ParentHash @@ -194,24 +205,28 @@ func (recv *consensusBAReceiver) ConfirmBlock( } }(block.ParentHash) } - voteList := make([]types.Vote, 0, len(votes)) - for _, vote := range votes { - if vote.BlockHash != hash { - continue + if recv.isNotary { + voteList := make([]types.Vote, 0, len(votes)) + for _, vote := range votes { + if vote.BlockHash != hash { + continue + } + voteList = append(voteList, *vote) } - voteList = append(voteList, *vote) - } - result := &types.AgreementResult{ - BlockHash: block.Hash, - Position: block.Position, - Votes: voteList, - IsEmptyBlock: isEmptyBlockConfirmed, + result := &types.AgreementResult{ + BlockHash: block.Hash, + Position: block.Position, + Votes: voteList, + IsEmptyBlock: isEmptyBlockConfirmed, + } + recv.consensus.logger.Debug("Propose AgreementResult", + "result", result) + recv.consensus.network.BroadcastAgreementResult(result) } - recv.consensus.logger.Debug("Propose AgreementResult", - "result", result) - recv.consensus.network.BroadcastAgreementResult(result) if err := recv.consensus.processBlock(block); err != nil { - recv.consensus.logger.Error("Failed to process block", "error", err) + recv.consensus.logger.Error("Failed to process block", + "block", block, + "error", err) return } // Clean the restartNotary channel so BA will not stuck by deadlock. @@ -232,6 +247,9 @@ CleanChannelLoop: } func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) { + if !recv.isNotary { + return + } recv.consensus.logger.Debug("Calling Network.PullBlocks", "hashes", hashes) recv.consensus.network.PullBlocks(hashes) } @@ -312,7 +330,7 @@ func (recv *consensusDKGReceiver) ProposeDKGAntiNackComplaint( // ProposeDKGFinalize propose a DKGFinalize message. func (recv *consensusDKGReceiver) ProposeDKGFinalize(final *typesDKG.Finalize) { if err := recv.authModule.SignDKGFinalize(final); err != nil { - recv.logger.Error("Faield to sign DKG finalize", "error", err) + recv.logger.Error("Failed to sign DKG finalize", "error", err) return } recv.logger.Debug("Calling Governance.AddDKGFinalize", "final", final) @@ -618,13 +636,11 @@ func (con *Consensus) runCRS(round uint64) { } con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS", "round", round) - for !con.gov.IsDKGFinal(round) { + for !con.cfgModule.isDKGReady(round) { con.logger.Debug("DKG is not ready for running CRS. Retry later...", "round", round) time.Sleep(500 * time.Millisecond) } - // Wait some time for DKG to recover private share. - time.Sleep(100 * time.Millisecond) // Start running next round CRS. con.logger.Debug("Calling Governance.CRS", "round", round) psig, err := con.cfgModule.preparePartialSignature(round, con.gov.CRS(round)) @@ -810,7 +826,8 @@ MessageLoop: case *types.Vote: if err := con.ProcessVote(val); err != nil { con.logger.Error("Failed to process vote", - "error", err) + "error", err, + "vote", val) } case *types.AgreementResult: if err := con.ProcessAgreementResult(val); err != nil { @@ -889,9 +906,7 @@ func (con *Consensus) ProcessAgreementResult( if rand.Position.Round == 0 { return nil } - if !con.ccModule.blockRegistered(rand.BlockHash) { - return nil - } + // TODO(mission): find a way to avoid spamming by older agreement results. // Sanity check done. if !con.cfgModule.touchTSigHash(rand.BlockHash) { return nil @@ -919,13 +934,16 @@ func (con *Consensus) ProcessAgreementResult( con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", "proposer", psig.ProposerID, "round", psig.Round, - "hash", psig.Hash) + "hash", psig.Hash.String()[:6]) con.network.BroadcastDKGPartialSignature(psig) go func() { tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash) if err != nil { if err != ErrTSigAlreadyRunning { - con.logger.Error("Faield to run TSIG", "error", err) + con.logger.Error("Failed to run TSIG", + "position", &rand.Position, + "hash", rand.BlockHash.String()[:6], + "error", err) } return } @@ -952,11 +970,12 @@ func (con *Consensus) ProcessBlockRandomnessResult( if err := con.ccModule.processBlockRandomnessResult(rand); err != nil { if err == ErrBlockNotRegistered { err = nil + } else { + return err } - return err } con.logger.Debug("Calling Network.BroadcastRandomnessResult", - "hash", rand.BlockHash, + "hash", rand.BlockHash.String()[:6], "position", &rand.Position, "randomness", hex.EncodeToString(rand.Randomness)) con.network.BroadcastRandomnessResult(rand) diff --git a/core/consensus_test.go b/core/consensus_test.go index 5819cc2..9234512 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -196,6 +196,7 @@ func (s *ConsensusTestSuite) prepareAgreementMgrWithoutRunning( con.baMgr.appendConfig(0, &types.Config{ NumChains: numChains, RoundInterval: time.Hour, + LambdaBA: 50 * time.Millisecond, }, common.NewRandomHash()) } diff --git a/core/test/state-change-request.go b/core/test/state-change-request.go index 84d4d3f..1515fd2 100644 --- a/core/test/state-change-request.go +++ b/core/test/state-change-request.go @@ -185,7 +185,11 @@ func (req *StateChangeRequest) String() (ret string) { ret += fmt.Sprintf( "{Type:AddNode %s", types.NewNodeID(req.Payload.(crypto.PublicKey)).String()[:6]) + default: + panic(fmt.Errorf( + "attempting to dump unknown type of state change request: %v", + req.Type)) } - panic(fmt.Errorf( - "attempting to dump unknown type of state change request: %v", req.Type)) + ret += "}" + return } diff --git a/core/test/state.go b/core/test/state.go index b360fa2..30ed8af 100644 --- a/core/test/state.go +++ b/core/test/state.go @@ -177,6 +177,11 @@ func (s *State) Snapshot() (*types.Config, []crypto.PublicKey) { }, nodes } +// AttachLogger allows to attach custom logger. +func (s *State) AttachLogger(logger common.Logger) { + s.logger = logger +} + func (s *State) unpackPayload( raw *rawStateChangeRequest) (v interface{}, err error) { switch raw.Type { diff --git a/core/utils/nodeset-cache.go b/core/utils/nodeset-cache.go index a8f8fe5..6d4f7b0 100644 --- a/core/utils/nodeset-cache.go +++ b/core/utils/nodeset-cache.go @@ -27,8 +27,12 @@ import ( ) var ( - // ErrRoundNotReady means we got nil config. - ErrRoundNotReady = errors.New("round is not ready") + // ErrNodeSetNotReady means we got nil empty node set. + ErrNodeSetNotReady = errors.New("node set is not ready") + // ErrCRSNotReady means we got empty CRS. + ErrCRSNotReady = errors.New("crs is not ready") + // ErrConfigurationNotReady means we go nil configuration. + ErrConfigurationNotReady = errors.New("configuration is not ready") // ErrInvalidChainID means the chain ID is unexpected. ErrInvalidChainID = errors.New("invalid chain id") ) @@ -172,16 +176,15 @@ func (cache *NodeSetCache) update( cache.lock.Lock() defer cache.lock.Unlock() - // Get the requested round. + // Get information for the requested round. keySet := cache.nsIntf.NodeSet(round) if keySet == nil { - // That round is not ready yet. - err = ErrRoundNotReady + err = ErrNodeSetNotReady return } crs := cache.nsIntf.CRS(round) if (crs == common.Hash{}) { - err = ErrRoundNotReady + err = ErrCRSNotReady return } // Cache new round. @@ -199,6 +202,10 @@ func (cache *NodeSetCache) update( } } cfg := cache.nsIntf.Configuration(round) + if cfg == nil { + err = ErrConfigurationNotReady + return + } nIDs = &sets{ nodeSet: nodeSet, notarySet: make([]map[types.NodeID]struct{}, cfg.NumChains), -- cgit v1.2.3