diff options
Diffstat (limited to 'core/total-ordering.go')
-rw-r--r-- | core/total-ordering.go | 544 |
1 files changed, 445 insertions, 99 deletions
diff --git a/core/total-ordering.go b/core/total-ordering.go index a1e2e76..182ec6c 100644 --- a/core/total-ordering.go +++ b/core/total-ordering.go @@ -18,7 +18,7 @@ package core import ( - "fmt" + "errors" "math" "sort" "sync" @@ -35,9 +35,18 @@ const ( var ( // ErrNotValidDAG would be reported when block subbmitted to totalOrdering // didn't form a DAG. - ErrNotValidDAG = fmt.Errorf("not a valid dag") - // ErrChainIDNotRecognized means the chain is unknown to this module. - ErrChainIDNotRecognized = fmt.Errorf("chain ID not recognized") + ErrNotValidDAG = errors.New("not a valid dag") + // ErrFutureRoundDelivered means some blocks from later rounds are + // delivered, this means program error. + ErrFutureRoundDelivered = errors.New("future round delivered") + // ErrBlockFromPastRound means we receive some block from past round. + ErrBlockFromPastRound = errors.New("block from past round") + // ErrTotalOrderingHangs means total ordering hangs somewhere. + ErrTotalOrderingHangs = errors.New("total ordering hangs") + // ErrForwardAck means a block acking some blocks from newer round. + ErrForwardAck = errors.New("forward ack") + // ErrUnexpected means general (I'm lazy) errors. + ErrUnexpected = errors.New("unexpected") ) // totalOrderingConfig is the configuration for total ordering. @@ -97,20 +106,20 @@ func (rec *totalOrderingWinRecord) reset() { } } -func newTotalOrderingWinRecord(chainNum uint32) ( +func newTotalOrderingWinRecord(numChains uint32) ( rec *totalOrderingWinRecord) { rec = &totalOrderingWinRecord{} rec.reset() - rec.wins = make([]int8, chainNum) + rec.wins = make([]int8, numChains) return } // grade implements the 'grade' potential function described in white paper. func (rec *totalOrderingWinRecord) grade( - chainNum uint32, phi uint64, globalAnsLength uint64) int { + numChains uint32, phi uint64, globalAnsLength uint64) int { if uint64(rec.count) >= phi { return 1 - } else if uint64(rec.count) < phi-uint64(chainNum)+globalAnsLength { + } else if uint64(rec.count) < phi-uint64(numChains)+globalAnsLength { return 0 } else { return -1 @@ -135,19 +144,38 @@ type totalOrderingObjectCache struct { winRecordContainers [][]*totalOrderingWinRecord ackedVectors []map[common.Hash]struct{} winRecordPool sync.Pool - chainNum uint32 + numChains uint32 } // newTotalOrderingObjectCache constructs an totalOrderingObjectCache // instance. -func newTotalOrderingObjectCache(chainNum uint32) *totalOrderingObjectCache { +func newTotalOrderingObjectCache(numChains uint32) *totalOrderingObjectCache { return &totalOrderingObjectCache{ winRecordPool: sync.Pool{ New: func() interface{} { - return newTotalOrderingWinRecord(chainNum) + return newTotalOrderingWinRecord(numChains) }, }, - chainNum: chainNum, + numChains: numChains, + } +} + +// resize makes sure internal storage of totalOrdering instance can handle +// maximum possible numChains in future configs. +func (cache *totalOrderingObjectCache) resize(numChains uint32) { + // Basically, everything in cache needs to be cleaned. + if cache.numChains >= numChains { + return + } + cache.ackedStatus = nil + cache.heightVectors = nil + cache.winRecordContainers = nil + cache.ackedVectors = nil + cache.numChains = numChains + cache.winRecordPool = sync.Pool{ + New: func() interface{} { + return newTotalOrderingWinRecord(numChains) + }, } } @@ -156,7 +184,7 @@ func newTotalOrderingObjectCache(chainNum uint32) *totalOrderingObjectCache { func (cache *totalOrderingObjectCache) requestAckedStatus() ( acked []*totalOrderingHeightRecord) { if len(cache.ackedStatus) == 0 { - acked = make([]*totalOrderingHeightRecord, cache.chainNum) + acked = make([]*totalOrderingHeightRecord, cache.numChains) for idx := range acked { acked[idx] = &totalOrderingHeightRecord{count: 0} } @@ -199,7 +227,7 @@ func (cache *totalOrderingObjectCache) recycleWinRecord( // of one candidate. func (cache *totalOrderingObjectCache) requestHeightVector() (hv []uint64) { if len(cache.heightVectors) == 0 { - hv = make([]uint64, cache.chainNum) + hv = make([]uint64, cache.numChains) } else { hv, cache.heightVectors = cache.heightVectors[len(cache.heightVectors)-1], @@ -221,7 +249,7 @@ func (cache *totalOrderingObjectCache) recycleHeightVector(hv []uint64) { func (cache *totalOrderingObjectCache) requestWinRecordContainer() ( con []*totalOrderingWinRecord) { if len(cache.winRecordContainers) == 0 { - con = make([]*totalOrderingWinRecord, cache.chainNum) + con = make([]*totalOrderingWinRecord, cache.numChains) } else { con, cache.winRecordContainers = cache.winRecordContainers[len(cache.winRecordContainers)-1], @@ -349,9 +377,11 @@ func (v *totalOrderingCandidateInfo) addBlock(b *types.Block) (err error) { // - k = 1 // then only block height >= 2 would be added to acking node set. func (v *totalOrderingCandidateInfo) getAckingNodeSetLength( - global *totalOrderingCandidateInfo, k uint64) (count uint64) { + global *totalOrderingCandidateInfo, + k uint64, + numChains uint32) (count uint64) { var rec *totalOrderingHeightRecord - for idx, gRec := range global.ackedStatus { + for idx, gRec := range global.ackedStatus[:numChains] { if gRec.count == 0 { continue } @@ -434,7 +464,8 @@ func (v *totalOrderingCandidateInfo) updateWinRecord( otherChainID uint32, other *totalOrderingCandidateInfo, dirtyChainIDs []int, - objCache *totalOrderingObjectCache) { + objCache *totalOrderingObjectCache, + numChains uint32) { var ( idx int height uint64 @@ -448,7 +479,7 @@ func (v *totalOrderingCandidateInfo) updateWinRecord( if win == nil { win = objCache.requestWinRecord() v.winRecords[otherChainID] = win - for idx, height = range v.cachedHeightVector { + for idx, height = range v.cachedHeightVector[:numChains] { if height == infinity { continue } @@ -481,36 +512,197 @@ func (v *totalOrderingCandidateInfo) updateWinRecord( } } +// totalOrderingBreakpoint is a record to store the height discontinuity +// on a chain. +type totalOrderingBreakpoint struct { + roundID uint64 + // height of last block in previous round. + lastHeight uint64 +} + // totalOrderingGroupVector keeps global status of current pending set. type totalOrderingGlobalVector struct { // blocks stores all blocks grouped by their proposers and // sorted by their block height. // - // TODO(mission): the way we use this slice would make it reallocate frequently. + // TODO(mission): the way we use this slice would make it reallocate + // frequently. blocks [][]*types.Block + // breakpoints caches rounds for chains that blocks' height on them are + // not continuous. Ex. + // ChainID Round Height + // 1 0 0 + // 1 0 1 + // 1 1 2 + // 1 1 3 + // 1 1 4 + // 1 3 0 <- a breakpoint for round 3 would be cached + // for chain 1 as (roundID=1, lastHeight=4). + breakpoints [][]*totalOrderingBreakpoint + + // curRound caches the last round ID used to purge breakpoints. + curRound uint64 + + // tips records the last seen block for each chain. + tips []*types.Block + // cachedCandidateInfo is an totalOrderingCandidateInfo instance, // which is just used for actual candidates to calculate height vector. cachedCandidateInfo *totalOrderingCandidateInfo } -func newTotalOrderingGlobalVector( - chainNum uint32) *totalOrderingGlobalVector { +func newTotalOrderingGlobalVector(numChains uint32) *totalOrderingGlobalVector { return &totalOrderingGlobalVector{ - blocks: make([][]*types.Block, chainNum), + blocks: make([][]*types.Block, numChains), + tips: make([]*types.Block, numChains), + breakpoints: make([][]*totalOrderingBreakpoint, numChains), + } +} + +func (global *totalOrderingGlobalVector) resize(numChains uint32) { + if len(global.blocks) >= int(numChains) { + return + } + // Resize blocks. + newBlocks := make([][]*types.Block, numChains) + copy(newBlocks, global.blocks) + global.blocks = newBlocks + // Resize breakpoints. + newBreakPoints := make([][]*totalOrderingBreakpoint, numChains) + copy(newBreakPoints, global.breakpoints) + global.breakpoints = newBreakPoints + // Resize tips. + newTips := make([]*types.Block, numChains) + copy(newTips, global.tips) + global.tips = newTips +} + +func (global *totalOrderingGlobalVector) switchRound(roundID uint64) { + if global.curRound+1 != roundID { + panic(ErrUnexpected) + } + global.curRound = roundID + for chainID, bs := range global.breakpoints { + if len(bs) == 0 { + continue + } + if bs[0].roundID == roundID { + global.breakpoints[chainID] = bs[1:] + } } } -func (global *totalOrderingGlobalVector) addBlock(b *types.Block) (err error) { - blocksFromChain := global.blocks[b.Position.ChainID] - if len(blocksFromChain) > 0 { - lastBlock := blocksFromChain[len(blocksFromChain)-1] - if b.Position.Height-lastBlock.Position.Height != 1 { +func (global *totalOrderingGlobalVector) prepareHeightRecord( + candidate *types.Block, + info *totalOrderingCandidateInfo, + acked map[common.Hash]struct{}) { + var ( + chainID = candidate.Position.ChainID + breakpoints = global.breakpoints[chainID] + breakpoint *totalOrderingBreakpoint + rec *totalOrderingHeightRecord + ) + // Setup height record for own chain. + rec = &totalOrderingHeightRecord{ + minHeight: candidate.Position.Height, + } + if len(breakpoints) == 0 { + rec.count = uint64(len(global.blocks[chainID])) + } else { + rec.count = breakpoints[0].lastHeight - candidate.Position.Height + 1 + } + info.ackedStatus[chainID] = rec + if acked == nil { + return + } + for idx, blocks := range global.blocks { + if idx == int(candidate.Position.ChainID) { + continue + } + breakpoint = nil + if len(global.breakpoints[idx]) > 0 { + breakpoint = global.breakpoints[idx][0] + } + for i, b := range blocks { + if breakpoint != nil && b.Position.Round >= breakpoint.roundID { + break + } + if _, acked := acked[b.Hash]; !acked { + continue + } + // If this block acks this candidate, all newer blocks + // from the same chain also 'indirect' acks it. + rec = info.ackedStatus[idx] + rec.minHeight = b.Position.Height + if breakpoint == nil { + rec.count = uint64(len(blocks) - i) + } else { + rec.count = breakpoint.lastHeight - b.Position.Height + 1 + } + break + } + } + +} + +func (global *totalOrderingGlobalVector) addBlock( + b *types.Block) (pos int, pending bool, err error) { + curPosition := b.Position + tip := global.tips[curPosition.ChainID] + pos = len(global.blocks[curPosition.ChainID]) + if tip != nil { + // Perform light weight sanity check based on tip. + lastPosition := tip.Position + if lastPosition.Round > curPosition.Round { err = ErrNotValidDAG return } + if DiffUint64(lastPosition.Round, curPosition.Round) > 1 { + if curPosition.Height != 0 { + err = ErrNotValidDAG + return + } + // Add breakpoint. + global.breakpoints[curPosition.ChainID] = append( + global.breakpoints[curPosition.ChainID], + &totalOrderingBreakpoint{ + roundID: curPosition.Round, + lastHeight: lastPosition.Height, + }) + } else { + if curPosition.Height != lastPosition.Height+1 { + err = ErrNotValidDAG + return + } + } + } else { + // Assume we run from round 0 (genesis round). Newly added chains + // would go into this case. Make sure blocks from those chains + // are safe to use. + if curPosition.Height != 0 { + err = ErrNotValidDAG + return + } + if curPosition.Round < global.curRound { + err = ErrBlockFromPastRound + return + } + if curPosition.Round > global.curRound { + // Add breakpoint. + global.breakpoints[curPosition.ChainID] = append( + global.breakpoints[curPosition.ChainID], + &totalOrderingBreakpoint{ + roundID: curPosition.Round, + lastHeight: 0, + }) + } } - global.blocks[b.Position.ChainID] = append(blocksFromChain, b) + breakpoints := global.breakpoints[b.Position.ChainID] + pending = len(breakpoints) > 0 && breakpoints[0].roundID <= b.Position.Round + global.blocks[b.Position.ChainID] = append( + global.blocks[b.Position.ChainID], b) + global.tips[b.Position.ChainID] = b return } @@ -518,10 +710,12 @@ func (global *totalOrderingGlobalVector) addBlock(b *types.Block) (err error) { func (global *totalOrderingGlobalVector) updateCandidateInfo( dirtyChainIDs []int, objCache *totalOrderingObjectCache) { var ( - idx int - blocks []*types.Block - info *totalOrderingCandidateInfo - rec *totalOrderingHeightRecord + idx int + blocks []*types.Block + block *types.Block + info *totalOrderingCandidateInfo + rec *totalOrderingHeightRecord + breakpoint *totalOrderingBreakpoint ) if global.cachedCandidateInfo == nil { info = newTotalOrderingCandidateInfo(common.Hash{}, objCache) @@ -530,8 +724,18 @@ func (global *totalOrderingGlobalVector) updateCandidateInfo( continue } rec = info.ackedStatus[idx] - rec.minHeight = blocks[0].Position.Height - rec.count = uint64(len(blocks)) + if len(global.breakpoints[idx]) > 0 { + breakpoint = global.breakpoints[idx][0] + block = blocks[0] + if block.Position.Round >= breakpoint.roundID { + continue + } + rec.minHeight = block.Position.Height + rec.count = breakpoint.lastHeight - block.Position.Height + 1 + } else { + rec.minHeight = blocks[0].Position.Height + rec.count = uint64(len(blocks)) + } } global.cachedCandidateInfo = info } else { @@ -543,8 +747,18 @@ func (global *totalOrderingGlobalVector) updateCandidateInfo( continue } rec = info.ackedStatus[idx] - rec.minHeight = blocks[0].Position.Height - rec.count = uint64(len(blocks)) + if len(global.breakpoints[idx]) > 0 { + breakpoint = global.breakpoints[idx][0] + block = blocks[0] + if block.Position.Round >= breakpoint.roundID { + continue + } + rec.minHeight = block.Position.Height + rec.count = breakpoint.lastHeight - block.Position.Height + 1 + } else { + rec.minHeight = blocks[0].Position.Height + rec.count = uint64(len(blocks)) + } } } return @@ -559,6 +773,17 @@ type totalOrdering struct { // The round of config used when performing total ordering. curRound uint64 + // duringFlush is a flag to switch the flush mode and normal mode. + duringFlush bool + + // flushReadyChains checks if the last block of that chain arrived. Once + // last blocks from all chains in current config are arrived, we can + // perform flush. + flushReadyChains map[uint32]struct{} + + // flush is a map to record which blocks are already flushed. + flushed map[uint32]struct{} + // globalVector group all pending blocks by proposers and // sort them by block height. This structure is helpful when: // @@ -583,7 +808,7 @@ type totalOrdering struct { // candidateChainMapping keeps a mapping from candidate's hash to // their chain IDs. - candidateChainMapping map[common.Hash]uint32 + candidateChainMapping map[uint32]common.Hash // candidateChainIDs records chain ID of all candidates. candidateChainIDs []uint32 @@ -603,7 +828,7 @@ func newTotalOrdering(genesisConfig *totalOrderingConfig) *totalOrdering { dirtyChainIDs: make([]int, 0, genesisConfig.numChains), acked: make(map[common.Hash]map[common.Hash]struct{}), objCache: objCache, - candidateChainMapping: make(map[common.Hash]uint32), + candidateChainMapping: make(map[uint32]common.Hash), candidates: candidates, candidateChainIDs: make([]uint32, 0, genesisConfig.numChains), } @@ -621,9 +846,22 @@ func (to *totalOrdering) appendConfig( to.configs = append( to.configs, newTotalOrderingConfig(to.configs[len(to.configs)-1], config)) + // Resize internal structures. + to.globalVector.resize(config.NumChains) + to.objCache.resize(config.NumChains) + if int(config.NumChains) > len(to.candidates) { + newCandidates := make([]*totalOrderingCandidateInfo, config.NumChains) + copy(newCandidates, to.candidates) + to.candidates = newCandidates + } return nil } +func (to *totalOrdering) switchRound() { + to.curRound++ + to.globalVector.switchRound(to.curRound) +} + // buildBlockRelation populates the acked according their acking relationships. // This function would update all blocks implcitly acked by input block // recursively. @@ -640,6 +878,12 @@ func (to *totalOrdering) buildBlockRelation(b *types.Block) { break } curBlock, toCheck = toCheck[len(toCheck)-1], toCheck[:len(toCheck)-1] + if curBlock.Position.Round > b.Position.Round { + // It's illegal for a block to acking some block from future + // round, this rule should be promised before delivering to + // total ordering. + panic(ErrForwardAck) + } for _, ack = range curBlock.Acks { if acked, exists = to.acked[ack]; !exists { acked = to.objCache.requestAckedVector() @@ -672,7 +916,7 @@ func (to *totalOrdering) clean(b *types.Block) { delete(to.pendings, h) to.candidates[chainID].recycle(to.objCache) to.candidates[chainID] = nil - delete(to.candidateChainMapping, h) + delete(to.candidateChainMapping, chainID) // Remove this candidate from candidate IDs. to.candidateChainIDs = removeFromSortedUint32Slice(to.candidateChainIDs, chainID) @@ -683,18 +927,36 @@ func (to *totalOrdering) clean(b *types.Block) { } // updateVectors is a helper function to update all cached vectors. -func (to *totalOrdering) updateVectors(b *types.Block) (err error) { +func (to *totalOrdering) updateVectors(b *types.Block) (pos int, err error) { var ( candidateHash common.Hash chainID uint32 acked bool + pending bool ) // Update global height vector - if err = to.globalVector.addBlock(b); err != nil { + if pos, pending, err = to.globalVector.addBlock(b); err != nil { + return + } + if to.duringFlush { + // It makes no sense to calculate potential functions of total ordering + // when flushing would be happened. + return + } + if pending { + // The chain of this block contains breakpoints, which means their + // height are not continuous. This implementation of DEXON total + // ordering algorithm assumes the height of blocks in working set should + // be continuous. + // + // To workaround this issue, when block arrived after breakpoints, + // their information would not be contributed to current working set. + // This mechanism works because we switch rounds by flushing and + // reset the whole working set. return } // Update acking status of candidates. - for candidateHash, chainID = range to.candidateChainMapping { + for chainID, candidateHash = range to.candidateChainMapping { if _, acked = to.acked[candidateHash][b.Hash]; !acked { continue } @@ -709,43 +971,18 @@ func (to *totalOrdering) updateVectors(b *types.Block) (err error) { // build totalOrderingCandidateInfo for new candidate. func (to *totalOrdering) prepareCandidate(candidate *types.Block) { var ( - info = newTotalOrderingCandidateInfo( - candidate.Hash, to.objCache) + info = newTotalOrderingCandidateInfo(candidate.Hash, to.objCache) chainID = candidate.Position.ChainID ) to.candidates[chainID] = info - to.candidateChainMapping[candidate.Hash] = chainID + to.candidateChainMapping[chainID] = candidate.Hash // Add index to slot to allocated list, make sure the modified list sorted. to.candidateChainIDs = append(to.candidateChainIDs, chainID) sort.Slice(to.candidateChainIDs, func(i, j int) bool { return to.candidateChainIDs[i] < to.candidateChainIDs[j] }) - info.ackedStatus[chainID] = &totalOrderingHeightRecord{ - minHeight: candidate.Position.Height, - count: uint64(len(to.globalVector.blocks[chainID])), - } - ackedsForCandidate, exists := to.acked[candidate.Hash] - if !exists { - // This candidate is acked by nobody. - return - } - var rec *totalOrderingHeightRecord - for idx, blocks := range to.globalVector.blocks { - if idx == int(chainID) { - continue - } - for i, b := range blocks { - if _, acked := ackedsForCandidate[b.Hash]; !acked { - continue - } - // If this block acks this candidate, all newer blocks - // from the same chain also 'indirect' acks it. - rec = info.ackedStatus[idx] - rec.minHeight = b.Position.Height - rec.count = uint64(len(blocks) - i) - break - } - } + to.globalVector.prepareHeightRecord( + candidate, info, to.acked[candidate.Hash]) return } @@ -762,7 +999,9 @@ func (to *totalOrdering) isAckOnlyPrecedings(b *types.Block) bool { // output is a helper function to finish the delivery of // deliverable preceding set. -func (to *totalOrdering) output(precedings map[common.Hash]struct{}) (ret []*types.Block) { +func (to *totalOrdering) output( + precedings map[common.Hash]struct{}, + numChains uint32) (ret []*types.Block) { for p := range precedings { // Remove the first element from corresponding blockVector. b := to.pendings[p] @@ -781,20 +1020,18 @@ func (to *totalOrdering) output(precedings map[common.Hash]struct{}) (ret []*typ // TODO(mission): only those tips that acking some blocks in // the devliered set should be checked. This // improvment related to the latency introduced by K. - for _, blocks := range to.globalVector.blocks { + for chainID, blocks := range to.globalVector.blocks[:numChains] { if len(blocks) == 0 { continue } - tip := blocks[0] - if _, alreadyCandidate := - to.candidateChainMapping[tip.Hash]; alreadyCandidate { + if _, picked := to.candidateChainMapping[uint32(chainID)]; picked { continue } - if !to.isAckOnlyPrecedings(tip) { + if !to.isAckOnlyPrecedings(blocks[0]) { continue } // Build totalOrderingCandidateInfo for new candidate. - to.prepareCandidate(tip) + to.prepareCandidate(blocks[0]) } return ret } @@ -833,7 +1070,8 @@ func (to *totalOrdering) generateDeliverSet() ( otherChainID, to.candidates[otherChainID], to.dirtyChainIDs, - to.objCache) + to.objCache, + cfg.numChains) } wg.Done() }(chainID, info) @@ -842,7 +1080,8 @@ func (to *totalOrdering) generateDeliverSet() ( // Reset dirty chains. to.dirtyChainIDs = to.dirtyChainIDs[:0] // TODO(mission): ANS should be bound by current numChains. - globalAnsLength := globalInfo.getAckingNodeSetLength(globalInfo, cfg.k) + globalAnsLength := globalInfo.getAckingNodeSetLength( + globalInfo, cfg.k, cfg.numChains) CheckNextCandidateLoop: for _, chainID = range to.candidateChainIDs { info = to.candidates[chainID] @@ -916,7 +1155,7 @@ CheckNextCandidateLoop: for p := range precedings { // TODO(mission): ANS should be bound by current numChains. chainAnsLength = to.candidates[p].getAckingNodeSetLength( - globalInfo, cfg.k) + globalInfo, cfg.k, cfg.numChains) if uint64(chainAnsLength) < uint64(cfg.numChains)-cfg.phi { return false } @@ -946,30 +1185,137 @@ CheckNextCandidateLoop: return } -// processBlock is the entry point of totalOrdering. -func (to *totalOrdering) processBlock(b *types.Block) ( - delivered []*types.Block, early bool, err error) { - // NOTE: I assume the block 'b' is already safe for total ordering. - // That means, all its acking blocks are during/after - // total ordering stage. +// flushBlocks flushes blocks. +func (to *totalOrdering) flushBlocks( + b *types.Block) (flushed []*types.Block, early bool, err error) { cfg := to.configs[to.curRound] - if b.Position.ChainID >= cfg.numChains { - err = ErrChainIDNotRecognized + if cfg.isValidLastBlock(b) { + to.flushReadyChains[b.Position.ChainID] = struct{}{} + } + // Flush blocks until last blocks from all chains are arrived. + if len(to.flushReadyChains) < int(cfg.numChains) { return } - to.pendings[b.Hash] = b - to.buildBlockRelation(b) - if err = to.updateVectors(b); err != nil { + if len(to.flushReadyChains) > int(cfg.numChains) { + // This line should never be reached. + err = ErrFutureRoundDelivered return } - if to.isAckOnlyPrecedings(b) { - to.prepareCandidate(b) + // Dump all blocks in this round. + for { + if len(to.flushed) == int(cfg.numChains) { + break + } + // Dump all candidates without checking potential function. + flushedHashes := make(map[common.Hash]struct{}) + for _, chainID := range to.candidateChainIDs { + candidateBlock := to.pendings[to.candidates[chainID].hash] + if candidateBlock.Position.Round > to.curRound { + continue + } + flushedHashes[candidateBlock.Hash] = struct{}{} + } + if len(flushedHashes) == 0 { + err = ErrTotalOrderingHangs + return + } + flushedBlocks := to.output(flushedHashes, cfg.numChains) + for _, b := range flushedBlocks { + if !cfg.isValidLastBlock(b) { + continue + } + to.flushed[b.Position.ChainID] = struct{}{} + } + flushed = append(flushed, flushedBlocks...) } - // Mark the proposer of incoming block as dirty. - to.dirtyChainIDs = append(to.dirtyChainIDs, int(b.Position.ChainID)) - hashes, early := to.generateDeliverSet() + // Switch back to normal mode: delivered by DEXON total ordering algorithm. + to.duringFlush = false + to.flushed = make(map[uint32]struct{}) + to.flushReadyChains = make(map[uint32]struct{}) + // Clean all cached intermediate stats. + for idx := range to.candidates { + if to.candidates[idx] == nil { + continue + } + to.candidates[idx].recycle(to.objCache) + to.candidates[idx] = nil + } + to.dirtyChainIDs = nil + to.candidateChainMapping = make(map[uint32]common.Hash) + to.candidateChainIDs = nil + to.globalVector.cachedCandidateInfo = nil + to.switchRound() + // Force to pick new candidates. + to.output(map[common.Hash]struct{}{}, to.configs[to.curRound].numChains) + return +} +// deliverBlocks delivers blocks by DEXON total ordering algorithm. +func (to *totalOrdering) deliverBlocks() ( + delivered []*types.Block, early bool, err error) { + hashes, early := to.generateDeliverSet() + cfg := to.configs[to.curRound] // output precedings - delivered = to.output(hashes) + delivered = to.output(hashes, cfg.numChains) + // Check if any block in delivered set are the last block in this round + // of that chain. If yes, flush or round-switching would be performed. + for _, b := range delivered { + if b.Position.Round > to.curRound { + err = ErrFutureRoundDelivered + return + } + if !cfg.isValidLastBlock(b) { + continue + } + if cfg.isFlushRequired { + // Switch to flush mode. + to.duringFlush = true + to.flushReadyChains = make(map[uint32]struct{}) + to.flushed = make(map[uint32]struct{}) + } else { + // Switch round directly. + to.switchRound() + } + break + } + if to.duringFlush { + // Make sure last blocks from all chains are marked as 'flushed'. + for _, b := range delivered { + if !cfg.isValidLastBlock(b) { + continue + } + to.flushReadyChains[b.Position.ChainID] = struct{}{} + to.flushed[b.Position.ChainID] = struct{}{} + } + } return } + +// processBlock is the entry point of totalOrdering. +func (to *totalOrdering) processBlock( + b *types.Block) ([]*types.Block, bool, error) { + // NOTE: I assume the block 'b' is already safe for total ordering. + // That means, all its acking blocks are during/after + // total ordering stage. + cfg := to.configs[to.curRound] + to.pendings[b.Hash] = b + to.buildBlockRelation(b) + pos, err := to.updateVectors(b) + if err != nil { + return nil, false, err + } + // Mark the proposer of incoming block as dirty. + if b.Position.ChainID < cfg.numChains { + to.dirtyChainIDs = append(to.dirtyChainIDs, int(b.Position.ChainID)) + _, picked := to.candidateChainMapping[b.Position.ChainID] + if pos == 0 && !picked { + if to.isAckOnlyPrecedings(b) { + to.prepareCandidate(b) + } + } + } + if to.duringFlush { + return to.flushBlocks(b) + } + return to.deliverBlocks() +} |