aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/consensus.go18
-rw-r--r--core/consensus_test.go36
-rw-r--r--core/shard.go197
-rw-r--r--core/shard_test.go192
-rw-r--r--core/test/governance.go9
-rw-r--r--integration_test/network.go47
-rw-r--r--integration_test/node.go111
-rw-r--r--integration_test/utils.go1
8 files changed, 504 insertions, 107 deletions
diff --git a/core/consensus.go b/core/consensus.go
index 33a3d7f..7700296 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -104,7 +104,7 @@ func (recv *consensusReceiver) ConfirmBlock(hash common.Hash) {
log.Println(ErrUnknownBlockConfirmed, hash)
return
}
- if err := recv.consensus.ProcessBlock(block); err != nil {
+ if err := recv.consensus.processBlock(block); err != nil {
log.Println(err)
return
}
@@ -273,7 +273,7 @@ BALoop:
// RunLegacy starts running Legacy DEXON Consensus.
func (con *Consensus) RunLegacy() {
- go con.processMsg(con.network.ReceiveChan(), con.ProcessBlock)
+ go con.processMsg(con.network.ReceiveChan(), con.processBlock)
go con.processWitnessData()
chainID := uint32(0)
@@ -299,7 +299,7 @@ func (con *Consensus) RunLegacy() {
if err := con.PrepareGenesisBlock(genesisBlock, time.Now().UTC()); err != nil {
log.Println(err)
}
- if err := con.ProcessBlock(genesisBlock); err != nil {
+ if err := con.processBlock(genesisBlock); err != nil {
log.Println(err)
}
con.network.BroadcastBlock(genesisBlock)
@@ -317,10 +317,10 @@ ProposingBlockLoop:
ChainID: chainID,
},
}
- if err := con.PrepareBlock(block, time.Now().UTC()); err != nil {
+ if err := con.prepareBlock(block, time.Now().UTC()); err != nil {
log.Println(err)
}
- if err := con.ProcessBlock(block); err != nil {
+ if err := con.processBlock(block); err != nil {
log.Println(err)
}
con.network.BroadcastBlock(block)
@@ -368,7 +368,7 @@ func (con *Consensus) proposeBlock(chainID uint32) *types.Block {
Height: con.rbModule.nextHeight(chainID),
},
}
- if err := con.PrepareBlock(block, time.Now().UTC()); err != nil {
+ if err := con.prepareBlock(block, time.Now().UTC()); err != nil {
log.Println(err)
return nil
}
@@ -464,8 +464,8 @@ func (con *Consensus) PreProcessBlock(b *types.Block) (err error) {
return
}
-// ProcessBlock is the entry point to submit one block to a Consensus instance.
-func (con *Consensus) ProcessBlock(block *types.Block) (err error) {
+// processBlock is the entry point to submit one block to a Consensus instance.
+func (con *Consensus) processBlock(block *types.Block) (err error) {
if err := con.sanityCheck(block); err != nil {
return err
}
@@ -538,7 +538,7 @@ func (con *Consensus) checkPrepareBlock(
}
// PrepareBlock would setup header fields of block based on its ProposerID.
-func (con *Consensus) PrepareBlock(b *types.Block,
+func (con *Consensus) prepareBlock(b *types.Block,
proposeTime time.Time) (err error) {
if err = con.checkPrepareBlock(b, proposeTime); err != nil {
return
diff --git a/core/consensus_test.go b/core/consensus_test.go
index c3f33fa..701ee00 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -127,7 +127,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() {
// to all core.Consensus objects.
broadcast := func(b *types.Block) {
for _, obj := range objs {
- req.Nil(obj.con.ProcessBlock(b))
+ req.Nil(obj.con.processBlock(b))
}
}
// Genesis blocks
@@ -152,7 +152,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() {
}
b11.Hash, err = hashBlock(b11)
s.Require().Nil(err)
- req.Nil(objs[nodes[1]].con.PrepareBlock(b11, time.Now().UTC()))
+ req.Nil(objs[nodes[1]].con.prepareBlock(b11, time.Now().UTC()))
req.Len(b11.Acks, 4)
req.Contains(b11.Acks, b00.Hash)
req.Contains(b11.Acks, b10.Hash)
@@ -168,7 +168,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() {
},
Hash: common.NewRandomHash(),
}
- req.Nil(objs[nodes[0]].con.PrepareBlock(b01, time.Now().UTC()))
+ req.Nil(objs[nodes[0]].con.prepareBlock(b01, time.Now().UTC()))
req.Len(b01.Acks, 4)
req.Contains(b01.Acks, b11.Hash)
// Setup b21.
@@ -180,7 +180,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() {
},
Hash: common.NewRandomHash(),
}
- req.Nil(objs[nodes[2]].con.PrepareBlock(b21, time.Now().UTC()))
+ req.Nil(objs[nodes[2]].con.prepareBlock(b21, time.Now().UTC()))
req.Len(b21.Acks, 4)
req.Contains(b21.Acks, b11.Hash)
// Setup b31.
@@ -192,7 +192,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() {
},
Hash: common.NewRandomHash(),
}
- req.Nil(objs[nodes[3]].con.PrepareBlock(b31, time.Now().UTC()))
+ req.Nil(objs[nodes[3]].con.prepareBlock(b31, time.Now().UTC()))
req.Len(b31.Acks, 4)
req.Contains(b31.Acks, b11.Hash)
// Broadcast other height=1 blocks.
@@ -209,7 +209,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() {
},
Hash: common.NewRandomHash(),
}
- req.Nil(objs[nodes[0]].con.PrepareBlock(b02, time.Now().UTC()))
+ req.Nil(objs[nodes[0]].con.prepareBlock(b02, time.Now().UTC()))
req.Len(b02.Acks, 3)
req.Contains(b02.Acks, b01.Hash)
req.Contains(b02.Acks, b21.Hash)
@@ -223,7 +223,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() {
},
Hash: common.NewRandomHash(),
}
- req.Nil(objs[nodes[1]].con.PrepareBlock(b12, time.Now().UTC()))
+ req.Nil(objs[nodes[1]].con.prepareBlock(b12, time.Now().UTC()))
req.Len(b12.Acks, 4)
req.Contains(b12.Acks, b01.Hash)
req.Contains(b12.Acks, b11.Hash)
@@ -238,7 +238,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() {
},
Hash: common.NewRandomHash(),
}
- req.Nil(objs[nodes[2]].con.PrepareBlock(b22, time.Now().UTC()))
+ req.Nil(objs[nodes[2]].con.prepareBlock(b22, time.Now().UTC()))
req.Len(b22.Acks, 3)
req.Contains(b22.Acks, b01.Hash)
req.Contains(b22.Acks, b21.Hash)
@@ -252,7 +252,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() {
},
Hash: common.NewRandomHash(),
}
- req.Nil(objs[nodes[3]].con.PrepareBlock(b32, time.Now().UTC()))
+ req.Nil(objs[nodes[3]].con.prepareBlock(b32, time.Now().UTC()))
req.Len(b32.Acks, 3)
req.Contains(b32.Acks, b01.Hash)
req.Contains(b32.Acks, b21.Hash)
@@ -318,10 +318,10 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() {
func (s *ConsensusTestSuite) TestPrepareBlock() {
// This test case would test these steps:
// - Add all genesis blocks into lattice.
- // - Make sure Consensus.PrepareBlock would attempt to ack
+ // - Make sure Consensus.prepareBlock would attempt to ack
// all genesis blocks.
// - Add the prepared block into lattice.
- // - Make sure Consensus.PrepareBlock would only attempt to
+ // - Make sure Consensus.prepareBlock would only attempt to
// ack the prepared block.
var (
gov, err = test.NewGovernance(4, time.Second)
@@ -350,17 +350,17 @@ func (s *ConsensusTestSuite) TestPrepareBlock() {
b30 := s.prepareGenesisBlock(nodes[3], 3, objs[nodes[3]].con)
for _, obj := range objs {
con := obj.con
- req.Nil(con.ProcessBlock(b00))
- req.Nil(con.ProcessBlock(b10))
- req.Nil(con.ProcessBlock(b20))
- req.Nil(con.ProcessBlock(b30))
+ req.Nil(con.processBlock(b00))
+ req.Nil(con.processBlock(b10))
+ req.Nil(con.processBlock(b20))
+ req.Nil(con.processBlock(b30))
}
b11 := &types.Block{
ProposerID: nodes[1],
}
// Sleep to make sure 'now' is slower than b10's timestamp.
time.Sleep(100 * time.Millisecond)
- req.Nil(objs[nodes[1]].con.PrepareBlock(b11, time.Now().UTC()))
+ req.Nil(objs[nodes[1]].con.prepareBlock(b11, time.Now().UTC()))
// Make sure we would assign 'now' to the timestamp belongs to
// the proposer.
req.True(
@@ -368,12 +368,12 @@ func (s *ConsensusTestSuite) TestPrepareBlock() {
b10.Timestamp) > 100*time.Millisecond)
for _, obj := range objs {
con := obj.con
- req.Nil(con.ProcessBlock(b11))
+ req.Nil(con.processBlock(b11))
}
b12 := &types.Block{
ProposerID: nodes[1],
}
- req.Nil(objs[nodes[1]].con.PrepareBlock(b12, time.Now().UTC()))
+ req.Nil(objs[nodes[1]].con.prepareBlock(b12, time.Now().UTC()))
req.Len(b12.Acks, 1)
req.Contains(b12.Acks, b11.Hash)
}
diff --git a/core/shard.go b/core/shard.go
new file mode 100644
index 0000000..b6a17e1
--- /dev/null
+++ b/core/shard.go
@@ -0,0 +1,197 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "sync"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus-core/common"
+ "github.com/dexon-foundation/dexon-consensus-core/core/blockdb"
+ "github.com/dexon-foundation/dexon-consensus-core/core/types"
+ "github.com/dexon-foundation/dexon-consensus-core/crypto"
+)
+
+// Shard represents a unit to produce a global ordering from multiple chains.
+type Shard struct {
+ lock sync.RWMutex
+ ID uint32
+ nodeID types.NodeID
+ prvKey crypto.PrivateKey
+ sigToPub SigToPubFn
+ chainNum uint32
+ app Application
+ db blockdb.BlockDatabase
+ pool blockPool
+ lattice *blockLattice
+ toModule *totalOrdering
+ ctModule *consensusTimestamp
+}
+
+// NewShard constructs an Shard instance.
+func NewShard(
+ ID uint32,
+ cfg *types.Config,
+ prvKey crypto.PrivateKey,
+ sigToPub SigToPubFn,
+ app Application,
+ db blockdb.BlockDatabase) (s *Shard) {
+
+ s = &Shard{
+ ID: ID,
+ nodeID: types.NewNodeID(prvKey.PublicKey()),
+ prvKey: prvKey,
+ sigToPub: sigToPub,
+ chainNum: cfg.NumChains,
+ app: app,
+ db: db,
+ pool: newBlockPool(cfg.NumChains),
+ lattice: newBlockLattice(ID, cfg.NumChains),
+ toModule: newTotalOrdering(
+ uint64(cfg.K),
+ uint64(float32(cfg.NumChains-1)*cfg.PhiRatio+1),
+ cfg.NumChains),
+ ctModule: newConsensusTimestamp(),
+ }
+ return
+}
+
+// PrepareBlock setup block's field based on current lattice status.
+func (s *Shard) PrepareBlock(
+ b *types.Block, proposeTime time.Time) (err error) {
+
+ s.lock.RLock()
+ defer s.lock.RUnlock()
+
+ s.lattice.prepareBlock(b)
+ // TODO(mission): the proposeTime might be earlier than tip block of
+ // that chain. We should let blockLattice suggest the time.
+ b.ProposerID = s.nodeID
+ b.Timestamp = proposeTime
+ b.Payload = s.app.PreparePayload(b.Position)
+ if b.Hash, err = hashBlock(b); err != nil {
+ return
+ }
+ if b.Signature, err = s.prvKey.Sign(b.Hash); err != nil {
+ return
+ }
+ return
+}
+
+// SanityCheck check if a block is valid based on current lattice status.
+//
+// If some acking blocks don't exists, Shard would help to cache this block
+// and retry when lattice updated in Shard.ProcessBlock.
+func (s *Shard) SanityCheck(b *types.Block) (err error) {
+ // Check block.Position.
+ if b.Position.ShardID != s.ID {
+ err = ErrIncorrectBlockPosition
+ return
+ }
+ // Check the hash of block.
+ hash, err := hashBlock(b)
+ if err != nil || hash != b.Hash {
+ err = ErrIncorrectHash
+ return
+ }
+ // Check the signer.
+ pubKey, err := s.sigToPub(b.Hash, b.Signature)
+ if err != nil {
+ return
+ }
+ if !b.ProposerID.Equal(crypto.Keccak256Hash(pubKey.Bytes())) {
+ err = ErrIncorrectSignature
+ return
+ }
+ s.lock.RLock()
+ defer s.lock.RUnlock()
+ if err = s.lattice.sanityCheck(b); err != nil {
+ // Add to block pool, once the lattice updated,
+ // would be checked again.
+ if err == ErrAckingBlockNotExists {
+ s.pool.addBlock(b)
+ }
+ return
+ }
+ return
+}
+
+// ProcessBlock adds a block into lattice, and deliver ordered blocks.
+// If any block pass sanity check after this block add into lattice, they
+// would be returned, too.
+//
+// NOTE: assume the block passed sanity check.
+func (s *Shard) ProcessBlock(
+ input *types.Block) (verified, delivered []*types.Block, err error) {
+
+ var (
+ tip, b *types.Block
+ toDelivered []*types.Block
+ inLattice []*types.Block
+ earlyDelivered bool
+ )
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ if inLattice, err = s.lattice.addBlock(input); err != nil {
+ return
+ }
+ if err = s.db.Put(*input); err != nil {
+ return
+ }
+ // TODO(mission): remove this hack, BA related stuffs should not
+ // be done here.
+ s.app.StronglyAcked(input.Hash)
+ s.app.BlockConfirmed(input)
+ // Purge blocks in pool with the same chainID and lower height.
+ s.pool.purgeBlocks(input.Position.ChainID, input.Position.Height)
+ // Replay tips in pool to check their validity.
+ for i := uint32(0); i < s.chainNum; i++ {
+ if tip = s.pool.tip(i); tip == nil {
+ continue
+ }
+ err = s.lattice.sanityCheck(tip)
+ if err == nil {
+ verified = append(verified, tip)
+ }
+ if err == ErrAckingBlockNotExists {
+ continue
+ }
+ s.pool.removeTip(i)
+ }
+ // Perform total ordering for each block added to lattice.
+ for _, b = range inLattice {
+ toDelivered, earlyDelivered, err = s.toModule.processBlock(b)
+ if err != nil {
+ return
+ }
+ if len(toDelivered) == 0 {
+ continue
+ }
+ hashes := make(common.Hashes, len(toDelivered))
+ for idx := range toDelivered {
+ hashes[idx] = toDelivered[idx].Hash
+ }
+ s.app.TotalOrderingDeliver(hashes, earlyDelivered)
+ // Perform timestamp generation.
+ if err = s.ctModule.processBlocks(toDelivered); err != nil {
+ return
+ }
+ delivered = append(delivered, toDelivered...)
+ }
+ return
+}
diff --git a/core/shard_test.go b/core/shard_test.go
new file mode 100644
index 0000000..db341a3
--- /dev/null
+++ b/core/shard_test.go
@@ -0,0 +1,192 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus-core/core/blockdb"
+ "github.com/dexon-foundation/dexon-consensus-core/core/test"
+ "github.com/dexon-foundation/dexon-consensus-core/core/types"
+ "github.com/dexon-foundation/dexon-consensus-core/crypto/eth"
+ "github.com/stretchr/testify/suite"
+)
+
+// testShardMgr wraps compaction chain and shard.
+type testShardMgr struct {
+ shard *Shard
+ ccModule *compactionChain
+ app *test.App
+ db blockdb.BlockDatabase
+}
+
+func (mgr *testShardMgr) prepareBlock(
+ chainID uint32) (b *types.Block, err error) {
+
+ b = &types.Block{
+ Position: types.Position{
+ ChainID: chainID,
+ }}
+ err = mgr.shard.PrepareBlock(b, time.Now().UTC())
+ return
+}
+
+// Process describes the usage of Shard.ProcessBlock.
+func (mgr *testShardMgr) processBlock(b *types.Block) (err error) {
+ var (
+ delivered []*types.Block
+ verified []*types.Block
+ pendings = []*types.Block{b}
+ )
+ if err = mgr.shard.SanityCheck(b); err != nil {
+ if err == ErrAckingBlockNotExists {
+ err = nil
+ }
+ return
+ }
+ for {
+ if len(pendings) == 0 {
+ break
+ }
+ b, pendings = pendings[0], pendings[1:]
+ if verified, delivered, err = mgr.shard.ProcessBlock(b); err != nil {
+ return
+ }
+ // Deliver blocks.
+ for _, b = range delivered {
+ if err = mgr.ccModule.processBlock(b); err != nil {
+ return
+ }
+ if err = mgr.db.Update(*b); err != nil {
+ return
+ }
+ mgr.app.DeliverBlock(b.Hash, b.Witness.Timestamp)
+ }
+ // Update pending blocks for verified block (pass sanity check).
+ pendings = append(pendings, verified...)
+ }
+ return
+}
+
+type ShardTestSuite struct {
+ suite.Suite
+}
+
+func (s *ShardTestSuite) newTestShardMgr(cfg *types.Config) *testShardMgr {
+ var req = s.Require()
+ // Setup private key.
+ prvKey, err := eth.NewPrivateKey()
+ req.Nil(err)
+ // Setup blockdb.
+ db, err := blockdb.NewMemBackedBlockDB()
+ req.Nil(err)
+ // Setup application.
+ app := test.NewApp()
+ // Setup shard.
+ return &testShardMgr{
+ ccModule: newCompactionChain(db, eth.SigToPub),
+ app: app,
+ db: db,
+ shard: NewShard(
+ uint32(0),
+ cfg,
+ prvKey,
+ eth.SigToPub,
+ app,
+ db)}
+}
+
+func (s *ShardTestSuite) TestBasicUsage() {
+ // One shard prepare blocks on chains randomly selected each time
+ // and process it. Those generated blocks and kept into a buffer, and
+ // process by other shard instances with random order.
+ var (
+ blockNum = 100
+ chainNum = uint32(19)
+ otherShardNum = 20
+ req = s.Require()
+ err error
+ cfg = types.Config{
+ NumChains: chainNum,
+ PhiRatio: float32(2) / float32(3),
+ K: 0,
+ }
+ master = s.newTestShardMgr(&cfg)
+ apps = []*test.App{master.app}
+ revealSeq = map[string]struct{}{}
+ )
+ // Master-shard generates blocks.
+ for i := uint32(0); i < chainNum; i++ {
+ // Produce genesis blocks should be delivered before all other blocks,
+ // or the consensus time would be wrong.
+ b, err := master.prepareBlock(i)
+ req.NotNil(b)
+ req.Nil(err)
+ // We've ignored the error for "acking blocks don't exist".
+ req.Nil(master.processBlock(b))
+ }
+ for i := 0; i < (blockNum - int(chainNum)); i++ {
+ b, err := master.prepareBlock(uint32(rand.Intn(int(chainNum))))
+ req.NotNil(b)
+ req.Nil(err)
+ // We've ignored the error for "acking blocks don't exist".
+ req.Nil(master.processBlock(b))
+ }
+ // Now we have some blocks, replay them on different shards.
+ iter, err := master.db.GetAll()
+ req.Nil(err)
+ revealer, err := test.NewRandomRevealer(iter)
+ req.Nil(err)
+ for i := 0; i < otherShardNum; i++ {
+ revealer.Reset()
+ revealed := ""
+ other := s.newTestShardMgr(&cfg)
+ for {
+ b, err := revealer.Next()
+ if err != nil {
+ if err == blockdb.ErrIterationFinished {
+ err = nil
+ break
+ }
+ }
+ req.Nil(err)
+ req.Nil(other.processBlock(&b))
+ revealed += b.Hash.String() + ","
+ revealSeq[revealed] = struct{}{}
+ }
+ apps = append(apps, other.app)
+ }
+ // Make sure not only one revealing sequence.
+ req.True(len(revealSeq) > 1)
+ // Make sure nothing goes wrong.
+ for i, app := range apps {
+ req.Nil(app.Verify())
+ for j, otherApp := range apps {
+ if i >= j {
+ continue
+ }
+ req.Nil(app.Compare(otherApp))
+ }
+ }
+}
+
+func TestShard(t *testing.T) {
+ suite.Run(t, new(ShardTestSuite))
+}
diff --git a/core/test/governance.go b/core/test/governance.go
index a2e6f69..63462d0 100644
--- a/core/test/governance.go
+++ b/core/test/governance.go
@@ -65,8 +65,13 @@ func NewGovernance(nodeCount int, lambda time.Duration) (
// GetNotarySet implements Governance interface to return current
// notary set.
-func (g *Governance) GetNotarySet() map[types.NodeID]struct{} {
- return g.notarySet
+func (g *Governance) GetNotarySet() (ret map[types.NodeID]struct{}) {
+ // Return a cloned map.
+ ret = make(map[types.NodeID]struct{})
+ for k := range g.notarySet {
+ ret[k] = struct{}{}
+ }
+ return
}
// GetConfiguration returns the configuration at a given block height.
diff --git a/integration_test/network.go b/integration_test/network.go
deleted file mode 100644
index 1c7265d..0000000
--- a/integration_test/network.go
+++ /dev/null
@@ -1,47 +0,0 @@
-// Copyright 2018 The dexon-consensus-core Authors
-// This file is part of the dexon-consensus-core library.
-//
-// The dexon-consensus-core library is free software: you can redistribute it
-// and/or modify it under the terms of the GNU Lesser General Public License as
-// published by the Free Software Foundation, either version 3 of the License,
-// or (at your option) any later version.
-//
-// The dexon-consensus-core library is distributed in the hope that it will be
-// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
-// General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the dexon-consensus-core library. If not, see
-// <http://www.gnu.org/licenses/>.
-
-package integration
-
-import (
- "github.com/dexon-foundation/dexon-consensus-core/core/types"
-)
-
-// Network implements core.Network.
-type Network struct {
-}
-
-// BroadcastVote broadcasts vote to all nodes in DEXON network.
-func (n *Network) BroadcastVote(vote *types.Vote) {}
-
-// BroadcastBlock broadcasts block to all nodes in DEXON network.
-func (n *Network) BroadcastBlock(block *types.Block) {
-}
-
-// BroadcastWitnessAck broadcasts witnessAck to all nodes in DEXON network.
-func (n *Network) BroadcastWitnessAck(witnessAck *types.WitnessAck) {
-}
-
-// SendDKGPrivateShare sends PrivateShare to a DKG participant.
-func (n *Network) SendDKGPrivateShare(
- recv types.NodeID, prvShare *types.DKGPrivateShare) {
-}
-
-// ReceiveChan returns a channel to receive messages from DEXON network.
-func (n *Network) ReceiveChan() <-chan interface{} {
- return make(chan interface{})
-}
diff --git a/integration_test/node.go b/integration_test/node.go
index c0e226b..3193d99 100644
--- a/integration_test/node.go
+++ b/integration_test/node.go
@@ -19,6 +19,7 @@ package integration
import (
"fmt"
+ "math"
"sort"
"time"
@@ -65,44 +66,59 @@ func NewReceiveBlockEvent(
// Node is designed to work with test.Scheduler.
type Node struct {
ID types.NodeID
+ chainNum uint32
chainID uint32
- cons *core.Consensus
- gov core.Governance
+ shard *core.Shard
+ app *test.App
+ db blockdb.BlockDatabase
+ broadcastTargets map[types.NodeID]struct{}
networkLatency test.LatencyModel
proposingLatency test.LatencyModel
}
// NewNode constructs an instance of Node.
func NewNode(
- app core.Application,
+ app *test.App,
gov core.Governance,
db blockdb.BlockDatabase,
privateKey crypto.PrivateKey,
- nID types.NodeID,
networkLatency test.LatencyModel,
proposingLatency test.LatencyModel) *Node {
- hashes := make(common.Hashes, 0)
- for nID := range gov.GetNotarySet() {
+ var (
+ shardID = uint32(0)
+ chainID = uint32(math.MaxUint32)
+ governanceConfig = gov.GetConfiguration(0)
+ broadcastTargets = gov.GetNotarySet()
+ nodeID = types.NewNodeID(privateKey.PublicKey())
+ )
+ hashes := common.Hashes{}
+ for nID := range broadcastTargets {
hashes = append(hashes, nID.Hash)
}
sort.Sort(hashes)
- chainID := uint32(0)
- for i, hash := range hashes {
- if hash == nID.Hash {
+ for i, h := range hashes {
+ if h == nodeID.Hash {
chainID = uint32(i)
- break
}
}
-
+ delete(broadcastTargets, nodeID)
return &Node{
- ID: nID,
+ ID: nodeID,
chainID: chainID,
- gov: gov,
+ chainNum: governanceConfig.NumChains,
+ broadcastTargets: broadcastTargets,
networkLatency: networkLatency,
proposingLatency: proposingLatency,
- cons: core.NewConsensus(
- app, gov, db, &Network{}, privateKey, eth.SigToPub),
+ app: app,
+ db: db,
+ shard: core.NewShard(
+ shardID,
+ governanceConfig,
+ privateKey,
+ eth.SigToPub,
+ app,
+ db),
}
}
@@ -120,27 +136,18 @@ func (n *Node) Handle(e *test.Event) (events []*test.Event) {
return
}
-func (n *Node) handleProposeBlock(when time.Time, piggyback interface{}) (
+func (n *Node) handleProposeBlock(when time.Time, _ interface{}) (
events []*test.Event, err error) {
- b := &types.Block{
- ProposerID: n.ID,
- Position: types.Position{
- ChainID: n.chainID,
- },
- }
- defer types.RecycleBlock(b)
- if err = n.cons.PrepareBlock(b, when); err != nil {
+ b, err := n.prepareBlock(when)
+ if err != nil {
return
}
- if err = n.cons.ProcessBlock(b); err != nil {
+ if err = n.processBlock(b); err != nil {
return
}
// Create 'block received' event for each other nodes.
- for nID := range n.gov.GetNotarySet() {
- if nID == n.ID {
- continue
- }
+ for nID := range n.broadcastTargets {
events = append(events, NewReceiveBlockEvent(
nID, when.Add(n.networkLatency.Delay()), b.Clone()))
}
@@ -153,9 +160,53 @@ func (n *Node) handleProposeBlock(when time.Time, piggyback interface{}) (
func (n *Node) handleReceiveBlock(piggyback interface{}) (
events []*test.Event, err error) {
- err = n.cons.ProcessBlock(piggyback.(*types.Block))
+ err = n.processBlock(piggyback.(*types.Block))
if err != nil {
panic(err)
}
return
}
+
+func (n *Node) prepareBlock(when time.Time) (b *types.Block, err error) {
+ b = &types.Block{
+ Position: types.Position{
+ ChainID: n.chainID,
+ }}
+ err = n.shard.PrepareBlock(b, when)
+ return
+}
+
+func (n *Node) processBlock(b *types.Block) (err error) {
+ // TODO(mission): this segment of code is identical to testShardMgr in
+ // core/shard_test.go, except the compaction-chain part.
+ var (
+ delivered []*types.Block
+ verified []*types.Block
+ pendings = []*types.Block{b}
+ )
+ if err = n.shard.SanityCheck(b); err != nil {
+ if err == core.ErrAckingBlockNotExists {
+ err = nil
+ }
+ return
+ }
+ for {
+ if len(pendings) == 0 {
+ break
+ }
+ b, pendings = pendings[0], pendings[1:]
+ if verified, delivered, err = n.shard.ProcessBlock(b); err != nil {
+ return
+ }
+ // Deliver blocks.
+ for _, b = range delivered {
+ if err = n.db.Update(*b); err != nil {
+ return
+ }
+ n.app.DeliverBlock(b.Hash, b.Witness.Timestamp)
+ }
+ // Update pending blocks for verified block (pass sanity check).
+ pendings = append(pendings, verified...)
+ }
+ return
+}
diff --git a/integration_test/utils.go b/integration_test/utils.go
index f95d771..7259ced 100644
--- a/integration_test/utils.go
+++ b/integration_test/utils.go
@@ -48,7 +48,6 @@ func PrepareNodes(
gov,
dbs[nID],
key,
- nID,
networkLatency,
proposingLatency)
}