aboutsummaryrefslogtreecommitdiffstats
path: root/core/total-ordering.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/total-ordering.go')
-rw-r--r--core/total-ordering.go544
1 files changed, 445 insertions, 99 deletions
diff --git a/core/total-ordering.go b/core/total-ordering.go
index a1e2e76..182ec6c 100644
--- a/core/total-ordering.go
+++ b/core/total-ordering.go
@@ -18,7 +18,7 @@
package core
import (
- "fmt"
+ "errors"
"math"
"sort"
"sync"
@@ -35,9 +35,18 @@ const (
var (
// ErrNotValidDAG would be reported when block subbmitted to totalOrdering
// didn't form a DAG.
- ErrNotValidDAG = fmt.Errorf("not a valid dag")
- // ErrChainIDNotRecognized means the chain is unknown to this module.
- ErrChainIDNotRecognized = fmt.Errorf("chain ID not recognized")
+ ErrNotValidDAG = errors.New("not a valid dag")
+ // ErrFutureRoundDelivered means some blocks from later rounds are
+ // delivered, this means program error.
+ ErrFutureRoundDelivered = errors.New("future round delivered")
+ // ErrBlockFromPastRound means we receive some block from past round.
+ ErrBlockFromPastRound = errors.New("block from past round")
+ // ErrTotalOrderingHangs means total ordering hangs somewhere.
+ ErrTotalOrderingHangs = errors.New("total ordering hangs")
+ // ErrForwardAck means a block acking some blocks from newer round.
+ ErrForwardAck = errors.New("forward ack")
+ // ErrUnexpected means general (I'm lazy) errors.
+ ErrUnexpected = errors.New("unexpected")
)
// totalOrderingConfig is the configuration for total ordering.
@@ -97,20 +106,20 @@ func (rec *totalOrderingWinRecord) reset() {
}
}
-func newTotalOrderingWinRecord(chainNum uint32) (
+func newTotalOrderingWinRecord(numChains uint32) (
rec *totalOrderingWinRecord) {
rec = &totalOrderingWinRecord{}
rec.reset()
- rec.wins = make([]int8, chainNum)
+ rec.wins = make([]int8, numChains)
return
}
// grade implements the 'grade' potential function described in white paper.
func (rec *totalOrderingWinRecord) grade(
- chainNum uint32, phi uint64, globalAnsLength uint64) int {
+ numChains uint32, phi uint64, globalAnsLength uint64) int {
if uint64(rec.count) >= phi {
return 1
- } else if uint64(rec.count) < phi-uint64(chainNum)+globalAnsLength {
+ } else if uint64(rec.count) < phi-uint64(numChains)+globalAnsLength {
return 0
} else {
return -1
@@ -135,19 +144,38 @@ type totalOrderingObjectCache struct {
winRecordContainers [][]*totalOrderingWinRecord
ackedVectors []map[common.Hash]struct{}
winRecordPool sync.Pool
- chainNum uint32
+ numChains uint32
}
// newTotalOrderingObjectCache constructs an totalOrderingObjectCache
// instance.
-func newTotalOrderingObjectCache(chainNum uint32) *totalOrderingObjectCache {
+func newTotalOrderingObjectCache(numChains uint32) *totalOrderingObjectCache {
return &totalOrderingObjectCache{
winRecordPool: sync.Pool{
New: func() interface{} {
- return newTotalOrderingWinRecord(chainNum)
+ return newTotalOrderingWinRecord(numChains)
},
},
- chainNum: chainNum,
+ numChains: numChains,
+ }
+}
+
+// resize makes sure internal storage of totalOrdering instance can handle
+// maximum possible numChains in future configs.
+func (cache *totalOrderingObjectCache) resize(numChains uint32) {
+ // Basically, everything in cache needs to be cleaned.
+ if cache.numChains >= numChains {
+ return
+ }
+ cache.ackedStatus = nil
+ cache.heightVectors = nil
+ cache.winRecordContainers = nil
+ cache.ackedVectors = nil
+ cache.numChains = numChains
+ cache.winRecordPool = sync.Pool{
+ New: func() interface{} {
+ return newTotalOrderingWinRecord(numChains)
+ },
}
}
@@ -156,7 +184,7 @@ func newTotalOrderingObjectCache(chainNum uint32) *totalOrderingObjectCache {
func (cache *totalOrderingObjectCache) requestAckedStatus() (
acked []*totalOrderingHeightRecord) {
if len(cache.ackedStatus) == 0 {
- acked = make([]*totalOrderingHeightRecord, cache.chainNum)
+ acked = make([]*totalOrderingHeightRecord, cache.numChains)
for idx := range acked {
acked[idx] = &totalOrderingHeightRecord{count: 0}
}
@@ -199,7 +227,7 @@ func (cache *totalOrderingObjectCache) recycleWinRecord(
// of one candidate.
func (cache *totalOrderingObjectCache) requestHeightVector() (hv []uint64) {
if len(cache.heightVectors) == 0 {
- hv = make([]uint64, cache.chainNum)
+ hv = make([]uint64, cache.numChains)
} else {
hv, cache.heightVectors =
cache.heightVectors[len(cache.heightVectors)-1],
@@ -221,7 +249,7 @@ func (cache *totalOrderingObjectCache) recycleHeightVector(hv []uint64) {
func (cache *totalOrderingObjectCache) requestWinRecordContainer() (
con []*totalOrderingWinRecord) {
if len(cache.winRecordContainers) == 0 {
- con = make([]*totalOrderingWinRecord, cache.chainNum)
+ con = make([]*totalOrderingWinRecord, cache.numChains)
} else {
con, cache.winRecordContainers =
cache.winRecordContainers[len(cache.winRecordContainers)-1],
@@ -349,9 +377,11 @@ func (v *totalOrderingCandidateInfo) addBlock(b *types.Block) (err error) {
// - k = 1
// then only block height >= 2 would be added to acking node set.
func (v *totalOrderingCandidateInfo) getAckingNodeSetLength(
- global *totalOrderingCandidateInfo, k uint64) (count uint64) {
+ global *totalOrderingCandidateInfo,
+ k uint64,
+ numChains uint32) (count uint64) {
var rec *totalOrderingHeightRecord
- for idx, gRec := range global.ackedStatus {
+ for idx, gRec := range global.ackedStatus[:numChains] {
if gRec.count == 0 {
continue
}
@@ -434,7 +464,8 @@ func (v *totalOrderingCandidateInfo) updateWinRecord(
otherChainID uint32,
other *totalOrderingCandidateInfo,
dirtyChainIDs []int,
- objCache *totalOrderingObjectCache) {
+ objCache *totalOrderingObjectCache,
+ numChains uint32) {
var (
idx int
height uint64
@@ -448,7 +479,7 @@ func (v *totalOrderingCandidateInfo) updateWinRecord(
if win == nil {
win = objCache.requestWinRecord()
v.winRecords[otherChainID] = win
- for idx, height = range v.cachedHeightVector {
+ for idx, height = range v.cachedHeightVector[:numChains] {
if height == infinity {
continue
}
@@ -481,36 +512,197 @@ func (v *totalOrderingCandidateInfo) updateWinRecord(
}
}
+// totalOrderingBreakpoint is a record to store the height discontinuity
+// on a chain.
+type totalOrderingBreakpoint struct {
+ roundID uint64
+ // height of last block in previous round.
+ lastHeight uint64
+}
+
// totalOrderingGroupVector keeps global status of current pending set.
type totalOrderingGlobalVector struct {
// blocks stores all blocks grouped by their proposers and
// sorted by their block height.
//
- // TODO(mission): the way we use this slice would make it reallocate frequently.
+ // TODO(mission): the way we use this slice would make it reallocate
+ // frequently.
blocks [][]*types.Block
+ // breakpoints caches rounds for chains that blocks' height on them are
+ // not continuous. Ex.
+ // ChainID Round Height
+ // 1 0 0
+ // 1 0 1
+ // 1 1 2
+ // 1 1 3
+ // 1 1 4
+ // 1 3 0 <- a breakpoint for round 3 would be cached
+ // for chain 1 as (roundID=1, lastHeight=4).
+ breakpoints [][]*totalOrderingBreakpoint
+
+ // curRound caches the last round ID used to purge breakpoints.
+ curRound uint64
+
+ // tips records the last seen block for each chain.
+ tips []*types.Block
+
// cachedCandidateInfo is an totalOrderingCandidateInfo instance,
// which is just used for actual candidates to calculate height vector.
cachedCandidateInfo *totalOrderingCandidateInfo
}
-func newTotalOrderingGlobalVector(
- chainNum uint32) *totalOrderingGlobalVector {
+func newTotalOrderingGlobalVector(numChains uint32) *totalOrderingGlobalVector {
return &totalOrderingGlobalVector{
- blocks: make([][]*types.Block, chainNum),
+ blocks: make([][]*types.Block, numChains),
+ tips: make([]*types.Block, numChains),
+ breakpoints: make([][]*totalOrderingBreakpoint, numChains),
+ }
+}
+
+func (global *totalOrderingGlobalVector) resize(numChains uint32) {
+ if len(global.blocks) >= int(numChains) {
+ return
+ }
+ // Resize blocks.
+ newBlocks := make([][]*types.Block, numChains)
+ copy(newBlocks, global.blocks)
+ global.blocks = newBlocks
+ // Resize breakpoints.
+ newBreakPoints := make([][]*totalOrderingBreakpoint, numChains)
+ copy(newBreakPoints, global.breakpoints)
+ global.breakpoints = newBreakPoints
+ // Resize tips.
+ newTips := make([]*types.Block, numChains)
+ copy(newTips, global.tips)
+ global.tips = newTips
+}
+
+func (global *totalOrderingGlobalVector) switchRound(roundID uint64) {
+ if global.curRound+1 != roundID {
+ panic(ErrUnexpected)
+ }
+ global.curRound = roundID
+ for chainID, bs := range global.breakpoints {
+ if len(bs) == 0 {
+ continue
+ }
+ if bs[0].roundID == roundID {
+ global.breakpoints[chainID] = bs[1:]
+ }
}
}
-func (global *totalOrderingGlobalVector) addBlock(b *types.Block) (err error) {
- blocksFromChain := global.blocks[b.Position.ChainID]
- if len(blocksFromChain) > 0 {
- lastBlock := blocksFromChain[len(blocksFromChain)-1]
- if b.Position.Height-lastBlock.Position.Height != 1 {
+func (global *totalOrderingGlobalVector) prepareHeightRecord(
+ candidate *types.Block,
+ info *totalOrderingCandidateInfo,
+ acked map[common.Hash]struct{}) {
+ var (
+ chainID = candidate.Position.ChainID
+ breakpoints = global.breakpoints[chainID]
+ breakpoint *totalOrderingBreakpoint
+ rec *totalOrderingHeightRecord
+ )
+ // Setup height record for own chain.
+ rec = &totalOrderingHeightRecord{
+ minHeight: candidate.Position.Height,
+ }
+ if len(breakpoints) == 0 {
+ rec.count = uint64(len(global.blocks[chainID]))
+ } else {
+ rec.count = breakpoints[0].lastHeight - candidate.Position.Height + 1
+ }
+ info.ackedStatus[chainID] = rec
+ if acked == nil {
+ return
+ }
+ for idx, blocks := range global.blocks {
+ if idx == int(candidate.Position.ChainID) {
+ continue
+ }
+ breakpoint = nil
+ if len(global.breakpoints[idx]) > 0 {
+ breakpoint = global.breakpoints[idx][0]
+ }
+ for i, b := range blocks {
+ if breakpoint != nil && b.Position.Round >= breakpoint.roundID {
+ break
+ }
+ if _, acked := acked[b.Hash]; !acked {
+ continue
+ }
+ // If this block acks this candidate, all newer blocks
+ // from the same chain also 'indirect' acks it.
+ rec = info.ackedStatus[idx]
+ rec.minHeight = b.Position.Height
+ if breakpoint == nil {
+ rec.count = uint64(len(blocks) - i)
+ } else {
+ rec.count = breakpoint.lastHeight - b.Position.Height + 1
+ }
+ break
+ }
+ }
+
+}
+
+func (global *totalOrderingGlobalVector) addBlock(
+ b *types.Block) (pos int, pending bool, err error) {
+ curPosition := b.Position
+ tip := global.tips[curPosition.ChainID]
+ pos = len(global.blocks[curPosition.ChainID])
+ if tip != nil {
+ // Perform light weight sanity check based on tip.
+ lastPosition := tip.Position
+ if lastPosition.Round > curPosition.Round {
err = ErrNotValidDAG
return
}
+ if DiffUint64(lastPosition.Round, curPosition.Round) > 1 {
+ if curPosition.Height != 0 {
+ err = ErrNotValidDAG
+ return
+ }
+ // Add breakpoint.
+ global.breakpoints[curPosition.ChainID] = append(
+ global.breakpoints[curPosition.ChainID],
+ &totalOrderingBreakpoint{
+ roundID: curPosition.Round,
+ lastHeight: lastPosition.Height,
+ })
+ } else {
+ if curPosition.Height != lastPosition.Height+1 {
+ err = ErrNotValidDAG
+ return
+ }
+ }
+ } else {
+ // Assume we run from round 0 (genesis round). Newly added chains
+ // would go into this case. Make sure blocks from those chains
+ // are safe to use.
+ if curPosition.Height != 0 {
+ err = ErrNotValidDAG
+ return
+ }
+ if curPosition.Round < global.curRound {
+ err = ErrBlockFromPastRound
+ return
+ }
+ if curPosition.Round > global.curRound {
+ // Add breakpoint.
+ global.breakpoints[curPosition.ChainID] = append(
+ global.breakpoints[curPosition.ChainID],
+ &totalOrderingBreakpoint{
+ roundID: curPosition.Round,
+ lastHeight: 0,
+ })
+ }
}
- global.blocks[b.Position.ChainID] = append(blocksFromChain, b)
+ breakpoints := global.breakpoints[b.Position.ChainID]
+ pending = len(breakpoints) > 0 && breakpoints[0].roundID <= b.Position.Round
+ global.blocks[b.Position.ChainID] = append(
+ global.blocks[b.Position.ChainID], b)
+ global.tips[b.Position.ChainID] = b
return
}
@@ -518,10 +710,12 @@ func (global *totalOrderingGlobalVector) addBlock(b *types.Block) (err error) {
func (global *totalOrderingGlobalVector) updateCandidateInfo(
dirtyChainIDs []int, objCache *totalOrderingObjectCache) {
var (
- idx int
- blocks []*types.Block
- info *totalOrderingCandidateInfo
- rec *totalOrderingHeightRecord
+ idx int
+ blocks []*types.Block
+ block *types.Block
+ info *totalOrderingCandidateInfo
+ rec *totalOrderingHeightRecord
+ breakpoint *totalOrderingBreakpoint
)
if global.cachedCandidateInfo == nil {
info = newTotalOrderingCandidateInfo(common.Hash{}, objCache)
@@ -530,8 +724,18 @@ func (global *totalOrderingGlobalVector) updateCandidateInfo(
continue
}
rec = info.ackedStatus[idx]
- rec.minHeight = blocks[0].Position.Height
- rec.count = uint64(len(blocks))
+ if len(global.breakpoints[idx]) > 0 {
+ breakpoint = global.breakpoints[idx][0]
+ block = blocks[0]
+ if block.Position.Round >= breakpoint.roundID {
+ continue
+ }
+ rec.minHeight = block.Position.Height
+ rec.count = breakpoint.lastHeight - block.Position.Height + 1
+ } else {
+ rec.minHeight = blocks[0].Position.Height
+ rec.count = uint64(len(blocks))
+ }
}
global.cachedCandidateInfo = info
} else {
@@ -543,8 +747,18 @@ func (global *totalOrderingGlobalVector) updateCandidateInfo(
continue
}
rec = info.ackedStatus[idx]
- rec.minHeight = blocks[0].Position.Height
- rec.count = uint64(len(blocks))
+ if len(global.breakpoints[idx]) > 0 {
+ breakpoint = global.breakpoints[idx][0]
+ block = blocks[0]
+ if block.Position.Round >= breakpoint.roundID {
+ continue
+ }
+ rec.minHeight = block.Position.Height
+ rec.count = breakpoint.lastHeight - block.Position.Height + 1
+ } else {
+ rec.minHeight = blocks[0].Position.Height
+ rec.count = uint64(len(blocks))
+ }
}
}
return
@@ -559,6 +773,17 @@ type totalOrdering struct {
// The round of config used when performing total ordering.
curRound uint64
+ // duringFlush is a flag to switch the flush mode and normal mode.
+ duringFlush bool
+
+ // flushReadyChains checks if the last block of that chain arrived. Once
+ // last blocks from all chains in current config are arrived, we can
+ // perform flush.
+ flushReadyChains map[uint32]struct{}
+
+ // flush is a map to record which blocks are already flushed.
+ flushed map[uint32]struct{}
+
// globalVector group all pending blocks by proposers and
// sort them by block height. This structure is helpful when:
//
@@ -583,7 +808,7 @@ type totalOrdering struct {
// candidateChainMapping keeps a mapping from candidate's hash to
// their chain IDs.
- candidateChainMapping map[common.Hash]uint32
+ candidateChainMapping map[uint32]common.Hash
// candidateChainIDs records chain ID of all candidates.
candidateChainIDs []uint32
@@ -603,7 +828,7 @@ func newTotalOrdering(genesisConfig *totalOrderingConfig) *totalOrdering {
dirtyChainIDs: make([]int, 0, genesisConfig.numChains),
acked: make(map[common.Hash]map[common.Hash]struct{}),
objCache: objCache,
- candidateChainMapping: make(map[common.Hash]uint32),
+ candidateChainMapping: make(map[uint32]common.Hash),
candidates: candidates,
candidateChainIDs: make([]uint32, 0, genesisConfig.numChains),
}
@@ -621,9 +846,22 @@ func (to *totalOrdering) appendConfig(
to.configs = append(
to.configs,
newTotalOrderingConfig(to.configs[len(to.configs)-1], config))
+ // Resize internal structures.
+ to.globalVector.resize(config.NumChains)
+ to.objCache.resize(config.NumChains)
+ if int(config.NumChains) > len(to.candidates) {
+ newCandidates := make([]*totalOrderingCandidateInfo, config.NumChains)
+ copy(newCandidates, to.candidates)
+ to.candidates = newCandidates
+ }
return nil
}
+func (to *totalOrdering) switchRound() {
+ to.curRound++
+ to.globalVector.switchRound(to.curRound)
+}
+
// buildBlockRelation populates the acked according their acking relationships.
// This function would update all blocks implcitly acked by input block
// recursively.
@@ -640,6 +878,12 @@ func (to *totalOrdering) buildBlockRelation(b *types.Block) {
break
}
curBlock, toCheck = toCheck[len(toCheck)-1], toCheck[:len(toCheck)-1]
+ if curBlock.Position.Round > b.Position.Round {
+ // It's illegal for a block to acking some block from future
+ // round, this rule should be promised before delivering to
+ // total ordering.
+ panic(ErrForwardAck)
+ }
for _, ack = range curBlock.Acks {
if acked, exists = to.acked[ack]; !exists {
acked = to.objCache.requestAckedVector()
@@ -672,7 +916,7 @@ func (to *totalOrdering) clean(b *types.Block) {
delete(to.pendings, h)
to.candidates[chainID].recycle(to.objCache)
to.candidates[chainID] = nil
- delete(to.candidateChainMapping, h)
+ delete(to.candidateChainMapping, chainID)
// Remove this candidate from candidate IDs.
to.candidateChainIDs =
removeFromSortedUint32Slice(to.candidateChainIDs, chainID)
@@ -683,18 +927,36 @@ func (to *totalOrdering) clean(b *types.Block) {
}
// updateVectors is a helper function to update all cached vectors.
-func (to *totalOrdering) updateVectors(b *types.Block) (err error) {
+func (to *totalOrdering) updateVectors(b *types.Block) (pos int, err error) {
var (
candidateHash common.Hash
chainID uint32
acked bool
+ pending bool
)
// Update global height vector
- if err = to.globalVector.addBlock(b); err != nil {
+ if pos, pending, err = to.globalVector.addBlock(b); err != nil {
+ return
+ }
+ if to.duringFlush {
+ // It makes no sense to calculate potential functions of total ordering
+ // when flushing would be happened.
+ return
+ }
+ if pending {
+ // The chain of this block contains breakpoints, which means their
+ // height are not continuous. This implementation of DEXON total
+ // ordering algorithm assumes the height of blocks in working set should
+ // be continuous.
+ //
+ // To workaround this issue, when block arrived after breakpoints,
+ // their information would not be contributed to current working set.
+ // This mechanism works because we switch rounds by flushing and
+ // reset the whole working set.
return
}
// Update acking status of candidates.
- for candidateHash, chainID = range to.candidateChainMapping {
+ for chainID, candidateHash = range to.candidateChainMapping {
if _, acked = to.acked[candidateHash][b.Hash]; !acked {
continue
}
@@ -709,43 +971,18 @@ func (to *totalOrdering) updateVectors(b *types.Block) (err error) {
// build totalOrderingCandidateInfo for new candidate.
func (to *totalOrdering) prepareCandidate(candidate *types.Block) {
var (
- info = newTotalOrderingCandidateInfo(
- candidate.Hash, to.objCache)
+ info = newTotalOrderingCandidateInfo(candidate.Hash, to.objCache)
chainID = candidate.Position.ChainID
)
to.candidates[chainID] = info
- to.candidateChainMapping[candidate.Hash] = chainID
+ to.candidateChainMapping[chainID] = candidate.Hash
// Add index to slot to allocated list, make sure the modified list sorted.
to.candidateChainIDs = append(to.candidateChainIDs, chainID)
sort.Slice(to.candidateChainIDs, func(i, j int) bool {
return to.candidateChainIDs[i] < to.candidateChainIDs[j]
})
- info.ackedStatus[chainID] = &totalOrderingHeightRecord{
- minHeight: candidate.Position.Height,
- count: uint64(len(to.globalVector.blocks[chainID])),
- }
- ackedsForCandidate, exists := to.acked[candidate.Hash]
- if !exists {
- // This candidate is acked by nobody.
- return
- }
- var rec *totalOrderingHeightRecord
- for idx, blocks := range to.globalVector.blocks {
- if idx == int(chainID) {
- continue
- }
- for i, b := range blocks {
- if _, acked := ackedsForCandidate[b.Hash]; !acked {
- continue
- }
- // If this block acks this candidate, all newer blocks
- // from the same chain also 'indirect' acks it.
- rec = info.ackedStatus[idx]
- rec.minHeight = b.Position.Height
- rec.count = uint64(len(blocks) - i)
- break
- }
- }
+ to.globalVector.prepareHeightRecord(
+ candidate, info, to.acked[candidate.Hash])
return
}
@@ -762,7 +999,9 @@ func (to *totalOrdering) isAckOnlyPrecedings(b *types.Block) bool {
// output is a helper function to finish the delivery of
// deliverable preceding set.
-func (to *totalOrdering) output(precedings map[common.Hash]struct{}) (ret []*types.Block) {
+func (to *totalOrdering) output(
+ precedings map[common.Hash]struct{},
+ numChains uint32) (ret []*types.Block) {
for p := range precedings {
// Remove the first element from corresponding blockVector.
b := to.pendings[p]
@@ -781,20 +1020,18 @@ func (to *totalOrdering) output(precedings map[common.Hash]struct{}) (ret []*typ
// TODO(mission): only those tips that acking some blocks in
// the devliered set should be checked. This
// improvment related to the latency introduced by K.
- for _, blocks := range to.globalVector.blocks {
+ for chainID, blocks := range to.globalVector.blocks[:numChains] {
if len(blocks) == 0 {
continue
}
- tip := blocks[0]
- if _, alreadyCandidate :=
- to.candidateChainMapping[tip.Hash]; alreadyCandidate {
+ if _, picked := to.candidateChainMapping[uint32(chainID)]; picked {
continue
}
- if !to.isAckOnlyPrecedings(tip) {
+ if !to.isAckOnlyPrecedings(blocks[0]) {
continue
}
// Build totalOrderingCandidateInfo for new candidate.
- to.prepareCandidate(tip)
+ to.prepareCandidate(blocks[0])
}
return ret
}
@@ -833,7 +1070,8 @@ func (to *totalOrdering) generateDeliverSet() (
otherChainID,
to.candidates[otherChainID],
to.dirtyChainIDs,
- to.objCache)
+ to.objCache,
+ cfg.numChains)
}
wg.Done()
}(chainID, info)
@@ -842,7 +1080,8 @@ func (to *totalOrdering) generateDeliverSet() (
// Reset dirty chains.
to.dirtyChainIDs = to.dirtyChainIDs[:0]
// TODO(mission): ANS should be bound by current numChains.
- globalAnsLength := globalInfo.getAckingNodeSetLength(globalInfo, cfg.k)
+ globalAnsLength := globalInfo.getAckingNodeSetLength(
+ globalInfo, cfg.k, cfg.numChains)
CheckNextCandidateLoop:
for _, chainID = range to.candidateChainIDs {
info = to.candidates[chainID]
@@ -916,7 +1155,7 @@ CheckNextCandidateLoop:
for p := range precedings {
// TODO(mission): ANS should be bound by current numChains.
chainAnsLength = to.candidates[p].getAckingNodeSetLength(
- globalInfo, cfg.k)
+ globalInfo, cfg.k, cfg.numChains)
if uint64(chainAnsLength) < uint64(cfg.numChains)-cfg.phi {
return false
}
@@ -946,30 +1185,137 @@ CheckNextCandidateLoop:
return
}
-// processBlock is the entry point of totalOrdering.
-func (to *totalOrdering) processBlock(b *types.Block) (
- delivered []*types.Block, early bool, err error) {
- // NOTE: I assume the block 'b' is already safe for total ordering.
- // That means, all its acking blocks are during/after
- // total ordering stage.
+// flushBlocks flushes blocks.
+func (to *totalOrdering) flushBlocks(
+ b *types.Block) (flushed []*types.Block, early bool, err error) {
cfg := to.configs[to.curRound]
- if b.Position.ChainID >= cfg.numChains {
- err = ErrChainIDNotRecognized
+ if cfg.isValidLastBlock(b) {
+ to.flushReadyChains[b.Position.ChainID] = struct{}{}
+ }
+ // Flush blocks until last blocks from all chains are arrived.
+ if len(to.flushReadyChains) < int(cfg.numChains) {
return
}
- to.pendings[b.Hash] = b
- to.buildBlockRelation(b)
- if err = to.updateVectors(b); err != nil {
+ if len(to.flushReadyChains) > int(cfg.numChains) {
+ // This line should never be reached.
+ err = ErrFutureRoundDelivered
return
}
- if to.isAckOnlyPrecedings(b) {
- to.prepareCandidate(b)
+ // Dump all blocks in this round.
+ for {
+ if len(to.flushed) == int(cfg.numChains) {
+ break
+ }
+ // Dump all candidates without checking potential function.
+ flushedHashes := make(map[common.Hash]struct{})
+ for _, chainID := range to.candidateChainIDs {
+ candidateBlock := to.pendings[to.candidates[chainID].hash]
+ if candidateBlock.Position.Round > to.curRound {
+ continue
+ }
+ flushedHashes[candidateBlock.Hash] = struct{}{}
+ }
+ if len(flushedHashes) == 0 {
+ err = ErrTotalOrderingHangs
+ return
+ }
+ flushedBlocks := to.output(flushedHashes, cfg.numChains)
+ for _, b := range flushedBlocks {
+ if !cfg.isValidLastBlock(b) {
+ continue
+ }
+ to.flushed[b.Position.ChainID] = struct{}{}
+ }
+ flushed = append(flushed, flushedBlocks...)
}
- // Mark the proposer of incoming block as dirty.
- to.dirtyChainIDs = append(to.dirtyChainIDs, int(b.Position.ChainID))
- hashes, early := to.generateDeliverSet()
+ // Switch back to normal mode: delivered by DEXON total ordering algorithm.
+ to.duringFlush = false
+ to.flushed = make(map[uint32]struct{})
+ to.flushReadyChains = make(map[uint32]struct{})
+ // Clean all cached intermediate stats.
+ for idx := range to.candidates {
+ if to.candidates[idx] == nil {
+ continue
+ }
+ to.candidates[idx].recycle(to.objCache)
+ to.candidates[idx] = nil
+ }
+ to.dirtyChainIDs = nil
+ to.candidateChainMapping = make(map[uint32]common.Hash)
+ to.candidateChainIDs = nil
+ to.globalVector.cachedCandidateInfo = nil
+ to.switchRound()
+ // Force to pick new candidates.
+ to.output(map[common.Hash]struct{}{}, to.configs[to.curRound].numChains)
+ return
+}
+// deliverBlocks delivers blocks by DEXON total ordering algorithm.
+func (to *totalOrdering) deliverBlocks() (
+ delivered []*types.Block, early bool, err error) {
+ hashes, early := to.generateDeliverSet()
+ cfg := to.configs[to.curRound]
// output precedings
- delivered = to.output(hashes)
+ delivered = to.output(hashes, cfg.numChains)
+ // Check if any block in delivered set are the last block in this round
+ // of that chain. If yes, flush or round-switching would be performed.
+ for _, b := range delivered {
+ if b.Position.Round > to.curRound {
+ err = ErrFutureRoundDelivered
+ return
+ }
+ if !cfg.isValidLastBlock(b) {
+ continue
+ }
+ if cfg.isFlushRequired {
+ // Switch to flush mode.
+ to.duringFlush = true
+ to.flushReadyChains = make(map[uint32]struct{})
+ to.flushed = make(map[uint32]struct{})
+ } else {
+ // Switch round directly.
+ to.switchRound()
+ }
+ break
+ }
+ if to.duringFlush {
+ // Make sure last blocks from all chains are marked as 'flushed'.
+ for _, b := range delivered {
+ if !cfg.isValidLastBlock(b) {
+ continue
+ }
+ to.flushReadyChains[b.Position.ChainID] = struct{}{}
+ to.flushed[b.Position.ChainID] = struct{}{}
+ }
+ }
return
}
+
+// processBlock is the entry point of totalOrdering.
+func (to *totalOrdering) processBlock(
+ b *types.Block) ([]*types.Block, bool, error) {
+ // NOTE: I assume the block 'b' is already safe for total ordering.
+ // That means, all its acking blocks are during/after
+ // total ordering stage.
+ cfg := to.configs[to.curRound]
+ to.pendings[b.Hash] = b
+ to.buildBlockRelation(b)
+ pos, err := to.updateVectors(b)
+ if err != nil {
+ return nil, false, err
+ }
+ // Mark the proposer of incoming block as dirty.
+ if b.Position.ChainID < cfg.numChains {
+ to.dirtyChainIDs = append(to.dirtyChainIDs, int(b.Position.ChainID))
+ _, picked := to.candidateChainMapping[b.Position.ChainID]
+ if pos == 0 && !picked {
+ if to.isAckOnlyPrecedings(b) {
+ to.prepareCandidate(b)
+ }
+ }
+ }
+ if to.duringFlush {
+ return to.flushBlocks(b)
+ }
+ return to.deliverBlocks()
+}