aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/consensus.go15
-rw-r--r--core/lattice-data.go76
-rw-r--r--core/lattice-data_test.go6
-rw-r--r--core/lattice.go115
-rw-r--r--core/lattice_test.go188
-rw-r--r--core/test/app.go23
-rw-r--r--core/test/app_test.go28
-rw-r--r--core/test/revealer.go75
-rw-r--r--core/test/revealer_test.go23
-rw-r--r--core/types/position.go10
-rw-r--r--core/types/position_test.go31
-rw-r--r--integration_test/node.go36
12 files changed, 507 insertions, 119 deletions
diff --git a/core/consensus.go b/core/consensus.go
index e20b4e7..ab59a67 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -815,6 +815,7 @@ func (con *Consensus) ProcessBlockRandomnessResult(
// preProcessBlock performs Byzantine Agreement on the block.
func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
+ // TODO(jimmy-dexon): add failed block to pool.
if err = con.lattice.SanityCheck(b); err != nil {
return
}
@@ -826,16 +827,12 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
// processBlock is the entry point to submit one block to a Consensus instance.
func (con *Consensus) processBlock(block *types.Block) (err error) {
- verifiedBlocks, deliveredBlocks, err := con.lattice.ProcessBlock(block)
- if err != nil {
+ if err = con.db.Put(*block); err != nil && err != blockdb.ErrBlockExists {
return
}
- // Pass verified blocks (pass sanity check) back to BA module.
- for _, b := range verifiedBlocks {
- if err :=
- con.baModules[b.Position.ChainID].processBlock(b); err != nil {
- return err
- }
+ deliveredBlocks, err := con.lattice.ProcessBlock(block)
+ if err != nil {
+ return
}
// Pass delivered blocks to compaction chain.
for _, b := range deliveredBlocks {
@@ -846,7 +843,7 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
}
deliveredBlocks = con.ccModule.extractBlocks()
for _, b := range deliveredBlocks {
- if err = con.db.Put(*b); err != nil {
+ if err = con.db.Update(*b); err != nil {
panic(err)
}
// TODO(mission): clone types.FinalizationResult
diff --git a/core/lattice-data.go b/core/lattice-data.go
index b0fe9cf..2a3ec29 100644
--- a/core/lattice-data.go
+++ b/core/lattice-data.go
@@ -30,7 +30,6 @@ import (
// Errors for sanity check error.
var (
- ErrAckingBlockNotExists = fmt.Errorf("acking block not exists")
ErrDuplicatedAckOnOneChain = fmt.Errorf("duplicated ack on one chain")
ErrInvalidChainID = fmt.Errorf("invalid chain id")
ErrInvalidProposerID = fmt.Errorf("invalid proposer id")
@@ -50,6 +49,15 @@ var (
ErrUnexpectedGenesisBlock = fmt.Errorf("unexpected genesis block")
)
+// ErrAckingBlockNotExists is for sanity check error.
+type ErrAckingBlockNotExists struct {
+ hash common.Hash
+}
+
+func (e ErrAckingBlockNotExists) Error() string {
+ return fmt.Sprintf("acking block %s not exists", e.hash)
+}
+
// Errors for method usage
var (
ErrRoundNotIncreasing = errors.New("round not increasing")
@@ -144,7 +152,7 @@ func (data *latticeData) checkAckingRelations(b *types.Block) error {
bAck, err := data.findBlock(hash)
if err != nil {
if err == blockdb.ErrBlockDoesNotExist {
- return ErrAckingBlockNotExists
+ return &ErrAckingBlockNotExists{hash}
}
return err
}
@@ -185,7 +193,7 @@ func (data *latticeData) sanityCheck(b *types.Block) error {
chainTip := chain.tip
if chainTip == nil {
if !b.ParentHash.Equal(common.Hash{}) {
- return ErrAckingBlockNotExists
+ return &ErrAckingBlockNotExists{b.ParentHash}
}
if !b.IsGenesis() {
return ErrNotGenesisBlock
@@ -198,7 +206,7 @@ func (data *latticeData) sanityCheck(b *types.Block) error {
// Check parent block if parent hash is specified.
if !b.ParentHash.Equal(common.Hash{}) {
if !b.ParentHash.Equal(chainTip.Hash) {
- return ErrAckingBlockNotExists
+ return &ErrAckingBlockNotExists{b.ParentHash}
}
if !b.IsAcking(b.ParentHash) {
return ErrNotAckParent
@@ -268,18 +276,21 @@ func (data *latticeData) addBlock(
bAck *types.Block
updated bool
)
- if err = data.chains[block.Position.ChainID].addBlock(block); err != nil {
- return
- }
+ data.chains[block.Position.ChainID].addBlock(block)
data.blockByHash[block.Hash] = block
// Update lastAckPos.
for _, ack := range block.Acks {
if bAck, err = data.findBlock(ack); err != nil {
+ if err == blockdb.ErrBlockDoesNotExist {
+ err = nil
+ continue
+ }
return
}
data.chains[bAck.Position.ChainID].lastAckPos[block.Position.ChainID] =
bAck.Position.Clone()
}
+
// Extract blocks that deliverable to total ordering.
// A block is deliverable to total ordering iff:
// - All its acking blocks are delivered to total ordering.
@@ -293,19 +304,37 @@ func (data *latticeData) addBlock(
allAckingBlockDelivered := true
for _, ack := range tip.Acks {
if bAck, err = data.findBlock(ack); err != nil {
+ if err == blockdb.ErrBlockDoesNotExist {
+ err = nil
+ allAckingBlockDelivered = false
+ break
+ }
return
}
// Check if this block is outputed or not.
idx := data.chains[bAck.Position.ChainID].findBlock(
&bAck.Position)
- if idx == -1 ||
- idx < data.chains[bAck.Position.ChainID].nextOutputIndex {
+ var ok bool
+ if idx == -1 {
+ // Either the block is delivered or not added to chain yet.
+ if out :=
+ data.chains[bAck.Position.ChainID].lastOutputPosition; out != nil {
+ ok = !out.Older(&bAck.Position)
+ } else if ackTip :=
+ data.chains[bAck.Position.ChainID].tip; ackTip != nil {
+ ok = !ackTip.Position.Older(&bAck.Position)
+ }
+ } else {
+ ok = idx < data.chains[bAck.Position.ChainID].nextOutputIndex
+ }
+ if ok {
continue
}
// This acked block exists and not delivered yet.
allAckingBlockDelivered = false
}
if allAckingBlockDelivered {
+ status.lastOutputPosition = &tip.Position
status.nextOutputIndex++
deliverable = append(deliverable, tip)
updated = true
@@ -318,6 +347,30 @@ func (data *latticeData) addBlock(
return
}
+// addFinalizedBlock processes block for syncing internal data.
+func (data *latticeData) addFinalizedBlock(
+ block *types.Block) (err error) {
+ var bAck *types.Block
+ chain := data.chains[block.Position.ChainID]
+ if chain.tip != nil && chain.tip.Position.Height >=
+ block.Position.Height {
+ return
+ }
+ chain.nextOutputIndex = 0
+ chain.blocks = []*types.Block{}
+ chain.tip = block
+ chain.lastOutputPosition = nil
+ // Update lastAckPost.
+ for _, ack := range block.Acks {
+ if bAck, err = data.findBlock(ack); err != nil {
+ return
+ }
+ data.chains[bAck.Position.ChainID].lastAckPos[block.Position.ChainID] =
+ bAck.Position.Clone()
+ }
+ return
+}
+
// prepareBlock helps to setup fields of block based on its ChainID and Round,
// including:
// - Acks
@@ -524,6 +577,8 @@ type chainStatus struct {
lastAckPos []*types.Position
// the index to be output next time.
nextOutputIndex int
+ // the position of output last time.
+ lastOutputPosition *types.Position
}
// findBlock finds index of block in current pending blocks on this chain.
@@ -551,10 +606,9 @@ func (s *chainStatus) getBlock(idx int) (b *types.Block) {
}
// addBlock adds a block to pending blocks on this chain.
-func (s *chainStatus) addBlock(b *types.Block) error {
+func (s *chainStatus) addBlock(b *types.Block) {
s.blocks = append(s.blocks, b)
s.tip = b
- return nil
}
// TODO(mission): change back to nextHeight.
diff --git a/core/lattice-data_test.go b/core/lattice-data_test.go
index 64767e1..b92ed7a 100644
--- a/core/lattice-data_test.go
+++ b/core/lattice-data_test.go
@@ -196,7 +196,7 @@ func (s *LatticeDataTestSuite) TestSanityCheck() {
s.hashBlock(b)
err := data.sanityCheck(b)
req.NotNil(err)
- req.Equal(expectedErr.Error(), err.Error())
+ req.IsType(expectedErr, err)
}
// Non-genesis block with no ack, should get error.
check(ErrNotAckParent, &types.Block{
@@ -245,7 +245,7 @@ func (s *LatticeDataTestSuite) TestSanityCheck() {
},
})
// Acking block doesn't exists.
- check(ErrAckingBlockNotExists, &types.Block{
+ check(&ErrAckingBlockNotExists{}, &types.Block{
ParentHash: blocks[1][0].Hash,
Position: types.Position{
ChainID: 1,
@@ -258,7 +258,7 @@ func (s *LatticeDataTestSuite) TestSanityCheck() {
Timestamp: time.Now().UTC(),
})
// Parent block on different chain.
- check(ErrAckingBlockNotExists, &types.Block{
+ check(&ErrAckingBlockNotExists{}, &types.Block{
ParentHash: blocks[1][0].Hash,
Position: types.Position{
ChainID: 2,
diff --git a/core/lattice.go b/core/lattice.go
index 3259f35..69ad51c 100644
--- a/core/lattice.go
+++ b/core/lattice.go
@@ -34,6 +34,7 @@ type Lattice struct {
app Application
debug Debug
pool blockPool
+ retryAdd bool
data *latticeData
toModule *totalOrdering
ctModule *consensusTimestamp
@@ -137,11 +138,6 @@ func (s *Lattice) SanityCheck(b *types.Block) (err error) {
s.lock.RLock()
defer s.lock.RUnlock()
if err = s.data.sanityCheck(b); err != nil {
- // Add to block pool, once the lattice updated,
- // would be checked again.
- if err == ErrAckingBlockNotExists {
- s.pool.addBlock(b)
- }
s.logger.Error("Sanity Check failed", "error", err)
return
}
@@ -159,51 +155,85 @@ func (s *Lattice) SanityCheck(b *types.Block) (err error) {
return
}
+// addBlockToLattice adds a block into lattice, and deliver blocks with the acks
+// already delivered.
+//
+// NOTE: assume the block passed sanity check.
+func (s *Lattice) addBlockToLattice(
+ input *types.Block) (outputBlocks []*types.Block, err error) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ if tip := s.data.chains[input.Position.ChainID].tip; tip != nil {
+ if !input.Position.Newer(&tip.Position) {
+ return
+ }
+ }
+ s.pool.addBlock(input)
+ // Replay tips in pool to check their validity.
+ for {
+ hasOutput := false
+ for i := uint32(0); i < s.chainNum; i++ {
+ var tip *types.Block
+ if tip = s.pool.tip(i); tip == nil {
+ continue
+ }
+ err = s.data.sanityCheck(tip)
+ if err == nil {
+ var output []*types.Block
+ if output, err = s.data.addBlock(tip); err != nil {
+ s.logger.Error("Sanity Check failed", "error", err)
+ continue
+ }
+ hasOutput = true
+ outputBlocks = append(outputBlocks, output...)
+ }
+ if _, ok := err.(*ErrAckingBlockNotExists); ok {
+ err = nil
+ continue
+ }
+ s.pool.removeTip(i)
+ }
+ if !hasOutput {
+ break
+ }
+ }
+
+ for _, b := range outputBlocks {
+ // TODO(jimmy-dexon): change this name of classic DEXON algorithm.
+ if s.debug != nil {
+ s.debug.StronglyAcked(b.Hash)
+ }
+ s.logger.Debug("Calling Application.BlockConfirmed", "block", input)
+ s.app.BlockConfirmed(*b.Clone())
+ // Purge blocks in pool with the same chainID and lower height.
+ s.pool.purgeBlocks(b.Position.ChainID, b.Position.Height)
+ }
+
+ return
+}
+
// ProcessBlock adds a block into lattice, and deliver ordered blocks.
// If any block pass sanity check after this block add into lattice, they
// would be returned, too.
//
// NOTE: assume the block passed sanity check.
func (s *Lattice) ProcessBlock(
- input *types.Block) (verified, delivered []*types.Block, err error) {
-
+ input *types.Block) (delivered []*types.Block, err error) {
var (
- tip, b *types.Block
- toDelivered []*types.Block
+ b *types.Block
inLattice []*types.Block
+ toDelivered []*types.Block
deliveredMode uint32
)
- s.lock.Lock()
- defer s.lock.Unlock()
- if inLattice, err = s.data.addBlock(input); err != nil {
- // TODO(mission): if sanity check failed with "acking block doesn't
- // exists", we should keep it in a pool.
- s.logger.Error("Sanity Check failed when adding blocks", "error", err)
+
+ if inLattice, err = s.addBlockToLattice(input); err != nil {
return
}
- // TODO(mission): remove this hack, BA related stuffs should not
- // be done here.
- if s.debug != nil {
- s.debug.StronglyAcked(input.Hash)
- }
- s.logger.Debug("Calling Application.BlockConfirmed", "block", input)
- s.app.BlockConfirmed(*input.Clone())
- // Purge blocks in pool with the same chainID and lower height.
- s.pool.purgeBlocks(input.Position.ChainID, input.Position.Height)
- // Replay tips in pool to check their validity.
- for i := uint32(0); i < s.chainNum; i++ {
- if tip = s.pool.tip(i); tip == nil {
- continue
- }
- err = s.data.sanityCheck(tip)
- if err == nil {
- verified = append(verified, tip)
- }
- if err == ErrAckingBlockNotExists {
- continue
- }
- s.pool.removeTip(i)
+
+ if len(inLattice) == 0 {
+ return
}
+
// Perform total ordering for each block added to lattice.
for _, b = range inLattice {
toDelivered, deliveredMode, err = s.toModule.processBlock(b)
@@ -265,3 +295,14 @@ func (s *Lattice) AppendConfig(round uint64, config *types.Config) (err error) {
}
return
}
+
+// ProcessFinalizedBlock is used for syncing lattice data.
+func (s *Lattice) ProcessFinalizedBlock(input *types.Block) {
+ defer func() { s.retryAdd = true }()
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ if err := s.data.addFinalizedBlock(input); err != nil {
+ panic(err)
+ }
+ s.pool.purgeBlocks(input.Position.ChainID, input.Position.Height)
+}
diff --git a/core/lattice_test.go b/core/lattice_test.go
index bf4138a..ff479bc 100644
--- a/core/lattice_test.go
+++ b/core/lattice_test.go
@@ -53,38 +53,37 @@ func (mgr *testLatticeMgr) prepareBlock(
func (mgr *testLatticeMgr) processBlock(b *types.Block) (err error) {
var (
delivered []*types.Block
- verified []*types.Block
- pendings = []*types.Block{b}
)
if err = mgr.lattice.SanityCheck(b); err != nil {
- if err == ErrAckingBlockNotExists {
+ if _, ok := err.(*ErrAckingBlockNotExists); ok {
err = nil
+ } else {
+ return
}
- return
}
- for {
- if len(pendings) == 0 {
- break
- }
- b, pendings = pendings[0], pendings[1:]
- if verified, delivered, err = mgr.lattice.ProcessBlock(b); err != nil {
+ if err = mgr.db.Put(*b); err != nil {
+ if err != blockdb.ErrBlockExists {
return
}
- // Deliver blocks.
- for _, b = range delivered {
- if err = mgr.ccModule.processBlock(b); err != nil {
- return
- }
- if err = mgr.db.Put(*b); err != nil {
- return
- }
- mgr.app.BlockDelivered(b.Hash, b.Finalization)
+ err = nil
+ }
+ if delivered, err = mgr.lattice.ProcessBlock(b); err != nil {
+ return
+ }
+ // Deliver blocks.
+ for _, b = range delivered {
+ if err = mgr.ccModule.processBlock(b); err != nil {
+ return
}
- if err = mgr.lattice.PurgeBlocks(delivered); err != nil {
+ }
+ for _, b = range mgr.ccModule.extractBlocks() {
+ if err = mgr.db.Update(*b); err != nil {
return
}
- // Update pending blocks for verified block (pass sanity check).
- pendings = append(pendings, verified...)
+ mgr.app.BlockDelivered(b.Hash, b.Finalization)
+ }
+ if err = mgr.lattice.PurgeBlocks(delivered); err != nil {
+ return
}
return
}
@@ -112,6 +111,10 @@ func (s *LatticeTestSuite) newTestLatticeMgr(
// Setup compaction chain.
cc := newCompactionChain(gov)
cc.init(&types.Block{})
+ mock := newMockTSigVerifier(true)
+ for i := 0; i < cc.tsigVerifier.cacheSize; i++ {
+ cc.tsigVerifier.verifier[uint64(i)] = mock
+ }
// Setup lattice.
return &testLatticeMgr{
ccModule: cc,
@@ -139,6 +142,7 @@ func (s *LatticeTestSuite) TestBasicUsage() {
err error
cfg = types.Config{
NumChains: chainNum,
+ NotarySetSize: chainNum,
PhiRatio: float32(2) / float32(3),
K: 0,
MinBlockInterval: 0,
@@ -158,14 +162,14 @@ func (s *LatticeTestSuite) TestBasicUsage() {
req.NotNil(b)
req.NoError(err)
// We've ignored the error for "acking blocks don't exist".
- req.Nil(master.processBlock(b))
+ req.NoError(master.processBlock(b))
}
for i := 0; i < (blockNum - int(chainNum)); i++ {
b, err := master.prepareBlock(uint32(rand.Intn(int(chainNum))))
req.NotNil(b)
req.NoError(err)
// We've ignored the error for "acking blocks don't exist".
- req.Nil(master.processBlock(b))
+ req.NoError(master.processBlock(b))
}
// Now we have some blocks, replay them on different lattices.
iter, err := master.db.GetAll()
@@ -185,7 +189,7 @@ func (s *LatticeTestSuite) TestBasicUsage() {
}
}
req.NoError(err)
- req.Nil(other.processBlock(&b))
+ req.NoError(other.processBlock(&b))
revealed += b.Hash.String() + ","
revealSeq[revealed] = struct{}{}
}
@@ -195,12 +199,142 @@ func (s *LatticeTestSuite) TestBasicUsage() {
req.True(len(revealSeq) > 1)
// Make sure nothing goes wrong.
for i, app := range apps {
- req.Nil(app.Verify())
+ err := app.Verify()
+ req.NoError(err)
for j, otherApp := range apps {
if i >= j {
continue
}
- req.Nil(app.Compare(otherApp))
+ err := app.Compare(otherApp)
+ s.NoError(err)
+ }
+ }
+}
+
+func (s *LatticeTestSuite) TestSync() {
+ // One Lattice prepare blocks on chains randomly selected each time
+ // and process it. Those generated blocks and kept into a buffer, and
+ // process by other Lattice instances with random order.
+ var (
+ blockNum = 500
+ // The first `desyncNum` blocks revealed are considered "desynced" and will
+ // not be delivered to lattice. After `syncNum` blocks have revealed, the
+ // system is considered "synced" and start feeding blocks that are desynced
+ // to processFinalizedBlock.
+ desyncNum = 50
+ syncNum = 150
+ chainNum = uint32(19)
+ otherLatticeNum = 50
+ req = s.Require()
+ err error
+ cfg = types.Config{
+ NumChains: chainNum,
+ NotarySetSize: chainNum,
+ PhiRatio: float32(2) / float32(3),
+ K: 0,
+ MinBlockInterval: 0,
+ MaxBlockInterval: 3000 * time.Second,
+ RoundInterval: time.Hour,
+ }
+ dMoment = time.Now().UTC()
+ master = s.newTestLatticeMgr(&cfg, dMoment)
+ //apps = []*test.App{master.app}
+ revealSeq = map[string]struct{}{}
+ )
+ // Make sure the test setup is correct.
+ s.Require().True(syncNum > desyncNum)
+ // Master-lattice generates blocks.
+ for i := uint32(0); i < chainNum; i++ {
+ // Produce genesis blocks should be delivered before all other blocks,
+ // or the consensus time would be wrong.
+ b, err := master.prepareBlock(i)
+ req.NotNil(b)
+ req.NoError(err)
+ // We've ignored the error for "acking blocks don't exist".
+ req.NoError(master.processBlock(b))
+ }
+ for i := 0; i < (blockNum - int(chainNum)); i++ {
+ b, err := master.prepareBlock(uint32(rand.Intn(int(chainNum))))
+ req.NotNil(b)
+ req.NoError(err)
+ // We've ignored the error for "acking blocks don't exist".
+ req.NoError(master.processBlock(b))
+ }
+ req.NoError(master.app.Verify())
+ // Now we have some blocks, replay them on different lattices.
+ iter, err := master.db.GetAll()
+ req.NoError(err)
+ revealer, err := test.NewRandomTipRevealer(iter)
+ req.NoError(err)
+ for i := 0; i < otherLatticeNum; i++ {
+ synced := false
+ syncFromHeight := uint64(0)
+ revealer.Reset()
+ revealed := ""
+ other := s.newTestLatticeMgr(&cfg, dMoment)
+ chainTip := make([]*types.Block, chainNum)
+ for height := 0; ; height++ {
+ b, err := revealer.Next()
+ if err != nil {
+ if err == blockdb.ErrIterationFinished {
+ err = nil
+ break
+ }
+ }
+ req.NoError(err)
+ if height >= syncNum && !synced {
+ synced = true
+ syncToHeight := uint64(0)
+ for _, block := range chainTip {
+ if block == nil {
+ synced = false
+ continue
+ }
+ result, exist := master.app.Delivered[block.Hash]
+ req.True(exist)
+ if syncToHeight < result.ConsensusHeight {
+ syncToHeight = result.ConsensusHeight
+ }
+ }
+
+ for idx := syncFromHeight; idx < syncToHeight; idx++ {
+ block, err := master.db.Get(master.app.DeliverSequence[idx])
+ req.Equal(idx+1, block.Finalization.Height)
+ req.NoError(err)
+ if err = other.db.Put(block); err != nil {
+ req.Equal(blockdb.ErrBlockExists, err)
+ }
+ other.ccModule.processFinalizedBlock(&block)
+ }
+ extracted := other.ccModule.extractFinalizedBlocks()
+ req.Len(extracted, int(syncToHeight-syncFromHeight))
+ for _, block := range extracted {
+ other.app.StronglyAcked(block.Hash)
+ other.lattice.ProcessFinalizedBlock(block)
+ }
+ syncFromHeight = syncToHeight
+ }
+ if height > desyncNum {
+ if chainTip[b.Position.ChainID] == nil {
+ chainTip[b.Position.ChainID] = &b
+ }
+ if err = other.db.Put(b); err != nil {
+ req.Equal(blockdb.ErrBlockExists, err)
+ }
+ delivered, err := other.lattice.addBlockToLattice(&b)
+ req.NoError(err)
+ revealed += b.Hash.String() + ","
+ revealSeq[revealed] = struct{}{}
+ req.NoError(other.lattice.PurgeBlocks(delivered))
+ // TODO(jimmy-dexon): check if delivered set is a DAG.
+ } else {
+ other.app.StronglyAcked(b.Hash)
+ }
+ }
+ for b := range master.app.Acked {
+ if _, exist := other.app.Acked[b]; !exist {
+ s.FailNowf("Block not delivered", "%s not exists", b)
+ }
}
}
}
diff --git a/core/test/app.go b/core/test/app.go
index ba949b3..546c9e5 100644
--- a/core/test/app.go
+++ b/core/test/app.go
@@ -29,7 +29,7 @@ import (
var (
// ErrEmptyDeliverSequence means there is no delivery event in this App
// instance.
- ErrEmptyDeliverSequence = fmt.Errorf("emptry deliver sequence")
+ ErrEmptyDeliverSequence = fmt.Errorf("empty deliver sequence")
// ErrMismatchBlockHashSequence means the delivering sequence between two App
// instances are different.
ErrMismatchBlockHashSequence = fmt.Errorf("mismatch block hash sequence")
@@ -43,6 +43,10 @@ var (
// consensus timestamp older than previous block.
ErrConsensusTimestampOutOfOrder = fmt.Errorf(
"consensus timestamp out of order")
+ // ErrConsensusHeightOutOfOrder means the later delivered block has
+ // consensus height not equal to height of previous block plus one.
+ ErrConsensusHeightOutOfOrder = fmt.Errorf(
+ "consensus height out of order")
// ErrDeliveredBlockNotAcked means some block delivered (confirmed) but
// not strongly acked.
ErrDeliveredBlockNotAcked = fmt.Errorf("delivered block not acked")
@@ -69,8 +73,9 @@ type AppTotalOrderRecord struct {
// AppDeliveredRecord caches information when this application received
// a block delivered notification.
type AppDeliveredRecord struct {
- ConsensusTime time.Time
- When time.Time
+ ConsensusTime time.Time
+ ConsensusHeight uint64
+ When time.Time
}
// App implements Application interface for testing purpose.
@@ -151,8 +156,9 @@ func (app *App) BlockDelivered(
defer app.deliveredLock.Unlock()
app.Delivered[blockHash] = &AppDeliveredRecord{
- ConsensusTime: result.Timestamp,
- When: time.Now().UTC(),
+ ConsensusTime: result.Timestamp,
+ ConsensusHeight: result.Height,
+ When: time.Now().UTC(),
}
app.DeliverSequence = append(app.DeliverSequence, blockHash)
}
@@ -201,6 +207,7 @@ func (app *App) Verify() error {
app.ackedLock.RLock()
defer app.ackedLock.RUnlock()
+ expectHeight := uint64(1)
prevTime := time.Time{}
for _, h := range app.DeliverSequence {
// Make sure delivered block is strongly acked.
@@ -218,6 +225,12 @@ func (app *App) Verify() error {
return ErrConsensusTimestampOutOfOrder
}
prevTime = rec.ConsensusTime
+
+ // Make sure the consensus height is incremental.
+ if expectHeight != rec.ConsensusHeight {
+ return ErrConsensusHeightOutOfOrder
+ }
+ expectHeight++
}
// Make sure the order of delivered and total ordering are the same by
// comparing the concated string.
diff --git a/core/test/app_test.go b/core/test/app_test.go
index 8f2aae5..823bde0 100644
--- a/core/test/app_test.go
+++ b/core/test/app_test.go
@@ -75,14 +75,16 @@ func (s *AppTestSuite) deliverBlockWithTimeFromSequenceLength(
app *App, hash common.Hash) {
s.deliverBlock(app, hash, time.Time{}.Add(
- time.Duration(len(app.DeliverSequence))*time.Second))
+ time.Duration(len(app.DeliverSequence))*time.Second),
+ uint64(len(app.DeliverSequence)+1))
}
func (s *AppTestSuite) deliverBlock(
- app *App, hash common.Hash, timestamp time.Time) {
+ app *App, hash common.Hash, timestamp time.Time, height uint64) {
app.BlockDelivered(hash, types.FinalizationResult{
Timestamp: timestamp,
+ Height: height,
})
}
@@ -113,7 +115,8 @@ func (s *AppTestSuite) TestCompare() {
wrongTime := time.Time{}.Add(
time.Duration(len(app3.DeliverSequence)) * time.Second)
wrongTime = wrongTime.Add(1 * time.Second)
- s.deliverBlock(app3, s.to3.BlockHashes[0], wrongTime)
+ s.deliverBlock(app3, s.to3.BlockHashes[0], wrongTime,
+ uint64(len(app3.DeliverSequence)+1))
req.Equal(ErrMismatchConsensusTime, app1.Compare(app3))
req.Equal(ErrMismatchConsensusTime, app3.Compare(app1))
// An App without any delivered blocks.
@@ -130,9 +133,10 @@ func (s *AppTestSuite) TestVerify() {
s.setupAppByTotalOrderDeliver(app1, s.to1)
s.setupAppByTotalOrderDeliver(app1, s.to2)
s.setupAppByTotalOrderDeliver(app1, s.to3)
- req.Nil(app1.Verify())
+ req.NoError(app1.Verify())
// A delivered block without strongly ack
- s.deliverBlock(app1, common.NewRandomHash(), time.Time{})
+ s.deliverBlock(app1, common.NewRandomHash(), time.Time{},
+ uint64(len(app1.DeliverSequence)))
req.Equal(ErrDeliveredBlockNotAcked, app1.Verify())
// The consensus time is out of order.
app2 := NewApp()
@@ -141,7 +145,8 @@ func (s *AppTestSuite) TestVerify() {
app2.StronglyAcked(h)
}
app2.TotalOrderingDelivered(s.to2.BlockHashes, s.to2.Mode)
- s.deliverBlock(app2, s.to2.BlockHashes[0], time.Time{})
+ s.deliverBlock(app2, s.to2.BlockHashes[0], time.Time{},
+ uint64(len(app2.DeliverSequence)+1))
req.Equal(ErrConsensusTimestampOutOfOrder, app2.Verify())
// A delivered block is not found in total ordering delivers.
app3 := NewApp()
@@ -164,6 +169,17 @@ func (s *AppTestSuite) TestVerify() {
// Witness ack on unknown block.
app5 := NewApp()
s.setupAppByTotalOrderDeliver(app5, s.to1)
+ // The conensus height is out of order.
+ app6 := NewApp()
+ s.setupAppByTotalOrderDeliver(app6, s.to1)
+ for _, h := range s.to2.BlockHashes {
+ app6.StronglyAcked(h)
+ }
+ app6.TotalOrderingDelivered(s.to2.BlockHashes, s.to2.Mode)
+ s.deliverBlock(app6, s.to2.BlockHashes[0], time.Time{}.Add(
+ time.Duration(len(app6.DeliverSequence))*time.Second),
+ uint64(len(app6.DeliverSequence)+2))
+ req.Equal(ErrConsensusHeightOutOfOrder, app6.Verify())
}
func TestApp(t *testing.T) {
diff --git a/core/test/revealer.go b/core/test/revealer.go
index 80d2a30..c9d82ce 100644
--- a/core/test/revealer.go
+++ b/core/test/revealer.go
@@ -1,6 +1,23 @@
// Copyright 2018 The dexon-consensus-core Authors
// This file is part of the dexon-consensus-core library.
//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
// The dexon-consensus-core library is free software: you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
@@ -205,3 +222,61 @@ func (r *RandomRevealer) Reset() {
}
r.remains = hashes
}
+
+// RandomTipRevealer implements Revealer interface, which would load
+// all blocks from blockdb, and randomly pick one chain's tip to reveal.
+type RandomTipRevealer struct {
+ chainsBlock []map[uint64]*types.Block
+ chainTip []uint64
+ chainRevealSeq []uint32
+ revealed int
+ randGen *rand.Rand
+}
+
+// NewRandomTipRevealer constructs RandomTipRevealer.
+func NewRandomTipRevealer(
+ iter blockdb.BlockIterator) (r *RandomTipRevealer, err error) {
+
+ blocks, err := loadAllBlocks(iter)
+ if err != nil {
+ return
+ }
+ r = &RandomTipRevealer{
+ randGen: rand.New(rand.NewSource(time.Now().UnixNano())),
+ }
+ for _, b := range blocks {
+ for b.Position.ChainID >= uint32(len(r.chainsBlock)) {
+ r.chainsBlock = append(r.chainsBlock, make(map[uint64]*types.Block))
+ r.chainTip = append(r.chainTip, 0)
+ }
+ r.chainsBlock[b.Position.ChainID][b.Position.Height] = b
+ r.chainRevealSeq = append(r.chainRevealSeq, b.Position.ChainID)
+ }
+ r.Reset()
+ return
+}
+
+// Next implements Revealer.Next method, which would reveal blocks randomly.
+func (r *RandomTipRevealer) Next() (types.Block, error) {
+ if len(r.chainRevealSeq) == r.revealed {
+ return types.Block{}, blockdb.ErrIterationFinished
+ }
+
+ picked := r.chainRevealSeq[r.revealed]
+ r.revealed++
+ block := r.chainsBlock[picked][r.chainTip[picked]]
+ r.chainTip[picked]++
+ return *block, nil
+}
+
+// Reset implement Revealer.Reset method, which would reset revealing.
+func (r *RandomTipRevealer) Reset() {
+ r.revealed = 0
+ r.randGen.Shuffle(len(r.chainRevealSeq), func(i, j int) {
+ r.chainRevealSeq[i], r.chainRevealSeq[j] =
+ r.chainRevealSeq[j], r.chainRevealSeq[i]
+ })
+ for i := range r.chainTip {
+ r.chainTip[i] = 0
+ }
+}
diff --git a/core/test/revealer_test.go b/core/test/revealer_test.go
index 8bb46bc..4945d62 100644
--- a/core/test/revealer_test.go
+++ b/core/test/revealer_test.go
@@ -135,6 +135,29 @@ func (s *RevealerTestSuite) TestRandomDAGReveal() {
s.baseTest(revealer, 10, checkFunc)
}
+func (s *RevealerTestSuite) TestRandomTipReveal() {
+ // This test case would make sure we could at least generate
+ // two different revealing sequence when revealing more than
+ // 10 times.
+ iter, err := s.db.GetAll()
+ s.Require().Nil(err)
+ revealer, err := NewRandomTipRevealer(iter)
+ s.Require().Nil(err)
+
+ checkFunc := func(b *types.Block, revealed map[common.Hash]struct{}) {
+ // Make sure the revealer won't reveal the same block twice.
+ _, alreadyRevealed := revealed[b.Hash]
+ s.False(alreadyRevealed)
+ // Make sure the parent is already revealed.
+ if b.Position.Height == 0 {
+ return
+ }
+ _, alreadyRevealed = revealed[b.ParentHash]
+ s.True(alreadyRevealed)
+ }
+ s.baseTest(revealer, 10, checkFunc)
+}
+
func TestRevealer(t *testing.T) {
suite.Run(t, new(RevealerTestSuite))
}
diff --git a/core/types/position.go b/core/types/position.go
index f41be32..d821d16 100644
--- a/core/types/position.go
+++ b/core/types/position.go
@@ -57,6 +57,16 @@ func (pos *Position) Newer(other *Position) bool {
(pos.Round == other.Round && pos.Height > other.Height)
}
+// Older checks if one block is older than another one on the same chain.
+// If two blocks on different chain compared by this function, it would panic.
+func (pos *Position) Older(other *Position) bool {
+ if pos.ChainID != other.ChainID {
+ panic(ErrComparePositionOnDifferentChains)
+ }
+ return pos.Round < other.Round ||
+ (pos.Round == other.Round && pos.Height < other.Height)
+}
+
// Clone a position instance.
func (pos *Position) Clone() *Position {
return &Position{
diff --git a/core/types/position_test.go b/core/types/position_test.go
index 48f8dbd..1ef1813 100644
--- a/core/types/position_test.go
+++ b/core/types/position_test.go
@@ -59,6 +59,37 @@ func (s *PositionTestSuite) TestNewer() {
}))
}
+func (s *PositionTestSuite) TestOlder() {
+ pos := Position{
+ Round: 1,
+ ChainID: 1,
+ Height: 1,
+ }
+ s.Panics(func() {
+ pos.Older(&Position{ChainID: 2})
+ })
+ s.False(pos.Older(&Position{
+ Round: 0,
+ ChainID: 1,
+ Height: 0,
+ }))
+ s.False(pos.Older(&Position{
+ Round: 1,
+ ChainID: 1,
+ Height: 0,
+ }))
+ s.True(pos.Older(&Position{
+ Round: 2,
+ ChainID: 1,
+ Height: 0,
+ }))
+ s.True(pos.Older(&Position{
+ Round: 1,
+ ChainID: 1,
+ Height: 100,
+ }))
+}
+
func (s *PositionTestSuite) TestSearchInAsendingOrder() {
positions := []*Position{
&Position{Round: 0, Height: 1},
diff --git a/integration_test/node.go b/integration_test/node.go
index a26c005..0fb661b 100644
--- a/integration_test/node.go
+++ b/integration_test/node.go
@@ -73,6 +73,7 @@ type Node struct {
broadcastTargets map[types.NodeID]struct{}
networkLatency test.LatencyModel
proposingLatency test.LatencyModel
+ prevFinalHeight uint64
}
// NewNode constructs an instance of Node.
@@ -185,35 +186,28 @@ func (n *Node) processBlock(b *types.Block) (err error) {
// core/lattice_test.go, except the compaction-chain part.
var (
delivered []*types.Block
- verified []*types.Block
- pendings = []*types.Block{b}
)
if err = n.lattice.SanityCheck(b); err != nil {
- if err == core.ErrAckingBlockNotExists {
+ if _, ok := err.(*core.ErrAckingBlockNotExists); ok {
err = nil
+ } else {
+ return
}
+ }
+ if delivered, err = n.lattice.ProcessBlock(b); err != nil {
return
}
- for {
- if len(pendings) == 0 {
- break
- }
- b, pendings = pendings[0], pendings[1:]
- if verified, delivered, err = n.lattice.ProcessBlock(b); err != nil {
- return
- }
- // Deliver blocks.
- for _, b = range delivered {
- if err = n.db.Put(*b); err != nil {
- return
- }
- n.app.BlockDelivered(b.Hash, b.Finalization)
- }
- if err = n.lattice.PurgeBlocks(delivered); err != nil {
+ // Deliver blocks.
+ for _, b = range delivered {
+ if err = n.db.Put(*b); err != nil {
return
}
- // Update pending blocks for verified block (pass sanity check).
- pendings = append(pendings, verified...)
+ b.Finalization.Height = n.prevFinalHeight + 1
+ n.app.BlockDelivered(b.Hash, b.Finalization)
+ n.prevFinalHeight++
+ }
+ if err = n.lattice.PurgeBlocks(delivered); err != nil {
+ return
}
return
}