From 9d99c27b7261f8228cc0a5a496be6ac50e03abf2 Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Fri, 21 Sep 2018 17:28:25 +0800 Subject: core: add shard (#127) A shard is basically DEXON v1 components, except the strongly acked part, including: - maintaining lattice structure - total ordering - generate consensus timestamp --- core/consensus.go | 18 ++-- core/consensus_test.go | 36 ++++---- core/shard.go | 197 ++++++++++++++++++++++++++++++++++++++++++++ core/shard_test.go | 192 ++++++++++++++++++++++++++++++++++++++++++ core/test/governance.go | 9 +- integration_test/network.go | 47 ----------- integration_test/node.go | 111 ++++++++++++++++++------- integration_test/utils.go | 1 - 8 files changed, 504 insertions(+), 107 deletions(-) create mode 100644 core/shard.go create mode 100644 core/shard_test.go delete mode 100644 integration_test/network.go diff --git a/core/consensus.go b/core/consensus.go index 33a3d7f..7700296 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -104,7 +104,7 @@ func (recv *consensusReceiver) ConfirmBlock(hash common.Hash) { log.Println(ErrUnknownBlockConfirmed, hash) return } - if err := recv.consensus.ProcessBlock(block); err != nil { + if err := recv.consensus.processBlock(block); err != nil { log.Println(err) return } @@ -273,7 +273,7 @@ BALoop: // RunLegacy starts running Legacy DEXON Consensus. func (con *Consensus) RunLegacy() { - go con.processMsg(con.network.ReceiveChan(), con.ProcessBlock) + go con.processMsg(con.network.ReceiveChan(), con.processBlock) go con.processWitnessData() chainID := uint32(0) @@ -299,7 +299,7 @@ func (con *Consensus) RunLegacy() { if err := con.PrepareGenesisBlock(genesisBlock, time.Now().UTC()); err != nil { log.Println(err) } - if err := con.ProcessBlock(genesisBlock); err != nil { + if err := con.processBlock(genesisBlock); err != nil { log.Println(err) } con.network.BroadcastBlock(genesisBlock) @@ -317,10 +317,10 @@ ProposingBlockLoop: ChainID: chainID, }, } - if err := con.PrepareBlock(block, time.Now().UTC()); err != nil { + if err := con.prepareBlock(block, time.Now().UTC()); err != nil { log.Println(err) } - if err := con.ProcessBlock(block); err != nil { + if err := con.processBlock(block); err != nil { log.Println(err) } con.network.BroadcastBlock(block) @@ -368,7 +368,7 @@ func (con *Consensus) proposeBlock(chainID uint32) *types.Block { Height: con.rbModule.nextHeight(chainID), }, } - if err := con.PrepareBlock(block, time.Now().UTC()); err != nil { + if err := con.prepareBlock(block, time.Now().UTC()); err != nil { log.Println(err) return nil } @@ -464,8 +464,8 @@ func (con *Consensus) PreProcessBlock(b *types.Block) (err error) { return } -// ProcessBlock is the entry point to submit one block to a Consensus instance. -func (con *Consensus) ProcessBlock(block *types.Block) (err error) { +// processBlock is the entry point to submit one block to a Consensus instance. +func (con *Consensus) processBlock(block *types.Block) (err error) { if err := con.sanityCheck(block); err != nil { return err } @@ -538,7 +538,7 @@ func (con *Consensus) checkPrepareBlock( } // PrepareBlock would setup header fields of block based on its ProposerID. -func (con *Consensus) PrepareBlock(b *types.Block, +func (con *Consensus) prepareBlock(b *types.Block, proposeTime time.Time) (err error) { if err = con.checkPrepareBlock(b, proposeTime); err != nil { return diff --git a/core/consensus_test.go b/core/consensus_test.go index c3f33fa..701ee00 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -127,7 +127,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { // to all core.Consensus objects. broadcast := func(b *types.Block) { for _, obj := range objs { - req.Nil(obj.con.ProcessBlock(b)) + req.Nil(obj.con.processBlock(b)) } } // Genesis blocks @@ -152,7 +152,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { } b11.Hash, err = hashBlock(b11) s.Require().Nil(err) - req.Nil(objs[nodes[1]].con.PrepareBlock(b11, time.Now().UTC())) + req.Nil(objs[nodes[1]].con.prepareBlock(b11, time.Now().UTC())) req.Len(b11.Acks, 4) req.Contains(b11.Acks, b00.Hash) req.Contains(b11.Acks, b10.Hash) @@ -168,7 +168,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { }, Hash: common.NewRandomHash(), } - req.Nil(objs[nodes[0]].con.PrepareBlock(b01, time.Now().UTC())) + req.Nil(objs[nodes[0]].con.prepareBlock(b01, time.Now().UTC())) req.Len(b01.Acks, 4) req.Contains(b01.Acks, b11.Hash) // Setup b21. @@ -180,7 +180,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { }, Hash: common.NewRandomHash(), } - req.Nil(objs[nodes[2]].con.PrepareBlock(b21, time.Now().UTC())) + req.Nil(objs[nodes[2]].con.prepareBlock(b21, time.Now().UTC())) req.Len(b21.Acks, 4) req.Contains(b21.Acks, b11.Hash) // Setup b31. @@ -192,7 +192,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { }, Hash: common.NewRandomHash(), } - req.Nil(objs[nodes[3]].con.PrepareBlock(b31, time.Now().UTC())) + req.Nil(objs[nodes[3]].con.prepareBlock(b31, time.Now().UTC())) req.Len(b31.Acks, 4) req.Contains(b31.Acks, b11.Hash) // Broadcast other height=1 blocks. @@ -209,7 +209,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { }, Hash: common.NewRandomHash(), } - req.Nil(objs[nodes[0]].con.PrepareBlock(b02, time.Now().UTC())) + req.Nil(objs[nodes[0]].con.prepareBlock(b02, time.Now().UTC())) req.Len(b02.Acks, 3) req.Contains(b02.Acks, b01.Hash) req.Contains(b02.Acks, b21.Hash) @@ -223,7 +223,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { }, Hash: common.NewRandomHash(), } - req.Nil(objs[nodes[1]].con.PrepareBlock(b12, time.Now().UTC())) + req.Nil(objs[nodes[1]].con.prepareBlock(b12, time.Now().UTC())) req.Len(b12.Acks, 4) req.Contains(b12.Acks, b01.Hash) req.Contains(b12.Acks, b11.Hash) @@ -238,7 +238,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { }, Hash: common.NewRandomHash(), } - req.Nil(objs[nodes[2]].con.PrepareBlock(b22, time.Now().UTC())) + req.Nil(objs[nodes[2]].con.prepareBlock(b22, time.Now().UTC())) req.Len(b22.Acks, 3) req.Contains(b22.Acks, b01.Hash) req.Contains(b22.Acks, b21.Hash) @@ -252,7 +252,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { }, Hash: common.NewRandomHash(), } - req.Nil(objs[nodes[3]].con.PrepareBlock(b32, time.Now().UTC())) + req.Nil(objs[nodes[3]].con.prepareBlock(b32, time.Now().UTC())) req.Len(b32.Acks, 3) req.Contains(b32.Acks, b01.Hash) req.Contains(b32.Acks, b21.Hash) @@ -318,10 +318,10 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { func (s *ConsensusTestSuite) TestPrepareBlock() { // This test case would test these steps: // - Add all genesis blocks into lattice. - // - Make sure Consensus.PrepareBlock would attempt to ack + // - Make sure Consensus.prepareBlock would attempt to ack // all genesis blocks. // - Add the prepared block into lattice. - // - Make sure Consensus.PrepareBlock would only attempt to + // - Make sure Consensus.prepareBlock would only attempt to // ack the prepared block. var ( gov, err = test.NewGovernance(4, time.Second) @@ -350,17 +350,17 @@ func (s *ConsensusTestSuite) TestPrepareBlock() { b30 := s.prepareGenesisBlock(nodes[3], 3, objs[nodes[3]].con) for _, obj := range objs { con := obj.con - req.Nil(con.ProcessBlock(b00)) - req.Nil(con.ProcessBlock(b10)) - req.Nil(con.ProcessBlock(b20)) - req.Nil(con.ProcessBlock(b30)) + req.Nil(con.processBlock(b00)) + req.Nil(con.processBlock(b10)) + req.Nil(con.processBlock(b20)) + req.Nil(con.processBlock(b30)) } b11 := &types.Block{ ProposerID: nodes[1], } // Sleep to make sure 'now' is slower than b10's timestamp. time.Sleep(100 * time.Millisecond) - req.Nil(objs[nodes[1]].con.PrepareBlock(b11, time.Now().UTC())) + req.Nil(objs[nodes[1]].con.prepareBlock(b11, time.Now().UTC())) // Make sure we would assign 'now' to the timestamp belongs to // the proposer. req.True( @@ -368,12 +368,12 @@ func (s *ConsensusTestSuite) TestPrepareBlock() { b10.Timestamp) > 100*time.Millisecond) for _, obj := range objs { con := obj.con - req.Nil(con.ProcessBlock(b11)) + req.Nil(con.processBlock(b11)) } b12 := &types.Block{ ProposerID: nodes[1], } - req.Nil(objs[nodes[1]].con.PrepareBlock(b12, time.Now().UTC())) + req.Nil(objs[nodes[1]].con.prepareBlock(b12, time.Now().UTC())) req.Len(b12.Acks, 1) req.Contains(b12.Acks, b11.Hash) } diff --git a/core/shard.go b/core/shard.go new file mode 100644 index 0000000..b6a17e1 --- /dev/null +++ b/core/shard.go @@ -0,0 +1,197 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// . + +package core + +import ( + "sync" + "time" + + "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/blockdb" + "github.com/dexon-foundation/dexon-consensus-core/core/types" + "github.com/dexon-foundation/dexon-consensus-core/crypto" +) + +// Shard represents a unit to produce a global ordering from multiple chains. +type Shard struct { + lock sync.RWMutex + ID uint32 + nodeID types.NodeID + prvKey crypto.PrivateKey + sigToPub SigToPubFn + chainNum uint32 + app Application + db blockdb.BlockDatabase + pool blockPool + lattice *blockLattice + toModule *totalOrdering + ctModule *consensusTimestamp +} + +// NewShard constructs an Shard instance. +func NewShard( + ID uint32, + cfg *types.Config, + prvKey crypto.PrivateKey, + sigToPub SigToPubFn, + app Application, + db blockdb.BlockDatabase) (s *Shard) { + + s = &Shard{ + ID: ID, + nodeID: types.NewNodeID(prvKey.PublicKey()), + prvKey: prvKey, + sigToPub: sigToPub, + chainNum: cfg.NumChains, + app: app, + db: db, + pool: newBlockPool(cfg.NumChains), + lattice: newBlockLattice(ID, cfg.NumChains), + toModule: newTotalOrdering( + uint64(cfg.K), + uint64(float32(cfg.NumChains-1)*cfg.PhiRatio+1), + cfg.NumChains), + ctModule: newConsensusTimestamp(), + } + return +} + +// PrepareBlock setup block's field based on current lattice status. +func (s *Shard) PrepareBlock( + b *types.Block, proposeTime time.Time) (err error) { + + s.lock.RLock() + defer s.lock.RUnlock() + + s.lattice.prepareBlock(b) + // TODO(mission): the proposeTime might be earlier than tip block of + // that chain. We should let blockLattice suggest the time. + b.ProposerID = s.nodeID + b.Timestamp = proposeTime + b.Payload = s.app.PreparePayload(b.Position) + if b.Hash, err = hashBlock(b); err != nil { + return + } + if b.Signature, err = s.prvKey.Sign(b.Hash); err != nil { + return + } + return +} + +// SanityCheck check if a block is valid based on current lattice status. +// +// If some acking blocks don't exists, Shard would help to cache this block +// and retry when lattice updated in Shard.ProcessBlock. +func (s *Shard) SanityCheck(b *types.Block) (err error) { + // Check block.Position. + if b.Position.ShardID != s.ID { + err = ErrIncorrectBlockPosition + return + } + // Check the hash of block. + hash, err := hashBlock(b) + if err != nil || hash != b.Hash { + err = ErrIncorrectHash + return + } + // Check the signer. + pubKey, err := s.sigToPub(b.Hash, b.Signature) + if err != nil { + return + } + if !b.ProposerID.Equal(crypto.Keccak256Hash(pubKey.Bytes())) { + err = ErrIncorrectSignature + return + } + s.lock.RLock() + defer s.lock.RUnlock() + if err = s.lattice.sanityCheck(b); err != nil { + // Add to block pool, once the lattice updated, + // would be checked again. + if err == ErrAckingBlockNotExists { + s.pool.addBlock(b) + } + return + } + return +} + +// ProcessBlock adds a block into lattice, and deliver ordered blocks. +// If any block pass sanity check after this block add into lattice, they +// would be returned, too. +// +// NOTE: assume the block passed sanity check. +func (s *Shard) ProcessBlock( + input *types.Block) (verified, delivered []*types.Block, err error) { + + var ( + tip, b *types.Block + toDelivered []*types.Block + inLattice []*types.Block + earlyDelivered bool + ) + s.lock.Lock() + defer s.lock.Unlock() + if inLattice, err = s.lattice.addBlock(input); err != nil { + return + } + if err = s.db.Put(*input); err != nil { + return + } + // TODO(mission): remove this hack, BA related stuffs should not + // be done here. + s.app.StronglyAcked(input.Hash) + s.app.BlockConfirmed(input) + // Purge blocks in pool with the same chainID and lower height. + s.pool.purgeBlocks(input.Position.ChainID, input.Position.Height) + // Replay tips in pool to check their validity. + for i := uint32(0); i < s.chainNum; i++ { + if tip = s.pool.tip(i); tip == nil { + continue + } + err = s.lattice.sanityCheck(tip) + if err == nil { + verified = append(verified, tip) + } + if err == ErrAckingBlockNotExists { + continue + } + s.pool.removeTip(i) + } + // Perform total ordering for each block added to lattice. + for _, b = range inLattice { + toDelivered, earlyDelivered, err = s.toModule.processBlock(b) + if err != nil { + return + } + if len(toDelivered) == 0 { + continue + } + hashes := make(common.Hashes, len(toDelivered)) + for idx := range toDelivered { + hashes[idx] = toDelivered[idx].Hash + } + s.app.TotalOrderingDeliver(hashes, earlyDelivered) + // Perform timestamp generation. + if err = s.ctModule.processBlocks(toDelivered); err != nil { + return + } + delivered = append(delivered, toDelivered...) + } + return +} diff --git a/core/shard_test.go b/core/shard_test.go new file mode 100644 index 0000000..db341a3 --- /dev/null +++ b/core/shard_test.go @@ -0,0 +1,192 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// . + +package core + +import ( + "math/rand" + "testing" + "time" + + "github.com/dexon-foundation/dexon-consensus-core/core/blockdb" + "github.com/dexon-foundation/dexon-consensus-core/core/test" + "github.com/dexon-foundation/dexon-consensus-core/core/types" + "github.com/dexon-foundation/dexon-consensus-core/crypto/eth" + "github.com/stretchr/testify/suite" +) + +// testShardMgr wraps compaction chain and shard. +type testShardMgr struct { + shard *Shard + ccModule *compactionChain + app *test.App + db blockdb.BlockDatabase +} + +func (mgr *testShardMgr) prepareBlock( + chainID uint32) (b *types.Block, err error) { + + b = &types.Block{ + Position: types.Position{ + ChainID: chainID, + }} + err = mgr.shard.PrepareBlock(b, time.Now().UTC()) + return +} + +// Process describes the usage of Shard.ProcessBlock. +func (mgr *testShardMgr) processBlock(b *types.Block) (err error) { + var ( + delivered []*types.Block + verified []*types.Block + pendings = []*types.Block{b} + ) + if err = mgr.shard.SanityCheck(b); err != nil { + if err == ErrAckingBlockNotExists { + err = nil + } + return + } + for { + if len(pendings) == 0 { + break + } + b, pendings = pendings[0], pendings[1:] + if verified, delivered, err = mgr.shard.ProcessBlock(b); err != nil { + return + } + // Deliver blocks. + for _, b = range delivered { + if err = mgr.ccModule.processBlock(b); err != nil { + return + } + if err = mgr.db.Update(*b); err != nil { + return + } + mgr.app.DeliverBlock(b.Hash, b.Witness.Timestamp) + } + // Update pending blocks for verified block (pass sanity check). + pendings = append(pendings, verified...) + } + return +} + +type ShardTestSuite struct { + suite.Suite +} + +func (s *ShardTestSuite) newTestShardMgr(cfg *types.Config) *testShardMgr { + var req = s.Require() + // Setup private key. + prvKey, err := eth.NewPrivateKey() + req.Nil(err) + // Setup blockdb. + db, err := blockdb.NewMemBackedBlockDB() + req.Nil(err) + // Setup application. + app := test.NewApp() + // Setup shard. + return &testShardMgr{ + ccModule: newCompactionChain(db, eth.SigToPub), + app: app, + db: db, + shard: NewShard( + uint32(0), + cfg, + prvKey, + eth.SigToPub, + app, + db)} +} + +func (s *ShardTestSuite) TestBasicUsage() { + // One shard prepare blocks on chains randomly selected each time + // and process it. Those generated blocks and kept into a buffer, and + // process by other shard instances with random order. + var ( + blockNum = 100 + chainNum = uint32(19) + otherShardNum = 20 + req = s.Require() + err error + cfg = types.Config{ + NumChains: chainNum, + PhiRatio: float32(2) / float32(3), + K: 0, + } + master = s.newTestShardMgr(&cfg) + apps = []*test.App{master.app} + revealSeq = map[string]struct{}{} + ) + // Master-shard generates blocks. + for i := uint32(0); i < chainNum; i++ { + // Produce genesis blocks should be delivered before all other blocks, + // or the consensus time would be wrong. + b, err := master.prepareBlock(i) + req.NotNil(b) + req.Nil(err) + // We've ignored the error for "acking blocks don't exist". + req.Nil(master.processBlock(b)) + } + for i := 0; i < (blockNum - int(chainNum)); i++ { + b, err := master.prepareBlock(uint32(rand.Intn(int(chainNum)))) + req.NotNil(b) + req.Nil(err) + // We've ignored the error for "acking blocks don't exist". + req.Nil(master.processBlock(b)) + } + // Now we have some blocks, replay them on different shards. + iter, err := master.db.GetAll() + req.Nil(err) + revealer, err := test.NewRandomRevealer(iter) + req.Nil(err) + for i := 0; i < otherShardNum; i++ { + revealer.Reset() + revealed := "" + other := s.newTestShardMgr(&cfg) + for { + b, err := revealer.Next() + if err != nil { + if err == blockdb.ErrIterationFinished { + err = nil + break + } + } + req.Nil(err) + req.Nil(other.processBlock(&b)) + revealed += b.Hash.String() + "," + revealSeq[revealed] = struct{}{} + } + apps = append(apps, other.app) + } + // Make sure not only one revealing sequence. + req.True(len(revealSeq) > 1) + // Make sure nothing goes wrong. + for i, app := range apps { + req.Nil(app.Verify()) + for j, otherApp := range apps { + if i >= j { + continue + } + req.Nil(app.Compare(otherApp)) + } + } +} + +func TestShard(t *testing.T) { + suite.Run(t, new(ShardTestSuite)) +} diff --git a/core/test/governance.go b/core/test/governance.go index a2e6f69..63462d0 100644 --- a/core/test/governance.go +++ b/core/test/governance.go @@ -65,8 +65,13 @@ func NewGovernance(nodeCount int, lambda time.Duration) ( // GetNotarySet implements Governance interface to return current // notary set. -func (g *Governance) GetNotarySet() map[types.NodeID]struct{} { - return g.notarySet +func (g *Governance) GetNotarySet() (ret map[types.NodeID]struct{}) { + // Return a cloned map. + ret = make(map[types.NodeID]struct{}) + for k := range g.notarySet { + ret[k] = struct{}{} + } + return } // GetConfiguration returns the configuration at a given block height. diff --git a/integration_test/network.go b/integration_test/network.go deleted file mode 100644 index 1c7265d..0000000 --- a/integration_test/network.go +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright 2018 The dexon-consensus-core Authors -// This file is part of the dexon-consensus-core library. -// -// The dexon-consensus-core library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus-core library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus-core library. If not, see -// . - -package integration - -import ( - "github.com/dexon-foundation/dexon-consensus-core/core/types" -) - -// Network implements core.Network. -type Network struct { -} - -// BroadcastVote broadcasts vote to all nodes in DEXON network. -func (n *Network) BroadcastVote(vote *types.Vote) {} - -// BroadcastBlock broadcasts block to all nodes in DEXON network. -func (n *Network) BroadcastBlock(block *types.Block) { -} - -// BroadcastWitnessAck broadcasts witnessAck to all nodes in DEXON network. -func (n *Network) BroadcastWitnessAck(witnessAck *types.WitnessAck) { -} - -// SendDKGPrivateShare sends PrivateShare to a DKG participant. -func (n *Network) SendDKGPrivateShare( - recv types.NodeID, prvShare *types.DKGPrivateShare) { -} - -// ReceiveChan returns a channel to receive messages from DEXON network. -func (n *Network) ReceiveChan() <-chan interface{} { - return make(chan interface{}) -} diff --git a/integration_test/node.go b/integration_test/node.go index c0e226b..3193d99 100644 --- a/integration_test/node.go +++ b/integration_test/node.go @@ -19,6 +19,7 @@ package integration import ( "fmt" + "math" "sort" "time" @@ -65,44 +66,59 @@ func NewReceiveBlockEvent( // Node is designed to work with test.Scheduler. type Node struct { ID types.NodeID + chainNum uint32 chainID uint32 - cons *core.Consensus - gov core.Governance + shard *core.Shard + app *test.App + db blockdb.BlockDatabase + broadcastTargets map[types.NodeID]struct{} networkLatency test.LatencyModel proposingLatency test.LatencyModel } // NewNode constructs an instance of Node. func NewNode( - app core.Application, + app *test.App, gov core.Governance, db blockdb.BlockDatabase, privateKey crypto.PrivateKey, - nID types.NodeID, networkLatency test.LatencyModel, proposingLatency test.LatencyModel) *Node { - hashes := make(common.Hashes, 0) - for nID := range gov.GetNotarySet() { + var ( + shardID = uint32(0) + chainID = uint32(math.MaxUint32) + governanceConfig = gov.GetConfiguration(0) + broadcastTargets = gov.GetNotarySet() + nodeID = types.NewNodeID(privateKey.PublicKey()) + ) + hashes := common.Hashes{} + for nID := range broadcastTargets { hashes = append(hashes, nID.Hash) } sort.Sort(hashes) - chainID := uint32(0) - for i, hash := range hashes { - if hash == nID.Hash { + for i, h := range hashes { + if h == nodeID.Hash { chainID = uint32(i) - break } } - + delete(broadcastTargets, nodeID) return &Node{ - ID: nID, + ID: nodeID, chainID: chainID, - gov: gov, + chainNum: governanceConfig.NumChains, + broadcastTargets: broadcastTargets, networkLatency: networkLatency, proposingLatency: proposingLatency, - cons: core.NewConsensus( - app, gov, db, &Network{}, privateKey, eth.SigToPub), + app: app, + db: db, + shard: core.NewShard( + shardID, + governanceConfig, + privateKey, + eth.SigToPub, + app, + db), } } @@ -120,27 +136,18 @@ func (n *Node) Handle(e *test.Event) (events []*test.Event) { return } -func (n *Node) handleProposeBlock(when time.Time, piggyback interface{}) ( +func (n *Node) handleProposeBlock(when time.Time, _ interface{}) ( events []*test.Event, err error) { - b := &types.Block{ - ProposerID: n.ID, - Position: types.Position{ - ChainID: n.chainID, - }, - } - defer types.RecycleBlock(b) - if err = n.cons.PrepareBlock(b, when); err != nil { + b, err := n.prepareBlock(when) + if err != nil { return } - if err = n.cons.ProcessBlock(b); err != nil { + if err = n.processBlock(b); err != nil { return } // Create 'block received' event for each other nodes. - for nID := range n.gov.GetNotarySet() { - if nID == n.ID { - continue - } + for nID := range n.broadcastTargets { events = append(events, NewReceiveBlockEvent( nID, when.Add(n.networkLatency.Delay()), b.Clone())) } @@ -153,9 +160,53 @@ func (n *Node) handleProposeBlock(when time.Time, piggyback interface{}) ( func (n *Node) handleReceiveBlock(piggyback interface{}) ( events []*test.Event, err error) { - err = n.cons.ProcessBlock(piggyback.(*types.Block)) + err = n.processBlock(piggyback.(*types.Block)) if err != nil { panic(err) } return } + +func (n *Node) prepareBlock(when time.Time) (b *types.Block, err error) { + b = &types.Block{ + Position: types.Position{ + ChainID: n.chainID, + }} + err = n.shard.PrepareBlock(b, when) + return +} + +func (n *Node) processBlock(b *types.Block) (err error) { + // TODO(mission): this segment of code is identical to testShardMgr in + // core/shard_test.go, except the compaction-chain part. + var ( + delivered []*types.Block + verified []*types.Block + pendings = []*types.Block{b} + ) + if err = n.shard.SanityCheck(b); err != nil { + if err == core.ErrAckingBlockNotExists { + err = nil + } + return + } + for { + if len(pendings) == 0 { + break + } + b, pendings = pendings[0], pendings[1:] + if verified, delivered, err = n.shard.ProcessBlock(b); err != nil { + return + } + // Deliver blocks. + for _, b = range delivered { + if err = n.db.Update(*b); err != nil { + return + } + n.app.DeliverBlock(b.Hash, b.Witness.Timestamp) + } + // Update pending blocks for verified block (pass sanity check). + pendings = append(pendings, verified...) + } + return +} diff --git a/integration_test/utils.go b/integration_test/utils.go index f95d771..7259ced 100644 --- a/integration_test/utils.go +++ b/integration_test/utils.go @@ -48,7 +48,6 @@ func PrepareNodes( gov, dbs[nID], key, - nID, networkLatency, proposingLatency) } -- cgit v1.2.3