diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2018-10-25 16:59:30 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-25 16:59:30 +0800 |
commit | 04eeac10e6c690e62ae57ef0e2bdf4618b8782d1 (patch) | |
tree | e0b95167d1f42a9304fb9e924378464edbb517e9 | |
parent | 233f1e8de99bf2a0023f05d1c67e48cc770621df (diff) | |
download | dexon-consensus-04eeac10e6c690e62ae57ef0e2bdf4618b8782d1.tar dexon-consensus-04eeac10e6c690e62ae57ef0e2bdf4618b8782d1.tar.gz dexon-consensus-04eeac10e6c690e62ae57ef0e2bdf4618b8782d1.tar.bz2 dexon-consensus-04eeac10e6c690e62ae57ef0e2bdf4618b8782d1.tar.lz dexon-consensus-04eeac10e6c690e62ae57ef0e2bdf4618b8782d1.tar.xz dexon-consensus-04eeac10e6c690e62ae57ef0e2bdf4618b8782d1.tar.zst dexon-consensus-04eeac10e6c690e62ae57ef0e2bdf4618b8782d1.zip |
core: lattice sync (#257)
-rw-r--r-- | core/consensus.go | 15 | ||||
-rw-r--r-- | core/lattice-data.go | 76 | ||||
-rw-r--r-- | core/lattice-data_test.go | 6 | ||||
-rw-r--r-- | core/lattice.go | 115 | ||||
-rw-r--r-- | core/lattice_test.go | 188 | ||||
-rw-r--r-- | core/test/app.go | 23 | ||||
-rw-r--r-- | core/test/app_test.go | 28 | ||||
-rw-r--r-- | core/test/revealer.go | 75 | ||||
-rw-r--r-- | core/test/revealer_test.go | 23 | ||||
-rw-r--r-- | core/types/position.go | 10 | ||||
-rw-r--r-- | core/types/position_test.go | 31 | ||||
-rw-r--r-- | integration_test/node.go | 36 |
12 files changed, 507 insertions, 119 deletions
diff --git a/core/consensus.go b/core/consensus.go index e20b4e7..ab59a67 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -815,6 +815,7 @@ func (con *Consensus) ProcessBlockRandomnessResult( // preProcessBlock performs Byzantine Agreement on the block. func (con *Consensus) preProcessBlock(b *types.Block) (err error) { + // TODO(jimmy-dexon): add failed block to pool. if err = con.lattice.SanityCheck(b); err != nil { return } @@ -826,16 +827,12 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) { // processBlock is the entry point to submit one block to a Consensus instance. func (con *Consensus) processBlock(block *types.Block) (err error) { - verifiedBlocks, deliveredBlocks, err := con.lattice.ProcessBlock(block) - if err != nil { + if err = con.db.Put(*block); err != nil && err != blockdb.ErrBlockExists { return } - // Pass verified blocks (pass sanity check) back to BA module. - for _, b := range verifiedBlocks { - if err := - con.baModules[b.Position.ChainID].processBlock(b); err != nil { - return err - } + deliveredBlocks, err := con.lattice.ProcessBlock(block) + if err != nil { + return } // Pass delivered blocks to compaction chain. for _, b := range deliveredBlocks { @@ -846,7 +843,7 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { } deliveredBlocks = con.ccModule.extractBlocks() for _, b := range deliveredBlocks { - if err = con.db.Put(*b); err != nil { + if err = con.db.Update(*b); err != nil { panic(err) } // TODO(mission): clone types.FinalizationResult diff --git a/core/lattice-data.go b/core/lattice-data.go index b0fe9cf..2a3ec29 100644 --- a/core/lattice-data.go +++ b/core/lattice-data.go @@ -30,7 +30,6 @@ import ( // Errors for sanity check error. var ( - ErrAckingBlockNotExists = fmt.Errorf("acking block not exists") ErrDuplicatedAckOnOneChain = fmt.Errorf("duplicated ack on one chain") ErrInvalidChainID = fmt.Errorf("invalid chain id") ErrInvalidProposerID = fmt.Errorf("invalid proposer id") @@ -50,6 +49,15 @@ var ( ErrUnexpectedGenesisBlock = fmt.Errorf("unexpected genesis block") ) +// ErrAckingBlockNotExists is for sanity check error. +type ErrAckingBlockNotExists struct { + hash common.Hash +} + +func (e ErrAckingBlockNotExists) Error() string { + return fmt.Sprintf("acking block %s not exists", e.hash) +} + // Errors for method usage var ( ErrRoundNotIncreasing = errors.New("round not increasing") @@ -144,7 +152,7 @@ func (data *latticeData) checkAckingRelations(b *types.Block) error { bAck, err := data.findBlock(hash) if err != nil { if err == blockdb.ErrBlockDoesNotExist { - return ErrAckingBlockNotExists + return &ErrAckingBlockNotExists{hash} } return err } @@ -185,7 +193,7 @@ func (data *latticeData) sanityCheck(b *types.Block) error { chainTip := chain.tip if chainTip == nil { if !b.ParentHash.Equal(common.Hash{}) { - return ErrAckingBlockNotExists + return &ErrAckingBlockNotExists{b.ParentHash} } if !b.IsGenesis() { return ErrNotGenesisBlock @@ -198,7 +206,7 @@ func (data *latticeData) sanityCheck(b *types.Block) error { // Check parent block if parent hash is specified. if !b.ParentHash.Equal(common.Hash{}) { if !b.ParentHash.Equal(chainTip.Hash) { - return ErrAckingBlockNotExists + return &ErrAckingBlockNotExists{b.ParentHash} } if !b.IsAcking(b.ParentHash) { return ErrNotAckParent @@ -268,18 +276,21 @@ func (data *latticeData) addBlock( bAck *types.Block updated bool ) - if err = data.chains[block.Position.ChainID].addBlock(block); err != nil { - return - } + data.chains[block.Position.ChainID].addBlock(block) data.blockByHash[block.Hash] = block // Update lastAckPos. for _, ack := range block.Acks { if bAck, err = data.findBlock(ack); err != nil { + if err == blockdb.ErrBlockDoesNotExist { + err = nil + continue + } return } data.chains[bAck.Position.ChainID].lastAckPos[block.Position.ChainID] = bAck.Position.Clone() } + // Extract blocks that deliverable to total ordering. // A block is deliverable to total ordering iff: // - All its acking blocks are delivered to total ordering. @@ -293,19 +304,37 @@ func (data *latticeData) addBlock( allAckingBlockDelivered := true for _, ack := range tip.Acks { if bAck, err = data.findBlock(ack); err != nil { + if err == blockdb.ErrBlockDoesNotExist { + err = nil + allAckingBlockDelivered = false + break + } return } // Check if this block is outputed or not. idx := data.chains[bAck.Position.ChainID].findBlock( &bAck.Position) - if idx == -1 || - idx < data.chains[bAck.Position.ChainID].nextOutputIndex { + var ok bool + if idx == -1 { + // Either the block is delivered or not added to chain yet. + if out := + data.chains[bAck.Position.ChainID].lastOutputPosition; out != nil { + ok = !out.Older(&bAck.Position) + } else if ackTip := + data.chains[bAck.Position.ChainID].tip; ackTip != nil { + ok = !ackTip.Position.Older(&bAck.Position) + } + } else { + ok = idx < data.chains[bAck.Position.ChainID].nextOutputIndex + } + if ok { continue } // This acked block exists and not delivered yet. allAckingBlockDelivered = false } if allAckingBlockDelivered { + status.lastOutputPosition = &tip.Position status.nextOutputIndex++ deliverable = append(deliverable, tip) updated = true @@ -318,6 +347,30 @@ func (data *latticeData) addBlock( return } +// addFinalizedBlock processes block for syncing internal data. +func (data *latticeData) addFinalizedBlock( + block *types.Block) (err error) { + var bAck *types.Block + chain := data.chains[block.Position.ChainID] + if chain.tip != nil && chain.tip.Position.Height >= + block.Position.Height { + return + } + chain.nextOutputIndex = 0 + chain.blocks = []*types.Block{} + chain.tip = block + chain.lastOutputPosition = nil + // Update lastAckPost. + for _, ack := range block.Acks { + if bAck, err = data.findBlock(ack); err != nil { + return + } + data.chains[bAck.Position.ChainID].lastAckPos[block.Position.ChainID] = + bAck.Position.Clone() + } + return +} + // prepareBlock helps to setup fields of block based on its ChainID and Round, // including: // - Acks @@ -524,6 +577,8 @@ type chainStatus struct { lastAckPos []*types.Position // the index to be output next time. nextOutputIndex int + // the position of output last time. + lastOutputPosition *types.Position } // findBlock finds index of block in current pending blocks on this chain. @@ -551,10 +606,9 @@ func (s *chainStatus) getBlock(idx int) (b *types.Block) { } // addBlock adds a block to pending blocks on this chain. -func (s *chainStatus) addBlock(b *types.Block) error { +func (s *chainStatus) addBlock(b *types.Block) { s.blocks = append(s.blocks, b) s.tip = b - return nil } // TODO(mission): change back to nextHeight. diff --git a/core/lattice-data_test.go b/core/lattice-data_test.go index 64767e1..b92ed7a 100644 --- a/core/lattice-data_test.go +++ b/core/lattice-data_test.go @@ -196,7 +196,7 @@ func (s *LatticeDataTestSuite) TestSanityCheck() { s.hashBlock(b) err := data.sanityCheck(b) req.NotNil(err) - req.Equal(expectedErr.Error(), err.Error()) + req.IsType(expectedErr, err) } // Non-genesis block with no ack, should get error. check(ErrNotAckParent, &types.Block{ @@ -245,7 +245,7 @@ func (s *LatticeDataTestSuite) TestSanityCheck() { }, }) // Acking block doesn't exists. - check(ErrAckingBlockNotExists, &types.Block{ + check(&ErrAckingBlockNotExists{}, &types.Block{ ParentHash: blocks[1][0].Hash, Position: types.Position{ ChainID: 1, @@ -258,7 +258,7 @@ func (s *LatticeDataTestSuite) TestSanityCheck() { Timestamp: time.Now().UTC(), }) // Parent block on different chain. - check(ErrAckingBlockNotExists, &types.Block{ + check(&ErrAckingBlockNotExists{}, &types.Block{ ParentHash: blocks[1][0].Hash, Position: types.Position{ ChainID: 2, diff --git a/core/lattice.go b/core/lattice.go index 3259f35..69ad51c 100644 --- a/core/lattice.go +++ b/core/lattice.go @@ -34,6 +34,7 @@ type Lattice struct { app Application debug Debug pool blockPool + retryAdd bool data *latticeData toModule *totalOrdering ctModule *consensusTimestamp @@ -137,11 +138,6 @@ func (s *Lattice) SanityCheck(b *types.Block) (err error) { s.lock.RLock() defer s.lock.RUnlock() if err = s.data.sanityCheck(b); err != nil { - // Add to block pool, once the lattice updated, - // would be checked again. - if err == ErrAckingBlockNotExists { - s.pool.addBlock(b) - } s.logger.Error("Sanity Check failed", "error", err) return } @@ -159,51 +155,85 @@ func (s *Lattice) SanityCheck(b *types.Block) (err error) { return } +// addBlockToLattice adds a block into lattice, and deliver blocks with the acks +// already delivered. +// +// NOTE: assume the block passed sanity check. +func (s *Lattice) addBlockToLattice( + input *types.Block) (outputBlocks []*types.Block, err error) { + s.lock.Lock() + defer s.lock.Unlock() + if tip := s.data.chains[input.Position.ChainID].tip; tip != nil { + if !input.Position.Newer(&tip.Position) { + return + } + } + s.pool.addBlock(input) + // Replay tips in pool to check their validity. + for { + hasOutput := false + for i := uint32(0); i < s.chainNum; i++ { + var tip *types.Block + if tip = s.pool.tip(i); tip == nil { + continue + } + err = s.data.sanityCheck(tip) + if err == nil { + var output []*types.Block + if output, err = s.data.addBlock(tip); err != nil { + s.logger.Error("Sanity Check failed", "error", err) + continue + } + hasOutput = true + outputBlocks = append(outputBlocks, output...) + } + if _, ok := err.(*ErrAckingBlockNotExists); ok { + err = nil + continue + } + s.pool.removeTip(i) + } + if !hasOutput { + break + } + } + + for _, b := range outputBlocks { + // TODO(jimmy-dexon): change this name of classic DEXON algorithm. + if s.debug != nil { + s.debug.StronglyAcked(b.Hash) + } + s.logger.Debug("Calling Application.BlockConfirmed", "block", input) + s.app.BlockConfirmed(*b.Clone()) + // Purge blocks in pool with the same chainID and lower height. + s.pool.purgeBlocks(b.Position.ChainID, b.Position.Height) + } + + return +} + // ProcessBlock adds a block into lattice, and deliver ordered blocks. // If any block pass sanity check after this block add into lattice, they // would be returned, too. // // NOTE: assume the block passed sanity check. func (s *Lattice) ProcessBlock( - input *types.Block) (verified, delivered []*types.Block, err error) { - + input *types.Block) (delivered []*types.Block, err error) { var ( - tip, b *types.Block - toDelivered []*types.Block + b *types.Block inLattice []*types.Block + toDelivered []*types.Block deliveredMode uint32 ) - s.lock.Lock() - defer s.lock.Unlock() - if inLattice, err = s.data.addBlock(input); err != nil { - // TODO(mission): if sanity check failed with "acking block doesn't - // exists", we should keep it in a pool. - s.logger.Error("Sanity Check failed when adding blocks", "error", err) + + if inLattice, err = s.addBlockToLattice(input); err != nil { return } - // TODO(mission): remove this hack, BA related stuffs should not - // be done here. - if s.debug != nil { - s.debug.StronglyAcked(input.Hash) - } - s.logger.Debug("Calling Application.BlockConfirmed", "block", input) - s.app.BlockConfirmed(*input.Clone()) - // Purge blocks in pool with the same chainID and lower height. - s.pool.purgeBlocks(input.Position.ChainID, input.Position.Height) - // Replay tips in pool to check their validity. - for i := uint32(0); i < s.chainNum; i++ { - if tip = s.pool.tip(i); tip == nil { - continue - } - err = s.data.sanityCheck(tip) - if err == nil { - verified = append(verified, tip) - } - if err == ErrAckingBlockNotExists { - continue - } - s.pool.removeTip(i) + + if len(inLattice) == 0 { + return } + // Perform total ordering for each block added to lattice. for _, b = range inLattice { toDelivered, deliveredMode, err = s.toModule.processBlock(b) @@ -265,3 +295,14 @@ func (s *Lattice) AppendConfig(round uint64, config *types.Config) (err error) { } return } + +// ProcessFinalizedBlock is used for syncing lattice data. +func (s *Lattice) ProcessFinalizedBlock(input *types.Block) { + defer func() { s.retryAdd = true }() + s.lock.Lock() + defer s.lock.Unlock() + if err := s.data.addFinalizedBlock(input); err != nil { + panic(err) + } + s.pool.purgeBlocks(input.Position.ChainID, input.Position.Height) +} diff --git a/core/lattice_test.go b/core/lattice_test.go index bf4138a..ff479bc 100644 --- a/core/lattice_test.go +++ b/core/lattice_test.go @@ -53,38 +53,37 @@ func (mgr *testLatticeMgr) prepareBlock( func (mgr *testLatticeMgr) processBlock(b *types.Block) (err error) { var ( delivered []*types.Block - verified []*types.Block - pendings = []*types.Block{b} ) if err = mgr.lattice.SanityCheck(b); err != nil { - if err == ErrAckingBlockNotExists { + if _, ok := err.(*ErrAckingBlockNotExists); ok { err = nil + } else { + return } - return } - for { - if len(pendings) == 0 { - break - } - b, pendings = pendings[0], pendings[1:] - if verified, delivered, err = mgr.lattice.ProcessBlock(b); err != nil { + if err = mgr.db.Put(*b); err != nil { + if err != blockdb.ErrBlockExists { return } - // Deliver blocks. - for _, b = range delivered { - if err = mgr.ccModule.processBlock(b); err != nil { - return - } - if err = mgr.db.Put(*b); err != nil { - return - } - mgr.app.BlockDelivered(b.Hash, b.Finalization) + err = nil + } + if delivered, err = mgr.lattice.ProcessBlock(b); err != nil { + return + } + // Deliver blocks. + for _, b = range delivered { + if err = mgr.ccModule.processBlock(b); err != nil { + return } - if err = mgr.lattice.PurgeBlocks(delivered); err != nil { + } + for _, b = range mgr.ccModule.extractBlocks() { + if err = mgr.db.Update(*b); err != nil { return } - // Update pending blocks for verified block (pass sanity check). - pendings = append(pendings, verified...) + mgr.app.BlockDelivered(b.Hash, b.Finalization) + } + if err = mgr.lattice.PurgeBlocks(delivered); err != nil { + return } return } @@ -112,6 +111,10 @@ func (s *LatticeTestSuite) newTestLatticeMgr( // Setup compaction chain. cc := newCompactionChain(gov) cc.init(&types.Block{}) + mock := newMockTSigVerifier(true) + for i := 0; i < cc.tsigVerifier.cacheSize; i++ { + cc.tsigVerifier.verifier[uint64(i)] = mock + } // Setup lattice. return &testLatticeMgr{ ccModule: cc, @@ -139,6 +142,7 @@ func (s *LatticeTestSuite) TestBasicUsage() { err error cfg = types.Config{ NumChains: chainNum, + NotarySetSize: chainNum, PhiRatio: float32(2) / float32(3), K: 0, MinBlockInterval: 0, @@ -158,14 +162,14 @@ func (s *LatticeTestSuite) TestBasicUsage() { req.NotNil(b) req.NoError(err) // We've ignored the error for "acking blocks don't exist". - req.Nil(master.processBlock(b)) + req.NoError(master.processBlock(b)) } for i := 0; i < (blockNum - int(chainNum)); i++ { b, err := master.prepareBlock(uint32(rand.Intn(int(chainNum)))) req.NotNil(b) req.NoError(err) // We've ignored the error for "acking blocks don't exist". - req.Nil(master.processBlock(b)) + req.NoError(master.processBlock(b)) } // Now we have some blocks, replay them on different lattices. iter, err := master.db.GetAll() @@ -185,7 +189,7 @@ func (s *LatticeTestSuite) TestBasicUsage() { } } req.NoError(err) - req.Nil(other.processBlock(&b)) + req.NoError(other.processBlock(&b)) revealed += b.Hash.String() + "," revealSeq[revealed] = struct{}{} } @@ -195,12 +199,142 @@ func (s *LatticeTestSuite) TestBasicUsage() { req.True(len(revealSeq) > 1) // Make sure nothing goes wrong. for i, app := range apps { - req.Nil(app.Verify()) + err := app.Verify() + req.NoError(err) for j, otherApp := range apps { if i >= j { continue } - req.Nil(app.Compare(otherApp)) + err := app.Compare(otherApp) + s.NoError(err) + } + } +} + +func (s *LatticeTestSuite) TestSync() { + // One Lattice prepare blocks on chains randomly selected each time + // and process it. Those generated blocks and kept into a buffer, and + // process by other Lattice instances with random order. + var ( + blockNum = 500 + // The first `desyncNum` blocks revealed are considered "desynced" and will + // not be delivered to lattice. After `syncNum` blocks have revealed, the + // system is considered "synced" and start feeding blocks that are desynced + // to processFinalizedBlock. + desyncNum = 50 + syncNum = 150 + chainNum = uint32(19) + otherLatticeNum = 50 + req = s.Require() + err error + cfg = types.Config{ + NumChains: chainNum, + NotarySetSize: chainNum, + PhiRatio: float32(2) / float32(3), + K: 0, + MinBlockInterval: 0, + MaxBlockInterval: 3000 * time.Second, + RoundInterval: time.Hour, + } + dMoment = time.Now().UTC() + master = s.newTestLatticeMgr(&cfg, dMoment) + //apps = []*test.App{master.app} + revealSeq = map[string]struct{}{} + ) + // Make sure the test setup is correct. + s.Require().True(syncNum > desyncNum) + // Master-lattice generates blocks. + for i := uint32(0); i < chainNum; i++ { + // Produce genesis blocks should be delivered before all other blocks, + // or the consensus time would be wrong. + b, err := master.prepareBlock(i) + req.NotNil(b) + req.NoError(err) + // We've ignored the error for "acking blocks don't exist". + req.NoError(master.processBlock(b)) + } + for i := 0; i < (blockNum - int(chainNum)); i++ { + b, err := master.prepareBlock(uint32(rand.Intn(int(chainNum)))) + req.NotNil(b) + req.NoError(err) + // We've ignored the error for "acking blocks don't exist". + req.NoError(master.processBlock(b)) + } + req.NoError(master.app.Verify()) + // Now we have some blocks, replay them on different lattices. + iter, err := master.db.GetAll() + req.NoError(err) + revealer, err := test.NewRandomTipRevealer(iter) + req.NoError(err) + for i := 0; i < otherLatticeNum; i++ { + synced := false + syncFromHeight := uint64(0) + revealer.Reset() + revealed := "" + other := s.newTestLatticeMgr(&cfg, dMoment) + chainTip := make([]*types.Block, chainNum) + for height := 0; ; height++ { + b, err := revealer.Next() + if err != nil { + if err == blockdb.ErrIterationFinished { + err = nil + break + } + } + req.NoError(err) + if height >= syncNum && !synced { + synced = true + syncToHeight := uint64(0) + for _, block := range chainTip { + if block == nil { + synced = false + continue + } + result, exist := master.app.Delivered[block.Hash] + req.True(exist) + if syncToHeight < result.ConsensusHeight { + syncToHeight = result.ConsensusHeight + } + } + + for idx := syncFromHeight; idx < syncToHeight; idx++ { + block, err := master.db.Get(master.app.DeliverSequence[idx]) + req.Equal(idx+1, block.Finalization.Height) + req.NoError(err) + if err = other.db.Put(block); err != nil { + req.Equal(blockdb.ErrBlockExists, err) + } + other.ccModule.processFinalizedBlock(&block) + } + extracted := other.ccModule.extractFinalizedBlocks() + req.Len(extracted, int(syncToHeight-syncFromHeight)) + for _, block := range extracted { + other.app.StronglyAcked(block.Hash) + other.lattice.ProcessFinalizedBlock(block) + } + syncFromHeight = syncToHeight + } + if height > desyncNum { + if chainTip[b.Position.ChainID] == nil { + chainTip[b.Position.ChainID] = &b + } + if err = other.db.Put(b); err != nil { + req.Equal(blockdb.ErrBlockExists, err) + } + delivered, err := other.lattice.addBlockToLattice(&b) + req.NoError(err) + revealed += b.Hash.String() + "," + revealSeq[revealed] = struct{}{} + req.NoError(other.lattice.PurgeBlocks(delivered)) + // TODO(jimmy-dexon): check if delivered set is a DAG. + } else { + other.app.StronglyAcked(b.Hash) + } + } + for b := range master.app.Acked { + if _, exist := other.app.Acked[b]; !exist { + s.FailNowf("Block not delivered", "%s not exists", b) + } } } } diff --git a/core/test/app.go b/core/test/app.go index ba949b3..546c9e5 100644 --- a/core/test/app.go +++ b/core/test/app.go @@ -29,7 +29,7 @@ import ( var ( // ErrEmptyDeliverSequence means there is no delivery event in this App // instance. - ErrEmptyDeliverSequence = fmt.Errorf("emptry deliver sequence") + ErrEmptyDeliverSequence = fmt.Errorf("empty deliver sequence") // ErrMismatchBlockHashSequence means the delivering sequence between two App // instances are different. ErrMismatchBlockHashSequence = fmt.Errorf("mismatch block hash sequence") @@ -43,6 +43,10 @@ var ( // consensus timestamp older than previous block. ErrConsensusTimestampOutOfOrder = fmt.Errorf( "consensus timestamp out of order") + // ErrConsensusHeightOutOfOrder means the later delivered block has + // consensus height not equal to height of previous block plus one. + ErrConsensusHeightOutOfOrder = fmt.Errorf( + "consensus height out of order") // ErrDeliveredBlockNotAcked means some block delivered (confirmed) but // not strongly acked. ErrDeliveredBlockNotAcked = fmt.Errorf("delivered block not acked") @@ -69,8 +73,9 @@ type AppTotalOrderRecord struct { // AppDeliveredRecord caches information when this application received // a block delivered notification. type AppDeliveredRecord struct { - ConsensusTime time.Time - When time.Time + ConsensusTime time.Time + ConsensusHeight uint64 + When time.Time } // App implements Application interface for testing purpose. @@ -151,8 +156,9 @@ func (app *App) BlockDelivered( defer app.deliveredLock.Unlock() app.Delivered[blockHash] = &AppDeliveredRecord{ - ConsensusTime: result.Timestamp, - When: time.Now().UTC(), + ConsensusTime: result.Timestamp, + ConsensusHeight: result.Height, + When: time.Now().UTC(), } app.DeliverSequence = append(app.DeliverSequence, blockHash) } @@ -201,6 +207,7 @@ func (app *App) Verify() error { app.ackedLock.RLock() defer app.ackedLock.RUnlock() + expectHeight := uint64(1) prevTime := time.Time{} for _, h := range app.DeliverSequence { // Make sure delivered block is strongly acked. @@ -218,6 +225,12 @@ func (app *App) Verify() error { return ErrConsensusTimestampOutOfOrder } prevTime = rec.ConsensusTime + + // Make sure the consensus height is incremental. + if expectHeight != rec.ConsensusHeight { + return ErrConsensusHeightOutOfOrder + } + expectHeight++ } // Make sure the order of delivered and total ordering are the same by // comparing the concated string. diff --git a/core/test/app_test.go b/core/test/app_test.go index 8f2aae5..823bde0 100644 --- a/core/test/app_test.go +++ b/core/test/app_test.go @@ -75,14 +75,16 @@ func (s *AppTestSuite) deliverBlockWithTimeFromSequenceLength( app *App, hash common.Hash) { s.deliverBlock(app, hash, time.Time{}.Add( - time.Duration(len(app.DeliverSequence))*time.Second)) + time.Duration(len(app.DeliverSequence))*time.Second), + uint64(len(app.DeliverSequence)+1)) } func (s *AppTestSuite) deliverBlock( - app *App, hash common.Hash, timestamp time.Time) { + app *App, hash common.Hash, timestamp time.Time, height uint64) { app.BlockDelivered(hash, types.FinalizationResult{ Timestamp: timestamp, + Height: height, }) } @@ -113,7 +115,8 @@ func (s *AppTestSuite) TestCompare() { wrongTime := time.Time{}.Add( time.Duration(len(app3.DeliverSequence)) * time.Second) wrongTime = wrongTime.Add(1 * time.Second) - s.deliverBlock(app3, s.to3.BlockHashes[0], wrongTime) + s.deliverBlock(app3, s.to3.BlockHashes[0], wrongTime, + uint64(len(app3.DeliverSequence)+1)) req.Equal(ErrMismatchConsensusTime, app1.Compare(app3)) req.Equal(ErrMismatchConsensusTime, app3.Compare(app1)) // An App without any delivered blocks. @@ -130,9 +133,10 @@ func (s *AppTestSuite) TestVerify() { s.setupAppByTotalOrderDeliver(app1, s.to1) s.setupAppByTotalOrderDeliver(app1, s.to2) s.setupAppByTotalOrderDeliver(app1, s.to3) - req.Nil(app1.Verify()) + req.NoError(app1.Verify()) // A delivered block without strongly ack - s.deliverBlock(app1, common.NewRandomHash(), time.Time{}) + s.deliverBlock(app1, common.NewRandomHash(), time.Time{}, + uint64(len(app1.DeliverSequence))) req.Equal(ErrDeliveredBlockNotAcked, app1.Verify()) // The consensus time is out of order. app2 := NewApp() @@ -141,7 +145,8 @@ func (s *AppTestSuite) TestVerify() { app2.StronglyAcked(h) } app2.TotalOrderingDelivered(s.to2.BlockHashes, s.to2.Mode) - s.deliverBlock(app2, s.to2.BlockHashes[0], time.Time{}) + s.deliverBlock(app2, s.to2.BlockHashes[0], time.Time{}, + uint64(len(app2.DeliverSequence)+1)) req.Equal(ErrConsensusTimestampOutOfOrder, app2.Verify()) // A delivered block is not found in total ordering delivers. app3 := NewApp() @@ -164,6 +169,17 @@ func (s *AppTestSuite) TestVerify() { // Witness ack on unknown block. app5 := NewApp() s.setupAppByTotalOrderDeliver(app5, s.to1) + // The conensus height is out of order. + app6 := NewApp() + s.setupAppByTotalOrderDeliver(app6, s.to1) + for _, h := range s.to2.BlockHashes { + app6.StronglyAcked(h) + } + app6.TotalOrderingDelivered(s.to2.BlockHashes, s.to2.Mode) + s.deliverBlock(app6, s.to2.BlockHashes[0], time.Time{}.Add( + time.Duration(len(app6.DeliverSequence))*time.Second), + uint64(len(app6.DeliverSequence)+2)) + req.Equal(ErrConsensusHeightOutOfOrder, app6.Verify()) } func TestApp(t *testing.T) { diff --git a/core/test/revealer.go b/core/test/revealer.go index 80d2a30..c9d82ce 100644 --- a/core/test/revealer.go +++ b/core/test/revealer.go @@ -1,6 +1,23 @@ // Copyright 2018 The dexon-consensus-core Authors // This file is part of the dexon-consensus-core library. // +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// // The dexon-consensus-core library is free software: you can redistribute it and/or // modify it under the terms of the GNU Lesser General Public License as // published by the Free Software Foundation, either version 3 of the License, @@ -205,3 +222,61 @@ func (r *RandomRevealer) Reset() { } r.remains = hashes } + +// RandomTipRevealer implements Revealer interface, which would load +// all blocks from blockdb, and randomly pick one chain's tip to reveal. +type RandomTipRevealer struct { + chainsBlock []map[uint64]*types.Block + chainTip []uint64 + chainRevealSeq []uint32 + revealed int + randGen *rand.Rand +} + +// NewRandomTipRevealer constructs RandomTipRevealer. +func NewRandomTipRevealer( + iter blockdb.BlockIterator) (r *RandomTipRevealer, err error) { + + blocks, err := loadAllBlocks(iter) + if err != nil { + return + } + r = &RandomTipRevealer{ + randGen: rand.New(rand.NewSource(time.Now().UnixNano())), + } + for _, b := range blocks { + for b.Position.ChainID >= uint32(len(r.chainsBlock)) { + r.chainsBlock = append(r.chainsBlock, make(map[uint64]*types.Block)) + r.chainTip = append(r.chainTip, 0) + } + r.chainsBlock[b.Position.ChainID][b.Position.Height] = b + r.chainRevealSeq = append(r.chainRevealSeq, b.Position.ChainID) + } + r.Reset() + return +} + +// Next implements Revealer.Next method, which would reveal blocks randomly. +func (r *RandomTipRevealer) Next() (types.Block, error) { + if len(r.chainRevealSeq) == r.revealed { + return types.Block{}, blockdb.ErrIterationFinished + } + + picked := r.chainRevealSeq[r.revealed] + r.revealed++ + block := r.chainsBlock[picked][r.chainTip[picked]] + r.chainTip[picked]++ + return *block, nil +} + +// Reset implement Revealer.Reset method, which would reset revealing. +func (r *RandomTipRevealer) Reset() { + r.revealed = 0 + r.randGen.Shuffle(len(r.chainRevealSeq), func(i, j int) { + r.chainRevealSeq[i], r.chainRevealSeq[j] = + r.chainRevealSeq[j], r.chainRevealSeq[i] + }) + for i := range r.chainTip { + r.chainTip[i] = 0 + } +} diff --git a/core/test/revealer_test.go b/core/test/revealer_test.go index 8bb46bc..4945d62 100644 --- a/core/test/revealer_test.go +++ b/core/test/revealer_test.go @@ -135,6 +135,29 @@ func (s *RevealerTestSuite) TestRandomDAGReveal() { s.baseTest(revealer, 10, checkFunc) } +func (s *RevealerTestSuite) TestRandomTipReveal() { + // This test case would make sure we could at least generate + // two different revealing sequence when revealing more than + // 10 times. + iter, err := s.db.GetAll() + s.Require().Nil(err) + revealer, err := NewRandomTipRevealer(iter) + s.Require().Nil(err) + + checkFunc := func(b *types.Block, revealed map[common.Hash]struct{}) { + // Make sure the revealer won't reveal the same block twice. + _, alreadyRevealed := revealed[b.Hash] + s.False(alreadyRevealed) + // Make sure the parent is already revealed. + if b.Position.Height == 0 { + return + } + _, alreadyRevealed = revealed[b.ParentHash] + s.True(alreadyRevealed) + } + s.baseTest(revealer, 10, checkFunc) +} + func TestRevealer(t *testing.T) { suite.Run(t, new(RevealerTestSuite)) } diff --git a/core/types/position.go b/core/types/position.go index f41be32..d821d16 100644 --- a/core/types/position.go +++ b/core/types/position.go @@ -57,6 +57,16 @@ func (pos *Position) Newer(other *Position) bool { (pos.Round == other.Round && pos.Height > other.Height) } +// Older checks if one block is older than another one on the same chain. +// If two blocks on different chain compared by this function, it would panic. +func (pos *Position) Older(other *Position) bool { + if pos.ChainID != other.ChainID { + panic(ErrComparePositionOnDifferentChains) + } + return pos.Round < other.Round || + (pos.Round == other.Round && pos.Height < other.Height) +} + // Clone a position instance. func (pos *Position) Clone() *Position { return &Position{ diff --git a/core/types/position_test.go b/core/types/position_test.go index 48f8dbd..1ef1813 100644 --- a/core/types/position_test.go +++ b/core/types/position_test.go @@ -59,6 +59,37 @@ func (s *PositionTestSuite) TestNewer() { })) } +func (s *PositionTestSuite) TestOlder() { + pos := Position{ + Round: 1, + ChainID: 1, + Height: 1, + } + s.Panics(func() { + pos.Older(&Position{ChainID: 2}) + }) + s.False(pos.Older(&Position{ + Round: 0, + ChainID: 1, + Height: 0, + })) + s.False(pos.Older(&Position{ + Round: 1, + ChainID: 1, + Height: 0, + })) + s.True(pos.Older(&Position{ + Round: 2, + ChainID: 1, + Height: 0, + })) + s.True(pos.Older(&Position{ + Round: 1, + ChainID: 1, + Height: 100, + })) +} + func (s *PositionTestSuite) TestSearchInAsendingOrder() { positions := []*Position{ &Position{Round: 0, Height: 1}, diff --git a/integration_test/node.go b/integration_test/node.go index a26c005..0fb661b 100644 --- a/integration_test/node.go +++ b/integration_test/node.go @@ -73,6 +73,7 @@ type Node struct { broadcastTargets map[types.NodeID]struct{} networkLatency test.LatencyModel proposingLatency test.LatencyModel + prevFinalHeight uint64 } // NewNode constructs an instance of Node. @@ -185,35 +186,28 @@ func (n *Node) processBlock(b *types.Block) (err error) { // core/lattice_test.go, except the compaction-chain part. var ( delivered []*types.Block - verified []*types.Block - pendings = []*types.Block{b} ) if err = n.lattice.SanityCheck(b); err != nil { - if err == core.ErrAckingBlockNotExists { + if _, ok := err.(*core.ErrAckingBlockNotExists); ok { err = nil + } else { + return } + } + if delivered, err = n.lattice.ProcessBlock(b); err != nil { return } - for { - if len(pendings) == 0 { - break - } - b, pendings = pendings[0], pendings[1:] - if verified, delivered, err = n.lattice.ProcessBlock(b); err != nil { - return - } - // Deliver blocks. - for _, b = range delivered { - if err = n.db.Put(*b); err != nil { - return - } - n.app.BlockDelivered(b.Hash, b.Finalization) - } - if err = n.lattice.PurgeBlocks(delivered); err != nil { + // Deliver blocks. + for _, b = range delivered { + if err = n.db.Put(*b); err != nil { return } - // Update pending blocks for verified block (pass sanity check). - pendings = append(pendings, verified...) + b.Finalization.Height = n.prevFinalHeight + 1 + n.app.BlockDelivered(b.Hash, b.Finalization) + n.prevFinalHeight++ + } + if err = n.lattice.PurgeBlocks(delivered); err != nil { + return } return } |