diff options
Diffstat (limited to 'core/total-ordering.go')
-rw-r--r-- | core/total-ordering.go | 536 |
1 files changed, 241 insertions, 295 deletions
diff --git a/core/total-ordering.go b/core/total-ordering.go index a4778f5..9b980c0 100644 --- a/core/total-ordering.go +++ b/core/total-ordering.go @@ -67,21 +67,20 @@ type totalOrderingConfig struct { // In short, only block height equals to (global minimum height + k) // would be taken into consideration. k uint64 + // phi is a const to control how strong the leading preceding block // should be. phi uint64 - // chainNum is the count of chains. - numChains uint32 - // Is round cutting required? + + numChains uint32 isFlushRequired bool } -func (config *totalOrderingConfig) fromConfig( - roundID uint64, cfg *types.Config) { +func (config *totalOrderingConfig) fromConfig(round uint64, cfg *types.Config) { config.k = uint64(cfg.K) config.numChains = cfg.NumChains config.phi = uint64(float32(cfg.NumChains-1)*cfg.PhiRatio + 1) - config.setupRoundBasedFields(roundID, cfg) + config.setupRoundBasedFields(round, cfg) } func newGenesisTotalOrderingConfig( @@ -103,8 +102,8 @@ func newTotalOrderingConfig( return c } -// totalOrderingWinRecord caches which chains this candidate -// wins another one based on their height vector. +// totalOrderingWinRecord caches the comparison of candidates calculated by +// their height vector. type totalOrderingWinRecord struct { wins []int8 count uint @@ -117,15 +116,14 @@ func (rec *totalOrderingWinRecord) reset() { } } -func newTotalOrderingWinRecord(numChains uint32) ( - rec *totalOrderingWinRecord) { - rec = &totalOrderingWinRecord{} - rec.reset() - rec.wins = make([]int8, numChains) - return +func newTotalOrderingWinRecord(numChains uint32) *totalOrderingWinRecord { + return &totalOrderingWinRecord{ + wins: make([]int8, numChains), + count: 0, + } } -// grade implements the 'grade' potential function described in white paper. +// grade implements the 'grade' potential function in algorithm. func (rec *totalOrderingWinRecord) grade( numChains uint32, phi uint64, globalAnsLength uint64) int { if uint64(rec.count) >= phi { @@ -137,31 +135,26 @@ func (rec *totalOrderingWinRecord) grade( } } -// totalOrderingHeightRecord records two things: -// - the minimum heiht of block from that chain acking this block. -// - the count of blocks from that chain acking this block. +// totalOrderingHeightRecord records: +// - the minimum height of block which acks this block. +// - the count of blocks acking this block. type totalOrderingHeightRecord struct{ minHeight, count uint64 } -// totalOrderingObjectCache caches objects for reuse. -// The target object is map because: -// - reuse map would prevent it grows during usage, when map grows, -// hashes of key would be recaculated, bucket reallocated, and values -// are copied. -// However, to reuse a map, we have no easy way to erase its content but -// iterating its keys and delete corresponding values. -type totalOrderingObjectCache struct { - ackedStatus [][]*totalOrderingHeightRecord - heightVectors [][]uint64 - winRecordContainers [][]*totalOrderingWinRecord - ackedVectors []map[common.Hash]struct{} - winRecordPool sync.Pool - numChains uint32 +// totalOrderingCache caches objects for reuse and not being colloected by GC. +// Each cached target has "get-" and "put-" functions for getting and reusing +// of objects. +type totalOrderingCache struct { + ackedStatus [][]*totalOrderingHeightRecord + heightVectors [][]uint64 + winRecords [][]*totalOrderingWinRecord + winRecordPool sync.Pool + ackedVectors []map[common.Hash]struct{} + numChains uint32 } -// newTotalOrderingObjectCache constructs an totalOrderingObjectCache -// instance. -func newTotalOrderingObjectCache(numChains uint32) *totalOrderingObjectCache { - return &totalOrderingObjectCache{ +// newTotalOrderingObjectCache constructs an totalOrderingCache instance. +func newTotalOrderingObjectCache(numChains uint32) *totalOrderingCache { + return &totalOrderingCache{ winRecordPool: sync.Pool{ New: func() interface{} { return newTotalOrderingWinRecord(numChains) @@ -173,14 +166,14 @@ func newTotalOrderingObjectCache(numChains uint32) *totalOrderingObjectCache { // resize makes sure internal storage of totalOrdering instance can handle // maximum possible numChains in future configs. -func (cache *totalOrderingObjectCache) resize(numChains uint32) { +func (cache *totalOrderingCache) resize(numChains uint32) { // Basically, everything in cache needs to be cleaned. if cache.numChains >= numChains { return } cache.ackedStatus = nil cache.heightVectors = nil - cache.winRecordContainers = nil + cache.winRecords = nil cache.ackedVectors = nil cache.numChains = numChains cache.winRecordPool = sync.Pool{ @@ -190,19 +183,17 @@ func (cache *totalOrderingObjectCache) resize(numChains uint32) { } } -// requestAckedStatus requests a structure to record acking status of one -// candidate (or a global view of acking status of pending set). -func (cache *totalOrderingObjectCache) requestAckedStatus() ( +func (cache *totalOrderingCache) getAckedStatus() ( acked []*totalOrderingHeightRecord) { + if len(cache.ackedStatus) == 0 { acked = make([]*totalOrderingHeightRecord, cache.numChains) for idx := range acked { acked[idx] = &totalOrderingHeightRecord{count: 0} } } else { - acked, cache.ackedStatus = - cache.ackedStatus[len(cache.ackedStatus)-1], - cache.ackedStatus[:len(cache.ackedStatus)-1] + acked = cache.ackedStatus[len(cache.ackedStatus)-1] + cache.ackedStatus = cache.ackedStatus[:len(cache.ackedStatus)-1] // Reset acked status. for idx := range acked { acked[idx].count = 0 @@ -211,8 +202,7 @@ func (cache *totalOrderingObjectCache) requestAckedStatus() ( return } -// recycleAckedStatys recycles the structure to record acking status. -func (cache *totalOrderingObjectCache) recycleAckedStatus( +func (cache *totalOrderingCache) putAckedStatus( acked []*totalOrderingHeightRecord) { // If the recycled objects supports lower numChains than we required, // don't recycle it. @@ -222,17 +212,14 @@ func (cache *totalOrderingObjectCache) recycleAckedStatus( cache.ackedStatus = append(cache.ackedStatus, acked) } -// requestWinRecord requests an totalOrderingWinRecord instance. -func (cache *totalOrderingObjectCache) requestWinRecord() ( +func (cache *totalOrderingCache) getWinRecord() ( win *totalOrderingWinRecord) { win = cache.winRecordPool.Get().(*totalOrderingWinRecord) win.reset() return } -// recycleWinRecord recycles an totalOrderingWinRecord instance. -func (cache *totalOrderingObjectCache) recycleWinRecord( - win *totalOrderingWinRecord) { +func (cache *totalOrderingCache) putWinRecord(win *totalOrderingWinRecord) { if win == nil { return } @@ -244,15 +231,12 @@ func (cache *totalOrderingObjectCache) recycleWinRecord( cache.winRecordPool.Put(win) } -// requestHeightVector requests a structure to record acking heights -// of one candidate. -func (cache *totalOrderingObjectCache) requestHeightVector() (hv []uint64) { +func (cache *totalOrderingCache) getHeightVector() (hv []uint64) { if len(cache.heightVectors) == 0 { hv = make([]uint64, cache.numChains) } else { - hv, cache.heightVectors = - cache.heightVectors[len(cache.heightVectors)-1], - cache.heightVectors[:len(cache.heightVectors)-1] + hv = cache.heightVectors[len(cache.heightVectors)-1] + cache.heightVectors = cache.heightVectors[:len(cache.heightVectors)-1] } for idx := range hv { hv[idx] = infinity @@ -260,46 +244,34 @@ func (cache *totalOrderingObjectCache) requestHeightVector() (hv []uint64) { return } -// recycleHeightVector recycles an instance to record acking heights -// of one candidate. -func (cache *totalOrderingObjectCache) recycleHeightVector(hv []uint64) { - // If the recycled objects supports lower numChains than we required, - // don't recycle it. +func (cache *totalOrderingCache) putHeightVector(hv []uint64) { if uint32(len(hv)) != cache.numChains { return } cache.heightVectors = append(cache.heightVectors, hv) } -// requestWinRecordContainer requests a map of totalOrderingWinRecord. -func (cache *totalOrderingObjectCache) requestWinRecordContainer() ( - con []*totalOrderingWinRecord) { - if len(cache.winRecordContainers) == 0 { - con = make([]*totalOrderingWinRecord, cache.numChains) +func (cache *totalOrderingCache) getWinRecords() (w []*totalOrderingWinRecord) { + if len(cache.winRecords) == 0 { + w = make([]*totalOrderingWinRecord, cache.numChains) } else { - con, cache.winRecordContainers = - cache.winRecordContainers[len(cache.winRecordContainers)-1], - cache.winRecordContainers[:len(cache.winRecordContainers)-1] - for idx := range con { - con[idx] = nil + w = cache.winRecords[len(cache.winRecords)-1] + cache.winRecords = cache.winRecords[:len(cache.winRecords)-1] + for idx := range w { + w[idx] = nil } } return } -// recycleWinRecordContainer recycles a map of totalOrderingWinRecord. -func (cache *totalOrderingObjectCache) recycleWinRecordContainer( - con []*totalOrderingWinRecord) { - // If the recycled objects supports lower numChains than we required, - // don't recycle it. - if uint32(len(con)) != cache.numChains { +func (cache *totalOrderingCache) putWinRecords(w []*totalOrderingWinRecord) { + if uint32(len(w)) != cache.numChains { return } - cache.winRecordContainers = append(cache.winRecordContainers, con) + cache.winRecords = append(cache.winRecords, w) } -// requestAckedVector requests an acked vector instance. -func (cache *totalOrderingObjectCache) requestAckedVector() ( +func (cache *totalOrderingCache) getAckedVector() ( acked map[common.Hash]struct{}) { if len(cache.ackedVectors) == 0 { acked = make(map[common.Hash]struct{}) @@ -314,30 +286,26 @@ func (cache *totalOrderingObjectCache) requestAckedVector() ( return } -// recycleAckedVector recycles an acked vector instance. -func (cache *totalOrderingObjectCache) recycleAckedVector( +func (cache *totalOrderingCache) putAckedVector( acked map[common.Hash]struct{}) { - if acked == nil { - return + if acked != nil { + cache.ackedVectors = append(cache.ackedVectors, acked) } - cache.ackedVectors = append(cache.ackedVectors, acked) } -// totalOrderingCandidateInfo describes proceeding status for one candidate, -// including: -// - acked status as height records, which could keep 'how many blocks from -// one chain acking this candidate. -// - cached height vector, which valid height based on K-level used for -// comparison in 'grade' function. -// - cached result of grade function to other candidates. +// totalOrderingCandidateInfo stores proceding status for a candidate including +// - acked status as height records, which keeps the number of blocks from other +// chains acking this candidate. +// - cached height vector, which valids height based on K-level used for +// comparison in 'grade' function. +// - cached result of grade function to other candidates. // // Height Record: -// When block A acks block B, all blocks proposed from the same proposer -// as block A with higher height would also acks block B. Therefore, -// we just need to record: +// When block A acks block B, all blocks proposed from the same proposer as +// block A with higher height also acks block B. Thus records below is needed // - the minimum height of acking block from that proposer // - count of acking blocks from that proposer -// to repsent the acking status for block A. +// to repsent the acking status for block A. type totalOrderingCandidateInfo struct { ackedStatus []*totalOrderingHeightRecord cachedHeightVector []uint64 @@ -345,72 +313,62 @@ type totalOrderingCandidateInfo struct { hash common.Hash } -// newTotalOrderingCandidateInfo constructs an totalOrderingCandidateInfo -// instance. +// newTotalOrderingCandidateInfo creates an totalOrderingCandidateInfo instance. func newTotalOrderingCandidateInfo( candidateHash common.Hash, - objCache *totalOrderingObjectCache) *totalOrderingCandidateInfo { + objCache *totalOrderingCache) *totalOrderingCandidateInfo { return &totalOrderingCandidateInfo{ - ackedStatus: objCache.requestAckedStatus(), - winRecords: objCache.requestWinRecordContainer(), + ackedStatus: objCache.getAckedStatus(), + winRecords: objCache.getWinRecords(), hash: candidateHash, } } -// clean clear information related to another candidate, which should be called -// when that candidate is selected as deliver set. +// clean clears information related to another candidate, which should be called +// when that candidate is selected in deliver set. func (v *totalOrderingCandidateInfo) clean(otherCandidateChainID uint32) { v.winRecords[otherCandidateChainID] = nil } -// recycle objects for later usage, this eases the loading of -// golangs' GC. -func (v *totalOrderingCandidateInfo) recycle( - objCache *totalOrderingObjectCache) { +// recycle recycles objects for later usage, this eases GC's work. +func (v *totalOrderingCandidateInfo) recycle(objCache *totalOrderingCache) { if v.winRecords != nil { for _, win := range v.winRecords { - objCache.recycleWinRecord(win) + objCache.putWinRecord(win) } - objCache.recycleWinRecordContainer(v.winRecords) + objCache.putWinRecords(v.winRecords) } if v.cachedHeightVector != nil { - objCache.recycleHeightVector(v.cachedHeightVector) + objCache.putHeightVector(v.cachedHeightVector) } - objCache.recycleAckedStatus(v.ackedStatus) + objCache.putAckedStatus(v.ackedStatus) } // addBlock would update totalOrderingCandidateInfo, it's caller's duty // to make sure the input block acutally acking the target block. -func (v *totalOrderingCandidateInfo) addBlock(b *types.Block) (err error) { +func (v *totalOrderingCandidateInfo) addBlock(b *types.Block) error { rec := v.ackedStatus[b.Position.ChainID] if rec.count == 0 { rec.minHeight = b.Position.Height rec.count = 1 } else { - if b.Position.Height < rec.minHeight { - err = ErrNotValidDAG - return + if b.Position.Height <= rec.minHeight { + return ErrNotValidDAG } rec.count++ } - return + return nil } -// getAckingNodeSetLength would generate the Acking Node Set and return its -// length. Only block height larger than -// -// global minimum height + k -// -// would be taken into consideration, ex. -// -// For some chain X: -// - the global minimum acking height = 1, -// - k = 1 -// then only block height >= 2 would be added to acking node set. +// getAckingNodeSetLength returns the size of acking node set. Only heights +// larger than "global minimum height + k" are counted. For example, global +// minimum acking height is 1 and k is 1, only block heights which is larger or +// equal to 2 are added into acking node set. func (v *totalOrderingCandidateInfo) getAckingNodeSetLength( global *totalOrderingCandidateInfo, k uint64, numChains uint32) (count uint64) { + var rec *totalOrderingHeightRecord for idx, gRec := range global.ackedStatus[:numChains] { if gRec.count == 0 { @@ -420,9 +378,6 @@ func (v *totalOrderingCandidateInfo) getAckingNodeSetLength( if rec.count == 0 { continue } - // This line would check if these two ranges would overlap: - // - (global minimum height + k, infinity) - // - (local minimum height, local minimum height + count - 1) if rec.minHeight+rec.count-1 >= gRec.minHeight+k { count++ } @@ -438,19 +393,20 @@ func (v *totalOrderingCandidateInfo) updateAckingHeightVector( global *totalOrderingCandidateInfo, k uint64, dirtyChainIDs []int, - objCache *totalOrderingObjectCache) { + objCache *totalOrderingCache) { + var ( idx int gRec, rec *totalOrderingHeightRecord ) - // The reason not to merge the two loops is the iteration over map - // is expensive when chain count is large, iterating over dirty - // chains is cheaper. + // The reason for not merging two loops is that the performance impact of map + // iteration is large if the size is large. Iteration of dirty chains is + // faster the map. // TODO(mission): merge the code in this if/else if the performance won't be // downgraded when adding a function for the shared part. if v.cachedHeightVector == nil { // Generate height vector from scratch. - v.cachedHeightVector = objCache.requestHeightVector() + v.cachedHeightVector = objCache.getHeightVector() for idx, gRec = range global.ackedStatus { if gRec.count <= k { continue @@ -490,25 +446,24 @@ func (v *totalOrderingCandidateInfo) updateAckingHeightVector( return } -// updateWinRecord setup win records between two candidates. +// updateWinRecord setups win records from two candidates. func (v *totalOrderingCandidateInfo) updateWinRecord( otherChainID uint32, other *totalOrderingCandidateInfo, dirtyChainIDs []int, - objCache *totalOrderingObjectCache, + objCache *totalOrderingCache, numChains uint32) { var ( idx int height uint64 ) - // The reason not to merge the two loops is the iteration over map - // is expensive when chain count is large, iterating over dirty - // chains is cheaper. - // TODO(mission): merge the code in this if/else if add a function won't - // affect the performance. + // The reason not to merge two loops is that the iteration of map is + // expensive when chain count is large, iterating of dirty chains is cheaper. + // TODO(mission): merge the code in this if/else if adding a function won't + // affect the performance. win := v.winRecords[otherChainID] if win == nil { - win = objCache.requestWinRecord() + win = objCache.getWinRecord() v.winRecords[otherChainID] = win for idx, height = range v.cachedHeightVector[:numChains] { if height == infinity { @@ -543,43 +498,46 @@ func (v *totalOrderingCandidateInfo) updateWinRecord( } } -// totalOrderingBreakpoint is a record to store the height discontinuity -// on a chain. +// totalOrderingBreakpoint is a record of height discontinuity on a chain type totalOrderingBreakpoint struct { roundID uint64 - // height of last block in previous round. + // height of last block. lastHeight uint64 } // totalOrderingGroupVector keeps global status of current pending set. type totalOrderingGlobalVector struct { - // blocks stores all blocks grouped by their proposers and - // sorted by their block height. - // - // TODO(mission): the way we use this slice would make it reallocate - // frequently. + // blocks stores all blocks grouped by their proposers and sorted by height. + // TODO(mission): slice used here reallocates frequently. blocks [][]*types.Block - // breakpoints caches rounds for chains that blocks' height on them are - // not continuous. Ex. - // ChainID Round Height - // 1 0 0 - // 1 0 1 - // 1 1 2 - // 1 1 3 - // 1 1 4 - // 1 3 0 <- a breakpoint for round 3 would be cached - // for chain 1 as (roundID=1, lastHeight=4). + // breakpoints stores rounds for chains that blocks' height on them are + // not consecutive, for example in chain i + // Round Height + // 0 0 + // 0 1 + // 1 2 + // 1 3 + // 1 4 + // 2 - <- a config change of chain number occured + // 2 - + // 3 - + // 3 - + // 4 0 <- a breakpoint for round 3 is cached here + // 5 - + // 5 - + // 6 0 <- breakpoint again + // breakpoints[i][0] == &totalOrderingBreakpoint{roundID: 4, lastHeight: 4} + // breakpoints[i][1] == &totalOrderingBreakpoint{roundID: 6, lastHeight: 0} breakpoints [][]*totalOrderingBreakpoint - // curRound caches the last round ID used to purge breakpoints. + // curRound stores the last round ID used for purging breakpoints. curRound uint64 // tips records the last seen block for each chain. tips []*types.Block - // cachedCandidateInfo is an totalOrderingCandidateInfo instance, - // which is just used for actual candidates to calculate height vector. + // Only ackedStatus in cachedCandidateInfo is used. cachedCandidateInfo *totalOrderingCandidateInfo } @@ -628,6 +586,7 @@ func (global *totalOrderingGlobalVector) prepareHeightRecord( candidate *types.Block, info *totalOrderingCandidateInfo, acked map[common.Hash]struct{}) { + var ( chainID = candidate.Position.ChainID breakpoints = global.breakpoints[chainID] @@ -639,8 +598,10 @@ func (global *totalOrderingGlobalVector) prepareHeightRecord( minHeight: candidate.Position.Height, } if len(breakpoints) == 0 { + // If no breakpoint, count is the amount of blocks. rec.count = uint64(len(global.blocks[chainID])) } else { + // If there are breakpoints, only the first counts. rec.count = breakpoints[0].lastHeight - candidate.Position.Height + 1 } info.ackedStatus[chainID] = rec @@ -648,7 +609,7 @@ func (global *totalOrderingGlobalVector) prepareHeightRecord( return } for idx, blocks := range global.blocks { - if idx == int(candidate.Position.ChainID) { + if idx == int(chainID) { continue } breakpoint = nil @@ -662,8 +623,8 @@ func (global *totalOrderingGlobalVector) prepareHeightRecord( if _, acked := acked[b.Hash]; !acked { continue } - // If this block acks this candidate, all newer blocks - // from the same chain also 'indirect' acks it. + // If this block acks the candidate, all newer blocks from the same chain + // also 'indirectly' acks the candidate. rec = info.ackedStatus[idx] rec.minHeight = b.Position.Height if breakpoint == nil { @@ -674,65 +635,62 @@ func (global *totalOrderingGlobalVector) prepareHeightRecord( break } } - } func (global *totalOrderingGlobalVector) addBlock( - b *types.Block) (pos int, pending bool, err error) { - curPosition := b.Position - tip := global.tips[curPosition.ChainID] - pos = len(global.blocks[curPosition.ChainID]) + b *types.Block) (isOldest bool, pending bool, err error) { + // isOldest implies the block is the oldest in global vector + chainID := b.Position.ChainID + tip := global.tips[chainID] + isOldest = len(global.blocks[chainID]) == 0 if tip != nil { // Perform light weight sanity check based on tip. - lastPosition := tip.Position - if lastPosition.Round > curPosition.Round { + if tip.Position.Round > b.Position.Round { err = ErrNotValidDAG return } - if DiffUint64(lastPosition.Round, curPosition.Round) > 1 { - if curPosition.Height != 0 { + if DiffUint64(tip.Position.Round, b.Position.Round) > 1 { + if b.Position.Height != 0 { err = ErrNotValidDAG return } // Add breakpoint. - global.breakpoints[curPosition.ChainID] = append( - global.breakpoints[curPosition.ChainID], + global.breakpoints[chainID] = append( + global.breakpoints[chainID], &totalOrderingBreakpoint{ - roundID: curPosition.Round, - lastHeight: lastPosition.Height, + roundID: b.Position.Round, + lastHeight: tip.Position.Height, }) } else { - if curPosition.Height != lastPosition.Height+1 { + if b.Position.Height != tip.Position.Height+1 { err = ErrNotValidDAG return } } } else { - if curPosition.Round < global.curRound { + if b.Position.Round < global.curRound { err = ErrBlockFromPastRound return } - if curPosition.Round > global.curRound { + if b.Position.Round > global.curRound { // Add breakpoint. - global.breakpoints[curPosition.ChainID] = append( - global.breakpoints[curPosition.ChainID], - &totalOrderingBreakpoint{ - roundID: curPosition.Round, - lastHeight: 0, - }) + bp := &totalOrderingBreakpoint{ + roundID: b.Position.Round, + lastHeight: 0, + } + global.breakpoints[chainID] = append(global.breakpoints[chainID], bp) } } - breakpoints := global.breakpoints[b.Position.ChainID] - pending = len(breakpoints) > 0 && breakpoints[0].roundID <= b.Position.Round - global.blocks[b.Position.ChainID] = append( - global.blocks[b.Position.ChainID], b) - global.tips[b.Position.ChainID] = b + bps := global.breakpoints[chainID] + pending = len(bps) > 0 && bps[0].roundID <= b.Position.Round + global.blocks[chainID] = append(global.blocks[chainID], b) + global.tips[chainID] = b return } -// updateCandidateInfo udpate cached candidate info. +// updateCandidateInfo udpates cached candidate info. func (global *totalOrderingGlobalVector) updateCandidateInfo( - dirtyChainIDs []int, objCache *totalOrderingObjectCache) { + dirtyChainIDs []int, objCache *totalOrderingCache) { var ( idx int blocks []*types.Block @@ -788,8 +746,7 @@ func (global *totalOrderingGlobalVector) updateCandidateInfo( return } -// totalOrdering represent a process unit to handle total ordering -// for blocks. +// totalOrdering represent a process unit to handle total ordering for blocks. type totalOrdering struct { // pendings stores blocks awaiting to be ordered. pendings map[common.Hash]*types.Block @@ -805,7 +762,7 @@ type totalOrdering struct { // perform flush. flushReadyChains map[uint32]struct{} - // flush is a map to record which blocks are already flushed. + // flushed is a map of flushed blocks. flushed map[uint32]struct{} // globalVector group all pending blocks by proposers and @@ -815,20 +772,19 @@ type totalOrdering struct { // - picking candidates next round globalVector *totalOrderingGlobalVector - // candidates caches result of potential function during generating - // preceding sets. + // candidates caches result of potential function during generating preceding + // set. candidates []*totalOrderingCandidateInfo - // acked cache the 'block A acked by block B' relation by - // keeping a record in acked[A.Hash][B.Hash] + // acked stores the 'block A acked by block B' by acked[A.Hash][B.Hash] acked map[common.Hash]map[common.Hash]struct{} - // dirtyChainIDs records which chainID that should be updated - // for all cached status (win record, acking status). + // dirtyChainIDs stores chainIDs that is "dirty", i.e. needed updating all + // cached statuses (win record, acking status). dirtyChainIDs []int // objCache caches allocated objects, like map. - objCache *totalOrderingObjectCache + objCache *totalOrderingCache // candidateChainMapping keeps a mapping from candidate's hash to // their chain IDs. @@ -843,15 +799,13 @@ type totalOrdering struct { // newTotalOrdering constructs an totalOrdering instance. func newTotalOrdering(config *totalOrderingConfig) *totalOrdering { - globalVector := newTotalOrderingGlobalVector(config.numChains) - objCache := newTotalOrderingObjectCache(config.numChains) candidates := make([]*totalOrderingCandidateInfo, config.numChains) to := &totalOrdering{ pendings: make(map[common.Hash]*types.Block), - globalVector: globalVector, + globalVector: newTotalOrderingGlobalVector(config.numChains), dirtyChainIDs: make([]int, 0, config.numChains), acked: make(map[common.Hash]map[common.Hash]struct{}), - objCache: objCache, + objCache: newTotalOrderingObjectCache(config.numChains), candidateChainMapping: make(map[uint32]common.Hash), candidates: candidates, candidateChainIDs: make([]uint32, 0, config.numChains), @@ -887,9 +841,7 @@ func (to *totalOrdering) switchRound() { to.globalVector.switchRound(to.curRound) } -// buildBlockRelation populates the acked according their acking relationships. -// This function would update all blocks implcitly acked by input block -// recursively. +// buildBlockRelation update all its indirect acks recursively. func (to *totalOrdering) buildBlockRelation(b *types.Block) { var ( curBlock, nextBlock *types.Block @@ -898,31 +850,24 @@ func (to *totalOrdering) buildBlockRelation(b *types.Block) { exists, alreadyPopulated bool toCheck = []*types.Block{b} ) - for { - if len(toCheck) == 0 { - break - } + for len(toCheck) != 0 { curBlock, toCheck = toCheck[len(toCheck)-1], toCheck[:len(toCheck)-1] if curBlock.Position.Round > b.Position.Round { - // It's illegal for a block to acking some block from future - // round, this rule should be promised before delivering to - // total ordering. + // It's illegal for a block to ack some blocks in future round. panic(ErrForwardAck) } for _, ack = range curBlock.Acks { if acked, exists = to.acked[ack]; !exists { - acked = to.objCache.requestAckedVector() + acked = to.objCache.getAckedVector() to.acked[ack] = acked } - // This means we've walked this block already. + // Check if the block is handled. if _, alreadyPopulated = acked[b.Hash]; alreadyPopulated { continue } acked[b.Hash] = struct{}{} - // See if we need to go forward. - if nextBlock, exists = to.pendings[ack]; !exists { - continue - } else { + // See if we need to do this recursively. + if nextBlock, exists = to.pendings[ack]; exists { toCheck = append(toCheck, nextBlock) } } @@ -936,7 +881,7 @@ func (to *totalOrdering) clean(b *types.Block) { h = b.Hash chainID = b.Position.ChainID ) - to.objCache.recycleAckedVector(to.acked[h]) + to.objCache.putAckedVector(to.acked[h]) delete(to.acked, h) delete(to.pendings, h) to.candidates[chainID].recycle(to.objCache) @@ -952,7 +897,7 @@ func (to *totalOrdering) clean(b *types.Block) { } // updateVectors is a helper function to update all cached vectors. -func (to *totalOrdering) updateVectors(b *types.Block) (pos int, err error) { +func (to *totalOrdering) updateVectors(b *types.Block) (isOldest bool, err error) { var ( candidateHash common.Hash chainID uint32 @@ -960,7 +905,7 @@ func (to *totalOrdering) updateVectors(b *types.Block) (pos int, err error) { pending bool ) // Update global height vector - if pos, pending, err = to.globalVector.addBlock(b); err != nil { + if isOldest, pending, err = to.globalVector.addBlock(b); err != nil { return } if to.duringFlush { @@ -978,9 +923,10 @@ func (to *totalOrdering) updateVectors(b *types.Block) (pos int, err error) { // their information would not be contributed to current working set. // This mechanism works because we switch rounds by flushing and // reset the whole working set. + // This works because forward acking blocks are rejected. return } - // Update acking status of candidates. + // Update candidates' acking status. for chainID, candidateHash = range to.candidateChainMapping { if _, acked = to.acked[candidateHash][b.Hash]; !acked { continue @@ -992,59 +938,54 @@ func (to *totalOrdering) updateVectors(b *types.Block) (pos int, err error) { return } -// prepareCandidate is a helper function to -// build totalOrderingCandidateInfo for new candidate. -func (to *totalOrdering) prepareCandidate(candidate *types.Block) { +// prepareCandidate builds totalOrderingCandidateInfo for a new candidate. +func (to *totalOrdering) prepareCandidate(b *types.Block) { var ( - info = newTotalOrderingCandidateInfo(candidate.Hash, to.objCache) - chainID = candidate.Position.ChainID + info = newTotalOrderingCandidateInfo(b.Hash, to.objCache) + chainID = b.Position.ChainID ) to.candidates[chainID] = info - to.candidateChainMapping[chainID] = candidate.Hash - // Add index to slot to allocated list, make sure the modified list sorted. + to.candidateChainMapping[chainID] = b.Hash + // Add index to slot to allocated list, make sure the modified list is sorted. to.candidateChainIDs = append(to.candidateChainIDs, chainID) sort.Slice(to.candidateChainIDs, func(i, j int) bool { return to.candidateChainIDs[i] < to.candidateChainIDs[j] }) - to.globalVector.prepareHeightRecord( - candidate, info, to.acked[candidate.Hash]) + to.globalVector.prepareHeightRecord(b, info, to.acked[b.Hash]) return } -// isAckOnlyPrecedings is a helper function to check if a block -// only contain acks to delivered blocks. -func (to *totalOrdering) isAckOnlyPrecedings(b *types.Block) bool { +// isCandidate checks if a block only contains acks to delivered blocks. +func (to *totalOrdering) isCandidate(b *types.Block) bool { for _, ack := range b.Acks { - if _, pending := to.pendings[ack]; pending { + if _, exists := to.pendings[ack]; exists { return false } } return true } -// output is a helper function to finish the delivery of -// deliverable preceding set. +// output finishes the delivery of preceding set. func (to *totalOrdering) output( precedings map[common.Hash]struct{}, numChains uint32) (ret []*types.Block) { + for p := range precedings { // Remove the first element from corresponding blockVector. b := to.pendings[p] chainID := b.Position.ChainID - // TODO(mission): This way to use slice makes it reallocate frequently. - to.globalVector.blocks[int(chainID)] = - to.globalVector.blocks[int(chainID)][1:] + // TODO(mission): frequent reallocation here. + to.globalVector.blocks[chainID] = to.globalVector.blocks[chainID][1:] ret = append(ret, b) // Remove block relations. to.clean(b) to.dirtyChainIDs = append(to.dirtyChainIDs, int(chainID)) } sort.Sort(types.ByHash(ret)) - // Find new candidates from tip of globalVector of each chain. + // Find new candidates from global vector's tips. // The complexity here is O(N^2logN). - // TODO(mission): only those tips that acking some blocks in - // the devliered set should be checked. This - // improvment related to the latency introduced by K. + // TODO(mission): only tips which acking some blocks in the devliered set + // should be checked. This improvement related to the latency introduced by K. for chainID, blocks := range to.globalVector.blocks[:numChains] { if len(blocks) == 0 { continue @@ -1052,20 +993,20 @@ func (to *totalOrdering) output( if _, picked := to.candidateChainMapping[uint32(chainID)]; picked { continue } - if !to.isAckOnlyPrecedings(blocks[0]) { + if !to.isCandidate(blocks[0]) { continue } // Build totalOrderingCandidateInfo for new candidate. to.prepareCandidate(blocks[0]) } - return ret + return } -// generateDeliverSet would: -// - generate preceding set -// - check if the preceding set deliverable by checking potential function +// generateDeliverSet generates preceding set and checks if the preceding set +// is deliverable by potential function. func (to *totalOrdering) generateDeliverSet() ( delivered map[common.Hash]struct{}, mode uint32) { + var ( chainID, otherChainID uint32 info, otherInfo *totalOrderingCandidateInfo @@ -1080,14 +1021,14 @@ func (to *totalOrdering) generateDeliverSet() ( globalInfo, cfg.k, to.dirtyChainIDs, to.objCache) } // Update winning records for each candidate. - // TODO(mission): It's not reasonable to - // request one routine for each candidate, the context - // switch rate would be high. + // TODO(mission): It's not reasonable to request one routine for each + // candidate, the context switch rate would be high. var wg sync.WaitGroup wg.Add(len(to.candidateChainIDs)) for _, chainID := range to.candidateChainIDs { info = to.candidates[chainID] go func(can uint32, canInfo *totalOrderingCandidateInfo) { + defer wg.Done() for _, otherChainID := range to.candidateChainIDs { if can == otherChainID { continue @@ -1099,13 +1040,12 @@ func (to *totalOrdering) generateDeliverSet() ( to.objCache, cfg.numChains) } - wg.Done() }(chainID, info) } wg.Wait() // Reset dirty chains. to.dirtyChainIDs = to.dirtyChainIDs[:0] - // TODO(mission): ANS should be bound by current numChains. + // TODO(mission): ANS should be bounded by current numChains. globalAnsLength := globalInfo.getAckingNodeSetLength( globalInfo, cfg.k, cfg.numChains) CheckNextCandidateLoop: @@ -1116,7 +1056,7 @@ CheckNextCandidateLoop: continue } otherInfo = to.candidates[otherChainID] - // TODO(mission): grade should be bound by current numChains. + // TODO(mission): grade should be bounded by current numChains. if otherInfo.winRecords[chainID].grade( cfg.numChains, cfg.phi, globalAnsLength) != 0 { continue CheckNextCandidateLoop @@ -1214,25 +1154,23 @@ CheckNextCandidateLoop: // flushBlocks flushes blocks. func (to *totalOrdering) flushBlocks( b *types.Block) (flushed []*types.Block, mode uint32, err error) { - cfg := to.configs[to.curRound-to.configs[0].roundID] + mode = TotalOrderingModeFlush - if cfg.isValidLastBlock(b) { + cfg := to.configs[to.curRound-to.configs[0].roundID] + if cfg.isLastBlock(b) { to.flushReadyChains[b.Position.ChainID] = struct{}{} } - // Flush blocks until last blocks from all chains are arrived. + // Flush blocks until last blocks from all chains appeared. if len(to.flushReadyChains) < int(cfg.numChains) { return } if len(to.flushReadyChains) > int(cfg.numChains) { - // This line should never be reached. + // This case should never be occured. err = ErrFutureRoundDelivered return } // Dump all blocks in this round. - for { - if len(to.flushed) == int(cfg.numChains) { - break - } + for len(to.flushed) != int(cfg.numChains) { // Dump all candidates without checking potential function. flushedHashes := make(map[common.Hash]struct{}) for _, chainID := range to.candidateChainIDs { @@ -1248,14 +1186,13 @@ func (to *totalOrdering) flushBlocks( } flushedBlocks := to.output(flushedHashes, cfg.numChains) for _, b := range flushedBlocks { - if !cfg.isValidLastBlock(b) { - continue + if cfg.isLastBlock(b) { + to.flushed[b.Position.ChainID] = struct{}{} } - to.flushed[b.Position.ChainID] = struct{}{} } flushed = append(flushed, flushedBlocks...) } - // Switch back to normal mode: delivered by DEXON total ordering algorithm. + // Switch back to non-flushing mode. to.duringFlush = false to.flushed = make(map[uint32]struct{}) to.flushReadyChains = make(map[uint32]struct{}) @@ -1272,7 +1209,7 @@ func (to *totalOrdering) flushBlocks( to.candidateChainIDs = nil to.globalVector.cachedCandidateInfo = nil to.switchRound() - // Force to pick new candidates. + // Force picking new candidates. numChains := to.configs[to.curRound-to.configs[0].roundID].numChains to.output(map[common.Hash]struct{}{}, numChains) return @@ -1281,20 +1218,23 @@ func (to *totalOrdering) flushBlocks( // deliverBlocks delivers blocks by DEXON total ordering algorithm. func (to *totalOrdering) deliverBlocks() ( delivered []*types.Block, mode uint32, err error) { + hashes, mode := to.generateDeliverSet() cfg := to.configs[to.curRound-to.configs[0].roundID] - // output precedings + // Output precedings. delivered = to.output(hashes, cfg.numChains) - // Check if any block in delivered set are the last block in this round - // of that chain. If yes, flush or round-switching would be performed. + // Check if any block in delivered set is the last block in this round, if + // there is, perform flush or round-switch. for _, b := range delivered { if b.Position.Round > to.curRound { err = ErrFutureRoundDelivered return } - if !cfg.isValidLastBlock(b) { + if !cfg.isLastBlock(b) { continue } + // Code reaches here if a last block is processed. This triggers + // "duringFlush" mode if config changes. if cfg.isFlushRequired { // Switch to flush mode. to.duringFlush = true @@ -1307,17 +1247,17 @@ func (to *totalOrdering) deliverBlocks() ( break } if to.duringFlush { - // Make sure last blocks from all chains are marked as 'flushed'. + // Collect last blocks until all last blocks appears and function + // flushBlocks will be called. for _, b := range delivered { - if !cfg.isValidLastBlock(b) { - continue + if cfg.isLastBlock(b) { + to.flushed[b.Position.ChainID] = struct{}{} } - to.flushed[b.Position.ChainID] = struct{}{} } // Some last blocks for the round to be flushed might not be delivered // yet. for _, tip := range to.globalVector.tips[:cfg.numChains] { - if tip.Position.Round > to.curRound || cfg.isValidLastBlock(tip) { + if tip.Position.Round > to.curRound || cfg.isLastBlock(tip) { to.flushReadyChains[tip.Position.ChainID] = struct{}{} } } @@ -1328,24 +1268,30 @@ func (to *totalOrdering) deliverBlocks() ( // processBlock is the entry point of totalOrdering. func (to *totalOrdering) processBlock( b *types.Block) ([]*types.Block, uint32, error) { - // NOTE: I assume the block 'b' is already safe for total ordering. - // That means, all its acking blocks are during/after - // total ordering stage. + // NOTE: Block b is assumed to be in topologically sorted, i.e., all its + // acking blocks are during or after total ordering stage. cfg := to.configs[to.curRound-to.configs[0].roundID] to.pendings[b.Hash] = b to.buildBlockRelation(b) - pos, err := to.updateVectors(b) + isOldest, err := to.updateVectors(b) if err != nil { return nil, uint32(0), err } // Mark the proposer of incoming block as dirty. if b.Position.ChainID < cfg.numChains { to.dirtyChainIDs = append(to.dirtyChainIDs, int(b.Position.ChainID)) - _, picked := to.candidateChainMapping[b.Position.ChainID] - if pos == 0 && !picked { - if to.isAckOnlyPrecedings(b) { - to.prepareCandidate(b) - } + _, exists := to.candidateChainMapping[b.Position.ChainID] + if isOldest && !exists && to.isCandidate(b) { + // isOldest means b is the oldest block in global vector, and isCandidate + // is still needed here due to round change. For example: + // o o o <- genesis block for round change, isCandidate returns true + // | | but isOldest is false + // o o + // | | + // o o o <- isOldest is true but isCandidate returns false + // | | / + // o o + to.prepareCandidate(b) } } if to.duringFlush { |