// Copyright 2018 The dexon-consensus Authors // This file is part of the dexon-consensus library. // // The dexon-consensus library is free software: you can redistribute it // and/or modify it under the terms of the GNU Lesser General Public License as // published by the Free Software Foundation, either version 3 of the License, // or (at your option) any later version. // // The dexon-consensus library is distributed in the hope that it will be // useful, but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser // General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the dexon-consensus library. If not, see // . package integration import ( "context" "fmt" "log" "os" "sync" "testing" "time" "github.com/stretchr/testify/suite" "gitlab.com/byzantine-lab/tangerine-consensus/common" "gitlab.com/byzantine-lab/tangerine-consensus/core" "gitlab.com/byzantine-lab/tangerine-consensus/core/crypto" "gitlab.com/byzantine-lab/tangerine-consensus/core/db" "gitlab.com/byzantine-lab/tangerine-consensus/core/syncer" "gitlab.com/byzantine-lab/tangerine-consensus/core/test" "gitlab.com/byzantine-lab/tangerine-consensus/core/types" "gitlab.com/byzantine-lab/tangerine-consensus/core/utils" ) // There is no scheduler in these tests, we need to wait a long period to make // sure these tests are ok. type ConsensusTestSuite struct { suite.Suite } // A round event handler to purge utils.NodeSetCache in test.Network. func purgeHandlerGen(n *test.Network) func([]utils.RoundEventParam) { return func(evts []utils.RoundEventParam) { for _, e := range evts { if e.Reset == 0 { continue } n.PurgeNodeSetCache(e.Round + 1) } } } func govHandlerGen( round, reset uint64, g *test.Governance, doer func(*test.Governance)) func([]utils.RoundEventParam) { return func(evts []utils.RoundEventParam) { for _, e := range evts { if e.Round == round && e.Reset == reset { doer(g) } } } } type node struct { ID types.NodeID con *core.Consensus app *test.App gov *test.Governance rEvt *utils.RoundEvent db db.Database network *test.Network logger common.Logger } func prohibitDKG(gov *test.Governance) { gov.Prohibit(test.StateAddDKGMasterPublicKey) gov.Prohibit(test.StateAddDKGFinal) gov.Prohibit(test.StateAddDKGComplaint) } func prohibitDKGExceptFinalize(gov *test.Governance) { gov.Prohibit(test.StateAddDKGMasterPublicKey) gov.Prohibit(test.StateAddDKGComplaint) } func unprohibitDKG(gov *test.Governance) { gov.Unprohibit(test.StateAddDKGMasterPublicKey) gov.Unprohibit(test.StateAddDKGFinal) gov.Unprohibit(test.StateAddDKGComplaint) } func (s *ConsensusTestSuite) setupNodes( dMoment time.Time, prvKeys []crypto.PrivateKey, seedGov *test.Governance) map[types.NodeID]*node { var ( wg sync.WaitGroup initRound uint64 ) // Setup peer server at transport layer. server := test.NewFakeTransportServer() serverChannel, err := server.Host() s.Require().NoError(err) // setup nodes. nodes := make(map[types.NodeID]*node) wg.Add(len(prvKeys)) for i, k := range prvKeys { dbInst, err := db.NewMemBackedDB() s.Require().NoError(err) // Prepare essential modules: app, gov, db. networkModule := test.NewNetwork(k.PublicKey(), test.NetworkConfig{ Type: test.NetworkTypeFake, DirectLatency: &test.FixedLatencyModel{}, GossipLatency: &test.FixedLatencyModel{}, Marshaller: test.NewDefaultMarshaller(nil)}, ) gov := seedGov.Clone() gov.SwitchToRemoteMode(networkModule) gov.NotifyRound(initRound, types.GenesisHeight) networkModule.AttachNodeSetCache(utils.NewNodeSetCache(gov)) f, err := os.Create(fmt.Sprintf("log.%d.log", i)) if err != nil { panic(err) } logger := common.NewCustomLogger(log.New(f, "", log.LstdFlags|log.Lmicroseconds)) rEvt, err := utils.NewRoundEvent(context.Background(), gov, logger, types.Position{Height: types.GenesisHeight}, core.ConfigRoundShift) s.Require().NoError(err) nID := types.NewNodeID(k.PublicKey()) nodes[nID] = &node{ ID: nID, app: test.NewApp(initRound+1, gov, rEvt), gov: gov, db: dbInst, logger: logger, rEvt: rEvt, network: networkModule, } go func() { defer wg.Done() s.Require().NoError(networkModule.Setup(serverChannel)) go networkModule.Run() }() } // Make sure transport layer is ready. s.Require().NoError(server.WaitForPeers(uint32(len(prvKeys)))) wg.Wait() for _, k := range prvKeys { node := nodes[types.NewNodeID(k.PublicKey())] // Now is the consensus module. node.con = core.NewConsensus( dMoment, node.app, node.gov, node.db, node.network, k, node.logger, ) } return nodes } func (s *ConsensusTestSuite) verifyNodes(nodes map[types.NodeID]*node) { for ID, node := range nodes { s.Require().NoError(test.VerifyDB(node.db)) s.Require().NoError(node.app.Verify()) for otherID, otherNode := range nodes { if ID == otherID { continue } s.Require().NoError(node.app.Compare(otherNode.app)) } } } func (s *ConsensusTestSuite) syncBlocksWithSomeNode( sourceNode, syncNode *node, syncerObj *syncer.Consensus, nextSyncHeight uint64) ( syncedCon *core.Consensus, syncerHeight uint64, err error) { syncerHeight = nextSyncHeight // Setup revealer. DBAll, err := sourceNode.db.GetAllBlocks() if err != nil { return } r, err := test.NewBlockRevealerByPosition(DBAll, nextSyncHeight) if err != nil { return } // Load all blocks from revealer and dump them into syncer. var compactionChainBlocks []*types.Block syncBlocks := func() (done bool) { // Apply txs in blocks to make sure our governance instance is ready. // This action should be performed by fullnode in production mode. for _, b := range compactionChainBlocks { if err = syncNode.gov.State().Apply(b.Payload); err != nil { if err != test.ErrDuplicatedChange { return } err = nil } // Sync app. syncNode.app.BlockConfirmed(*b) syncNode.app.BlockDelivered(b.Hash, b.Position, b.Randomness) // Sync gov. syncNode.gov.CatchUpWithRound( b.Position.Round + core.ConfigRoundShift) } var synced bool synced, err = syncerObj.SyncBlocks(compactionChainBlocks, true) if err != nil { done = true } if synced { syncedCon, err = syncerObj.GetSyncedConsensus() done = true } compactionChainBlocks = nil return } for { var b types.Block b, err = r.NextBlock() if err != nil { if err == db.ErrIterationFinished { err = nil if syncBlocks() { break } } break } syncerHeight = b.Position.Height + 1 compactionChainBlocks = append(compactionChainBlocks, &b) if len(compactionChainBlocks) >= 20 { if syncBlocks() { break } } } return } func (s *ConsensusTestSuite) TestSimple() { if testing.Short() { // All other tests will cover this basic case. To speed up CI process, // ignore this test in short mode. return } // The simplest test case: // - Node set is equals to DKG set and notary set for each chain in each // round. // - No configuration change. // - 4 rounds (0, 1 are genesis rounds, round 2 would be ready when the // first block delivered. Test until round 3 should be enough. var ( req = s.Require() peerCount = 4 dMoment = time.Now().UTC() untilRound = uint64(5) ) if testing.Short() { untilRound = 2 } prvKeys, pubKeys, err := test.NewKeys(peerCount) req.NoError(err) // Setup seed governance instance. Give a short latency to make this test // run faster. seedGov, err := test.NewGovernance( test.NewState(core.DKGDelayRound, pubKeys, 100*time.Millisecond, &common.NullLogger{}, true), core.ConfigRoundShift) req.NoError(err) req.NoError(seedGov.State().RequestChange( test.StateChangeRoundLength, uint64(100))) // A short round interval. nodes := s.setupNodes(dMoment, prvKeys, seedGov) for _, n := range nodes { go n.con.Run(make(chan struct{})) defer n.con.Stop() } Loop: for { <-time.After(5 * time.Second) for _, n := range nodes { latestPos := n.app.GetLatestDeliveredPosition() fmt.Println("latestPos", n.ID, &latestPos) if latestPos.Round < untilRound { continue Loop } } // Oh ya. break } s.verifyNodes(nodes) } func (s *ConsensusTestSuite) TestSetSizeChange() { var ( req = s.Require() peerCount = 7 dMoment = time.Now().UTC() untilRound = uint64(5) ) if testing.Short() { // Short test won't test configuration change packed as payload of // blocks and applied when delivered. untilRound = 5 } prvKeys, pubKeys, err := test.NewKeys(peerCount) req.NoError(err) // Setup seed governance instance. seedGov, err := test.NewGovernance( test.NewState(core.DKGDelayRound, pubKeys, 100*time.Millisecond, &common.NullLogger{}, true), core.ConfigRoundShift) req.NoError(err) req.NoError(seedGov.State().RequestChange( test.StateChangeRoundLength, uint64(100))) req.NoError(seedGov.State().RequestChange( test.StateChangeNotarySetSize, uint32(4))) seedGov.CatchUpWithRound(0) // Setup configuration for round 0 and round 1. req.NoError(seedGov.State().RequestChange( test.StateChangeRoundLength, uint64(100))) req.NoError(seedGov.State().RequestChange( test.StateChangeNotarySetSize, uint32(5))) seedGov.CatchUpWithRound(1) // Setup configuration for round 2. req.NoError(seedGov.State().RequestChange( test.StateChangeRoundLength, uint64(100))) req.NoError(seedGov.State().RequestChange( test.StateChangeNotarySetSize, uint32(6))) seedGov.CatchUpWithRound(2) // Setup configuration for round 3. req.NoError(seedGov.State().RequestChange( test.StateChangeRoundLength, uint64(100))) req.NoError(seedGov.State().RequestChange( test.StateChangeNotarySetSize, uint32(4))) seedGov.CatchUpWithRound(3) // Setup nodes. nodes := s.setupNodes(dMoment, prvKeys, seedGov) // Pick master node, and register changes on it. var pickedNode *node for _, pickedNode = range nodes { break } // Register configuration changes for round 4. req.NoError(pickedNode.gov.RegisterConfigChange( 4, test.StateChangeRoundLength, uint64(100))) req.NoError(pickedNode.gov.RegisterConfigChange( 4, test.StateChangeNotarySetSize, uint32(5))) // Register configuration changes for round 5. req.NoError(pickedNode.gov.RegisterConfigChange( 5, test.StateChangeRoundLength, uint64(60))) req.NoError(pickedNode.gov.RegisterConfigChange( 5, test.StateChangeNotarySetSize, uint32(4))) // Run test. for _, n := range nodes { go n.con.Run(make(chan struct{})) defer n.con.Stop() } Loop: for { <-time.After(5 * time.Second) for _, n := range nodes { latestPos := n.app.GetLatestDeliveredPosition() fmt.Println("latestPos", n.ID, &latestPos) if latestPos.Round < untilRound { continue Loop } } // Oh ya. break } s.verifyNodes(nodes) } func (s *ConsensusTestSuite) TestSync() { // The sync test case: // - No configuration change. // - One node does not run when all others starts until aliveRound exceeded. // - One DKG reset happened before syncing. var ( req = s.Require() peerCount = 4 dMoment = time.Now().UTC() untilRound = uint64(6) stopRound = uint64(4) // aliveRound should be large enough to test round event handling in // syncer. aliveRound = uint64(2) errChan = make(chan error, 100) ) prvKeys, pubKeys, err := test.NewKeys(peerCount) req.NoError(err) // Setup seed governance instance. Give a short latency to make this test // run faster. seedGov, err := test.NewGovernance( test.NewState(core.DKGDelayRound, pubKeys, 100*time.Millisecond, &common.NullLogger{}, true), core.ConfigRoundShift) req.NoError(err) req.NoError(seedGov.State().RequestChange( test.StateChangeRoundLength, uint64(100))) seedGov.CatchUpWithRound(0) seedGov.CatchUpWithRound(1) // A short round interval. nodes := s.setupNodes(dMoment, prvKeys, seedGov) // Choose the first node as "syncNode" that its consensus' Run() is called // later. syncNode := nodes[types.NewNodeID(pubKeys[0])] syncNode.con = nil // Pick a node to stop when synced. stoppedNode := nodes[types.NewNodeID(pubKeys[1])] for _, n := range nodes { n.rEvt.Register(purgeHandlerGen(n.network)) // Round Height reference table: // - Round:1 Reset:0 -- 100 // - Round:1 Reset:1 -- 200 // - Round:2 Reset:0 -- 300 n.rEvt.Register(govHandlerGen(1, 0, n.gov, prohibitDKG)) n.rEvt.Register(govHandlerGen(1, 1, n.gov, unprohibitDKG)) if n.ID != syncNode.ID { go n.con.Run(make(chan struct{})) if n.ID != stoppedNode.ID { defer n.con.Stop() } } } // Clean syncNode's network receive channel, or it might exceed the limit // and block other go routines. dummyReceiverCtxCancel, dummyFinished := utils.LaunchDummyReceiver( context.Background(), syncNode.network.ReceiveChan(), nil) ReachAlive: for { // Check if any error happened or sleep for a period of time. select { case err := <-errChan: req.NoError(err) case <-time.After(5 * time.Second): } // If all nodes excepts syncNode have reached aliveRound, call syncNode's // Run() and send it all blocks in one of normal node's compaction chain. for id, n := range nodes { if id == syncNode.ID { continue } pos := n.app.GetLatestDeliveredPosition() if pos.Round < aliveRound { fmt.Println("latestPos", n.ID, &pos) continue ReachAlive } } dummyReceiverCtxCancel() <-dummyFinished break } // Initiate Syncer. runnerCtx, runnerCtxCancel := context.WithCancel(context.Background()) defer runnerCtxCancel() f, err := os.Create("log.sync.log") if err != nil { panic(err) } logger := common.NewCustomLogger(log.New(f, "", log.LstdFlags|log.Lmicroseconds)) syncerObj := syncer.NewConsensus( 0, dMoment, syncNode.app, syncNode.gov, syncNode.db, syncNode.network, prvKeys[0], logger, ) // Initialize communication channel, it's not recommended to assertion in // another go routine. go func() { var ( syncedHeight uint64 = 1 err error syncedCon *core.Consensus ) SyncLoop: for { syncedCon, syncedHeight, err = s.syncBlocksWithSomeNode( stoppedNode, syncNode, syncerObj, syncedHeight) if syncedCon != nil { syncNode.con = syncedCon go syncNode.con.Run(make(chan struct{})) go func() { <-runnerCtx.Done() syncNode.con.Stop() }() break SyncLoop } if err != nil { errChan <- err break SyncLoop } select { case <-runnerCtx.Done(): break SyncLoop case <-time.After(4 * time.Second): } } }() // Wait until all nodes reach 'untilRound'. var stoppedRound uint64 go func() { n, pos := stoppedNode, stoppedNode.app.GetLatestDeliveredPosition() ReachFinished: for { fmt.Println("latestPos", n.ID, &pos) time.Sleep(5 * time.Second) if stoppedNode.con != nil { pos = n.app.GetLatestDeliveredPosition() if pos.Round >= stopRound { // Stop a node, we should still be able to proceed. stoppedNode.con.Stop() stoppedNode.con = nil stoppedRound = pos.Round fmt.Println("one node stopped", stoppedNode.ID) utils.LaunchDummyReceiver( runnerCtx, stoppedNode.network.ReceiveChan(), nil) } } for _, n = range nodes { if n.ID == stoppedNode.ID { continue } pos = n.app.GetLatestDeliveredPosition() if pos.Round < untilRound { continue ReachFinished } } break } runnerCtxCancel() }() // Block until any reasonable testing milestone reached. select { case err := <-errChan: req.NoError(err) case <-runnerCtx.Done(): // This test passed. } s.Require().Equal(stoppedRound, stopRound) } func (s *ConsensusTestSuite) TestForceSync() { // The sync test case: // - No configuration change. // - One node does not run when all others starts until aliveRound exceeded. var ( req = s.Require() peerCount = 4 dMoment = time.Now().UTC() untilRound = uint64(3) stopRound = uint64(1) errChan = make(chan error, 100) ) prvKeys, pubKeys, err := test.NewKeys(peerCount) req.NoError(err) // Setup seed governance instance. Give a short latency to make this test // run faster. seedGov, err := test.NewGovernance( test.NewState(core.DKGDelayRound, pubKeys, 100*time.Millisecond, &common.NullLogger{}, true), core.ConfigRoundShift) req.NoError(err) req.NoError(seedGov.State().RequestChange( test.StateChangeRoundLength, uint64(100))) seedGov.CatchUpWithRound(0) seedGov.CatchUpWithRound(1) // A short round interval. nodes := s.setupNodes(dMoment, prvKeys, seedGov) for _, n := range nodes { go n.con.Run(make(chan struct{})) } ReachStop: for { // Check if any error happened or sleep for a period of time. select { case err := <-errChan: req.NoError(err) case <-time.After(5 * time.Second): } // If one of the nodes have reached stopRound, stop all nodes to simulate // crash. for _, n := range nodes { pos := n.app.GetLatestDeliveredPosition() if pos.Round >= stopRound { break ReachStop } else { fmt.Println("latestPos", n.ID, &pos) } } } var latestHeight uint64 var latestNodeID types.NodeID for _, n := range nodes { n.con.Stop() time.Sleep(1 * time.Second) } for nID, n := range nodes { _, height := n.db.GetCompactionChainTipInfo() if height > latestHeight { fmt.Println("Newer height", nID, height) latestNodeID = nID latestHeight = height } } fmt.Println("Latest node", latestNodeID, latestHeight) for nID, node := range nodes { if nID == latestNodeID { continue } fmt.Printf("[%p] Clearing %s %s\n", node.app, nID, node.app.GetLatestDeliveredPosition()) node.app.ClearUndeliveredBlocks() } syncerCon := make(map[types.NodeID]*syncer.Consensus, len(nodes)) for i, prvKey := range prvKeys { f, err := os.Create(fmt.Sprintf("log.sync.%d.log", i)) if err != nil { panic(err) } logger := common.NewCustomLogger(log.New(f, "", log.LstdFlags|log.Lmicroseconds)) nID := types.NewNodeID(prvKey.PublicKey()) node := nodes[nID] syncerCon[nID] = syncer.NewConsensus( latestHeight, dMoment, node.app, node.gov, node.db, node.network, prvKey, logger, ) } targetNode := nodes[latestNodeID] for nID, node := range nodes { if nID == latestNodeID { continue } syncedHeight := node.app.GetLatestDeliveredPosition().Height syncedHeight++ var err error for { fmt.Println("Syncing", nID, syncedHeight) if syncedHeight >= latestHeight { break } _, syncedHeight, err = s.syncBlocksWithSomeNode( targetNode, node, syncerCon[nID], syncedHeight) if err != nil { panic(err) } fmt.Println("Syncing after", nID, syncedHeight) } fmt.Println("Synced", nID, syncedHeight) } // Make sure all nodes are synced in db and app. _, latestHeight = targetNode.db.GetCompactionChainTipInfo() latestPos := targetNode.app.GetLatestDeliveredPosition() for _, node := range nodes { _, height := node.db.GetCompactionChainTipInfo() s.Require().Equal(height, latestHeight) pos := node.app.GetLatestDeliveredPosition() s.Require().Equal(latestPos, pos) } for _, con := range syncerCon { con.ForceSync(latestPos, true) } for nID := range nodes { con, err := syncerCon[nID].GetSyncedConsensus() s.Require().NoError(err) nodes[nID].con = con } for _, node := range nodes { go node.con.Run(make(chan struct{})) defer node.con.Stop() } Loop: for { <-time.After(5 * time.Second) for _, n := range nodes { latestPos := n.app.GetLatestDeliveredPosition() fmt.Println("latestPos", n.ID, &latestPos) if latestPos.Round < untilRound { continue Loop } } // Oh ya. break } s.verifyNodes(nodes) } func (s *ConsensusTestSuite) TestResetDKG() { var ( req = s.Require() peerCount = 5 dMoment = time.Now().UTC() untilRound = uint64(3) ) prvKeys, pubKeys, err := test.NewKeys(peerCount) req.NoError(err) // Setup seed governance instance. Give a short latency to make this test // run faster. seedGov, err := test.NewGovernance( test.NewState(core.DKGDelayRound, pubKeys, 100*time.Millisecond, &common.NullLogger{}, true), core.ConfigRoundShift) req.NoError(err) req.NoError(seedGov.State().RequestChange( test.StateChangeRoundLength, uint64(100))) req.NoError(seedGov.State().RequestChange( test.StateChangeNotarySetSize, uint32(4))) nodes := s.setupNodes(dMoment, prvKeys, seedGov) for _, n := range nodes { n.rEvt.Register(purgeHandlerGen(n.network)) // Round Height reference table: // - Round:1 Reset:0 -- 100 // - Round:1 Reset:1 -- 200 // - Round:1 Reset:2 -- 300 // - Round:2 Reset:0 -- 400 // - Round:2 Reset:1 -- 500 // - Round:3 Reset:0 -- 600 n.rEvt.Register(govHandlerGen(1, 0, n.gov, prohibitDKG)) n.rEvt.Register(govHandlerGen(1, 2, n.gov, unprohibitDKG)) n.rEvt.Register(govHandlerGen(2, 0, n.gov, prohibitDKGExceptFinalize)) n.rEvt.Register(govHandlerGen(2, 1, n.gov, unprohibitDKG)) go n.con.Run(make(chan struct{})) } Loop: for { <-time.After(5 * time.Second) for _, n := range nodes { latestPos := n.app.GetLatestDeliveredPosition() fmt.Println("latestPos", n.ID, &latestPos) if latestPos.Round < untilRound { continue Loop } } // Oh ya. break } s.verifyNodes(nodes) for _, n := range nodes { n.con.Stop() req.Equal(n.gov.DKGResetCount(2), uint64(2)) req.Equal(n.gov.DKGResetCount(3), uint64(1)) } } func TestConsensus(t *testing.T) { suite.Run(t, new(ConsensusTestSuite)) }