aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-12-07 18:02:59 +0800
committerGitHub <noreply@github.com>2018-12-07 18:02:59 +0800
commit1b352d9e52839c8b6c316c2601d08c91c995d8f0 (patch)
tree32aff3397d61b0e6f5e2d6f407ec5ab3a69311d8 /core
parenta63e0d313300fc85eba8254963800a312fb14e9b (diff)
downloadtangerine-consensus-1b352d9e52839c8b6c316c2601d08c91c995d8f0.tar
tangerine-consensus-1b352d9e52839c8b6c316c2601d08c91c995d8f0.tar.gz
tangerine-consensus-1b352d9e52839c8b6c316c2601d08c91c995d8f0.tar.bz2
tangerine-consensus-1b352d9e52839c8b6c316c2601d08c91c995d8f0.tar.lz
tangerine-consensus-1b352d9e52839c8b6c316c2601d08c91c995d8f0.tar.xz
tangerine-consensus-1b352d9e52839c8b6c316c2601d08c91c995d8f0.tar.zst
tangerine-consensus-1b352d9e52839c8b6c316c2601d08c91c995d8f0.zip
core: fix bugs found when node-set is not equal to notary-set (#362)
Diffstat (limited to 'core')
-rw-r--r--core/agreement-mgr.go27
-rw-r--r--core/configuration-chain.go25
-rw-r--r--core/consensus.go83
-rw-r--r--core/consensus_test.go1
-rw-r--r--core/test/state-change-request.go8
-rw-r--r--core/test/state.go5
-rw-r--r--core/utils/nodeset-cache.go19
7 files changed, 106 insertions, 62 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go
index d3a0af2..c95f913 100644
--- a/core/agreement-mgr.go
+++ b/core/agreement-mgr.go
@@ -298,7 +298,7 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) {
// Check if this routine needs to awake in this round and prepare essential
// variables when yes.
- checkRound := func() (awake bool) {
+ checkRound := func() (isNotary, isDisabled bool) {
defer func() {
currentRound = nextRound
nextRound++
@@ -318,7 +318,8 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) {
roundEndTime = config.beginTime.Add(config.roundInterval)
// Check if this chain handled by this routine included in this round.
if chainID >= config.numChains {
- return false
+ isDisabled = true
+ return
}
// Check if this node in notary set of this chain in this round.
nodeSet, err := mgr.cache.GetNodeSet(nextRound)
@@ -329,7 +330,18 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) {
setting.notarySet = nodeSet.GetSubSet(
int(config.notarySetSize),
types.NewNotarySetTarget(config.crs, chainID))
- _, awake = setting.notarySet[mgr.ID]
+ _, isNotary = setting.notarySet[mgr.ID]
+ if isNotary {
+ mgr.logger.Info("selected as notary set",
+ "ID", mgr.ID,
+ "round", nextRound,
+ "chainID", chainID)
+ } else {
+ mgr.logger.Info("not selected as notary set",
+ "ID", mgr.ID,
+ "round", nextRound,
+ "chainID", chainID)
+ }
// Setup ticker
if tickDuration != config.lambdaBA {
if setting.ticker != nil {
@@ -348,12 +360,9 @@ Loop:
default:
}
now := time.Now().UTC()
- if !checkRound() {
- if now.After(roundEndTime) {
- // That round is passed.
- continue Loop
- }
- // Sleep until next checkpoint.
+ var isDisabled bool
+ setting.recv.isNotary, isDisabled = checkRound()
+ if isDisabled {
select {
case <-mgr.ctx.Done():
break Loop
diff --git a/core/configuration-chain.go b/core/configuration-chain.go
index fdfcd13..364f2c7 100644
--- a/core/configuration-chain.go
+++ b/core/configuration-chain.go
@@ -216,6 +216,18 @@ func (cc *configurationChain) runDKG(round uint64) error {
return nil
}
+func (cc *configurationChain) isDKGReady(round uint64) bool {
+ if !cc.gov.IsDKGFinal(round) {
+ return false
+ }
+ return func() bool {
+ cc.dkgResult.RLock()
+ defer cc.dkgResult.RUnlock()
+ _, exist := cc.gpk[round]
+ return exist
+ }()
+}
+
func (cc *configurationChain) preparePartialSignature(
round uint64, hash common.Hash) (*typesDKG.PartialSignature, error) {
signer, exist := func() (*dkgShareSecret, bool) {
@@ -305,19 +317,6 @@ func (cc *configurationChain) runTSig(
return signature, nil
}
-func (cc *configurationChain) runBlockTSig(
- round uint64, hash common.Hash) (crypto.Signature, error) {
- sig, err := cc.runTSig(round, hash)
- if err != nil {
- return crypto.Signature{}, err
- }
- cc.logger.Info("Block TSIG",
- "nodeID", cc.ID,
- "round", round,
- "signature", sig)
- return sig, nil
-}
-
func (cc *configurationChain) runCRSTSig(
round uint64, crs common.Hash) ([]byte, error) {
sig, err := cc.runTSig(round, crs)
diff --git a/core/consensus.go b/core/consensus.go
index 6ca54e0..11df5d4 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -70,17 +70,23 @@ type consensusBAReceiver struct {
chainID uint32
changeNotaryTime time.Time
round uint64
+ isNotary bool
restartNotary chan bool
}
func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) {
+ if !recv.isNotary {
+ return
+ }
if err := recv.agreementModule.prepareVote(vote); err != nil {
recv.consensus.logger.Error("Failed to prepare vote", "error", err)
return
}
go func() {
if err := recv.agreementModule.processVote(vote); err != nil {
- recv.consensus.logger.Error("Failed to process vote", "error", err)
+ recv.consensus.logger.Error("Failed to process self vote",
+ "error", err,
+ "vote", vote)
return
}
recv.consensus.logger.Debug("Calling Network.BroadcastVote",
@@ -90,6 +96,9 @@ func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) {
}
func (recv *consensusBAReceiver) ProposeBlock() common.Hash {
+ if !recv.isNotary {
+ return common.Hash{}
+ }
block := recv.consensus.proposeBlock(recv.chainID, recv.round)
if block == nil {
recv.consensus.logger.Error("unable to propose block")
@@ -123,7 +132,7 @@ func (recv *consensusBAReceiver) ConfirmBlock(
block, exist = recv.agreementModule.findCandidateBlockNoLock(hash)
if !exist {
recv.consensus.logger.Error("Unknown block confirmed",
- "hash", hash,
+ "hash", hash.String()[:6],
"chainID", recv.chainID)
ch := make(chan *types.Block)
func() {
@@ -135,7 +144,7 @@ func (recv *consensusBAReceiver) ConfirmBlock(
go func() {
block = <-ch
recv.consensus.logger.Info("Receive unknown block",
- "hash", hash,
+ "hash", hash.String()[:6],
"chainID", recv.chainID)
recv.agreementModule.addCandidateBlock(block)
recv.agreementModule.lock.Lock()
@@ -179,11 +188,13 @@ func (recv *consensusBAReceiver) ConfirmBlock(
}
}
recv.consensus.logger.Info("Receive parent block",
- "hash", block.ParentHash,
+ "hash", block.ParentHash.String()[:6],
"chainID", recv.chainID)
recv.consensus.ccModule.registerBlock(block)
if err := recv.consensus.processBlock(block); err != nil {
- recv.consensus.logger.Error("Failed to process block", "error", err)
+ recv.consensus.logger.Error("Failed to process block",
+ "block", block,
+ "error", err)
return
}
parentHash = block.ParentHash
@@ -194,24 +205,28 @@ func (recv *consensusBAReceiver) ConfirmBlock(
}
}(block.ParentHash)
}
- voteList := make([]types.Vote, 0, len(votes))
- for _, vote := range votes {
- if vote.BlockHash != hash {
- continue
+ if recv.isNotary {
+ voteList := make([]types.Vote, 0, len(votes))
+ for _, vote := range votes {
+ if vote.BlockHash != hash {
+ continue
+ }
+ voteList = append(voteList, *vote)
}
- voteList = append(voteList, *vote)
- }
- result := &types.AgreementResult{
- BlockHash: block.Hash,
- Position: block.Position,
- Votes: voteList,
- IsEmptyBlock: isEmptyBlockConfirmed,
+ result := &types.AgreementResult{
+ BlockHash: block.Hash,
+ Position: block.Position,
+ Votes: voteList,
+ IsEmptyBlock: isEmptyBlockConfirmed,
+ }
+ recv.consensus.logger.Debug("Propose AgreementResult",
+ "result", result)
+ recv.consensus.network.BroadcastAgreementResult(result)
}
- recv.consensus.logger.Debug("Propose AgreementResult",
- "result", result)
- recv.consensus.network.BroadcastAgreementResult(result)
if err := recv.consensus.processBlock(block); err != nil {
- recv.consensus.logger.Error("Failed to process block", "error", err)
+ recv.consensus.logger.Error("Failed to process block",
+ "block", block,
+ "error", err)
return
}
// Clean the restartNotary channel so BA will not stuck by deadlock.
@@ -232,6 +247,9 @@ CleanChannelLoop:
}
func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) {
+ if !recv.isNotary {
+ return
+ }
recv.consensus.logger.Debug("Calling Network.PullBlocks", "hashes", hashes)
recv.consensus.network.PullBlocks(hashes)
}
@@ -312,7 +330,7 @@ func (recv *consensusDKGReceiver) ProposeDKGAntiNackComplaint(
// ProposeDKGFinalize propose a DKGFinalize message.
func (recv *consensusDKGReceiver) ProposeDKGFinalize(final *typesDKG.Finalize) {
if err := recv.authModule.SignDKGFinalize(final); err != nil {
- recv.logger.Error("Faield to sign DKG finalize", "error", err)
+ recv.logger.Error("Failed to sign DKG finalize", "error", err)
return
}
recv.logger.Debug("Calling Governance.AddDKGFinalize", "final", final)
@@ -618,13 +636,11 @@ func (con *Consensus) runCRS(round uint64) {
}
con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS",
"round", round)
- for !con.gov.IsDKGFinal(round) {
+ for !con.cfgModule.isDKGReady(round) {
con.logger.Debug("DKG is not ready for running CRS. Retry later...",
"round", round)
time.Sleep(500 * time.Millisecond)
}
- // Wait some time for DKG to recover private share.
- time.Sleep(100 * time.Millisecond)
// Start running next round CRS.
con.logger.Debug("Calling Governance.CRS", "round", round)
psig, err := con.cfgModule.preparePartialSignature(round, con.gov.CRS(round))
@@ -810,7 +826,8 @@ MessageLoop:
case *types.Vote:
if err := con.ProcessVote(val); err != nil {
con.logger.Error("Failed to process vote",
- "error", err)
+ "error", err,
+ "vote", val)
}
case *types.AgreementResult:
if err := con.ProcessAgreementResult(val); err != nil {
@@ -889,9 +906,7 @@ func (con *Consensus) ProcessAgreementResult(
if rand.Position.Round == 0 {
return nil
}
- if !con.ccModule.blockRegistered(rand.BlockHash) {
- return nil
- }
+ // TODO(mission): find a way to avoid spamming by older agreement results.
// Sanity check done.
if !con.cfgModule.touchTSigHash(rand.BlockHash) {
return nil
@@ -919,13 +934,16 @@ func (con *Consensus) ProcessAgreementResult(
con.logger.Debug("Calling Network.BroadcastDKGPartialSignature",
"proposer", psig.ProposerID,
"round", psig.Round,
- "hash", psig.Hash)
+ "hash", psig.Hash.String()[:6])
con.network.BroadcastDKGPartialSignature(psig)
go func() {
tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash)
if err != nil {
if err != ErrTSigAlreadyRunning {
- con.logger.Error("Faield to run TSIG", "error", err)
+ con.logger.Error("Failed to run TSIG",
+ "position", &rand.Position,
+ "hash", rand.BlockHash.String()[:6],
+ "error", err)
}
return
}
@@ -952,11 +970,12 @@ func (con *Consensus) ProcessBlockRandomnessResult(
if err := con.ccModule.processBlockRandomnessResult(rand); err != nil {
if err == ErrBlockNotRegistered {
err = nil
+ } else {
+ return err
}
- return err
}
con.logger.Debug("Calling Network.BroadcastRandomnessResult",
- "hash", rand.BlockHash,
+ "hash", rand.BlockHash.String()[:6],
"position", &rand.Position,
"randomness", hex.EncodeToString(rand.Randomness))
con.network.BroadcastRandomnessResult(rand)
diff --git a/core/consensus_test.go b/core/consensus_test.go
index 5819cc2..9234512 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -196,6 +196,7 @@ func (s *ConsensusTestSuite) prepareAgreementMgrWithoutRunning(
con.baMgr.appendConfig(0, &types.Config{
NumChains: numChains,
RoundInterval: time.Hour,
+ LambdaBA: 50 * time.Millisecond,
}, common.NewRandomHash())
}
diff --git a/core/test/state-change-request.go b/core/test/state-change-request.go
index 84d4d3f..1515fd2 100644
--- a/core/test/state-change-request.go
+++ b/core/test/state-change-request.go
@@ -185,7 +185,11 @@ func (req *StateChangeRequest) String() (ret string) {
ret += fmt.Sprintf(
"{Type:AddNode %s",
types.NewNodeID(req.Payload.(crypto.PublicKey)).String()[:6])
+ default:
+ panic(fmt.Errorf(
+ "attempting to dump unknown type of state change request: %v",
+ req.Type))
}
- panic(fmt.Errorf(
- "attempting to dump unknown type of state change request: %v", req.Type))
+ ret += "}"
+ return
}
diff --git a/core/test/state.go b/core/test/state.go
index b360fa2..30ed8af 100644
--- a/core/test/state.go
+++ b/core/test/state.go
@@ -177,6 +177,11 @@ func (s *State) Snapshot() (*types.Config, []crypto.PublicKey) {
}, nodes
}
+// AttachLogger allows to attach custom logger.
+func (s *State) AttachLogger(logger common.Logger) {
+ s.logger = logger
+}
+
func (s *State) unpackPayload(
raw *rawStateChangeRequest) (v interface{}, err error) {
switch raw.Type {
diff --git a/core/utils/nodeset-cache.go b/core/utils/nodeset-cache.go
index a8f8fe5..6d4f7b0 100644
--- a/core/utils/nodeset-cache.go
+++ b/core/utils/nodeset-cache.go
@@ -27,8 +27,12 @@ import (
)
var (
- // ErrRoundNotReady means we got nil config.
- ErrRoundNotReady = errors.New("round is not ready")
+ // ErrNodeSetNotReady means we got nil empty node set.
+ ErrNodeSetNotReady = errors.New("node set is not ready")
+ // ErrCRSNotReady means we got empty CRS.
+ ErrCRSNotReady = errors.New("crs is not ready")
+ // ErrConfigurationNotReady means we go nil configuration.
+ ErrConfigurationNotReady = errors.New("configuration is not ready")
// ErrInvalidChainID means the chain ID is unexpected.
ErrInvalidChainID = errors.New("invalid chain id")
)
@@ -172,16 +176,15 @@ func (cache *NodeSetCache) update(
cache.lock.Lock()
defer cache.lock.Unlock()
- // Get the requested round.
+ // Get information for the requested round.
keySet := cache.nsIntf.NodeSet(round)
if keySet == nil {
- // That round is not ready yet.
- err = ErrRoundNotReady
+ err = ErrNodeSetNotReady
return
}
crs := cache.nsIntf.CRS(round)
if (crs == common.Hash{}) {
- err = ErrRoundNotReady
+ err = ErrCRSNotReady
return
}
// Cache new round.
@@ -199,6 +202,10 @@ func (cache *NodeSetCache) update(
}
}
cfg := cache.nsIntf.Configuration(round)
+ if cfg == nil {
+ err = ErrConfigurationNotReady
+ return
+ }
nIDs = &sets{
nodeSet: nodeSet,
notarySet: make([]map[types.NodeID]struct{}, cfg.NumChains),