aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2019-03-20 17:29:22 +0800
committerGitHub <noreply@github.com>2019-03-20 17:29:22 +0800
commitc852eda00f781abafaab2b41d2c1a85fe9d3177f (patch)
tree931680bf76590c4bdb74c247582f213c17db9274
parent448935829700500ecf512b9e0a6437cbb63504b3 (diff)
downloaddexon-consensus-c852eda00f781abafaab2b41d2c1a85fe9d3177f.tar
dexon-consensus-c852eda00f781abafaab2b41d2c1a85fe9d3177f.tar.gz
dexon-consensus-c852eda00f781abafaab2b41d2c1a85fe9d3177f.tar.bz2
dexon-consensus-c852eda00f781abafaab2b41d2c1a85fe9d3177f.tar.lz
dexon-consensus-c852eda00f781abafaab2b41d2c1a85fe9d3177f.tar.xz
dexon-consensus-c852eda00f781abafaab2b41d2c1a85fe9d3177f.tar.zst
dexon-consensus-c852eda00f781abafaab2b41d2c1a85fe9d3177f.zip
core: reset DKG (#502)
* Allow utils.NodeSetCache to purge by rounds. * Purge utils.NodeSetCache when DKG reset. * Add a utils.RoundEvent handler to abort all previous running DKG * Fix test.App hangs in BlockDelivered when utils.RoundEvent is attached. ValidateNextRound is a blocking call and would block test.App.BlockDelivered.
-rw-r--r--GNUmakefile4
-rw-r--r--core/consensus.go120
-rw-r--r--core/consensus_test.go2
-rw-r--r--core/syncer/consensus.go10
-rw-r--r--core/test/app.go2
-rw-r--r--core/test/app_test.go16
-rw-r--r--core/test/governance.go29
-rw-r--r--core/test/governance_test.go7
-rw-r--r--core/test/network.go9
-rw-r--r--core/test/network_test.go2
-rw-r--r--core/test/state.go11
-rw-r--r--core/test/state_test.go8
-rw-r--r--core/utils/crypto.go9
-rw-r--r--core/utils/nodeset-cache.go20
-rw-r--r--core/utils/nodeset-cache_test.go18
-rw-r--r--core/utils/round-based-config.go2
-rw-r--r--core/utils/round-event.go9
-rw-r--r--integration_test/byzantine_test.go15
-rw-r--r--integration_test/consensus_test.go181
19 files changed, 395 insertions, 79 deletions
diff --git a/GNUmakefile b/GNUmakefile
index 425a569..bbf14bd 100644
--- a/GNUmakefile
+++ b/GNUmakefile
@@ -44,11 +44,11 @@ else
endif
endef
-GO_TEST_TIMEOUT := 20m
+GO_TEST_TIMEOUT := 33m
TEST_TARGET := go list ./... | grep -v 'vendor'
ifeq ($(NO_INTEGRATION_TEST), true)
- GO_TEST_TIMEOUT := 15m
+ GO_TEST_TIMEOUT := 25m
TEST_TARGET := $(TEST_TARGET) | grep -v 'integration_test'
else ifeq ($(ONLY_INTEGRATION_TEST), true)
TEST_TARGET := $(TEST_TARGET) | grep 'integration_test'
diff --git a/core/consensus.go b/core/consensus.go
index c75f542..8f8002b 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -630,8 +630,40 @@ func (con *Consensus) prepare(
panic("not implemented yet")
}
}
+ // Measure time elapse for each handler of round events.
+ elapse := func(what string, lastE utils.RoundEventParam) func() {
+ start := time.Now()
+ con.logger.Info("handle round event",
+ "what", what,
+ "event", lastE)
+ return func() {
+ con.logger.Info("finish round event",
+ "what", what,
+ "event", lastE,
+ "elapse", time.Since(start))
+ }
+ }
+ // Register round event handler to purge cached node set. To make sure each
+ // modules see the up-to-date node set, we need to make sure this action
+ // should be taken as the first one.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ defer elapse("purge node set", evts[len(evts)-1])()
+ for _, e := range evts {
+ if e.Reset == 0 {
+ continue
+ }
+ con.nodeSetCache.Purge(e.Round + 1)
+ }
+ })
+ // Register round event handler to abort previous running DKG if any.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ e := evts[len(evts)-1]
+ defer elapse("abort DKG", e)()
+ con.cfgModule.abortDKG(e.Round+1, e.Reset)
+ })
// Register round event handler to update BA and BC modules.
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ defer elapse("append config", evts[len(evts)-1])()
// Always updates newer configs to the later modules first in the flow.
if err := con.bcModule.notifyRoundEvents(evts); err != nil {
panic(err)
@@ -643,11 +675,62 @@ func (con *Consensus) prepare(
}
}
})
+ // Register round event handler to reset DKG if the DKG set for next round
+ // failed to setup.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ e := evts[len(evts)-1]
+ defer elapse("reset DKG", e)()
+ nextRound := e.Round + 1
+ if nextRound < DKGDelayRound {
+ return
+ }
+ curDKGSet, err := con.nodeSetCache.GetDKGSet(e.Round)
+ if err != nil {
+ con.logger.Error("Error getting DKG set when proposing CRS",
+ "round", e.Round,
+ "error", err)
+ return
+ }
+ if _, exist := curDKGSet[con.ID]; !exist {
+ return
+ }
+ isDKGValid := func() bool {
+ nextConfig := utils.GetConfigWithPanic(con.gov, nextRound,
+ con.logger)
+ if !con.gov.IsDKGFinal(nextRound) {
+ con.logger.Error("Next DKG is not final, reset it",
+ "round", e.Round,
+ "reset", e.Reset)
+ return false
+ }
+ if _, err := typesDKG.NewGroupPublicKey(
+ nextRound,
+ con.gov.DKGMasterPublicKeys(nextRound),
+ con.gov.DKGComplaints(nextRound),
+ utils.GetDKGThreshold(nextConfig)); err != nil {
+ con.logger.Error("Next DKG failed to prepare, reset it",
+ "round", e.Round,
+ "reset", e.Reset,
+ "error", err)
+ return false
+ }
+ return true
+ }
+ con.event.RegisterHeight(e.NextDKGResetHeight(), func(uint64) {
+ if isDKGValid() {
+ return
+ }
+ // Aborting all previous running DKG protocol instance if any.
+ con.cfgModule.abortDKG(nextRound, e.Reset)
+ con.runCRS(e.Round, utils.Rehash(e.CRS, uint(e.Reset+1)), true)
+ })
+ })
// Register round event handler to propose new CRS.
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
// We don't have to propose new CRS during DKG reset, the reset of DKG
// would be done by the DKG set in previous round.
e := evts[len(evts)-1]
+ defer elapse("propose CRS", e)()
if e.Reset != 0 || e.Round < DKGDelayRound {
return
}
@@ -667,13 +750,14 @@ func (con *Consensus) prepare(
con.logger.Debug("CRS already proposed", "round", e.Round+1)
return
}
- con.runCRS(e.Round, e.CRS)
+ con.runCRS(e.Round, e.CRS, false)
})
}
})
// Touch nodeSetCache for next round.
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
e := evts[len(evts)-1]
+ defer elapse("touch node set cache", e)()
if e.Reset != 0 {
return
}
@@ -702,6 +786,7 @@ func (con *Consensus) prepare(
// Trigger round validation method for next period.
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
e := evts[len(evts)-1]
+ defer elapse("next round", e)()
// Register a routine to trigger round events.
con.event.RegisterHeight(e.NextRoundValidationHeight(), func(
blockHeight uint64) {
@@ -711,7 +796,9 @@ func (con *Consensus) prepare(
con.event.RegisterHeight(e.NextDKGRegisterHeight(), func(uint64) {
nextRound := e.Round + 1
if nextRound < DKGDelayRound {
- con.logger.Info("Skip runDKG for round", "round", nextRound)
+ con.logger.Info("Skip runDKG for round",
+ "round", nextRound,
+ "reset", e.Reset)
return
}
// Normally, gov.CRS would return non-nil. Use this for in case of
@@ -719,21 +806,27 @@ func (con *Consensus) prepare(
if !checkWithCancel(
con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
con.logger.Debug("unable to prepare CRS for DKG set",
- "round", nextRound)
+ "round", nextRound,
+ "reset", e.Reset)
return
}
nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound)
if err != nil {
con.logger.Error("Error getting DKG set for next round",
"round", nextRound,
+ "reset", e.Reset,
"error", err)
return
}
if _, exist := nextDkgSet[con.ID]; !exist {
- con.logger.Info("Not selected as DKG set", "round", nextRound)
+ con.logger.Info("Not selected as DKG set",
+ "round", nextRound,
+ "reset", e.Reset)
return
}
- con.logger.Info("Selected as DKG set", "round", nextRound)
+ con.logger.Info("Selected as DKG set",
+ "round", nextRound,
+ "reset", e.Reset)
nextConfig := utils.GetConfigWithPanic(con.gov, nextRound,
con.logger)
con.cfgModule.registerDKG(nextRound, e.Reset, utils.GetDKGThreshold(
@@ -821,7 +914,7 @@ func (con *Consensus) runDKG(round, reset uint64, config *types.Config) {
}()
}
-func (con *Consensus) runCRS(round uint64, hash common.Hash) {
+func (con *Consensus) runCRS(round uint64, hash common.Hash, reset bool) {
// Start running next round CRS.
psig, err := con.cfgModule.preparePartialSignature(round, hash)
if err != nil {
@@ -841,10 +934,17 @@ func (con *Consensus) runCRS(round uint64, hash common.Hash) {
if err != nil {
con.logger.Error("Failed to run CRS Tsig", "error", err)
} else {
- con.logger.Debug("Calling Governance.ProposeCRS",
- "round", round+1,
- "crs", hex.EncodeToString(crs))
- con.gov.ProposeCRS(round+1, crs)
+ if reset {
+ con.logger.Debug("Calling Governance.ResetDKG",
+ "round", round+1,
+ "crs", hex.EncodeToString(crs))
+ con.gov.ResetDKG(crs)
+ } else {
+ con.logger.Debug("Calling Governance.ProposeCRS",
+ "round", round+1,
+ "crs", hex.EncodeToString(crs))
+ con.gov.ProposeCRS(round+1, crs)
+ }
}
}
}
diff --git a/core/consensus_test.go b/core/consensus_test.go
index 541f157..e60a173 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -285,7 +285,7 @@ func (s *ConsensusTestSuite) TestDKGCRS() {
crsFinish := make(chan struct{})
for _, con := range cons {
go func(con *Consensus) {
- con.runCRS(0, gov.CRS(0))
+ con.runCRS(0, gov.CRS(0), false)
crsFinish <- struct{}{}
}(con)
}
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index 8d89d07..2eeee9d 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -168,6 +168,16 @@ func (con *Consensus) assureBuffering() {
}
// Make sure con.roundEvt stopped before stopping con.agreementModule.
con.waitGroup.Add(1)
+ // Register a round event handler to reset node set cache, this handler
+ // should be the highest priority.
+ con.roundEvt.Register(func(evts []utils.RoundEventParam) {
+ for _, e := range evts {
+ if e.Reset == 0 {
+ continue
+ }
+ con.nodeSetCache.Purge(e.Round + 1)
+ }
+ })
// Register a round event handler to notify CRS to agreementModule.
con.roundEvt.Register(func(evts []utils.RoundEventParam) {
con.waitGroup.Add(1)
diff --git a/core/test/app.go b/core/test/app.go
index df58135..12b2047 100644
--- a/core/test/app.go
+++ b/core/test/app.go
@@ -265,7 +265,7 @@ func (app *App) BlockDelivered(blockHash common.Hash, pos types.Position,
}
}
}()
- app.hEvt.NotifyHeight(result.Height)
+ go app.hEvt.NotifyHeight(result.Height)
}
// GetLatestDeliveredPosition would return the latest position of delivered
diff --git a/core/test/app_test.go b/core/test/app_test.go
index c83aaf6..138f803 100644
--- a/core/test/app_test.go
+++ b/core/test/app_test.go
@@ -309,15 +309,15 @@ func (s *AppTestSuite) TestAttachedWithRoundEvent() {
1900, 2019, core.ConfigRoundShift)
s.Require().NoError(err)
// Register a handler to collects triggered events.
- var evts []evtParamToCheck
+ evts := make(chan evtParamToCheck, 2)
rEvt.Register(func(params []utils.RoundEventParam) {
for _, p := range params {
- evts = append(evts, evtParamToCheck{
+ evts <- evtParamToCheck{
round: p.Round,
reset: p.Reset,
height: p.BeginHeight,
crs: p.CRS,
- })
+ }
}
})
// Setup App instance.
@@ -336,18 +336,16 @@ func (s *AppTestSuite) TestAttachedWithRoundEvent() {
// Deliver blocks from height=2020 to height=2081.
deliver(0, 0, 2019)
deliver(19, 2020, 2091)
- s.Require().Len(evts, 2)
- s.Require().Equal(evts[0], evtParamToCheck{19, 2, 2100, gov.CRS(19)})
- s.Require().Equal(evts[1], evtParamToCheck{20, 0, 2200, gov.CRS(20)})
+ s.Require().Equal(<-evts, evtParamToCheck{19, 2, 2100, gov.CRS(19)})
+ s.Require().Equal(<-evts, evtParamToCheck{20, 0, 2200, gov.CRS(20)})
// Deliver blocks from height=2082 to height=2281.
deliver(19, 2092, 2199)
deliver(20, 2200, 2291)
- s.Require().Len(evts, 3)
- s.Require().Equal(evts[2], evtParamToCheck{21, 0, 2300, gov.CRS(21)})
+ s.Require().Equal(<-evts, evtParamToCheck{21, 0, 2300, gov.CRS(21)})
// Deliver blocks from height=2282 to height=2381.
deliver(20, 2292, 2299)
deliver(21, 2300, 2391)
- s.Require().Equal(evts[3], evtParamToCheck{22, 0, 2400, gov.CRS(22)})
+ s.Require().Equal(<-evts, evtParamToCheck{22, 0, 2400, gov.CRS(22)})
}
func TestApp(t *testing.T) {
diff --git a/core/test/governance.go b/core/test/governance.go
index 62bee70..9fe525b 100644
--- a/core/test/governance.go
+++ b/core/test/governance.go
@@ -154,7 +154,9 @@ func (g *Governance) AddDKGComplaint(complaint *typesDKG.Complaint) {
}
if err := g.stateModule.RequestChange(
StateAddDKGComplaint, complaint); err != nil {
- panic(err)
+ if err != ErrChangeWontApply {
+ panic(err)
+ }
}
g.broadcastPendingStateChanges()
}
@@ -174,7 +176,9 @@ func (g *Governance) AddDKGMasterPublicKey(masterPublicKey *typesDKG.MasterPubli
}
if err := g.stateModule.RequestChange(
StateAddDKGMasterPublicKey, masterPublicKey); err != nil {
- panic(err)
+ if err != ErrChangeWontApply {
+ panic(err)
+ }
}
g.broadcastPendingStateChanges()
}
@@ -187,8 +191,11 @@ func (g *Governance) DKGMasterPublicKeys(
// AddDKGMPKReady adds a DKG ready message.
func (g *Governance) AddDKGMPKReady(ready *typesDKG.MPKReady) {
- if err := g.stateModule.RequestChange(StateAddDKGMPKReady, ready); err != nil {
- panic(err)
+ if err := g.stateModule.RequestChange(
+ StateAddDKGMPKReady, ready); err != nil {
+ if err != ErrChangeWontApply {
+ panic(err)
+ }
}
g.broadcastPendingStateChanges()
}
@@ -214,7 +221,9 @@ func (g *Governance) AddDKGFinalize(final *typesDKG.Finalize) {
return
}
if err := g.stateModule.RequestChange(StateAddDKGFinal, final); err != nil {
- panic(err)
+ if err != ErrChangeWontApply {
+ panic(err)
+ }
}
g.broadcastPendingStateChanges()
}
@@ -341,6 +350,11 @@ func (g *Governance) Clone() *Governance {
}
copiedNodeSets = append(copiedNodeSets, copiedNodeSet)
}
+ // Clone prohibited flag.
+ copiedProhibitedTypes := make(map[StateChangeType]struct{})
+ for t := range g.prohibitedTypes {
+ copiedProhibitedTypes[t] = struct{}{}
+ }
// Clone pending changes.
return &Governance{
roundShift: g.roundShift,
@@ -348,6 +362,7 @@ func (g *Governance) Clone() *Governance {
stateModule: copiedState,
nodeSets: copiedNodeSets,
pendingConfigChanges: copiedPendingChanges,
+ prohibitedTypes: copiedProhibitedTypes,
}
}
@@ -369,6 +384,10 @@ func (g *Governance) Equal(other *Governance, checkState bool) bool {
if !reflect.DeepEqual(g.pendingConfigChanges, other.pendingConfigChanges) {
return false
}
+ // Check prohibited types.
+ if !reflect.DeepEqual(g.prohibitedTypes, other.prohibitedTypes) {
+ return false
+ }
getSortedKeys := func(keys []crypto.PublicKey) (encoded []string) {
for _, key := range keys {
encoded = append(encoded, hex.EncodeToString(key.Bytes()))
diff --git a/core/test/governance_test.go b/core/test/governance_test.go
index 36819a0..a2d3a47 100644
--- a/core/test/governance_test.go
+++ b/core/test/governance_test.go
@@ -70,6 +70,13 @@ func (s *GovernanceTestSuite) TestEqual() {
// Change its roundShift
g5.roundShift = 3
req.False(g1.Equal(g5, true))
+ // Prohibit some change.
+ g1.Prohibit(StateAddDKGFinal)
+ // Make a clone and should be equal.
+ g6 := g1.Clone()
+ req.True(g1.Equal(g6, true))
+ g6.Unprohibit(StateAddDKGFinal)
+ req.False(g1.Equal(g6, true))
}
func (s *GovernanceTestSuite) TestRegisterChange() {
diff --git a/core/test/network.go b/core/test/network.go
index 0bbb12e..443a26c 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -498,14 +498,19 @@ func (n *Network) addStateModule(s *State) {
n.stateModule = s
}
-// AddNodeSetCache attaches an utils.NodeSetCache to this module. Once attached
+// AttachNodeSetCache attaches an utils.NodeSetCache to this module. Once attached
// The behavior of Broadcast-X methods would be switched to broadcast to correct
// set of peers, instead of all peers.
-func (n *Network) AddNodeSetCache(cache *utils.NodeSetCache) {
+func (n *Network) AttachNodeSetCache(cache *utils.NodeSetCache) {
// This variable should be attached before run, no lock to protect it.
n.cache = cache
}
+// PurgeNodeSetCache purges cache of some round in attached utils.NodeSetCache.
+func (n *Network) PurgeNodeSetCache(round uint64) {
+ n.cache.Purge(round)
+}
+
func (n *Network) pullBlocksAsync(hashes common.Hashes) {
// Setup notification channels for each block hash.
notYetReceived := make(map[common.Hash]struct{})
diff --git a/core/test/network_test.go b/core/test/network_test.go
index 863fee2..f9a6db9 100644
--- a/core/test/network_test.go
+++ b/core/test/network_test.go
@@ -289,7 +289,7 @@ func (s *NetworkTestSuite) TestBroadcastToSet() {
req.NotNil(nerd)
req.NotNil(dkgNode)
req.NotNil(notaryNode)
- nerd.AddNodeSetCache(cache)
+ nerd.AttachNodeSetCache(cache)
// Try broadcasting with datum from round 0, and make sure only node belongs
// to that set receiving the message.
nerd.BroadcastVote(&types.Vote{VoteHeader: types.VoteHeader{
diff --git a/core/test/state.go b/core/test/state.go
index 89d2e90..ce906ae 100644
--- a/core/test/state.go
+++ b/core/test/state.go
@@ -77,9 +77,6 @@ var (
// ErrChangeWontApply means the state change won't be applied for some
// reason.
ErrChangeWontApply = errors.New("change won't apply")
- // ErrUnmatchedResetCount means an DKG message attempt to apply is not
- // the latest reset count in State module.
- ErrUnmatchedResetCount = errors.New("unmatched reset count of DKG message")
// ErrNotInRemoteMode means callers attempts to call functions for remote
// mode when the State instance is still in local mode.
ErrNotInRemoteMode = errors.New(
@@ -641,17 +638,17 @@ func (s *State) isValidRequest(req *StateChangeRequest) error {
case StateAddDKGMPKReady:
ready := req.Payload.(*typesDKG.MPKReady)
if ready.Reset != s.dkgResetCount[ready.Round] {
- return ErrUnmatchedResetCount
+ return ErrChangeWontApply
}
case StateAddDKGFinal:
final := req.Payload.(*typesDKG.Finalize)
if final.Reset != s.dkgResetCount[final.Round] {
- return ErrUnmatchedResetCount
+ return ErrChangeWontApply
}
case StateAddDKGMasterPublicKey:
mpk := req.Payload.(*typesDKG.MasterPublicKey)
if mpk.Reset != s.dkgResetCount[mpk.Round] {
- return ErrUnmatchedResetCount
+ return ErrChangeWontApply
}
// If we've received identical MPK, ignore it.
mpkForRound, exists := s.dkgMasterPublicKeys[mpk.Round]
@@ -671,7 +668,7 @@ func (s *State) isValidRequest(req *StateChangeRequest) error {
case StateAddDKGComplaint:
comp := req.Payload.(*typesDKG.Complaint)
if comp.Reset != s.dkgResetCount[comp.Round] {
- return ErrUnmatchedResetCount
+ return ErrChangeWontApply
}
// If we've received DKG final from that proposer, we would ignore
// its complaint.
diff --git a/core/test/state_test.go b/core/test/state_test.go
index 2adfc95..0ec90a4 100644
--- a/core/test/state_test.go
+++ b/core/test/state_test.go
@@ -472,13 +472,13 @@ func (s *StateTestSuite) TestUnmatchedResetCount() {
s.Require().NoError(st.RequestChange(StateResetDKG, common.NewRandomHash()))
s.Require().NoError(st.RequestChange(StateResetDKG, common.NewRandomHash()))
s.Require().Equal(st.dkgResetCount[1], uint64(2))
- s.Require().EqualError(ErrUnmatchedResetCount, st.RequestChange(
+ s.Require().EqualError(ErrChangeWontApply, st.RequestChange(
StateAddDKGMasterPublicKey, mpk).Error())
- s.Require().EqualError(ErrUnmatchedResetCount, st.RequestChange(
+ s.Require().EqualError(ErrChangeWontApply, st.RequestChange(
StateAddDKGMPKReady, ready).Error())
- s.Require().EqualError(ErrUnmatchedResetCount, st.RequestChange(
+ s.Require().EqualError(ErrChangeWontApply, st.RequestChange(
StateAddDKGComplaint, comp).Error())
- s.Require().EqualError(ErrUnmatchedResetCount, st.RequestChange(
+ s.Require().EqualError(ErrChangeWontApply, st.RequestChange(
StateAddDKGFinal, final).Error())
mpk = s.newDKGMasterPublicKey(1, 2)
ready = s.newDKGMPKReady(1, 2)
diff --git a/core/utils/crypto.go b/core/utils/crypto.go
index 7532d29..8be503f 100644
--- a/core/utils/crypto.go
+++ b/core/utils/crypto.go
@@ -323,3 +323,12 @@ func VerifyDKGFinalizeSignature(
}
return true, nil
}
+
+// Rehash hashes the hash again and again and again...
+func Rehash(hash common.Hash, count uint) common.Hash {
+ result := hash
+ for i := uint(0); i < count; i++ {
+ result = crypto.Keccak256Hash(result[:])
+ }
+ return result
+}
diff --git a/core/utils/nodeset-cache.go b/core/utils/nodeset-cache.go
index e09120d..0090123 100644
--- a/core/utils/nodeset-cache.go
+++ b/core/utils/nodeset-cache.go
@@ -59,6 +59,9 @@ type NodeSetCacheInterface interface {
}
// NodeSetCache caches node set information.
+//
+// NOTE: this module doesn't handle DKG resetting and can only be used along
+// with utils.RoundEvent.
type NodeSetCache struct {
lock sync.RWMutex
nsIntf NodeSetCacheInterface
@@ -165,6 +168,23 @@ func (cache *NodeSetCache) GetLeaderNode(pos types.Position) (
return IDs.leaderNode[pos.Height], nil
}
+// Purge a specific round.
+func (cache *NodeSetCache) Purge(rID uint64) {
+ cache.lock.Lock()
+ defer cache.lock.Unlock()
+ nIDs, exist := cache.rounds[rID]
+ if !exist {
+ return
+ }
+ for nID := range nIDs.nodeSet.IDs {
+ rec := cache.keyPool[nID]
+ if rec.refCnt--; rec.refCnt == 0 {
+ delete(cache.keyPool, nID)
+ }
+ }
+ delete(cache.rounds, rID)
+}
+
// Touch updates the internal cache of round.
func (cache *NodeSetCache) Touch(round uint64) (err error) {
_, err = cache.update(round)
diff --git a/core/utils/nodeset-cache_test.go b/core/utils/nodeset-cache_test.go
index fe905cf..45d30a7 100644
--- a/core/utils/nodeset-cache_test.go
+++ b/core/utils/nodeset-cache_test.go
@@ -138,6 +138,24 @@ func (s *NodeSetCacheTestSuite) TestTouch() {
req.True(exists)
}
+func (s *NodeSetCacheTestSuite) TestPurge() {
+ var (
+ nsIntf = &nsIntf{
+ s: s,
+ crs: common.NewRandomHash(),
+ }
+ cache = NewNodeSetCache(nsIntf)
+ req = s.Require()
+ )
+ err := cache.Touch(1)
+ req.NoError(err)
+ _, exist := cache.get(1)
+ req.True(exist)
+ cache.Purge(1)
+ _, exist = cache.get(1)
+ req.False(exist)
+}
+
func TestNodeSetCache(t *testing.T) {
suite.Run(t, new(NodeSetCacheTestSuite))
}
diff --git a/core/utils/round-based-config.go b/core/utils/round-based-config.go
index 3219a13..4c83d04 100644
--- a/core/utils/round-based-config.go
+++ b/core/utils/round-based-config.go
@@ -90,7 +90,7 @@ func (c *RoundBasedConfig) RoundEndHeight() uint64 {
return c.roundEndHeight
}
-// AppendTo a config in previous round.
+// AppendTo a config from previous round.
func (c *RoundBasedConfig) AppendTo(other RoundBasedConfig) {
if c.roundID != other.roundID+1 {
panic(fmt.Errorf("round IDs of configs not continuous: %d %d",
diff --git a/core/utils/round-event.go b/core/utils/round-event.go
index fe735e5..3536a27 100644
--- a/core/utils/round-event.go
+++ b/core/utils/round-event.go
@@ -93,6 +93,13 @@ func (e RoundEventParam) NextDKGRegisterHeight() uint64 {
return e.BeginHeight + e.Config.RoundLength/2
}
+func (e RoundEventParam) String() string {
+ return fmt.Sprintf("roundEvtParam{Round:%d Reset:%d Height:%d}",
+ e.Round,
+ e.Reset,
+ e.BeginHeight)
+}
+
// roundEventFn defines the fingerprint of handlers of round events.
type roundEventFn func([]RoundEventParam)
@@ -177,6 +184,8 @@ func NewRoundEvent(parentCtx context.Context, gov governanceAccessor,
// Register a handler to be called when new round is confirmed or new DKG reset
// is detected.
+//
+// The earlier registered handler has higher priority.
func (e *RoundEvent) Register(h roundEventFn) {
e.lock.Lock()
defer e.lock.Unlock()
diff --git a/integration_test/byzantine_test.go b/integration_test/byzantine_test.go
index a709870..e95e58d 100644
--- a/integration_test/byzantine_test.go
+++ b/integration_test/byzantine_test.go
@@ -80,9 +80,16 @@ func (s *ByzantineTestSuite) setupNodes(
gov := seedGov.Clone()
gov.SwitchToRemoteMode(networkModule)
gov.NotifyRound(0)
- networkModule.AddNodeSetCache(utils.NewNodeSetCache(gov))
+ networkModule.AttachNodeSetCache(utils.NewNodeSetCache(gov))
app := test.NewApp(1, gov, nil)
- nodes[nID] = &node{nID, nil, app, gov, dbInst, networkModule}
+ nodes[nID] = &node{
+ ID: nID,
+ app: app,
+ gov: gov,
+ db: dbInst,
+ network: networkModule,
+ logger: &common.NullLogger{},
+ }
go func() {
defer wg.Done()
s.Require().NoError(networkModule.Setup(serverChannel))
@@ -102,7 +109,7 @@ func (s *ByzantineTestSuite) setupNodes(
node.db,
node.network,
k,
- &common.NullLogger{},
+ node.logger,
)
}
return nodes
@@ -144,7 +151,7 @@ func (s *ByzantineTestSuite) TestOneSlowNodeOneDeadNode() {
core.ConfigRoundShift)
req.NoError(err)
req.NoError(seedGov.State().RequestChange(
- test.StateChangeRoundLength, uint64(60)))
+ test.StateChangeRoundLength, uint64(100)))
slowNodeID := types.NewNodeID(pubKeys[0])
deadNodeID := types.NewNodeID(pubKeys[1])
s.directLatencyModel[slowNodeID] = &test.FixedLatencyModel{
diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go
index dd64113..afea6d8 100644
--- a/integration_test/consensus_test.go
+++ b/integration_test/consensus_test.go
@@ -46,8 +46,22 @@ type node struct {
con *core.Consensus
app *test.App
gov *test.Governance
+ rEvt *utils.RoundEvent
db db.Database
network *test.Network
+ logger common.Logger
+}
+
+func prohibitDKG(gov *test.Governance) {
+ gov.Prohibit(test.StateAddDKGMasterPublicKey)
+ gov.Prohibit(test.StateAddDKGFinal)
+ gov.Prohibit(test.StateAddDKGComplaint)
+}
+
+func unprohibitDKG(gov *test.Governance) {
+ gov.Unprohibit(test.StateAddDKGMasterPublicKey)
+ gov.Unprohibit(test.StateAddDKGFinal)
+ gov.Unprohibit(test.StateAddDKGComplaint)
}
func (s *ConsensusTestSuite) setupNodes(
@@ -55,7 +69,8 @@ func (s *ConsensusTestSuite) setupNodes(
prvKeys []crypto.PrivateKey,
seedGov *test.Governance) map[types.NodeID]*node {
var (
- wg sync.WaitGroup
+ wg sync.WaitGroup
+ initRound uint64
)
// Setup peer server at transport layer.
server := test.NewFakeTransportServer()
@@ -76,11 +91,22 @@ func (s *ConsensusTestSuite) setupNodes(
)
gov := seedGov.Clone()
gov.SwitchToRemoteMode(networkModule)
- gov.NotifyRound(0)
- networkModule.AddNodeSetCache(utils.NewNodeSetCache(gov))
- app := test.NewApp(1, gov, nil)
+ gov.NotifyRound(initRound)
+ networkModule.AttachNodeSetCache(utils.NewNodeSetCache(gov))
+ logger := &common.NullLogger{}
+ rEvt, err := utils.NewRoundEvent(context.Background(), gov, logger, 0,
+ 0, 0, core.ConfigRoundShift)
+ s.Require().NoError(err)
nID := types.NewNodeID(k.PublicKey())
- nodes[nID] = &node{nID, nil, app, gov, dbInst, networkModule}
+ nodes[nID] = &node{
+ ID: nID,
+ app: test.NewApp(initRound+1, gov, rEvt),
+ gov: gov,
+ db: dbInst,
+ logger: logger,
+ rEvt: rEvt,
+ network: networkModule,
+ }
go func() {
defer wg.Done()
s.Require().NoError(networkModule.Setup(serverChannel))
@@ -100,7 +126,7 @@ func (s *ConsensusTestSuite) setupNodes(
node.db,
node.network,
k,
- &common.NullLogger{},
+ node.logger,
)
}
return nodes
@@ -215,7 +241,7 @@ func (s *ConsensusTestSuite) TestSimple() {
core.ConfigRoundShift)
req.NoError(err)
req.NoError(seedGov.State().RequestChange(
- test.StateChangeRoundLength, uint64(60)))
+ test.StateChangeRoundLength, uint64(100)))
// A short round interval.
nodes := s.setupNodes(dMoment, prvKeys, seedGov)
for _, n := range nodes {
@@ -244,7 +270,7 @@ func (s *ConsensusTestSuite) TestSetSizeChange() {
req = s.Require()
peerCount = 7
dMoment = time.Now().UTC()
- untilRound = uint64(6)
+ untilRound = uint64(5)
)
if testing.Short() {
// Short test won't test configuration change packed as payload of
@@ -259,7 +285,7 @@ func (s *ConsensusTestSuite) TestSetSizeChange() {
core.ConfigRoundShift)
req.NoError(err)
req.NoError(seedGov.State().RequestChange(
- test.StateChangeRoundLength, uint64(60)))
+ test.StateChangeRoundLength, uint64(100)))
req.NoError(seedGov.State().RequestChange(
test.StateChangeNotarySetSize, uint32(4)))
req.NoError(seedGov.State().RequestChange(
@@ -267,7 +293,7 @@ func (s *ConsensusTestSuite) TestSetSizeChange() {
seedGov.CatchUpWithRound(0)
// Setup configuration for round 0 and round 1.
req.NoError(seedGov.State().RequestChange(
- test.StateChangeRoundLength, uint64(85)))
+ test.StateChangeRoundLength, uint64(100)))
req.NoError(seedGov.State().RequestChange(
test.StateChangeNotarySetSize, uint32(5)))
req.NoError(seedGov.State().RequestChange(
@@ -275,7 +301,7 @@ func (s *ConsensusTestSuite) TestSetSizeChange() {
seedGov.CatchUpWithRound(1)
// Setup configuration for round 2.
req.NoError(seedGov.State().RequestChange(
- test.StateChangeRoundLength, uint64(85)))
+ test.StateChangeRoundLength, uint64(100)))
req.NoError(seedGov.State().RequestChange(
test.StateChangeNotarySetSize, uint32(6)))
req.NoError(seedGov.State().RequestChange(
@@ -283,7 +309,7 @@ func (s *ConsensusTestSuite) TestSetSizeChange() {
seedGov.CatchUpWithRound(2)
// Setup configuration for round 3.
req.NoError(seedGov.State().RequestChange(
- test.StateChangeRoundLength, uint64(60)))
+ test.StateChangeRoundLength, uint64(100)))
req.NoError(seedGov.State().RequestChange(
test.StateChangeNotarySetSize, uint32(4)))
req.NoError(seedGov.State().RequestChange(
@@ -298,18 +324,11 @@ func (s *ConsensusTestSuite) TestSetSizeChange() {
}
// Register configuration changes for round 4.
req.NoError(pickedNode.gov.RegisterConfigChange(
- 4, test.StateChangeRoundLength, uint64(80)))
+ 4, test.StateChangeRoundLength, uint64(100)))
req.NoError(pickedNode.gov.RegisterConfigChange(
4, test.StateChangeNotarySetSize, uint32(5)))
req.NoError(pickedNode.gov.RegisterConfigChange(
4, test.StateChangeDKGSetSize, uint32(5)))
- // Register configuration changes for round 5.
- req.NoError(pickedNode.gov.RegisterConfigChange(
- 5, test.StateChangeRoundLength, uint64(60)))
- req.NoError(pickedNode.gov.RegisterConfigChange(
- 5, test.StateChangeNotarySetSize, uint32(4)))
- req.NoError(pickedNode.gov.RegisterConfigChange(
- 5, test.StateChangeDKGSetSize, uint32(4)))
// Run test.
for _, n := range nodes {
go n.con.Run()
@@ -340,9 +359,11 @@ func (s *ConsensusTestSuite) TestSync() {
req = s.Require()
peerCount = 4
dMoment = time.Now().UTC()
- untilRound = uint64(7)
- stopRound = uint64(5)
- aliveRound = uint64(4)
+ untilRound = uint64(6)
+ stopRound = uint64(4)
+ // aliveRound should be large enough to test round event handling in
+ // syncer.
+ aliveRound = uint64(3)
errChan = make(chan error, 100)
)
prvKeys, pubKeys, err := test.NewKeys(peerCount)
@@ -355,7 +376,7 @@ func (s *ConsensusTestSuite) TestSync() {
core.ConfigRoundShift)
req.NoError(err)
req.NoError(seedGov.State().RequestChange(
- test.StateChangeRoundLength, uint64(60)))
+ test.StateChangeRoundLength, uint64(100)))
seedGov.CatchUpWithRound(0)
seedGov.CatchUpWithRound(1)
// A short round interval.
@@ -447,29 +468,30 @@ ReachAlive:
}
}()
// Wait until all nodes reach 'untilRound'.
+ var stoppedRound uint64
go func() {
n, pos := stoppedNode, stoppedNode.app.GetLatestDeliveredPosition()
ReachFinished:
for {
fmt.Println("latestPos", n.ID, &pos)
time.Sleep(5 * time.Second)
- for _, n = range nodes {
+ if stoppedNode.con != nil {
pos = n.app.GetLatestDeliveredPosition()
- if n.ID == stoppedNode.ID {
- if n.con == nil {
- continue
- }
- if pos.Round < stopRound {
- continue ReachFinished
- }
+ if pos.Round >= stopRound {
// Stop a node, we should still be able to proceed.
stoppedNode.con.Stop()
stoppedNode.con = nil
+ stoppedRound = pos.Round
fmt.Println("one node stopped", stoppedNode.ID)
utils.LaunchDummyReceiver(
runnerCtx, stoppedNode.network.ReceiveChan(), nil)
+ }
+ }
+ for _, n = range nodes {
+ if n.ID == stoppedNode.ID {
continue
}
+ pos = n.app.GetLatestDeliveredPosition()
if pos.Round < untilRound {
continue ReachFinished
}
@@ -485,6 +507,7 @@ ReachAlive:
case <-runnerCtx.Done():
// This test passed.
}
+ s.Require().Equal(stoppedRound, stopRound)
}
func (s *ConsensusTestSuite) TestForceSync() {
@@ -632,6 +655,100 @@ Loop:
s.verifyNodes(nodes)
}
+func (s *ConsensusTestSuite) TestResetDKG() {
+ var (
+ req = s.Require()
+ peerCount = 5
+ dMoment = time.Now().UTC()
+ untilRound = uint64(3)
+ )
+ prvKeys, pubKeys, err := test.NewKeys(peerCount)
+ req.NoError(err)
+ // Setup seed governance instance. Give a short latency to make this test
+ // run faster.
+ seedGov, err := test.NewGovernance(
+ test.NewState(core.DKGDelayRound,
+ pubKeys, 100*time.Millisecond, &common.NullLogger{}, true),
+ core.ConfigRoundShift)
+ req.NoError(err)
+ req.NoError(seedGov.State().RequestChange(
+ test.StateChangeRoundLength, uint64(100)))
+ req.NoError(seedGov.State().RequestChange(
+ test.StateChangeNotarySetSize, uint32(4)))
+ req.NoError(seedGov.State().RequestChange(
+ test.StateChangeDKGSetSize, uint32(4)))
+ nodes := s.setupNodes(dMoment, prvKeys, seedGov)
+ // A round event handler to purge utils.NodeSetCache in test.Network.
+ purgeHandlerGen := func(n *test.Network) func([]utils.RoundEventParam) {
+ return func(evts []utils.RoundEventParam) {
+ for _, e := range evts {
+ if e.Reset == 0 {
+ continue
+ }
+ n.PurgeNodeSetCache(e.Round + 1)
+ }
+ }
+ }
+ // Round Height reference table:
+ // - Round:1 Reset:0 -- 100
+ // - Round:1 Reset:1 -- 200
+ // - Round:1 Reset:2 -- 300
+ // - Round:2 Reset:0 -- 400
+ // - Round:2 Reset:1 -- 500
+ // - Round:3 Reset:0 -- 600
+ // Register round event handler to prohibit/unprohibit DKG operation to
+ // governance.
+ roundHandlerGen := func(g *test.Governance) func([]utils.RoundEventParam) {
+ return func(evts []utils.RoundEventParam) {
+ trigger := func(e utils.RoundEventParam) {
+ // Make round 2 reseted until resetCount == 2.
+ if e.Round == 1 && e.Reset == 0 {
+ prohibitDKG(g)
+ }
+ if e.Round == 1 && e.Reset == 2 {
+ unprohibitDKG(g)
+ }
+ // Make round 3 reseted until resetCount == 1.
+ if e.Round == 2 && e.Reset == 0 {
+ // Allow DKG final this time.
+ g.Prohibit(test.StateAddDKGMasterPublicKey)
+ g.Prohibit(test.StateAddDKGComplaint)
+ }
+ if e.Round == 2 && e.Reset == 1 {
+ unprohibitDKG(g)
+ }
+ }
+ for _, e := range evts {
+ trigger(e)
+ }
+ }
+ }
+ for _, n := range nodes {
+ n.rEvt.Register(purgeHandlerGen(n.network))
+ n.rEvt.Register(roundHandlerGen(n.gov))
+ go n.con.Run()
+ }
+Loop:
+ for {
+ <-time.After(5 * time.Second)
+ for _, n := range nodes {
+ latestPos := n.app.GetLatestDeliveredPosition()
+ fmt.Println("latestPos", n.ID, &latestPos)
+ if latestPos.Round < untilRound {
+ continue Loop
+ }
+ }
+ // Oh ya.
+ break
+ }
+ s.verifyNodes(nodes)
+ for _, n := range nodes {
+ n.con.Stop()
+ req.Equal(n.gov.DKGResetCount(2), uint64(2))
+ req.Equal(n.gov.DKGResetCount(3), uint64(1))
+ }
+}
+
func TestConsensus(t *testing.T) {
suite.Run(t, new(ConsensusTestSuite))
}