diff options
author | Mission Liao <mission.liao@dexon.org> | 2019-03-20 17:29:22 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-20 17:29:22 +0800 |
commit | c852eda00f781abafaab2b41d2c1a85fe9d3177f (patch) | |
tree | 931680bf76590c4bdb74c247582f213c17db9274 | |
parent | 448935829700500ecf512b9e0a6437cbb63504b3 (diff) | |
download | dexon-consensus-c852eda00f781abafaab2b41d2c1a85fe9d3177f.tar dexon-consensus-c852eda00f781abafaab2b41d2c1a85fe9d3177f.tar.gz dexon-consensus-c852eda00f781abafaab2b41d2c1a85fe9d3177f.tar.bz2 dexon-consensus-c852eda00f781abafaab2b41d2c1a85fe9d3177f.tar.lz dexon-consensus-c852eda00f781abafaab2b41d2c1a85fe9d3177f.tar.xz dexon-consensus-c852eda00f781abafaab2b41d2c1a85fe9d3177f.tar.zst dexon-consensus-c852eda00f781abafaab2b41d2c1a85fe9d3177f.zip |
core: reset DKG (#502)
* Allow utils.NodeSetCache to purge by rounds.
* Purge utils.NodeSetCache when DKG reset.
* Add a utils.RoundEvent handler to abort all
previous running DKG
* Fix test.App hangs in BlockDelivered when
utils.RoundEvent is attached.
ValidateNextRound is a blocking call and would
block test.App.BlockDelivered.
-rw-r--r-- | GNUmakefile | 4 | ||||
-rw-r--r-- | core/consensus.go | 120 | ||||
-rw-r--r-- | core/consensus_test.go | 2 | ||||
-rw-r--r-- | core/syncer/consensus.go | 10 | ||||
-rw-r--r-- | core/test/app.go | 2 | ||||
-rw-r--r-- | core/test/app_test.go | 16 | ||||
-rw-r--r-- | core/test/governance.go | 29 | ||||
-rw-r--r-- | core/test/governance_test.go | 7 | ||||
-rw-r--r-- | core/test/network.go | 9 | ||||
-rw-r--r-- | core/test/network_test.go | 2 | ||||
-rw-r--r-- | core/test/state.go | 11 | ||||
-rw-r--r-- | core/test/state_test.go | 8 | ||||
-rw-r--r-- | core/utils/crypto.go | 9 | ||||
-rw-r--r-- | core/utils/nodeset-cache.go | 20 | ||||
-rw-r--r-- | core/utils/nodeset-cache_test.go | 18 | ||||
-rw-r--r-- | core/utils/round-based-config.go | 2 | ||||
-rw-r--r-- | core/utils/round-event.go | 9 | ||||
-rw-r--r-- | integration_test/byzantine_test.go | 15 | ||||
-rw-r--r-- | integration_test/consensus_test.go | 181 |
19 files changed, 395 insertions, 79 deletions
diff --git a/GNUmakefile b/GNUmakefile index 425a569..bbf14bd 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -44,11 +44,11 @@ else endif endef -GO_TEST_TIMEOUT := 20m +GO_TEST_TIMEOUT := 33m TEST_TARGET := go list ./... | grep -v 'vendor' ifeq ($(NO_INTEGRATION_TEST), true) - GO_TEST_TIMEOUT := 15m + GO_TEST_TIMEOUT := 25m TEST_TARGET := $(TEST_TARGET) | grep -v 'integration_test' else ifeq ($(ONLY_INTEGRATION_TEST), true) TEST_TARGET := $(TEST_TARGET) | grep 'integration_test' diff --git a/core/consensus.go b/core/consensus.go index c75f542..8f8002b 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -630,8 +630,40 @@ func (con *Consensus) prepare( panic("not implemented yet") } } + // Measure time elapse for each handler of round events. + elapse := func(what string, lastE utils.RoundEventParam) func() { + start := time.Now() + con.logger.Info("handle round event", + "what", what, + "event", lastE) + return func() { + con.logger.Info("finish round event", + "what", what, + "event", lastE, + "elapse", time.Since(start)) + } + } + // Register round event handler to purge cached node set. To make sure each + // modules see the up-to-date node set, we need to make sure this action + // should be taken as the first one. + con.roundEvent.Register(func(evts []utils.RoundEventParam) { + defer elapse("purge node set", evts[len(evts)-1])() + for _, e := range evts { + if e.Reset == 0 { + continue + } + con.nodeSetCache.Purge(e.Round + 1) + } + }) + // Register round event handler to abort previous running DKG if any. + con.roundEvent.Register(func(evts []utils.RoundEventParam) { + e := evts[len(evts)-1] + defer elapse("abort DKG", e)() + con.cfgModule.abortDKG(e.Round+1, e.Reset) + }) // Register round event handler to update BA and BC modules. con.roundEvent.Register(func(evts []utils.RoundEventParam) { + defer elapse("append config", evts[len(evts)-1])() // Always updates newer configs to the later modules first in the flow. if err := con.bcModule.notifyRoundEvents(evts); err != nil { panic(err) @@ -643,11 +675,62 @@ func (con *Consensus) prepare( } } }) + // Register round event handler to reset DKG if the DKG set for next round + // failed to setup. + con.roundEvent.Register(func(evts []utils.RoundEventParam) { + e := evts[len(evts)-1] + defer elapse("reset DKG", e)() + nextRound := e.Round + 1 + if nextRound < DKGDelayRound { + return + } + curDKGSet, err := con.nodeSetCache.GetDKGSet(e.Round) + if err != nil { + con.logger.Error("Error getting DKG set when proposing CRS", + "round", e.Round, + "error", err) + return + } + if _, exist := curDKGSet[con.ID]; !exist { + return + } + isDKGValid := func() bool { + nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, + con.logger) + if !con.gov.IsDKGFinal(nextRound) { + con.logger.Error("Next DKG is not final, reset it", + "round", e.Round, + "reset", e.Reset) + return false + } + if _, err := typesDKG.NewGroupPublicKey( + nextRound, + con.gov.DKGMasterPublicKeys(nextRound), + con.gov.DKGComplaints(nextRound), + utils.GetDKGThreshold(nextConfig)); err != nil { + con.logger.Error("Next DKG failed to prepare, reset it", + "round", e.Round, + "reset", e.Reset, + "error", err) + return false + } + return true + } + con.event.RegisterHeight(e.NextDKGResetHeight(), func(uint64) { + if isDKGValid() { + return + } + // Aborting all previous running DKG protocol instance if any. + con.cfgModule.abortDKG(nextRound, e.Reset) + con.runCRS(e.Round, utils.Rehash(e.CRS, uint(e.Reset+1)), true) + }) + }) // Register round event handler to propose new CRS. con.roundEvent.Register(func(evts []utils.RoundEventParam) { // We don't have to propose new CRS during DKG reset, the reset of DKG // would be done by the DKG set in previous round. e := evts[len(evts)-1] + defer elapse("propose CRS", e)() if e.Reset != 0 || e.Round < DKGDelayRound { return } @@ -667,13 +750,14 @@ func (con *Consensus) prepare( con.logger.Debug("CRS already proposed", "round", e.Round+1) return } - con.runCRS(e.Round, e.CRS) + con.runCRS(e.Round, e.CRS, false) }) } }) // Touch nodeSetCache for next round. con.roundEvent.Register(func(evts []utils.RoundEventParam) { e := evts[len(evts)-1] + defer elapse("touch node set cache", e)() if e.Reset != 0 { return } @@ -702,6 +786,7 @@ func (con *Consensus) prepare( // Trigger round validation method for next period. con.roundEvent.Register(func(evts []utils.RoundEventParam) { e := evts[len(evts)-1] + defer elapse("next round", e)() // Register a routine to trigger round events. con.event.RegisterHeight(e.NextRoundValidationHeight(), func( blockHeight uint64) { @@ -711,7 +796,9 @@ func (con *Consensus) prepare( con.event.RegisterHeight(e.NextDKGRegisterHeight(), func(uint64) { nextRound := e.Round + 1 if nextRound < DKGDelayRound { - con.logger.Info("Skip runDKG for round", "round", nextRound) + con.logger.Info("Skip runDKG for round", + "round", nextRound, + "reset", e.Reset) return } // Normally, gov.CRS would return non-nil. Use this for in case of @@ -719,21 +806,27 @@ func (con *Consensus) prepare( if !checkWithCancel( con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { con.logger.Debug("unable to prepare CRS for DKG set", - "round", nextRound) + "round", nextRound, + "reset", e.Reset) return } nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound) if err != nil { con.logger.Error("Error getting DKG set for next round", "round", nextRound, + "reset", e.Reset, "error", err) return } if _, exist := nextDkgSet[con.ID]; !exist { - con.logger.Info("Not selected as DKG set", "round", nextRound) + con.logger.Info("Not selected as DKG set", + "round", nextRound, + "reset", e.Reset) return } - con.logger.Info("Selected as DKG set", "round", nextRound) + con.logger.Info("Selected as DKG set", + "round", nextRound, + "reset", e.Reset) nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, con.logger) con.cfgModule.registerDKG(nextRound, e.Reset, utils.GetDKGThreshold( @@ -821,7 +914,7 @@ func (con *Consensus) runDKG(round, reset uint64, config *types.Config) { }() } -func (con *Consensus) runCRS(round uint64, hash common.Hash) { +func (con *Consensus) runCRS(round uint64, hash common.Hash, reset bool) { // Start running next round CRS. psig, err := con.cfgModule.preparePartialSignature(round, hash) if err != nil { @@ -841,10 +934,17 @@ func (con *Consensus) runCRS(round uint64, hash common.Hash) { if err != nil { con.logger.Error("Failed to run CRS Tsig", "error", err) } else { - con.logger.Debug("Calling Governance.ProposeCRS", - "round", round+1, - "crs", hex.EncodeToString(crs)) - con.gov.ProposeCRS(round+1, crs) + if reset { + con.logger.Debug("Calling Governance.ResetDKG", + "round", round+1, + "crs", hex.EncodeToString(crs)) + con.gov.ResetDKG(crs) + } else { + con.logger.Debug("Calling Governance.ProposeCRS", + "round", round+1, + "crs", hex.EncodeToString(crs)) + con.gov.ProposeCRS(round+1, crs) + } } } } diff --git a/core/consensus_test.go b/core/consensus_test.go index 541f157..e60a173 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -285,7 +285,7 @@ func (s *ConsensusTestSuite) TestDKGCRS() { crsFinish := make(chan struct{}) for _, con := range cons { go func(con *Consensus) { - con.runCRS(0, gov.CRS(0)) + con.runCRS(0, gov.CRS(0), false) crsFinish <- struct{}{} }(con) } diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index 8d89d07..2eeee9d 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -168,6 +168,16 @@ func (con *Consensus) assureBuffering() { } // Make sure con.roundEvt stopped before stopping con.agreementModule. con.waitGroup.Add(1) + // Register a round event handler to reset node set cache, this handler + // should be the highest priority. + con.roundEvt.Register(func(evts []utils.RoundEventParam) { + for _, e := range evts { + if e.Reset == 0 { + continue + } + con.nodeSetCache.Purge(e.Round + 1) + } + }) // Register a round event handler to notify CRS to agreementModule. con.roundEvt.Register(func(evts []utils.RoundEventParam) { con.waitGroup.Add(1) diff --git a/core/test/app.go b/core/test/app.go index df58135..12b2047 100644 --- a/core/test/app.go +++ b/core/test/app.go @@ -265,7 +265,7 @@ func (app *App) BlockDelivered(blockHash common.Hash, pos types.Position, } } }() - app.hEvt.NotifyHeight(result.Height) + go app.hEvt.NotifyHeight(result.Height) } // GetLatestDeliveredPosition would return the latest position of delivered diff --git a/core/test/app_test.go b/core/test/app_test.go index c83aaf6..138f803 100644 --- a/core/test/app_test.go +++ b/core/test/app_test.go @@ -309,15 +309,15 @@ func (s *AppTestSuite) TestAttachedWithRoundEvent() { 1900, 2019, core.ConfigRoundShift) s.Require().NoError(err) // Register a handler to collects triggered events. - var evts []evtParamToCheck + evts := make(chan evtParamToCheck, 2) rEvt.Register(func(params []utils.RoundEventParam) { for _, p := range params { - evts = append(evts, evtParamToCheck{ + evts <- evtParamToCheck{ round: p.Round, reset: p.Reset, height: p.BeginHeight, crs: p.CRS, - }) + } } }) // Setup App instance. @@ -336,18 +336,16 @@ func (s *AppTestSuite) TestAttachedWithRoundEvent() { // Deliver blocks from height=2020 to height=2081. deliver(0, 0, 2019) deliver(19, 2020, 2091) - s.Require().Len(evts, 2) - s.Require().Equal(evts[0], evtParamToCheck{19, 2, 2100, gov.CRS(19)}) - s.Require().Equal(evts[1], evtParamToCheck{20, 0, 2200, gov.CRS(20)}) + s.Require().Equal(<-evts, evtParamToCheck{19, 2, 2100, gov.CRS(19)}) + s.Require().Equal(<-evts, evtParamToCheck{20, 0, 2200, gov.CRS(20)}) // Deliver blocks from height=2082 to height=2281. deliver(19, 2092, 2199) deliver(20, 2200, 2291) - s.Require().Len(evts, 3) - s.Require().Equal(evts[2], evtParamToCheck{21, 0, 2300, gov.CRS(21)}) + s.Require().Equal(<-evts, evtParamToCheck{21, 0, 2300, gov.CRS(21)}) // Deliver blocks from height=2282 to height=2381. deliver(20, 2292, 2299) deliver(21, 2300, 2391) - s.Require().Equal(evts[3], evtParamToCheck{22, 0, 2400, gov.CRS(22)}) + s.Require().Equal(<-evts, evtParamToCheck{22, 0, 2400, gov.CRS(22)}) } func TestApp(t *testing.T) { diff --git a/core/test/governance.go b/core/test/governance.go index 62bee70..9fe525b 100644 --- a/core/test/governance.go +++ b/core/test/governance.go @@ -154,7 +154,9 @@ func (g *Governance) AddDKGComplaint(complaint *typesDKG.Complaint) { } if err := g.stateModule.RequestChange( StateAddDKGComplaint, complaint); err != nil { - panic(err) + if err != ErrChangeWontApply { + panic(err) + } } g.broadcastPendingStateChanges() } @@ -174,7 +176,9 @@ func (g *Governance) AddDKGMasterPublicKey(masterPublicKey *typesDKG.MasterPubli } if err := g.stateModule.RequestChange( StateAddDKGMasterPublicKey, masterPublicKey); err != nil { - panic(err) + if err != ErrChangeWontApply { + panic(err) + } } g.broadcastPendingStateChanges() } @@ -187,8 +191,11 @@ func (g *Governance) DKGMasterPublicKeys( // AddDKGMPKReady adds a DKG ready message. func (g *Governance) AddDKGMPKReady(ready *typesDKG.MPKReady) { - if err := g.stateModule.RequestChange(StateAddDKGMPKReady, ready); err != nil { - panic(err) + if err := g.stateModule.RequestChange( + StateAddDKGMPKReady, ready); err != nil { + if err != ErrChangeWontApply { + panic(err) + } } g.broadcastPendingStateChanges() } @@ -214,7 +221,9 @@ func (g *Governance) AddDKGFinalize(final *typesDKG.Finalize) { return } if err := g.stateModule.RequestChange(StateAddDKGFinal, final); err != nil { - panic(err) + if err != ErrChangeWontApply { + panic(err) + } } g.broadcastPendingStateChanges() } @@ -341,6 +350,11 @@ func (g *Governance) Clone() *Governance { } copiedNodeSets = append(copiedNodeSets, copiedNodeSet) } + // Clone prohibited flag. + copiedProhibitedTypes := make(map[StateChangeType]struct{}) + for t := range g.prohibitedTypes { + copiedProhibitedTypes[t] = struct{}{} + } // Clone pending changes. return &Governance{ roundShift: g.roundShift, @@ -348,6 +362,7 @@ func (g *Governance) Clone() *Governance { stateModule: copiedState, nodeSets: copiedNodeSets, pendingConfigChanges: copiedPendingChanges, + prohibitedTypes: copiedProhibitedTypes, } } @@ -369,6 +384,10 @@ func (g *Governance) Equal(other *Governance, checkState bool) bool { if !reflect.DeepEqual(g.pendingConfigChanges, other.pendingConfigChanges) { return false } + // Check prohibited types. + if !reflect.DeepEqual(g.prohibitedTypes, other.prohibitedTypes) { + return false + } getSortedKeys := func(keys []crypto.PublicKey) (encoded []string) { for _, key := range keys { encoded = append(encoded, hex.EncodeToString(key.Bytes())) diff --git a/core/test/governance_test.go b/core/test/governance_test.go index 36819a0..a2d3a47 100644 --- a/core/test/governance_test.go +++ b/core/test/governance_test.go @@ -70,6 +70,13 @@ func (s *GovernanceTestSuite) TestEqual() { // Change its roundShift g5.roundShift = 3 req.False(g1.Equal(g5, true)) + // Prohibit some change. + g1.Prohibit(StateAddDKGFinal) + // Make a clone and should be equal. + g6 := g1.Clone() + req.True(g1.Equal(g6, true)) + g6.Unprohibit(StateAddDKGFinal) + req.False(g1.Equal(g6, true)) } func (s *GovernanceTestSuite) TestRegisterChange() { diff --git a/core/test/network.go b/core/test/network.go index 0bbb12e..443a26c 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -498,14 +498,19 @@ func (n *Network) addStateModule(s *State) { n.stateModule = s } -// AddNodeSetCache attaches an utils.NodeSetCache to this module. Once attached +// AttachNodeSetCache attaches an utils.NodeSetCache to this module. Once attached // The behavior of Broadcast-X methods would be switched to broadcast to correct // set of peers, instead of all peers. -func (n *Network) AddNodeSetCache(cache *utils.NodeSetCache) { +func (n *Network) AttachNodeSetCache(cache *utils.NodeSetCache) { // This variable should be attached before run, no lock to protect it. n.cache = cache } +// PurgeNodeSetCache purges cache of some round in attached utils.NodeSetCache. +func (n *Network) PurgeNodeSetCache(round uint64) { + n.cache.Purge(round) +} + func (n *Network) pullBlocksAsync(hashes common.Hashes) { // Setup notification channels for each block hash. notYetReceived := make(map[common.Hash]struct{}) diff --git a/core/test/network_test.go b/core/test/network_test.go index 863fee2..f9a6db9 100644 --- a/core/test/network_test.go +++ b/core/test/network_test.go @@ -289,7 +289,7 @@ func (s *NetworkTestSuite) TestBroadcastToSet() { req.NotNil(nerd) req.NotNil(dkgNode) req.NotNil(notaryNode) - nerd.AddNodeSetCache(cache) + nerd.AttachNodeSetCache(cache) // Try broadcasting with datum from round 0, and make sure only node belongs // to that set receiving the message. nerd.BroadcastVote(&types.Vote{VoteHeader: types.VoteHeader{ diff --git a/core/test/state.go b/core/test/state.go index 89d2e90..ce906ae 100644 --- a/core/test/state.go +++ b/core/test/state.go @@ -77,9 +77,6 @@ var ( // ErrChangeWontApply means the state change won't be applied for some // reason. ErrChangeWontApply = errors.New("change won't apply") - // ErrUnmatchedResetCount means an DKG message attempt to apply is not - // the latest reset count in State module. - ErrUnmatchedResetCount = errors.New("unmatched reset count of DKG message") // ErrNotInRemoteMode means callers attempts to call functions for remote // mode when the State instance is still in local mode. ErrNotInRemoteMode = errors.New( @@ -641,17 +638,17 @@ func (s *State) isValidRequest(req *StateChangeRequest) error { case StateAddDKGMPKReady: ready := req.Payload.(*typesDKG.MPKReady) if ready.Reset != s.dkgResetCount[ready.Round] { - return ErrUnmatchedResetCount + return ErrChangeWontApply } case StateAddDKGFinal: final := req.Payload.(*typesDKG.Finalize) if final.Reset != s.dkgResetCount[final.Round] { - return ErrUnmatchedResetCount + return ErrChangeWontApply } case StateAddDKGMasterPublicKey: mpk := req.Payload.(*typesDKG.MasterPublicKey) if mpk.Reset != s.dkgResetCount[mpk.Round] { - return ErrUnmatchedResetCount + return ErrChangeWontApply } // If we've received identical MPK, ignore it. mpkForRound, exists := s.dkgMasterPublicKeys[mpk.Round] @@ -671,7 +668,7 @@ func (s *State) isValidRequest(req *StateChangeRequest) error { case StateAddDKGComplaint: comp := req.Payload.(*typesDKG.Complaint) if comp.Reset != s.dkgResetCount[comp.Round] { - return ErrUnmatchedResetCount + return ErrChangeWontApply } // If we've received DKG final from that proposer, we would ignore // its complaint. diff --git a/core/test/state_test.go b/core/test/state_test.go index 2adfc95..0ec90a4 100644 --- a/core/test/state_test.go +++ b/core/test/state_test.go @@ -472,13 +472,13 @@ func (s *StateTestSuite) TestUnmatchedResetCount() { s.Require().NoError(st.RequestChange(StateResetDKG, common.NewRandomHash())) s.Require().NoError(st.RequestChange(StateResetDKG, common.NewRandomHash())) s.Require().Equal(st.dkgResetCount[1], uint64(2)) - s.Require().EqualError(ErrUnmatchedResetCount, st.RequestChange( + s.Require().EqualError(ErrChangeWontApply, st.RequestChange( StateAddDKGMasterPublicKey, mpk).Error()) - s.Require().EqualError(ErrUnmatchedResetCount, st.RequestChange( + s.Require().EqualError(ErrChangeWontApply, st.RequestChange( StateAddDKGMPKReady, ready).Error()) - s.Require().EqualError(ErrUnmatchedResetCount, st.RequestChange( + s.Require().EqualError(ErrChangeWontApply, st.RequestChange( StateAddDKGComplaint, comp).Error()) - s.Require().EqualError(ErrUnmatchedResetCount, st.RequestChange( + s.Require().EqualError(ErrChangeWontApply, st.RequestChange( StateAddDKGFinal, final).Error()) mpk = s.newDKGMasterPublicKey(1, 2) ready = s.newDKGMPKReady(1, 2) diff --git a/core/utils/crypto.go b/core/utils/crypto.go index 7532d29..8be503f 100644 --- a/core/utils/crypto.go +++ b/core/utils/crypto.go @@ -323,3 +323,12 @@ func VerifyDKGFinalizeSignature( } return true, nil } + +// Rehash hashes the hash again and again and again... +func Rehash(hash common.Hash, count uint) common.Hash { + result := hash + for i := uint(0); i < count; i++ { + result = crypto.Keccak256Hash(result[:]) + } + return result +} diff --git a/core/utils/nodeset-cache.go b/core/utils/nodeset-cache.go index e09120d..0090123 100644 --- a/core/utils/nodeset-cache.go +++ b/core/utils/nodeset-cache.go @@ -59,6 +59,9 @@ type NodeSetCacheInterface interface { } // NodeSetCache caches node set information. +// +// NOTE: this module doesn't handle DKG resetting and can only be used along +// with utils.RoundEvent. type NodeSetCache struct { lock sync.RWMutex nsIntf NodeSetCacheInterface @@ -165,6 +168,23 @@ func (cache *NodeSetCache) GetLeaderNode(pos types.Position) ( return IDs.leaderNode[pos.Height], nil } +// Purge a specific round. +func (cache *NodeSetCache) Purge(rID uint64) { + cache.lock.Lock() + defer cache.lock.Unlock() + nIDs, exist := cache.rounds[rID] + if !exist { + return + } + for nID := range nIDs.nodeSet.IDs { + rec := cache.keyPool[nID] + if rec.refCnt--; rec.refCnt == 0 { + delete(cache.keyPool, nID) + } + } + delete(cache.rounds, rID) +} + // Touch updates the internal cache of round. func (cache *NodeSetCache) Touch(round uint64) (err error) { _, err = cache.update(round) diff --git a/core/utils/nodeset-cache_test.go b/core/utils/nodeset-cache_test.go index fe905cf..45d30a7 100644 --- a/core/utils/nodeset-cache_test.go +++ b/core/utils/nodeset-cache_test.go @@ -138,6 +138,24 @@ func (s *NodeSetCacheTestSuite) TestTouch() { req.True(exists) } +func (s *NodeSetCacheTestSuite) TestPurge() { + var ( + nsIntf = &nsIntf{ + s: s, + crs: common.NewRandomHash(), + } + cache = NewNodeSetCache(nsIntf) + req = s.Require() + ) + err := cache.Touch(1) + req.NoError(err) + _, exist := cache.get(1) + req.True(exist) + cache.Purge(1) + _, exist = cache.get(1) + req.False(exist) +} + func TestNodeSetCache(t *testing.T) { suite.Run(t, new(NodeSetCacheTestSuite)) } diff --git a/core/utils/round-based-config.go b/core/utils/round-based-config.go index 3219a13..4c83d04 100644 --- a/core/utils/round-based-config.go +++ b/core/utils/round-based-config.go @@ -90,7 +90,7 @@ func (c *RoundBasedConfig) RoundEndHeight() uint64 { return c.roundEndHeight } -// AppendTo a config in previous round. +// AppendTo a config from previous round. func (c *RoundBasedConfig) AppendTo(other RoundBasedConfig) { if c.roundID != other.roundID+1 { panic(fmt.Errorf("round IDs of configs not continuous: %d %d", diff --git a/core/utils/round-event.go b/core/utils/round-event.go index fe735e5..3536a27 100644 --- a/core/utils/round-event.go +++ b/core/utils/round-event.go @@ -93,6 +93,13 @@ func (e RoundEventParam) NextDKGRegisterHeight() uint64 { return e.BeginHeight + e.Config.RoundLength/2 } +func (e RoundEventParam) String() string { + return fmt.Sprintf("roundEvtParam{Round:%d Reset:%d Height:%d}", + e.Round, + e.Reset, + e.BeginHeight) +} + // roundEventFn defines the fingerprint of handlers of round events. type roundEventFn func([]RoundEventParam) @@ -177,6 +184,8 @@ func NewRoundEvent(parentCtx context.Context, gov governanceAccessor, // Register a handler to be called when new round is confirmed or new DKG reset // is detected. +// +// The earlier registered handler has higher priority. func (e *RoundEvent) Register(h roundEventFn) { e.lock.Lock() defer e.lock.Unlock() diff --git a/integration_test/byzantine_test.go b/integration_test/byzantine_test.go index a709870..e95e58d 100644 --- a/integration_test/byzantine_test.go +++ b/integration_test/byzantine_test.go @@ -80,9 +80,16 @@ func (s *ByzantineTestSuite) setupNodes( gov := seedGov.Clone() gov.SwitchToRemoteMode(networkModule) gov.NotifyRound(0) - networkModule.AddNodeSetCache(utils.NewNodeSetCache(gov)) + networkModule.AttachNodeSetCache(utils.NewNodeSetCache(gov)) app := test.NewApp(1, gov, nil) - nodes[nID] = &node{nID, nil, app, gov, dbInst, networkModule} + nodes[nID] = &node{ + ID: nID, + app: app, + gov: gov, + db: dbInst, + network: networkModule, + logger: &common.NullLogger{}, + } go func() { defer wg.Done() s.Require().NoError(networkModule.Setup(serverChannel)) @@ -102,7 +109,7 @@ func (s *ByzantineTestSuite) setupNodes( node.db, node.network, k, - &common.NullLogger{}, + node.logger, ) } return nodes @@ -144,7 +151,7 @@ func (s *ByzantineTestSuite) TestOneSlowNodeOneDeadNode() { core.ConfigRoundShift) req.NoError(err) req.NoError(seedGov.State().RequestChange( - test.StateChangeRoundLength, uint64(60))) + test.StateChangeRoundLength, uint64(100))) slowNodeID := types.NewNodeID(pubKeys[0]) deadNodeID := types.NewNodeID(pubKeys[1]) s.directLatencyModel[slowNodeID] = &test.FixedLatencyModel{ diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go index dd64113..afea6d8 100644 --- a/integration_test/consensus_test.go +++ b/integration_test/consensus_test.go @@ -46,8 +46,22 @@ type node struct { con *core.Consensus app *test.App gov *test.Governance + rEvt *utils.RoundEvent db db.Database network *test.Network + logger common.Logger +} + +func prohibitDKG(gov *test.Governance) { + gov.Prohibit(test.StateAddDKGMasterPublicKey) + gov.Prohibit(test.StateAddDKGFinal) + gov.Prohibit(test.StateAddDKGComplaint) +} + +func unprohibitDKG(gov *test.Governance) { + gov.Unprohibit(test.StateAddDKGMasterPublicKey) + gov.Unprohibit(test.StateAddDKGFinal) + gov.Unprohibit(test.StateAddDKGComplaint) } func (s *ConsensusTestSuite) setupNodes( @@ -55,7 +69,8 @@ func (s *ConsensusTestSuite) setupNodes( prvKeys []crypto.PrivateKey, seedGov *test.Governance) map[types.NodeID]*node { var ( - wg sync.WaitGroup + wg sync.WaitGroup + initRound uint64 ) // Setup peer server at transport layer. server := test.NewFakeTransportServer() @@ -76,11 +91,22 @@ func (s *ConsensusTestSuite) setupNodes( ) gov := seedGov.Clone() gov.SwitchToRemoteMode(networkModule) - gov.NotifyRound(0) - networkModule.AddNodeSetCache(utils.NewNodeSetCache(gov)) - app := test.NewApp(1, gov, nil) + gov.NotifyRound(initRound) + networkModule.AttachNodeSetCache(utils.NewNodeSetCache(gov)) + logger := &common.NullLogger{} + rEvt, err := utils.NewRoundEvent(context.Background(), gov, logger, 0, + 0, 0, core.ConfigRoundShift) + s.Require().NoError(err) nID := types.NewNodeID(k.PublicKey()) - nodes[nID] = &node{nID, nil, app, gov, dbInst, networkModule} + nodes[nID] = &node{ + ID: nID, + app: test.NewApp(initRound+1, gov, rEvt), + gov: gov, + db: dbInst, + logger: logger, + rEvt: rEvt, + network: networkModule, + } go func() { defer wg.Done() s.Require().NoError(networkModule.Setup(serverChannel)) @@ -100,7 +126,7 @@ func (s *ConsensusTestSuite) setupNodes( node.db, node.network, k, - &common.NullLogger{}, + node.logger, ) } return nodes @@ -215,7 +241,7 @@ func (s *ConsensusTestSuite) TestSimple() { core.ConfigRoundShift) req.NoError(err) req.NoError(seedGov.State().RequestChange( - test.StateChangeRoundLength, uint64(60))) + test.StateChangeRoundLength, uint64(100))) // A short round interval. nodes := s.setupNodes(dMoment, prvKeys, seedGov) for _, n := range nodes { @@ -244,7 +270,7 @@ func (s *ConsensusTestSuite) TestSetSizeChange() { req = s.Require() peerCount = 7 dMoment = time.Now().UTC() - untilRound = uint64(6) + untilRound = uint64(5) ) if testing.Short() { // Short test won't test configuration change packed as payload of @@ -259,7 +285,7 @@ func (s *ConsensusTestSuite) TestSetSizeChange() { core.ConfigRoundShift) req.NoError(err) req.NoError(seedGov.State().RequestChange( - test.StateChangeRoundLength, uint64(60))) + test.StateChangeRoundLength, uint64(100))) req.NoError(seedGov.State().RequestChange( test.StateChangeNotarySetSize, uint32(4))) req.NoError(seedGov.State().RequestChange( @@ -267,7 +293,7 @@ func (s *ConsensusTestSuite) TestSetSizeChange() { seedGov.CatchUpWithRound(0) // Setup configuration for round 0 and round 1. req.NoError(seedGov.State().RequestChange( - test.StateChangeRoundLength, uint64(85))) + test.StateChangeRoundLength, uint64(100))) req.NoError(seedGov.State().RequestChange( test.StateChangeNotarySetSize, uint32(5))) req.NoError(seedGov.State().RequestChange( @@ -275,7 +301,7 @@ func (s *ConsensusTestSuite) TestSetSizeChange() { seedGov.CatchUpWithRound(1) // Setup configuration for round 2. req.NoError(seedGov.State().RequestChange( - test.StateChangeRoundLength, uint64(85))) + test.StateChangeRoundLength, uint64(100))) req.NoError(seedGov.State().RequestChange( test.StateChangeNotarySetSize, uint32(6))) req.NoError(seedGov.State().RequestChange( @@ -283,7 +309,7 @@ func (s *ConsensusTestSuite) TestSetSizeChange() { seedGov.CatchUpWithRound(2) // Setup configuration for round 3. req.NoError(seedGov.State().RequestChange( - test.StateChangeRoundLength, uint64(60))) + test.StateChangeRoundLength, uint64(100))) req.NoError(seedGov.State().RequestChange( test.StateChangeNotarySetSize, uint32(4))) req.NoError(seedGov.State().RequestChange( @@ -298,18 +324,11 @@ func (s *ConsensusTestSuite) TestSetSizeChange() { } // Register configuration changes for round 4. req.NoError(pickedNode.gov.RegisterConfigChange( - 4, test.StateChangeRoundLength, uint64(80))) + 4, test.StateChangeRoundLength, uint64(100))) req.NoError(pickedNode.gov.RegisterConfigChange( 4, test.StateChangeNotarySetSize, uint32(5))) req.NoError(pickedNode.gov.RegisterConfigChange( 4, test.StateChangeDKGSetSize, uint32(5))) - // Register configuration changes for round 5. - req.NoError(pickedNode.gov.RegisterConfigChange( - 5, test.StateChangeRoundLength, uint64(60))) - req.NoError(pickedNode.gov.RegisterConfigChange( - 5, test.StateChangeNotarySetSize, uint32(4))) - req.NoError(pickedNode.gov.RegisterConfigChange( - 5, test.StateChangeDKGSetSize, uint32(4))) // Run test. for _, n := range nodes { go n.con.Run() @@ -340,9 +359,11 @@ func (s *ConsensusTestSuite) TestSync() { req = s.Require() peerCount = 4 dMoment = time.Now().UTC() - untilRound = uint64(7) - stopRound = uint64(5) - aliveRound = uint64(4) + untilRound = uint64(6) + stopRound = uint64(4) + // aliveRound should be large enough to test round event handling in + // syncer. + aliveRound = uint64(3) errChan = make(chan error, 100) ) prvKeys, pubKeys, err := test.NewKeys(peerCount) @@ -355,7 +376,7 @@ func (s *ConsensusTestSuite) TestSync() { core.ConfigRoundShift) req.NoError(err) req.NoError(seedGov.State().RequestChange( - test.StateChangeRoundLength, uint64(60))) + test.StateChangeRoundLength, uint64(100))) seedGov.CatchUpWithRound(0) seedGov.CatchUpWithRound(1) // A short round interval. @@ -447,29 +468,30 @@ ReachAlive: } }() // Wait until all nodes reach 'untilRound'. + var stoppedRound uint64 go func() { n, pos := stoppedNode, stoppedNode.app.GetLatestDeliveredPosition() ReachFinished: for { fmt.Println("latestPos", n.ID, &pos) time.Sleep(5 * time.Second) - for _, n = range nodes { + if stoppedNode.con != nil { pos = n.app.GetLatestDeliveredPosition() - if n.ID == stoppedNode.ID { - if n.con == nil { - continue - } - if pos.Round < stopRound { - continue ReachFinished - } + if pos.Round >= stopRound { // Stop a node, we should still be able to proceed. stoppedNode.con.Stop() stoppedNode.con = nil + stoppedRound = pos.Round fmt.Println("one node stopped", stoppedNode.ID) utils.LaunchDummyReceiver( runnerCtx, stoppedNode.network.ReceiveChan(), nil) + } + } + for _, n = range nodes { + if n.ID == stoppedNode.ID { continue } + pos = n.app.GetLatestDeliveredPosition() if pos.Round < untilRound { continue ReachFinished } @@ -485,6 +507,7 @@ ReachAlive: case <-runnerCtx.Done(): // This test passed. } + s.Require().Equal(stoppedRound, stopRound) } func (s *ConsensusTestSuite) TestForceSync() { @@ -632,6 +655,100 @@ Loop: s.verifyNodes(nodes) } +func (s *ConsensusTestSuite) TestResetDKG() { + var ( + req = s.Require() + peerCount = 5 + dMoment = time.Now().UTC() + untilRound = uint64(3) + ) + prvKeys, pubKeys, err := test.NewKeys(peerCount) + req.NoError(err) + // Setup seed governance instance. Give a short latency to make this test + // run faster. + seedGov, err := test.NewGovernance( + test.NewState(core.DKGDelayRound, + pubKeys, 100*time.Millisecond, &common.NullLogger{}, true), + core.ConfigRoundShift) + req.NoError(err) + req.NoError(seedGov.State().RequestChange( + test.StateChangeRoundLength, uint64(100))) + req.NoError(seedGov.State().RequestChange( + test.StateChangeNotarySetSize, uint32(4))) + req.NoError(seedGov.State().RequestChange( + test.StateChangeDKGSetSize, uint32(4))) + nodes := s.setupNodes(dMoment, prvKeys, seedGov) + // A round event handler to purge utils.NodeSetCache in test.Network. + purgeHandlerGen := func(n *test.Network) func([]utils.RoundEventParam) { + return func(evts []utils.RoundEventParam) { + for _, e := range evts { + if e.Reset == 0 { + continue + } + n.PurgeNodeSetCache(e.Round + 1) + } + } + } + // Round Height reference table: + // - Round:1 Reset:0 -- 100 + // - Round:1 Reset:1 -- 200 + // - Round:1 Reset:2 -- 300 + // - Round:2 Reset:0 -- 400 + // - Round:2 Reset:1 -- 500 + // - Round:3 Reset:0 -- 600 + // Register round event handler to prohibit/unprohibit DKG operation to + // governance. + roundHandlerGen := func(g *test.Governance) func([]utils.RoundEventParam) { + return func(evts []utils.RoundEventParam) { + trigger := func(e utils.RoundEventParam) { + // Make round 2 reseted until resetCount == 2. + if e.Round == 1 && e.Reset == 0 { + prohibitDKG(g) + } + if e.Round == 1 && e.Reset == 2 { + unprohibitDKG(g) + } + // Make round 3 reseted until resetCount == 1. + if e.Round == 2 && e.Reset == 0 { + // Allow DKG final this time. + g.Prohibit(test.StateAddDKGMasterPublicKey) + g.Prohibit(test.StateAddDKGComplaint) + } + if e.Round == 2 && e.Reset == 1 { + unprohibitDKG(g) + } + } + for _, e := range evts { + trigger(e) + } + } + } + for _, n := range nodes { + n.rEvt.Register(purgeHandlerGen(n.network)) + n.rEvt.Register(roundHandlerGen(n.gov)) + go n.con.Run() + } +Loop: + for { + <-time.After(5 * time.Second) + for _, n := range nodes { + latestPos := n.app.GetLatestDeliveredPosition() + fmt.Println("latestPos", n.ID, &latestPos) + if latestPos.Round < untilRound { + continue Loop + } + } + // Oh ya. + break + } + s.verifyNodes(nodes) + for _, n := range nodes { + n.con.Stop() + req.Equal(n.gov.DKGResetCount(2), uint64(2)) + req.Equal(n.gov.DKGResetCount(3), uint64(1)) + } +} + func TestConsensus(t *testing.T) { suite.Run(t, new(ConsensusTestSuite)) } |