aboutsummaryrefslogtreecommitdiffstats
path: root/integration_test/consensus_test.go
diff options
context:
space:
mode:
authorhaoping-ku <haoping.ku@dexon.org>2018-11-29 14:30:02 +0800
committerGitHub <noreply@github.com>2018-11-29 14:30:02 +0800
commitdaf3bab93c323b173345811adc9a334dad4a7094 (patch)
tree8a3957ec1f77262ce92c56f0384b0dedc307628c /integration_test/consensus_test.go
parent8470ac070f097b261fddc42991a4d2e9ec998db6 (diff)
downloaddexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar
dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar.gz
dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar.bz2
dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar.lz
dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar.xz
dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar.zst
dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.zip
core: syncer: add syncer (#346)
Diffstat (limited to 'integration_test/consensus_test.go')
-rw-r--r--integration_test/consensus_test.go213
1 files changed, 205 insertions, 8 deletions
diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go
index 8fd3fa4..6432e9f 100644
--- a/integration_test/consensus_test.go
+++ b/integration_test/consensus_test.go
@@ -18,6 +18,7 @@
package integration
import (
+ "context"
"sync"
"testing"
"time"
@@ -26,6 +27,7 @@ import (
"github.com/dexon-foundation/dexon-consensus/core"
"github.com/dexon-foundation/dexon-consensus/core/blockdb"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
+ "github.com/dexon-foundation/dexon-consensus/core/syncer"
"github.com/dexon-foundation/dexon-consensus/core/test"
"github.com/dexon-foundation/dexon-consensus/core/types"
"github.com/dexon-foundation/dexon-consensus/core/utils"
@@ -39,10 +41,12 @@ type ConsensusTestSuite struct {
}
type node struct {
- con *core.Consensus
- app *test.App
- gov *test.Governance
- db blockdb.BlockDatabase
+ ID types.NodeID
+ con *core.Consensus
+ app *test.App
+ gov *test.Governance
+ db blockdb.BlockDatabase
+ network *test.Network
}
func (s *ConsensusTestSuite) setupNodes(
@@ -81,8 +85,9 @@ func (s *ConsensusTestSuite) setupNodes(
db,
networkModule,
k,
- &common.NullLogger{})
- nodes[con.ID] = &node{con, app, gov, db}
+ &common.NullLogger{},
+ )
+ nodes[con.ID] = &node{con.ID, con, app, gov, db, networkModule}
go func() {
defer wg.Done()
s.Require().NoError(networkModule.Setup(serverChannel))
@@ -107,6 +112,53 @@ func (s *ConsensusTestSuite) verifyNodes(nodes map[types.NodeID]*node) {
}
}
+func (s *ConsensusTestSuite) syncBlocksWithSomeNode(
+ sourceNode *node, syncerObj *syncer.Consensus, nextSyncHeight uint64) (
+ syncedCon *core.Consensus, syncerHeight uint64, err error) {
+
+ syncerHeight = nextSyncHeight
+ // Setup revealer.
+ DBAll, err := sourceNode.db.GetAll()
+ if err != nil {
+ return
+ }
+ r, err := test.NewCompactionChainRevealer(DBAll, nextSyncHeight)
+ if err != nil {
+ return
+ }
+ // Load all blocks from revealer and dump them into syncer.
+ var compactionChainBlocks []*types.Block
+ syncBlocks := func() (done bool) {
+ syncedCon, err = syncerObj.SyncBlocks(compactionChainBlocks, true)
+ if syncedCon != nil || err != nil {
+ done = true
+ }
+ compactionChainBlocks = nil
+ return
+ }
+ for {
+ var b types.Block
+ b, err = r.Next()
+ if err != nil {
+ if err == blockdb.ErrIterationFinished {
+ err = nil
+ if syncBlocks() {
+ break
+ }
+ }
+ break
+ }
+ syncerHeight = b.Finalization.Height + 1
+ compactionChainBlocks = append(compactionChainBlocks, &b)
+ if len(compactionChainBlocks) >= 100 {
+ if syncBlocks() {
+ break
+ }
+ }
+ }
+ return
+}
+
func (s *ConsensusTestSuite) TestSimple() {
// The simplest test case:
// - Node set is equals to DKG set and notary set for each chain in each
@@ -143,7 +195,7 @@ Loop:
s.T().Log("check latest position delivered by each node")
for _, n := range nodes {
latestPos := n.app.GetLatestDeliveredPosition()
- s.T().Log("latestPos", n.con.ID, &latestPos)
+ s.T().Log("latestPos", n.ID, &latestPos)
if latestPos.Round < untilRound {
continue Loop
}
@@ -152,6 +204,7 @@ Loop:
break
}
s.verifyNodes(nodes)
+ // TODO(haoping) stop consensus.
}
func (s *ConsensusTestSuite) TestNumChainsChange() {
@@ -215,7 +268,7 @@ Loop:
s.T().Log("check latest position delivered by each node")
for _, n := range nodes {
latestPos := n.app.GetLatestDeliveredPosition()
- s.T().Log("latestPos", n.con.ID, &latestPos)
+ s.T().Log("latestPos", n.ID, &latestPos)
if latestPos.Round < untilRound {
continue Loop
}
@@ -226,6 +279,150 @@ Loop:
s.verifyNodes(nodes)
}
+func (s *ConsensusTestSuite) TestSync() {
+ // The sync test case:
+ // - No configuration change.
+ // - One node does not run when all others starts until aliveRound exceeded.
+ var (
+ req = s.Require()
+ peerCount = 4
+ dMoment = time.Now().UTC()
+ untilRound = uint64(5)
+ aliveRound = uint64(1)
+ errChan = make(chan error, 100)
+ )
+ prvKeys, pubKeys, err := test.NewKeys(peerCount)
+ req.NoError(err)
+ // Setup seed governance instance. Give a short latency to make this test
+ // run faster.
+ seedGov, err := test.NewGovernance(
+ pubKeys, 100*time.Millisecond, core.ConfigRoundShift)
+ req.NoError(err)
+ req.NoError(seedGov.State().RequestChange(
+ test.StateChangeRoundInterval, 30*time.Second))
+ // A short round interval.
+ nodes := s.setupNodes(dMoment, prvKeys, seedGov)
+ // Choose the first node as "syncNode" that its consensus' Run() is called
+ // later.
+ syncNode := nodes[types.NewNodeID(pubKeys[0])]
+ syncNode.con = nil
+ // Use other node's governance instance. Normally, fullnode would make
+ // governance when syncing. In our test, it's the simplest way to achieve
+ // that.
+ syncNode.gov = nodes[types.NewNodeID(pubKeys[1])].gov
+ for _, n := range nodes {
+ if n.ID != syncNode.ID {
+ go n.con.Run(&types.Block{})
+ }
+ }
+ // Clean syncNode's network receive channel, or it might exceed the limit
+ // and block other go routines.
+ dummyReceiverCtx, dummyReceiverCtxCancel := context.WithCancel(
+ context.Background())
+ go func() {
+ Loop:
+ for {
+ select {
+ case <-syncNode.network.ReceiveChan():
+ case <-dummyReceiverCtx.Done():
+ break Loop
+ }
+ }
+ }()
+ReachAlive:
+ for {
+ // If all nodes excepts syncNode have reached aliveRound, call syncNode's
+ // Run() and send it all blocks in one of normal node's compaction chain.
+ for id, n := range nodes {
+ if id == syncNode.ID {
+ continue
+ }
+ if n.app.GetLatestDeliveredPosition().Round < aliveRound {
+ continue ReachAlive
+ }
+ // Check if any error happened or sleep for a period of time.
+ select {
+ case err := <-errChan:
+ req.NoError(err)
+ case <-time.After(5 * time.Second):
+ }
+ }
+ dummyReceiverCtxCancel()
+ break
+ }
+ // Initiate Syncer.
+ runnerCtx, runnerCtxCancel := context.WithCancel(context.Background())
+ defer runnerCtxCancel()
+ syncerObj := syncer.NewConsensus(
+ dMoment,
+ syncNode.app,
+ syncNode.gov,
+ syncNode.db,
+ syncNode.network,
+ prvKeys[0],
+ &common.NullLogger{},
+ )
+ // Initialize communication channel, it's not recommended to assertion in
+ // another go routine.
+ go func() {
+ var (
+ node *node
+ syncedHeight uint64
+ err error
+ syncedCon *core.Consensus
+ )
+ SyncLoop:
+ for {
+ for _, n := range nodes {
+ if n.ID == syncNode.ID {
+ continue
+ }
+ node = n
+ break
+ }
+ syncedCon, syncedHeight, err = s.syncBlocksWithSomeNode(
+ node, syncerObj, syncedHeight)
+ if syncedCon != nil {
+ // TODO(mission): run it and make sure it can follow up with
+ // other nodes.
+ runnerCtxCancel()
+ break SyncLoop
+ }
+ if err != nil {
+ errChan <- err
+ break SyncLoop
+ }
+ select {
+ case <-runnerCtx.Done():
+ break SyncLoop
+ case <-time.After(2 * time.Second):
+ }
+ }
+ }()
+ // Wait until all nodes reach 'untilRound'.
+ go func() {
+ ReachFinished:
+ for {
+ time.Sleep(5 * time.Second)
+ for _, n := range nodes {
+ if n.app.GetLatestDeliveredPosition().Round < untilRound {
+ continue ReachFinished
+ }
+ }
+ break
+ }
+ runnerCtxCancel()
+ }()
+ // Block until any reasonable testing milestone reached.
+ select {
+ case err := <-errChan:
+ req.NoError(err)
+ case <-runnerCtx.Done():
+ // This test passed.
+ // TODO(haoping) stop consensus.
+ }
+}
+
func TestConsensus(t *testing.T) {
suite.Run(t, new(ConsensusTestSuite))
}