aboutsummaryrefslogblamecommitdiffstats
path: root/integration_test/consensus_test.go
blob: fa62bda356ceee56ec00b958bfffc07463ced92e (plain) (tree)



















                                                                               
                 
             





                                                            
                                                                 
                                                             
                                                                 

                                                                
                                                                









                                                                              



                                
                           
                             
















                                                          
                                                  

                                                           





                                                                                   

                                                     

                                                                         
                                               

                                                                             








                                                                               












                                                             


                    

                                                                        
                                                           









                                                                            
                                                    


                                    



                                                                    
                                                  


                       
                                                                             





                                                                   








                                                                                     



                                                                                       


                                                                         






                                                                               






                                           
                                      
                               
                                                           

















                                                                         












                                                                                 


                              



                                                                                 
                                           


                                                                                   

                                                  
                                                               


                                                        
                              



                                             
                                                                           

                                                                       
                                                                  






                                                         
                            
                                        

















                                                                                  


                                                                                   
                        

                                                               





                                                               


                                                  
                                                      




                                                               
                                                      





















                                                                  
                              



                                             
                                                                           

                                                                       
                                                                  







                                                         

 








                                                                                    
                                      







                                                                                 


                                                                                   

                                                  
                                                               





                                                      





                                                                                  

                                                         

                                        
                                      



                                                                                 

                                                           

             





                                                                             





                                                                                         


                                                                    

                                                   



















                                                                                 





                                                    
                                                                                
                                                                               
                                             

                                                        














                                                           
                                                                                   

                      
                                                            
                                                   

                                                                        



                                                           

                                                                      



                                                                                           
                                                                                       
                                                                                                

                                                
                                                           
















                                                                


                                             
// Copyright 2018 The dexon-consensus Authors
// This file is part of the dexon-consensus library.
//
// The dexon-consensus library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus library. If not, see
// <http://www.gnu.org/licenses/>.

package integration

import (
    "context"
    "fmt"
    "sync"
    "testing"
    "time"

    "github.com/dexon-foundation/dexon-consensus/common"
    "github.com/dexon-foundation/dexon-consensus/core"
    "github.com/dexon-foundation/dexon-consensus/core/crypto"
    "github.com/dexon-foundation/dexon-consensus/core/db"
    "github.com/dexon-foundation/dexon-consensus/core/syncer"
    "github.com/dexon-foundation/dexon-consensus/core/test"
    "github.com/dexon-foundation/dexon-consensus/core/types"
    "github.com/dexon-foundation/dexon-consensus/core/utils"
    "github.com/stretchr/testify/suite"
)

// There is no scheduler in these tests, we need to wait a long period to make
// sure these tests are ok.
type ConsensusTestSuite struct {
    suite.Suite
}

type node struct {
    ID      types.NodeID
    con     *core.Consensus
    app     *test.App
    gov     *test.Governance
    db      db.Database
    network *test.Network
}

func (s *ConsensusTestSuite) setupNodes(
    dMoment time.Time,
    prvKeys []crypto.PrivateKey,
    seedGov *test.Governance) map[types.NodeID]*node {
    var (
        wg sync.WaitGroup
    )
    // Setup peer server at transport layer.
    server := test.NewFakeTransportServer()
    serverChannel, err := server.Host()
    s.Require().NoError(err)
    // setup nodes.
    nodes := make(map[types.NodeID]*node)
    wg.Add(len(prvKeys))
    for _, k := range prvKeys {
        dbInst, err := db.NewMemBackedDB()
        s.Require().NoError(err)
        // Prepare essential modules: app, gov, db.
        networkModule := test.NewNetwork(k.PublicKey(), test.NetworkConfig{
            Type:          test.NetworkTypeFake,
            DirectLatency: &test.FixedLatencyModel{},
            GossipLatency: &test.FixedLatencyModel{},
            Marshaller:    test.NewDefaultMarshaller(nil)},
        )
        gov := seedGov.Clone()
        gov.SwitchToRemoteMode(networkModule)
        gov.NotifyRoundHeight(0, 0)
        networkModule.AddNodeSetCache(utils.NewNodeSetCache(gov))
        app := test.NewApp(gov.State())
        nID := types.NewNodeID(k.PublicKey())
        nodes[nID] = &node{nID, nil, app, gov, dbInst, networkModule}
        go func() {
            defer wg.Done()
            s.Require().NoError(networkModule.Setup(serverChannel))
            go networkModule.Run()
        }()
    }
    // Make sure transport layer is ready.
    s.Require().NoError(server.WaitForPeers(uint32(len(prvKeys))))
    wg.Wait()
    for _, k := range prvKeys {
        node := nodes[types.NewNodeID(k.PublicKey())]
        // Now is the consensus module.
        node.con = core.NewConsensus(
            dMoment,
            node.app,
            node.gov,
            node.db,
            node.network,
            k,
            &common.NullLogger{},
        )
    }
    return nodes
}

func (s *ConsensusTestSuite) verifyNodes(nodes map[types.NodeID]*node) {
    for ID, node := range nodes {
        s.Require().NoError(test.VerifyDB(node.db))
        s.Require().NoError(node.app.Verify())
        for otherID, otherNode := range nodes {
            if ID == otherID {
                continue
            }
            s.Require().NoError(node.app.Compare(otherNode.app))
        }
    }
}

func (s *ConsensusTestSuite) syncBlocksWithSomeNode(
    sourceNode, syncNode *node,
    syncerObj *syncer.Consensus,
    nextSyncHeight uint64) (
    syncedCon *core.Consensus, syncerHeight uint64, err error) {

    syncerHeight = nextSyncHeight
    // Setup revealer.
    DBAll, err := sourceNode.db.GetAllBlocks()
    if err != nil {
        return
    }
    r, err := test.NewCompactionChainBlockRevealer(DBAll, nextSyncHeight)
    if err != nil {
        return
    }
    // Load all blocks from revealer and dump them into syncer.
    var compactionChainBlocks []*types.Block
    syncBlocks := func() (done bool) {
        // Apply txs in blocks to make sure our governance instance is ready.
        // This action should be performed by fullnode in production mode.
        for _, b := range compactionChainBlocks {
            if err = syncNode.gov.State().Apply(b.Payload); err != nil {
                if err != test.ErrDuplicatedChange {
                    return
                }
                err = nil
            }
            // Sync app.
            syncNode.app.BlockConfirmed(*b)
            syncNode.app.BlockDelivered(b.Hash, b.Position, b.Finalization)
            // Sync gov.
            syncNode.gov.CatchUpWithRound(
                b.Position.Round + core.ConfigRoundShift)
        }
        var synced bool
        synced, err = syncerObj.SyncBlocks(compactionChainBlocks, true)
        if err != nil {
            done = true
        }
        if synced {
            syncedCon, err = syncerObj.GetSyncedConsensus()
            done = true
        }
        compactionChainBlocks = nil
        return
    }
    for {
        var b types.Block
        b, err = r.NextBlock()
        if err != nil {
            if err == db.ErrIterationFinished {
                err = nil
                if syncBlocks() {
                    break
                }
            }
            break
        }
        syncerHeight = b.Finalization.Height + 1
        compactionChainBlocks = append(compactionChainBlocks, &b)
        if len(compactionChainBlocks) >= 100 {
            if syncBlocks() {
                break
            }
        }
    }
    return
}

func (s *ConsensusTestSuite) TestSimple() {
    // The simplest test case:
    //  - Node set is equals to DKG set and notary set for each chain in each
    //    round.
    //  - No configuration change.
    //  - 4 rounds (0, 1 are genesis rounds, round 2 would be ready when the
    //    first block delivered. Test until round 3 should be enough.
    var (
        req        = s.Require()
        peerCount  = 4
        dMoment    = time.Now().UTC()
        untilRound = uint64(5)
    )
    if testing.Short() {
        untilRound = 2
    }
    prvKeys, pubKeys, err := test.NewKeys(peerCount)
    req.NoError(err)
    // Setup seed governance instance. Give a short latency to make this test
    // run faster.
    seedGov, err := test.NewGovernance(
        test.NewState(
            pubKeys, 100*time.Millisecond, &common.NullLogger{}, true),
        core.ConfigRoundShift)
    req.NoError(err)
    req.NoError(seedGov.State().RequestChange(
        test.StateChangeRoundInterval, 50*time.Second))
    // A short round interval.
    nodes := s.setupNodes(dMoment, prvKeys, seedGov)
    for _, n := range nodes {
        go n.con.Run()
    }
Loop:
    for {
        <-time.After(5 * time.Second)
        fmt.Println("check latest position delivered by each node")
        for _, n := range nodes {
            latestPos := n.app.GetLatestDeliveredPosition()
            fmt.Println("latestPos", n.ID, &latestPos)
            if latestPos.Round < untilRound {
                continue Loop
            }
        }
        // Oh ya.
        break
    }
    s.verifyNodes(nodes)
    // TODO(haoping) stop consensus.
}

func (s *ConsensusTestSuite) TestNumChainsChange() {
    var (
        req        = s.Require()
        peerCount  = 4
        dMoment    = time.Now().UTC()
        untilRound = uint64(6)
    )
    if testing.Short() {
        // Short test won't test configuration change packed as payload of
        // blocks and applied when delivered.
        untilRound = 5
    }
    prvKeys, pubKeys, err := test.NewKeys(peerCount)
    req.NoError(err)
    // Setup seed governance instance.
    seedGov, err := test.NewGovernance(
        test.NewState(
            pubKeys, 100*time.Millisecond, &common.NullLogger{}, true),
        core.ConfigRoundShift)
    req.NoError(err)
    req.NoError(seedGov.State().RequestChange(
        test.StateChangeRoundInterval, 45*time.Second))
    seedGov.CatchUpWithRound(0)
    // Setup configuration for round 0 and round 1.
    req.NoError(seedGov.State().RequestChange(
        test.StateChangeNumChains, uint32(5)))
    req.NoError(seedGov.State().RequestChange(
        test.StateChangeRoundInterval, 55*time.Second))
    seedGov.CatchUpWithRound(1)
    // Setup configuration for round 2.
    req.NoError(seedGov.State().RequestChange(
        test.StateChangeNumChains, uint32(6)))
    req.NoError(seedGov.State().RequestChange(
        test.StateChangeRoundInterval, 55*time.Second))
    seedGov.CatchUpWithRound(2)
    // Setup configuration for round 3.
    req.NoError(seedGov.State().RequestChange(
        test.StateChangeNumChains, uint32(5)))
    req.NoError(seedGov.State().RequestChange(
        test.StateChangeRoundInterval, 75*time.Second))
    seedGov.CatchUpWithRound(3)
    // Setup nodes.
    nodes := s.setupNodes(dMoment, prvKeys, seedGov)
    // Pick master node, and register changes on it.
    var pickedNode *node
    for _, pickedNode = range nodes {
        break
    }
    // Register configuration changes for round 4.
    req.NoError(pickedNode.gov.RegisterConfigChange(
        4, test.StateChangeNumChains, uint32(4)))
    req.NoError(pickedNode.gov.RegisterConfigChange(
        4, test.StateChangeRoundInterval, 45*time.Second))
    // Register configuration changes for round 5.
    req.NoError(pickedNode.gov.RegisterConfigChange(
        5, test.StateChangeNumChains, uint32(5)))
    req.NoError(pickedNode.gov.RegisterConfigChange(
        5, test.StateChangeRoundInterval, 55*time.Second))
    // Run test.
    for _, n := range nodes {
        go n.con.Run()
    }
Loop:
    for {
        <-time.After(5 * time.Second)
        fmt.Println("check latest position delivered by each node")
        for _, n := range nodes {
            latestPos := n.app.GetLatestDeliveredPosition()
            fmt.Println("latestPos", n.ID, &latestPos)
            if latestPos.Round < untilRound {
                continue Loop
            }
        }
        // Oh ya.
        break
    }
    s.verifyNodes(nodes)
}

func (s *ConsensusTestSuite) TestSync() {
    // The sync test case:
    // - No configuration change.
    // - One node does not run when all others starts until aliveRound exceeded.
    var (
        req        = s.Require()
        peerCount  = 4
        dMoment    = time.Now().UTC()
        untilRound = uint64(5)
        stopRound  = uint64(3)
        aliveRound = uint64(1)
        errChan    = make(chan error, 100)
    )
    prvKeys, pubKeys, err := test.NewKeys(peerCount)
    req.NoError(err)
    // Setup seed governance instance. Give a short latency to make this test
    // run faster.
    seedGov, err := test.NewGovernance(
        test.NewState(
            pubKeys, 100*time.Millisecond, &common.NullLogger{}, true),
        core.ConfigRoundShift)
    req.NoError(err)
    req.NoError(seedGov.State().RequestChange(
        test.StateChangeRoundInterval, 55*time.Second))
    req.NoError(seedGov.State().RequestChange(
        test.StateChangeNumChains, uint32(5)))
    seedGov.CatchUpWithRound(0)
    req.NoError(seedGov.State().RequestChange(
        test.StateChangeNumChains, uint32(4)))
    seedGov.CatchUpWithRound(1)
    // A short round interval.
    nodes := s.setupNodes(dMoment, prvKeys, seedGov)
    // Choose the first node as "syncNode" that its consensus' Run() is called
    // later.
    syncNode := nodes[types.NewNodeID(pubKeys[0])]
    syncNode.con = nil
    // Pick a node to stop when synced.
    stoppedNode := nodes[types.NewNodeID(pubKeys[1])]
    for _, n := range nodes {
        if n.ID != syncNode.ID {
            go n.con.Run()
        }
    }
    // Clean syncNode's network receive channel, or it might exceed the limit
    // and block other go routines.
    dummyReceiverCtxCancel := test.LaunchDummyReceiver(
        context.Background(), syncNode.network)
ReachAlive:
    for {
        // Check if any error happened or sleep for a period of time.
        select {
        case err := <-errChan:
            req.NoError(err)
        case <-time.After(5 * time.Second):
        }
        // If all nodes excepts syncNode have reached aliveRound, call syncNode's
        // Run() and send it all blocks in one of normal node's compaction chain.
        for id, n := range nodes {
            if id == syncNode.ID {
                continue
            }
            pos := n.app.GetLatestDeliveredPosition()
            if pos.Round < aliveRound {
                fmt.Println("latestPos", n.ID, &pos)
                continue ReachAlive
            }
        }
        dummyReceiverCtxCancel()
        break
    }
    // Initiate Syncer.
    runnerCtx, runnerCtxCancel := context.WithCancel(context.Background())
    defer runnerCtxCancel()
    syncerObj := syncer.NewConsensus(
        dMoment,
        syncNode.app,
        syncNode.gov,
        syncNode.db,
        syncNode.network,
        prvKeys[0],
        &common.NullLogger{},
    )
    // Initialize communication channel, it's not recommended to assertion in
    // another go routine.
    go func() {
        var (
            syncedHeight uint64
            err          error
            syncedCon    *core.Consensus
        )
    SyncLoop:
        for {
            syncedCon, syncedHeight, err = s.syncBlocksWithSomeNode(
                stoppedNode, syncNode, syncerObj, syncedHeight)
            if syncedCon != nil {
                syncNode.con = syncedCon
                go syncNode.con.Run()
                break SyncLoop
            }
            if err != nil {
                errChan <- err
                break SyncLoop
            }
            select {
            case <-runnerCtx.Done():
                break SyncLoop
            case <-time.After(2 * time.Second):
            }
        }
    }()
    // Wait until all nodes reach 'untilRound'.
    go func() {
        n, pos := stoppedNode, stoppedNode.app.GetLatestDeliveredPosition()
    ReachFinished:
        for {
            fmt.Println("latestPos", n.ID, &pos)
            time.Sleep(5 * time.Second)
            for _, n = range nodes {
                pos = n.app.GetLatestDeliveredPosition()
                if n.ID == stoppedNode.ID {
                    if n.con == nil {
                        continue
                    }
                    if pos.Round < stopRound {
                        continue ReachFinished
                    }
                    // Stop a node, we should still be able to proceed.
                    stoppedNode.con.Stop()
                    stoppedNode.con = nil
                    fmt.Println("one node stopped", stoppedNode.ID)
                    test.LaunchDummyReceiver(runnerCtx, stoppedNode.network)
                    continue
                }
                if pos.Round < untilRound {
                    continue ReachFinished
                }
            }
            break
        }
        runnerCtxCancel()
    }()
    // Block until any reasonable testing milestone reached.
    select {
    case err := <-errChan:
        req.NoError(err)
    case <-runnerCtx.Done():
        // This test passed.
        // TODO(haoping) stop consensus.
    }
}

func TestConsensus(t *testing.T) {
    suite.Run(t, new(ConsensusTestSuite))
}