aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/blockpool.go15
-rw-r--r--core/consensus.go5
-rw-r--r--core/lattice-data.go181
-rw-r--r--core/lattice-data_test.go39
-rw-r--r--core/lattice.go28
-rw-r--r--core/lattice_test.go5
6 files changed, 195 insertions, 78 deletions
diff --git a/core/blockpool.go b/core/blockpool.go
index bfde881..cece34d 100644
--- a/core/blockpool.go
+++ b/core/blockpool.go
@@ -36,6 +36,21 @@ func newBlockPool(chainNum uint32) (pool blockPool) {
return
}
+// resize the pool if new chain is added.
+func (p *blockPool) resize(num uint32) {
+ if uint32(len(*p)) < num {
+ return
+ }
+ newPool := make([]types.ByHeight, num)
+ copy(newPool, *p)
+ for i := uint32(len(*p)); i < num; i++ {
+ newChain := types.ByHeight{}
+ heap.Init(&newChain)
+ newPool[i] = newChain
+ }
+ *p = newPool
+}
+
// addBlock adds a block into pending set and make sure these
// blocks are sorted by height.
func (p blockPool) addBlock(b *types.Block) {
diff --git a/core/consensus.go b/core/consensus.go
index 92990b0..b1615a2 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -533,7 +533,7 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
if err = con.ccModule.processBlock(b); err != nil {
return
}
- if err = con.db.Update(*b); err != nil {
+ if err = con.db.Put(*b); err != nil {
return
}
go con.event.NotifyTime(b.ConsensusTimestamp)
@@ -543,6 +543,9 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
// nonBlocking and let them recycle the
// block.
}
+ if err = con.lattice.PurgeBlocks(deliveredBlocks); err != nil {
+ return
+ }
return
}
diff --git a/core/lattice-data.go b/core/lattice-data.go
index 75447c6..a674f3c 100644
--- a/core/lattice-data.go
+++ b/core/lattice-data.go
@@ -23,6 +23,7 @@ import (
"time"
"github.com/dexon-foundation/dexon-consensus-core/common"
+ "github.com/dexon-foundation/dexon-consensus-core/core/blockdb"
"github.com/dexon-foundation/dexon-consensus-core/core/types"
)
@@ -44,58 +45,87 @@ var (
ErrInvalidBlockHeight = fmt.Errorf("invalid block height")
ErrAlreadyInLattice = fmt.Errorf("block already in lattice")
ErrIncorrectBlockTime = fmt.Errorf("block timestamp is incorrect")
+ ErrUnknownRoundID = fmt.Errorf("unknown round id")
)
// Errors for method usage
var (
- ErrRoundNotIncreasing = errors.New("round not increasing")
+ ErrRoundNotIncreasing = errors.New("round not increasing")
+ ErrPurgedBlockNotFound = errors.New("purged block not found")
)
+// latticeDataConfig is the configuration for latticeData for each round.
+type latticeDataConfig struct {
+ // Number of chains between runs
+ prevNumChains uint32
+ numChains uint32
+ // Block interval specifies reasonable time difference between
+ // parent/child blocks.
+ minBlockTimeInterval time.Duration
+ maxBlockTimeInterval time.Duration
+ // the range of clean cut at the beginning of this round.
+ cleanCutRange int
+}
+
+// newLatticeDataConfig constructs a latticeDataConfig instance.
+func newLatticeDataConfig(prev, cur *types.Config) *latticeDataConfig {
+ config := &latticeDataConfig{
+ numChains: cur.NumChains,
+ minBlockTimeInterval: cur.MinBlockInterval,
+ maxBlockTimeInterval: cur.MaxBlockInterval,
+ }
+ if prev != nil {
+ config.prevNumChains = prev.NumChains
+ if prev.K != cur.K ||
+ prev.PhiRatio != cur.PhiRatio ||
+ prev.NumChains != cur.NumChains {
+ // K plus 2 is a magic from GOD.
+ config.cleanCutRange = prev.K + 2
+ }
+ }
+ return config
+}
+
// latticeData is a module for storing lattice.
type latticeData struct {
+ // we need blockdb to read blocks purged from cache in memory.
+ db blockdb.Reader
// chains stores chains' blocks and other info.
chains []*chainStatus
-
// blockByHash stores blocks, indexed by block hash.
blockByHash map[common.Hash]*types.Block
-
- // Block interval specifies reasonable time difference between
- // parent/child blocks.
- minBlockTimeInterval time.Duration
- maxBlockTimeInterval time.Duration
-
// This stores configuration for each round.
- numChainsForRounds []uint32
- minRound uint64
+ configs []*latticeDataConfig
+ minRound uint64
}
// newLatticeData creates a new latticeData struct.
-func newLatticeData(
- round uint64,
- chainNum uint32,
- minBlockTimeInterval time.Duration,
- maxBlockTimeInterval time.Duration) (data *latticeData) {
+func newLatticeData(db blockdb.Reader,
+ round uint64, config *latticeDataConfig) (data *latticeData) {
data = &latticeData{
- chains: make([]*chainStatus, chainNum),
- blockByHash: make(map[common.Hash]*types.Block),
- minBlockTimeInterval: minBlockTimeInterval,
- maxBlockTimeInterval: maxBlockTimeInterval,
- numChainsForRounds: []uint32{chainNum},
- minRound: round,
+ db: db,
+ chains: make([]*chainStatus, config.numChains),
+ blockByHash: make(map[common.Hash]*types.Block),
+ configs: []*latticeDataConfig{config},
+ minRound: round,
}
for i := range data.chains {
data.chains[i] = &chainStatus{
ID: uint32(i),
blocks: []*types.Block{},
- nextAck: make([]uint64, chainNum),
+ nextAck: make([]uint64, config.numChains),
}
}
return
}
func (data *latticeData) sanityCheck(b *types.Block) error {
+ config := data.getConfig(b.Position.Round)
+ if config == nil {
+ return ErrUnknownRoundID
+ }
// Check if the chain id is valid.
- if b.Position.ChainID >= uint32(len(data.chains)) {
+ if b.Position.ChainID >= config.numChains {
return ErrInvalidChainID
}
@@ -117,20 +147,23 @@ func (data *latticeData) sanityCheck(b *types.Block) error {
// Check if it acks older blocks.
acksByChainID := make(map[uint32]struct{}, len(data.chains))
for _, hash := range b.Acks {
- if bAck, exist := data.blockByHash[hash]; exist {
- if bAck.Position.Height <
- data.chains[bAck.Position.ChainID].nextAck[b.Position.ChainID] {
- return ErrDoubleAck
- }
- // Check if ack two blocks on the same chain. This would need
- // to check after we replace map with slice for acks.
- if _, acked := acksByChainID[bAck.Position.ChainID]; acked {
- return ErrDuplicatedAckOnOneChain
+ bAck, err := data.findBlock(hash)
+ if err != nil {
+ if err == blockdb.ErrBlockDoesNotExist {
+ return ErrAckingBlockNotExists
}
- acksByChainID[bAck.Position.ChainID] = struct{}{}
- } else {
- return ErrAckingBlockNotExists
+ return err
+ }
+ if bAck.Position.Height <
+ data.chains[bAck.Position.ChainID].nextAck[b.Position.ChainID] {
+ return ErrDoubleAck
}
+ // Check if ack two blocks on the same chain. This would need
+ // to check after we replace map with slice for acks.
+ if _, acked := acksByChainID[bAck.Position.ChainID]; acked {
+ return ErrDuplicatedAckOnOneChain
+ }
+ acksByChainID[bAck.Position.ChainID] = struct{}{}
}
// Check non-genesis blocks if it acks its parent.
@@ -138,7 +171,12 @@ func (data *latticeData) sanityCheck(b *types.Block) error {
if !b.IsAcking(b.ParentHash) {
return ErrNotAckParent
}
- bParent := data.blockByHash[b.ParentHash]
+ bParent, err := data.findBlock(b.ParentHash)
+ if err != nil {
+ // This error should never happened, a parent block is always
+ // an acked block.
+ panic(err)
+ }
if bParent.Position.ChainID != b.Position.ChainID {
return ErrInvalidParentChain
}
@@ -153,8 +191,10 @@ func (data *latticeData) sanityCheck(b *types.Block) error {
return ErrInvalidTimestamp
}
// Check if its timestamp is in expected range.
- if b.Timestamp.Before(bParent.Timestamp.Add(data.minBlockTimeInterval)) ||
- b.Timestamp.After(bParent.Timestamp.Add(data.maxBlockTimeInterval)) {
+ if b.Timestamp.Before(
+ bParent.Timestamp.Add(config.minBlockTimeInterval)) ||
+ b.Timestamp.After(
+ bParent.Timestamp.Add(config.maxBlockTimeInterval)) {
return ErrIncorrectBlockTime
}
@@ -182,7 +222,9 @@ func (data *latticeData) addBlock(
data.blockByHash[block.Hash] = block
// Update nextAcks.
for _, ack := range block.Acks {
- bAck = data.blockByHash[ack]
+ if bAck, err = data.findBlock(ack); err != nil {
+ return
+ }
data.chains[bAck.Position.ChainID].nextAck[block.Position.ChainID] =
bAck.Position.Height + 1
}
@@ -198,9 +240,8 @@ func (data *latticeData) addBlock(
}
allAckingBlockDelivered := true
for _, ack := range tip.Acks {
- bAck, exists := data.blockByHash[ack]
- if !exists {
- continue
+ if bAck, err = data.findBlock(ack); err != nil {
+ return
}
if data.chains[bAck.Position.ChainID].nextOutput >
bAck.Position.Height {
@@ -231,9 +272,7 @@ func (data *latticeData) addBlock(
// This works because blocks of height below this minimum are not going to be
// acked anymore, the ackings of these blocks are illegal.
for _, status := range data.chains {
- for _, h := range status.purge() {
- delete(data.blockByHash, h)
- }
+ status.purge()
}
return
}
@@ -245,6 +284,7 @@ func (data *latticeData) addBlock(
// - Set 'ParentHash' and 'Height' from parent block, if we can't find a
// parent, these fields would be setup like a genesis block.
func (data *latticeData) prepareBlock(block *types.Block) {
+ config := data.getConfig(block.Position.Round)
// Reset fields to make sure we got these information from parent block.
block.Position.Height = 0
block.ParentHash = common.Hash{}
@@ -271,8 +311,8 @@ func (data *latticeData) prepareBlock(block *types.Block) {
block.ParentHash = curBlock.Hash
block.Position.Height = curBlock.Position.Height + 1
block.Witness.Height = curBlock.Witness.Height
- minTimestamp := curBlock.Timestamp.Add(data.minBlockTimeInterval)
- maxTimestamp := curBlock.Timestamp.Add(data.maxBlockTimeInterval)
+ minTimestamp := curBlock.Timestamp.Add(config.minBlockTimeInterval)
+ maxTimestamp := curBlock.Timestamp.Add(config.maxBlockTimeInterval)
if block.Timestamp.Before(minTimestamp) {
block.Timestamp = minTimestamp
} else if block.Timestamp.After(maxTimestamp) {
@@ -290,15 +330,51 @@ func (data *latticeData) nextPosition(chainID uint32) types.Position {
return data.chains[chainID].nextPosition()
}
+// findBlock seeks blocks in memory or db.
+func (data *latticeData) findBlock(h common.Hash) (b *types.Block, err error) {
+ if b = data.blockByHash[h]; b != nil {
+ return
+ }
+ var tmpB types.Block
+ if tmpB, err = data.db.Get(h); err != nil {
+ return
+ }
+ b = &tmpB
+ return
+}
+
+// purgeBlocks purges blocks from cache.
+func (data *latticeData) purgeBlocks(blocks []*types.Block) error {
+ for _, b := range blocks {
+ if _, exists := data.blockByHash[b.Hash]; !exists {
+ return ErrPurgedBlockNotFound
+ }
+ delete(data.blockByHash, b.Hash)
+ }
+ return nil
+}
+
+// getConfig get configuration for lattice-data by round ID.
+func (data *latticeData) getConfig(round uint64) (config *latticeDataConfig) {
+ if round < data.minRound {
+ return
+ }
+ diff := round - data.minRound
+ if diff >= uint64(len(data.configs)) {
+ return
+ }
+ return data.configs[diff]
+}
+
// appendConfig appends a configuration for upcoming round. When you append
// a config for round R, next time you can only append the config for round R+1.
func (data *latticeData) appendConfig(
- round uint64, config *types.Config) (err error) {
+ round uint64, config *latticeDataConfig) (err error) {
- if round != data.minRound+uint64(len(data.numChainsForRounds)) {
+ if round != data.minRound+uint64(len(data.configs)) {
return ErrRoundNotIncreasing
}
- data.numChainsForRounds = append(data.numChainsForRounds, config.NumChains)
+ data.configs = append(data.configs, config)
return nil
}
@@ -375,15 +451,12 @@ func (s *chainStatus) calcPurgeHeight() (safe uint64, ok bool) {
}
// purge blocks if they are safe to be deleted from working set.
-func (s *chainStatus) purge() (purged common.Hashes) {
+func (s *chainStatus) purge() {
safe, ok := s.calcPurgeHeight()
if !ok {
return
}
newMinIndex := safe - s.minHeight + 1
- for _, b := range s.blocks[:newMinIndex] {
- purged = append(purged, b.Hash)
- }
s.blocks = s.blocks[newMinIndex:]
s.minHeight = safe + 1
return
diff --git a/core/lattice-data_test.go b/core/lattice-data_test.go
index 92e66a4..9e3ce52 100644
--- a/core/lattice-data_test.go
+++ b/core/lattice-data_test.go
@@ -54,8 +54,10 @@ func (s *LatticeDataTestSuite) genTestCase1() (data *latticeData) {
req = s.Require()
err error
)
-
- data = newLatticeData(round, chainNum, 2*time.Nanosecond, 1000*time.Second)
+ db, err := blockdb.NewMemBackedBlockDB()
+ req.NoError(err)
+ data = newLatticeData(
+ db, round, s.newConfig(chainNum, 2*time.Nanosecond, 1000*time.Second))
// Add genesis blocks.
for i := uint32(0); i < chainNum; i++ {
b = s.prepareGenesisBlock(i)
@@ -157,6 +159,16 @@ func (s *LatticeDataTestSuite) genTestCase1() (data *latticeData) {
return
}
+func (s *LatticeDataTestSuite) newConfig(numChains uint32,
+ minBlockInterval, maxBlockInterval time.Duration) *latticeDataConfig {
+
+ return &latticeDataConfig{
+ numChains: numChains,
+ minBlockTimeInterval: minBlockInterval,
+ maxBlockTimeInterval: maxBlockInterval,
+ }
+}
+
// hashBlock is a helper to hash a block and check if any error.
func (s *LatticeDataTestSuite) hashBlock(b *types.Block) {
var err error
@@ -352,13 +364,14 @@ func (s *LatticeDataTestSuite) TestSanityCheckInDataLayer() {
b = &types.Block{
ParentHash: h,
Hash: common.NewRandomHash(),
- Timestamp: time.Now().UTC().Add(data.maxBlockTimeInterval),
Position: types.Position{
ChainID: 2,
Height: 1,
},
Acks: common.NewSortedHashes(common.Hashes{h}),
}
+ b.Timestamp = data.chains[2].getBlockByHeight(0).Timestamp.Add(
+ data.getConfig(0).maxBlockTimeInterval + time.Nanosecond)
s.hashBlock(b)
err = data.sanityCheck(b)
req.NotNil(err)
@@ -390,14 +403,16 @@ func (s *LatticeDataTestSuite) TestRandomIntensiveAcking() {
var (
round uint64
chainNum uint32 = 19
- data = newLatticeData(round, chainNum, 0, 1000*time.Second)
req = s.Require()
delivered []*types.Block
extracted []*types.Block
b *types.Block
err error
)
-
+ db, err := blockdb.NewMemBackedBlockDB()
+ req.NoError(err)
+ data := newLatticeData(
+ db, round, s.newConfig(chainNum, 0, 1000*time.Second))
// Generate genesis blocks.
for i := uint32(0); i < chainNum; i++ {
b = s.prepareGenesisBlock(i)
@@ -417,6 +432,10 @@ func (s *LatticeDataTestSuite) TestRandomIntensiveAcking() {
s.hashBlock(b)
delivered, err = data.addBlock(b)
req.Nil(err)
+ for _, b := range delivered {
+ req.NoError(db.Put(*b))
+ }
+ req.NoError(data.purgeBlocks(delivered))
extracted = append(extracted, delivered...)
}
@@ -459,7 +478,8 @@ func (s *LatticeDataTestSuite) TestRandomlyGeneratedBlocks() {
revealedHashesAsString := map[string]struct{}{}
deliveredHashesAsString := map[string]struct{}{}
for i := 0; i < repeat; i++ {
- data := newLatticeData(round, chainNum, 0, 1000*time.Second)
+ data := newLatticeData(
+ nil, round, s.newConfig(chainNum, 0, 1000*time.Second))
deliveredHashes := common.Hashes{}
revealedHashes := common.Hashes{}
revealer.Reset()
@@ -542,7 +562,7 @@ func (s *LatticeDataTestSuite) TestPrepareBlock() {
delivered []*types.Block
err error
data = newLatticeData(
- round, chainNum, 0, 3000*time.Second)
+ nil, round, s.newConfig(chainNum, 0, 3000*time.Second))
)
// Setup genesis blocks.
b00 := s.prepareGenesisBlock(0)
@@ -656,8 +676,7 @@ func (s *LatticeDataTestSuite) TestPurge() {
nextAck: []uint64{1, 1, 1, 1},
nextOutput: 1,
}
- hashes := chain.purge()
- s.Equal(hashes, common.Hashes{b00.Hash})
+ chain.purge()
s.Equal(chain.minHeight, uint64(1))
s.Require().Len(chain.blocks, 2)
s.Equal(chain.blocks[0].Hash, b01.Hash)
@@ -670,7 +689,7 @@ func (s *LatticeDataTestSuite) TestNextPosition() {
s.Equal(data.nextPosition(0), types.Position{ChainID: 0, Height: 4})
// Test 'NextPosition' method when lattice is empty.
- data = newLatticeData(0, 4, 0, 1000*time.Second)
+ data = newLatticeData(nil, 0, s.newConfig(4, 0, 1000*time.Second))
s.Equal(data.nextPosition(0), types.Position{ChainID: 0, Height: 0})
}
diff --git a/core/lattice.go b/core/lattice.go
index 27d38d8..7b34e1a 100644
--- a/core/lattice.go
+++ b/core/lattice.go
@@ -34,7 +34,7 @@ type Lattice struct {
chainNum uint32
app Application
debug Debug
- db blockdb.BlockDatabase
+ lastConfig *types.Config
pool blockPool
data *latticeData
toModule *totalOrdering
@@ -49,19 +49,14 @@ func NewLattice(
app Application,
debug Debug,
db blockdb.BlockDatabase) (s *Lattice) {
- data := newLatticeData(
- round,
- cfg.NumChains,
- cfg.MinBlockInterval,
- cfg.MaxBlockInterval)
s = &Lattice{
authModule: authModule,
chainNum: cfg.NumChains,
app: app,
debug: debug,
- db: db,
+ lastConfig: cfg,
pool: newBlockPool(cfg.NumChains),
- data: data,
+ data: newLatticeData(db, round, newLatticeDataConfig(nil, cfg)),
toModule: newTotalOrdering(
round,
uint64(cfg.K),
@@ -156,9 +151,6 @@ func (s *Lattice) ProcessBlock(
if inLattice, err = s.data.addBlock(input); err != nil {
return
}
- if err = s.db.Put(*input); err != nil {
- return
- }
// TODO(mission): remove this hack, BA related stuffs should not
// be done here.
if s.debug != nil {
@@ -214,13 +206,24 @@ func (s *Lattice) NextPosition(chainID uint32) types.Position {
return s.data.nextPosition(chainID)
}
+// PurgeBlocks from cache of blocks in memory, this is called when the caller
+// make sure those blocks are saved to db.
+func (s *Lattice) PurgeBlocks(blocks []*types.Block) error {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ return s.data.purgeBlocks(blocks)
+}
+
// AppendConfig add new configs for upcoming rounds. If you add a config for
// round R, next time you can only add the config for round R+1.
func (s *Lattice) AppendConfig(round uint64, config *types.Config) (err error) {
s.lock.Lock()
defer s.lock.Unlock()
- if err = s.data.appendConfig(round, config); err != nil {
+ s.pool.resize(config.NumChains)
+ if err = s.data.appendConfig(
+ round, newLatticeDataConfig(s.lastConfig, config)); err != nil {
return
}
if err = s.toModule.appendConfig(round, config); err != nil {
@@ -229,5 +232,6 @@ func (s *Lattice) AppendConfig(round uint64, config *types.Config) (err error) {
if err = s.ctModule.appendConfig(round, config); err != nil {
return
}
+ s.lastConfig = config
return
}
diff --git a/core/lattice_test.go b/core/lattice_test.go
index bf65684..1270a3c 100644
--- a/core/lattice_test.go
+++ b/core/lattice_test.go
@@ -75,11 +75,14 @@ func (mgr *testLatticeMgr) processBlock(b *types.Block) (err error) {
if err = mgr.ccModule.processBlock(b); err != nil {
return
}
- if err = mgr.db.Update(*b); err != nil {
+ if err = mgr.db.Put(*b); err != nil {
return
}
mgr.app.BlockDelivered(*b)
}
+ if err = mgr.lattice.PurgeBlocks(delivered); err != nil {
+ return
+ }
// Update pending blocks for verified block (pass sanity check).
pendings = append(pendings, verified...)
}