// Copyright 2018 The dexon-consensus Authors // This file is part of the dexon-consensus library. // // The dexon-consensus library is free software: you can redistribute it // and/or modify it under the terms of the GNU Lesser General Public License as // published by the Free Software Foundation, either version 3 of the License, // or (at your option) any later version. // // The dexon-consensus library is distributed in the hope that it will be // useful, but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser // General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the dexon-consensus library. If not, see // . package core import ( "errors" "math" "sort" "sync" "time" "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/types" ) const ( infinity uint64 = math.MaxUint64 ) const ( // TotalOrderingModeError returns mode error. TotalOrderingModeError uint32 = iota // TotalOrderingModeNormal returns mode normal. TotalOrderingModeNormal // TotalOrderingModeEarly returns mode early. TotalOrderingModeEarly // TotalOrderingModeFlush returns mode flush. TotalOrderingModeFlush ) var ( // ErrNotValidDAG would be reported when block subbmitted to totalOrdering // didn't form a DAG. ErrNotValidDAG = errors.New("not a valid dag") // ErrFutureRoundDelivered means some blocks from later rounds are // delivered, this means program error. ErrFutureRoundDelivered = errors.New("future round delivered") // ErrBlockFromPastRound means we receive some block from past round. ErrBlockFromPastRound = errors.New("block from past round") // ErrTotalOrderingHangs means total ordering hangs somewhere. ErrTotalOrderingHangs = errors.New("total ordering hangs") // ErrForwardAck means a block acking some blocks from newer round. ErrForwardAck = errors.New("forward ack") // ErrUnexpected means general (I'm lazy) errors. ErrUnexpected = errors.New("unexpected") // ErrTotalOrderingPhiRatio means invalid phi ratio ErrTotalOrderingPhiRatio = errors.New("invalid total ordering phi ratio") ) // totalOrderingConfig is the configuration for total ordering. type totalOrderingConfig struct { roundBasedConfig // k represents the k in 'k-level total ordering'. // In short, only block height equals to (global minimum height + k) // would be taken into consideration. k uint64 // phi is a const to control how strong the leading preceding block // should be. phi uint64 numChains uint32 isFlushRequired bool } func (config *totalOrderingConfig) fromConfig(round uint64, cfg *types.Config) { config.k = uint64(cfg.K) config.numChains = cfg.NumChains config.phi = uint64(float32(cfg.NumChains-1)*cfg.PhiRatio + 1) config.setupRoundBasedFields(round, cfg) } func newTotalOrderingConfig( prev *totalOrderingConfig, cur *types.Config) *totalOrderingConfig { c := &totalOrderingConfig{} c.fromConfig(prev.roundID+1, cur) c.setRoundBeginTime(prev.roundEndTime) prev.isFlushRequired = c.k != prev.k || c.phi != prev.phi || c.numChains != prev.numChains return c } // totalOrderingWinRecord caches the comparison of candidates calculated by // their height vector. type totalOrderingWinRecord struct { wins []int8 count uint } func (rec *totalOrderingWinRecord) reset() { rec.count = 0 for idx := range rec.wins { rec.wins[idx] = 0 } } func newTotalOrderingWinRecord(numChains uint32) *totalOrderingWinRecord { return &totalOrderingWinRecord{ wins: make([]int8, numChains), count: 0, } } // grade implements the 'grade' potential function in algorithm. func (rec *totalOrderingWinRecord) grade( numChains uint32, phi uint64, globalAnsLength uint64) int { if uint64(rec.count) >= phi { return 1 } else if uint64(rec.count) < phi-uint64(numChains)+globalAnsLength { return 0 } else { return -1 } } // totalOrderingHeightRecord records: // - the minimum height of block which acks this block. // - the count of blocks acking this block. type totalOrderingHeightRecord struct{ minHeight, count uint64 } // totalOrderingCache caches objects for reuse and not being colloected by GC. // Each cached target has "get-" and "put-" functions for getting and reusing // of objects. type totalOrderingCache struct { ackedStatus [][]*totalOrderingHeightRecord heightVectors [][]uint64 winRecords [][]*totalOrderingWinRecord winRecordPool sync.Pool ackedVectors []map[common.Hash]struct{} numChains uint32 } // newTotalOrderingObjectCache constructs an totalOrderingCache instance. func newTotalOrderingObjectCache(numChains uint32) *totalOrderingCache { return &totalOrderingCache{ winRecordPool: sync.Pool{ New: func() interface{} { return newTotalOrderingWinRecord(numChains) }, }, numChains: numChains, } } // resize makes sure internal storage of totalOrdering instance can handle // maximum possible numChains in future configs. func (cache *totalOrderingCache) resize(numChains uint32) { // Basically, everything in cache needs to be cleaned. if cache.numChains >= numChains { return } cache.ackedStatus = nil cache.heightVectors = nil cache.winRecords = nil cache.ackedVectors = nil cache.numChains = numChains cache.winRecordPool = sync.Pool{ New: func() interface{} { return newTotalOrderingWinRecord(numChains) }, } } func (cache *totalOrderingCache) getAckedStatus() ( acked []*totalOrderingHeightRecord) { if len(cache.ackedStatus) == 0 { acked = make([]*totalOrderingHeightRecord, cache.numChains) for idx := range acked { acked[idx] = &totalOrderingHeightRecord{count: 0} } } else { acked = cache.ackedStatus[len(cache.ackedStatus)-1] cache.ackedStatus = cache.ackedStatus[:len(cache.ackedStatus)-1] // Reset acked status. for idx := range acked { acked[idx].count = 0 } } return } func (cache *totalOrderingCache) putAckedStatus( acked []*totalOrderingHeightRecord) { // If the recycled objects supports lower numChains than we required, // don't recycle it. if uint32(len(acked)) != cache.numChains { return } cache.ackedStatus = append(cache.ackedStatus, acked) } func (cache *totalOrderingCache) getWinRecord() ( win *totalOrderingWinRecord) { win = cache.winRecordPool.Get().(*totalOrderingWinRecord) win.reset() return } func (cache *totalOrderingCache) putWinRecord(win *totalOrderingWinRecord) { if win == nil { return } // If the recycled objects supports lower numChains than we required, // don't recycle it. if uint32(len(win.wins)) != cache.numChains { return } cache.winRecordPool.Put(win) } func (cache *totalOrderingCache) getHeightVector() (hv []uint64) { if len(cache.heightVectors) == 0 { hv = make([]uint64, cache.numChains) } else { hv = cache.heightVectors[len(cache.heightVectors)-1] cache.heightVectors = cache.heightVectors[:len(cache.heightVectors)-1] } for idx := range hv { hv[idx] = infinity } return } func (cache *totalOrderingCache) putHeightVector(hv []uint64) { if uint32(len(hv)) != cache.numChains { return } cache.heightVectors = append(cache.heightVectors, hv) } func (cache *totalOrderingCache) getWinRecords() (w []*totalOrderingWinRecord) { if len(cache.winRecords) == 0 { w = make([]*totalOrderingWinRecord, cache.numChains) } else { w = cache.winRecords[len(cache.winRecords)-1] cache.winRecords = cache.winRecords[:len(cache.winRecords)-1] for idx := range w { w[idx] = nil } } return } func (cache *totalOrderingCache) putWinRecords(w []*totalOrderingWinRecord) { if uint32(len(w)) != cache.numChains { return } cache.winRecords = append(cache.winRecords, w) } func (cache *totalOrderingCache) getAckedVector() ( acked map[common.Hash]struct{}) { if len(cache.ackedVectors) == 0 { acked = make(map[common.Hash]struct{}) } else { acked, cache.ackedVectors = cache.ackedVectors[len(cache.ackedVectors)-1], cache.ackedVectors[:len(cache.ackedVectors)-1] for k := range acked { delete(acked, k) } } return } func (cache *totalOrderingCache) putAckedVector( acked map[common.Hash]struct{}) { if acked != nil { cache.ackedVectors = append(cache.ackedVectors, acked) } } // totalOrderingCandidateInfo stores proceding status for a candidate including // - acked status as height records, which keeps the number of blocks from other // chains acking this candidate. // - cached height vector, which valids height based on K-level used for // comparison in 'grade' function. // - cached result of grade function to other candidates. // // Height Record: // When block A acks block B, all blocks proposed from the same proposer as // block A with higher height also acks block B. Thus records below is needed // - the minimum height of acking block from that proposer // - count of acking blocks from that proposer // to repsent the acking status for block A. type totalOrderingCandidateInfo struct { ackedStatus []*totalOrderingHeightRecord cachedHeightVector []uint64 winRecords []*totalOrderingWinRecord hash common.Hash } // newTotalOrderingCandidateInfo creates an totalOrderingCandidateInfo instance. func newTotalOrderingCandidateInfo( candidateHash common.Hash, objCache *totalOrderingCache) *totalOrderingCandidateInfo { return &totalOrderingCandidateInfo{ ackedStatus: objCache.getAckedStatus(), winRecords: objCache.getWinRecords(), hash: candidateHash, } } // clean clears information related to another candidate, which should be called // when that candidate is selected in deliver set. func (v *totalOrderingCandidateInfo) clean(otherCandidateChainID uint32) { v.winRecords[otherCandidateChainID] = nil } // recycle recycles objects for later usage, this eases GC's work. func (v *totalOrderingCandidateInfo) recycle(objCache *totalOrderingCache) { if v.winRecords != nil { for _, win := range v.winRecords { objCache.putWinRecord(win) } objCache.putWinRecords(v.winRecords) } if v.cachedHeightVector != nil { objCache.putHeightVector(v.cachedHeightVector) } objCache.putAckedStatus(v.ackedStatus) } // addBlock would update totalOrderingCandidateInfo, it's caller's duty // to make sure the input block acutally acking the target block. func (v *totalOrderingCandidateInfo) addBlock(b *types.Block) error { rec := v.ackedStatus[b.Position.ChainID] if rec.count == 0 { rec.minHeight = b.Position.Height rec.count = 1 } else { if b.Position.Height <= rec.minHeight { return ErrNotValidDAG } rec.count++ } return nil } // getAckingNodeSetLength returns the size of acking node set. Only heights // larger than "global minimum height + k" are counted. For example, global // minimum acking height is 1 and k is 1, only block heights which is larger or // equal to 2 are added into acking node set. func (v *totalOrderingCandidateInfo) getAckingNodeSetLength( global *totalOrderingCandidateInfo, k uint64, numChains uint32) (count uint64) { var rec *totalOrderingHeightRecord for idx, gRec := range global.ackedStatus[:numChains] { if gRec.count == 0 { continue } rec = v.ackedStatus[idx] if rec.count == 0 { continue } if rec.minHeight+rec.count-1 >= gRec.minHeight+k { count++ } } return } // updateAckingHeightVector would cached acking height vector. // // Only block height equals to (global minimum block height + k) would be // taken into consideration. func (v *totalOrderingCandidateInfo) updateAckingHeightVector( global *totalOrderingCandidateInfo, k uint64, dirtyChainIDs []int, objCache *totalOrderingCache) { var ( idx int gRec, rec *totalOrderingHeightRecord ) // The reason for not merging two loops is that the performance impact of map // iteration is large if the size is large. Iteration of dirty chains is // faster the map. // TODO(mission): merge the code in this if/else if the performance won't be // downgraded when adding a function for the shared part. if v.cachedHeightVector == nil { // Generate height vector from scratch. v.cachedHeightVector = objCache.getHeightVector() for idx, gRec = range global.ackedStatus { if gRec.count <= k { continue } rec = v.ackedStatus[idx] if rec.count == 0 { v.cachedHeightVector[idx] = infinity } else if rec.minHeight <= gRec.minHeight+k { // This check is sufficient to make sure the block height: // // gRec.minHeight + k // // would be included in this totalOrderingCandidateInfo. v.cachedHeightVector[idx] = gRec.minHeight + k } else { v.cachedHeightVector[idx] = infinity } } } else { // Return the cached one, only update dirty fields. for _, idx = range dirtyChainIDs { gRec = global.ackedStatus[idx] if gRec.count == 0 || gRec.count <= k { v.cachedHeightVector[idx] = infinity continue } rec = v.ackedStatus[idx] if rec.count == 0 { v.cachedHeightVector[idx] = infinity } else if rec.minHeight <= gRec.minHeight+k { v.cachedHeightVector[idx] = gRec.minHeight + k } else { v.cachedHeightVector[idx] = infinity } } } return } // updateWinRecord setups win records from two candidates. func (v *totalOrderingCandidateInfo) updateWinRecord( otherChainID uint32, other *totalOrderingCandidateInfo, dirtyChainIDs []int, objCache *totalOrderingCache, numChains uint32) { var ( idx int height uint64 ) // The reason not to merge two loops is that the iteration of map is // expensive when chain count is large, iterating of dirty chains is cheaper. // TODO(mission): merge the code in this if/else if adding a function won't // affect the performance. win := v.winRecords[otherChainID] if win == nil { win = objCache.getWinRecord() v.winRecords[otherChainID] = win for idx, height = range v.cachedHeightVector[:numChains] { if height == infinity { continue } if other.cachedHeightVector[idx] == infinity { win.wins[idx] = 1 win.count++ } } } else { for _, idx = range dirtyChainIDs { if v.cachedHeightVector[idx] == infinity { if win.wins[idx] == 1 { win.wins[idx] = 0 win.count-- } continue } if other.cachedHeightVector[idx] == infinity { if win.wins[idx] == 0 { win.wins[idx] = 1 win.count++ } } else { if win.wins[idx] == 1 { win.wins[idx] = 0 win.count-- } } } } } // totalOrderingBreakpoint is a record of height discontinuity on a chain type totalOrderingBreakpoint struct { roundID uint64 // height of last block. lastHeight uint64 } // totalOrderingGroupVector keeps global status of current pending set. type totalOrderingGlobalVector struct { // blocks stores all blocks grouped by their proposers and sorted by height. // TODO(mission): slice used here reallocates frequently. blocks [][]*types.Block // breakpoints stores rounds for chains that blocks' height on them are // not consecutive, for example in chain i // Round Height // 0 0 // 0 1 // 1 2 // 1 3 // 1 4 // 2 - <- a config change of chain number occured // 2 - // 3 - // 3 - // 4 0 <- a breakpoint for round 3 is cached here // 5 - // 5 - // 6 0 <- breakpoint again // breakpoints[i][0] == &totalOrderingBreakpoint{roundID: 4, lastHeight: 4} // breakpoints[i][1] == &totalOrderingBreakpoint{roundID: 6, lastHeight: 0} breakpoints [][]*totalOrderingBreakpoint // curRound stores the last round ID used for purging breakpoints. curRound uint64 // tips records the last seen block for each chain. tips []*types.Block // Only ackedStatus in cachedCandidateInfo is used. cachedCandidateInfo *totalOrderingCandidateInfo } func newTotalOrderingGlobalVector(numChains uint32) *totalOrderingGlobalVector { return &totalOrderingGlobalVector{ blocks: make([][]*types.Block, numChains), tips: make([]*types.Block, numChains), breakpoints: make([][]*totalOrderingBreakpoint, numChains), } } func (global *totalOrderingGlobalVector) resize(numChains uint32) { if len(global.blocks) >= int(numChains) { return } // Resize blocks. newBlocks := make([][]*types.Block, numChains) copy(newBlocks, global.blocks) global.blocks = newBlocks // Resize breakpoints. newBreakPoints := make([][]*totalOrderingBreakpoint, numChains) copy(newBreakPoints, global.breakpoints) global.breakpoints = newBreakPoints // Resize tips. newTips := make([]*types.Block, numChains) copy(newTips, global.tips) global.tips = newTips } func (global *totalOrderingGlobalVector) switchRound(roundID uint64) { if global.curRound+1 != roundID { panic(ErrUnexpected) } global.curRound = roundID for chainID, bs := range global.breakpoints { if len(bs) == 0 { continue } if bs[0].roundID == roundID { global.breakpoints[chainID] = bs[1:] } } } func (global *totalOrderingGlobalVector) prepareHeightRecord( candidate *types.Block, info *totalOrderingCandidateInfo, acked map[common.Hash]struct{}) { var ( chainID = candidate.Position.ChainID breakpoints = global.breakpoints[chainID] breakpoint *totalOrderingBreakpoint rec *totalOrderingHeightRecord ) // Setup height record for own chain. rec = &totalOrderingHeightRecord{ minHeight: candidate.Position.Height, } if len(breakpoints) == 0 { // If no breakpoint, count is the amount of blocks. rec.count = uint64(len(global.blocks[chainID])) } else { // If there are breakpoints, only the first counts. rec.count = breakpoints[0].lastHeight - candidate.Position.Height + 1 } info.ackedStatus[chainID] = rec if acked == nil { return } for idx, blocks := range global.blocks { if idx == int(chainID) { continue } breakpoint = nil if len(global.breakpoints[idx]) > 0 { breakpoint = global.breakpoints[idx][0] } for i, b := range blocks { if breakpoint != nil && b.Position.Round >= breakpoint.roundID { break } if _, acked := acked[b.Hash]; !acked { continue } // If this block acks the candidate, all newer blocks from the same chain // also 'indirectly' acks the candidate. rec = info.ackedStatus[idx] rec.minHeight = b.Position.Height if breakpoint == nil { rec.count = uint64(len(blocks) - i) } else { rec.count = breakpoint.lastHeight - b.Position.Height + 1 } break } } } func (global *totalOrderingGlobalVector) addBlock( b *types.Block) (isOldest bool, pending bool, err error) { // isOldest implies the block is the oldest in global vector chainID := b.Position.ChainID tip := global.tips[chainID] isOldest = len(global.blocks[chainID]) == 0 if tip != nil { // Perform light weight sanity check based on tip. if tip.Position.Round > b.Position.Round { err = ErrNotValidDAG return } if DiffUint64(tip.Position.Round, b.Position.Round) > 1 { if b.Position.Height != 0 { err = ErrNotValidDAG return } // Add breakpoint. global.breakpoints[chainID] = append( global.breakpoints[chainID], &totalOrderingBreakpoint{ roundID: b.Position.Round, lastHeight: tip.Position.Height, }) } else { if b.Position.Height != tip.Position.Height+1 { err = ErrNotValidDAG return } } } else { if b.Position.Round < global.curRound { err = ErrBlockFromPastRound return } if b.Position.Round > global.curRound { // Add breakpoint. bp := &totalOrderingBreakpoint{ roundID: b.Position.Round, lastHeight: 0, } global.breakpoints[chainID] = append(global.breakpoints[chainID], bp) } } bps := global.breakpoints[chainID] pending = len(bps) > 0 && bps[0].roundID <= b.Position.Round global.blocks[chainID] = append(global.blocks[chainID], b) global.tips[chainID] = b return } // updateCandidateInfo udpates cached candidate info. func (global *totalOrderingGlobalVector) updateCandidateInfo( dirtyChainIDs []int, objCache *totalOrderingCache) { var ( idx int blocks []*types.Block block *types.Block info *totalOrderingCandidateInfo rec *totalOrderingHeightRecord breakpoint *totalOrderingBreakpoint ) if global.cachedCandidateInfo == nil { info = newTotalOrderingCandidateInfo(common.Hash{}, objCache) for idx, blocks = range global.blocks { if len(blocks) == 0 { continue } rec = info.ackedStatus[idx] if len(global.breakpoints[idx]) > 0 { breakpoint = global.breakpoints[idx][0] block = blocks[0] if block.Position.Round >= breakpoint.roundID { continue } rec.minHeight = block.Position.Height rec.count = breakpoint.lastHeight - block.Position.Height + 1 } else { rec.minHeight = blocks[0].Position.Height rec.count = uint64(len(blocks)) } } global.cachedCandidateInfo = info } else { info = global.cachedCandidateInfo for _, idx = range dirtyChainIDs { blocks = global.blocks[idx] if len(blocks) == 0 { info.ackedStatus[idx].count = 0 continue } rec = info.ackedStatus[idx] if len(global.breakpoints[idx]) > 0 { breakpoint = global.breakpoints[idx][0] block = blocks[0] if block.Position.Round >= breakpoint.roundID { continue } rec.minHeight = block.Position.Height rec.count = breakpoint.lastHeight - block.Position.Height + 1 } else { rec.minHeight = blocks[0].Position.Height rec.count = uint64(len(blocks)) } } } return } // totalOrdering represent a process unit to handle total ordering for blocks. type totalOrdering struct { // pendings stores blocks awaiting to be ordered. pendings map[common.Hash]*types.Block // The round of config used when performing total ordering. curRound uint64 // duringFlush is a flag to switch the flush mode and normal mode. duringFlush bool // flushReadyChains checks if the last block of that chain arrived. Once // last blocks from all chains in current config are arrived, we can // perform flush. flushReadyChains map[uint32]struct{} // flushed is a map of flushed blocks. flushed map[uint32]struct{} // globalVector group all pending blocks by proposers and // sort them by block height. This structure is helpful when: // // - build global height vector // - picking candidates next round globalVector *totalOrderingGlobalVector // candidates caches result of potential function during generating preceding // set. candidates []*totalOrderingCandidateInfo // acked stores the 'block A acked by block B' by acked[A.Hash][B.Hash] acked map[common.Hash]map[common.Hash]struct{} // dirtyChainIDs stores chainIDs that is "dirty", i.e. needed updating all // cached statuses (win record, acking status). dirtyChainIDs []int // objCache caches allocated objects, like map. objCache *totalOrderingCache // candidateChainMapping keeps a mapping from candidate's hash to // their chain IDs. candidateChainMapping map[uint32]common.Hash // candidateChainIDs records chain ID of all candidates. candidateChainIDs []uint32 // configs keeps configuration for each round in continuous way. configs []*totalOrderingConfig } // newTotalOrdering constructs an totalOrdering instance. func newTotalOrdering(dMoment time.Time, cfg *types.Config) *totalOrdering { config := &totalOrderingConfig{} config.fromConfig(0, cfg) config.setRoundBeginTime(dMoment) candidates := make([]*totalOrderingCandidateInfo, config.numChains) to := &totalOrdering{ pendings: make(map[common.Hash]*types.Block), globalVector: newTotalOrderingGlobalVector(config.numChains), dirtyChainIDs: make([]int, 0, config.numChains), acked: make(map[common.Hash]map[common.Hash]struct{}), objCache: newTotalOrderingObjectCache(config.numChains), candidateChainMapping: make(map[uint32]common.Hash), candidates: candidates, candidateChainIDs: make([]uint32, 0, config.numChains), curRound: config.roundID, } to.configs = []*totalOrderingConfig{config} return to } // appendConfig add new configs for upcoming rounds. If you add a config for // round R, next time you can only add the config for round R+1. func (to *totalOrdering) appendConfig( round uint64, config *types.Config) error { if round != uint64(len(to.configs))+to.configs[0].roundID { return ErrRoundNotIncreasing } if config.PhiRatio < 0.5 || config.PhiRatio > 1.0 { return ErrTotalOrderingPhiRatio } to.configs = append( to.configs, newTotalOrderingConfig(to.configs[len(to.configs)-1], config)) // Resize internal structures. to.globalVector.resize(config.NumChains) to.objCache.resize(config.NumChains) if int(config.NumChains) > len(to.candidates) { newCandidates := make([]*totalOrderingCandidateInfo, config.NumChains) copy(newCandidates, to.candidates) to.candidates = newCandidates } return nil } func (to *totalOrdering) switchRound() { to.curRound++ to.globalVector.switchRound(to.curRound) } // buildBlockRelation update all its indirect acks recursively. func (to *totalOrdering) buildBlockRelation(b *types.Block) { var ( curBlock, nextBlock *types.Block ack common.Hash acked map[common.Hash]struct{} exists, alreadyPopulated bool toCheck = []*types.Block{b} ) for len(toCheck) != 0 { curBlock, toCheck = toCheck[len(toCheck)-1], toCheck[:len(toCheck)-1] if curBlock.Position.Round > b.Position.Round { // It's illegal for a block to ack some blocks in future round. panic(ErrForwardAck) } for _, ack = range curBlock.Acks { if acked, exists = to.acked[ack]; !exists { acked = to.objCache.getAckedVector() to.acked[ack] = acked } // Check if the block is handled. if _, alreadyPopulated = acked[b.Hash]; alreadyPopulated { continue } acked[b.Hash] = struct{}{} // See if we need to do this recursively. if nextBlock, exists = to.pendings[ack]; exists { toCheck = append(toCheck, nextBlock) } } } } // clean a block from working set. This behaviour would prevent // our memory usage growing infinity. func (to *totalOrdering) clean(b *types.Block) { var ( h = b.Hash chainID = b.Position.ChainID ) to.objCache.putAckedVector(to.acked[h]) delete(to.acked, h) delete(to.pendings, h) to.candidates[chainID].recycle(to.objCache) to.candidates[chainID] = nil delete(to.candidateChainMapping, chainID) // Remove this candidate from candidate IDs. to.candidateChainIDs = removeFromSortedUint32Slice(to.candidateChainIDs, chainID) // Clear records of this candidate from other candidates. for _, idx := range to.candidateChainIDs { to.candidates[idx].clean(chainID) } } // updateVectors is a helper function to update all cached vectors. func (to *totalOrdering) updateVectors(b *types.Block) (isOldest bool, err error) { var ( candidateHash common.Hash chainID uint32 acked bool pending bool ) // Update global height vector if isOldest, pending, err = to.globalVector.addBlock(b); err != nil { return } if to.duringFlush { // It makes no sense to calculate potential functions of total ordering // when flushing would be happened. return } if pending { // The chain of this block contains breakpoints, which means their // height are not continuous. This implementation of DEXON total // ordering algorithm assumes the height of blocks in working set should // be continuous. // // To workaround this issue, when block arrived after breakpoints, // their information would not be contributed to current working set. // This mechanism works because we switch rounds by flushing and // reset the whole working set. // This works because forward acking blocks are rejected. return } // Update candidates' acking status. for chainID, candidateHash = range to.candidateChainMapping { if _, acked = to.acked[candidateHash][b.Hash]; !acked { continue } if err = to.candidates[chainID].addBlock(b); err != nil { return } } return } // prepareCandidate builds totalOrderingCandidateInfo for a new candidate. func (to *totalOrdering) prepareCandidate(b *types.Block) { var ( info = newTotalOrderingCandidateInfo(b.Hash, to.objCache) chainID = b.Position.ChainID ) to.candidates[chainID] = info to.candidateChainMapping[chainID] = b.Hash // Add index to slot to allocated list, make sure the modified list is sorted. to.candidateChainIDs = append(to.candidateChainIDs, chainID) sort.Slice(to.candidateChainIDs, func(i, j int) bool { return to.candidateChainIDs[i] < to.candidateChainIDs[j] }) to.globalVector.prepareHeightRecord(b, info, to.acked[b.Hash]) return } // isCandidate checks if a block only contains acks to delivered blocks. func (to *totalOrdering) isCandidate(b *types.Block) bool { for _, ack := range b.Acks { if _, exists := to.pendings[ack]; exists { return false } } return true } // output finishes the delivery of preceding set. func (to *totalOrdering) output( precedings map[common.Hash]struct{}, numChains uint32) (ret []*types.Block) { for p := range precedings { // Remove the first element from corresponding blockVector. b := to.pendings[p] chainID := b.Position.ChainID // TODO(mission): frequent reallocation here. to.globalVector.blocks[chainID] = to.globalVector.blocks[chainID][1:] ret = append(ret, b) // Remove block relations. to.clean(b) to.dirtyChainIDs = append(to.dirtyChainIDs, int(chainID)) } sort.Sort(types.ByHash(ret)) // Find new candidates from global vector's tips. // The complexity here is O(N^2logN). // TODO(mission): only tips which acking some blocks in the devliered set // should be checked. This improvement related to the latency introduced by K. for chainID, blocks := range to.globalVector.blocks[:numChains] { if len(blocks) == 0 { continue } if _, picked := to.candidateChainMapping[uint32(chainID)]; picked { continue } if !to.isCandidate(blocks[0]) { continue } // Build totalOrderingCandidateInfo for new candidate. to.prepareCandidate(blocks[0]) } return } // generateDeliverSet generates preceding set and checks if the preceding set // is deliverable by potential function. func (to *totalOrdering) generateDeliverSet() ( delivered map[common.Hash]struct{}, mode uint32) { var ( chainID, otherChainID uint32 info, otherInfo *totalOrderingCandidateInfo precedings = make(map[uint32]struct{}) cfg = to.configs[to.curRound-to.configs[0].roundID] ) mode = TotalOrderingModeNormal to.globalVector.updateCandidateInfo(to.dirtyChainIDs, to.objCache) globalInfo := to.globalVector.cachedCandidateInfo for _, chainID = range to.candidateChainIDs { to.candidates[chainID].updateAckingHeightVector( globalInfo, cfg.k, to.dirtyChainIDs, to.objCache) } // Update winning records for each candidate. // TODO(mission): It's not reasonable to request one routine for each // candidate, the context switch rate would be high. var wg sync.WaitGroup wg.Add(len(to.candidateChainIDs)) for _, chainID := range to.candidateChainIDs { info = to.candidates[chainID] go func(can uint32, canInfo *totalOrderingCandidateInfo) { defer wg.Done() for _, otherChainID := range to.candidateChainIDs { if can == otherChainID { continue } canInfo.updateWinRecord( otherChainID, to.candidates[otherChainID], to.dirtyChainIDs, to.objCache, cfg.numChains) } }(chainID, info) } wg.Wait() // Reset dirty chains. to.dirtyChainIDs = to.dirtyChainIDs[:0] // TODO(mission): ANS should be bounded by current numChains. globalAnsLength := globalInfo.getAckingNodeSetLength( globalInfo, cfg.k, cfg.numChains) CheckNextCandidateLoop: for _, chainID = range to.candidateChainIDs { info = to.candidates[chainID] for _, otherChainID = range to.candidateChainIDs { if chainID == otherChainID { continue } otherInfo = to.candidates[otherChainID] // TODO(mission): grade should be bounded by current numChains. if otherInfo.winRecords[chainID].grade( cfg.numChains, cfg.phi, globalAnsLength) != 0 { continue CheckNextCandidateLoop } } precedings[chainID] = struct{}{} } if len(precedings) == 0 { return } // internal is a helper function to verify internal stability. internal := func() bool { var ( isPreceding, beaten bool p uint32 ) for _, chainID = range to.candidateChainIDs { if _, isPreceding = precedings[chainID]; isPreceding { continue } beaten = false for p = range precedings { // TODO(mission): grade should be bound by current numChains. if beaten = to.candidates[p].winRecords[chainID].grade( cfg.numChains, cfg.phi, globalAnsLength) == 1; beaten { break } } if !beaten { return false } } return true } // checkAHV is a helper function to verify external stability. // It would make sure some preceding block is strong enough // to lead the whole preceding set. checkAHV := func() bool { var ( height, count uint64 p uint32 ) for p = range precedings { count = 0 info = to.candidates[p] for _, height = range info.cachedHeightVector { if height != infinity { count++ if count > cfg.phi { return true } } } } return false } // checkANS is a helper function to verify external stability. // It would make sure all preceding blocks are strong enough // to be delivered. checkANS := func() bool { var chainAnsLength uint64 for p := range precedings { // TODO(mission): ANS should be bound by current numChains. chainAnsLength = to.candidates[p].getAckingNodeSetLength( globalInfo, cfg.k, cfg.numChains) if uint64(chainAnsLength) < uint64(cfg.numChains)-cfg.phi { return false } } return true } // If all chains propose enough blocks, we should force // to deliver since the whole picture of the DAG is revealed. if globalAnsLength != uint64(cfg.numChains) { // Check internal stability first. if !internal() { return } // The whole picture is not ready, we need to check if // exteranl stability is met, and we can deliver earlier. if checkAHV() && checkANS() { mode = TotalOrderingModeEarly } else { return } } delivered = make(map[common.Hash]struct{}) for p := range precedings { delivered[to.candidates[p].hash] = struct{}{} } return } // flushBlocks flushes blocks. func (to *totalOrdering) flushBlocks( b *types.Block) (flushed []*types.Block, mode uint32, err error) { mode = TotalOrderingModeFlush cfg := to.configs[to.curRound-to.configs[0].roundID] if cfg.isLastBlock(b) { to.flushReadyChains[b.Position.ChainID] = struct{}{} } // Flush blocks until last blocks from all chains appeared. if len(to.flushReadyChains) < int(cfg.numChains) { return } if len(to.flushReadyChains) > int(cfg.numChains) { // This case should never be occured. err = ErrFutureRoundDelivered return } // Dump all blocks in this round. for len(to.flushed) != int(cfg.numChains) { // Dump all candidates without checking potential function. flushedHashes := make(map[common.Hash]struct{}) for _, chainID := range to.candidateChainIDs { candidateBlock := to.pendings[to.candidates[chainID].hash] if candidateBlock.Position.Round > to.curRound { continue } flushedHashes[candidateBlock.Hash] = struct{}{} } if len(flushedHashes) == 0 { err = ErrTotalOrderingHangs return } flushedBlocks := to.output(flushedHashes, cfg.numChains) for _, b := range flushedBlocks { if cfg.isLastBlock(b) { to.flushed[b.Position.ChainID] = struct{}{} } } flushed = append(flushed, flushedBlocks...) } // Switch back to non-flushing mode. to.duringFlush = false to.flushed = make(map[uint32]struct{}) to.flushReadyChains = make(map[uint32]struct{}) // Clean all cached intermediate stats. for idx := range to.candidates { if to.candidates[idx] == nil { continue } to.candidates[idx].recycle(to.objCache) to.candidates[idx] = nil } to.dirtyChainIDs = nil to.candidateChainMapping = make(map[uint32]common.Hash) to.candidateChainIDs = nil to.globalVector.cachedCandidateInfo = nil to.switchRound() // Force picking new candidates. numChains := to.configs[to.curRound-to.configs[0].roundID].numChains to.output(map[common.Hash]struct{}{}, numChains) return } // deliverBlocks delivers blocks by DEXON total ordering algorithm. func (to *totalOrdering) deliverBlocks() ( delivered []*types.Block, mode uint32, err error) { hashes, mode := to.generateDeliverSet() cfg := to.configs[to.curRound-to.configs[0].roundID] // Output precedings. delivered = to.output(hashes, cfg.numChains) // Check if any block in delivered set is the last block in this round, if // there is, perform flush or round-switch. for _, b := range delivered { if b.Position.Round > to.curRound { err = ErrFutureRoundDelivered return } if !cfg.isLastBlock(b) { continue } // Code reaches here if a last block is processed. This triggers // "duringFlush" mode if config changes. if cfg.isFlushRequired { // Switch to flush mode. to.duringFlush = true to.flushReadyChains = make(map[uint32]struct{}) to.flushed = make(map[uint32]struct{}) } else { // Switch round directly. to.switchRound() } break } if to.duringFlush { // Collect last blocks until all last blocks appears and function // flushBlocks will be called. for _, b := range delivered { if cfg.isLastBlock(b) { to.flushed[b.Position.ChainID] = struct{}{} } } // Some last blocks for the round to be flushed might not be delivered // yet. for _, tip := range to.globalVector.tips[:cfg.numChains] { if tip.Position.Round > to.curRound || cfg.isLastBlock(tip) { to.flushReadyChains[tip.Position.ChainID] = struct{}{} } } } return } // processBlock is the entry point of totalOrdering. func (to *totalOrdering) processBlock( b *types.Block) ([]*types.Block, uint32, error) { // NOTE: Block b is assumed to be in topologically sorted, i.e., all its // acking blocks are during or after total ordering stage. cfg := to.configs[to.curRound-to.configs[0].roundID] to.pendings[b.Hash] = b to.buildBlockRelation(b) isOldest, err := to.updateVectors(b) if err != nil { return nil, uint32(0), err } // Mark the proposer of incoming block as dirty. if b.Position.ChainID < cfg.numChains { to.dirtyChainIDs = append(to.dirtyChainIDs, int(b.Position.ChainID)) _, exists := to.candidateChainMapping[b.Position.ChainID] if isOldest && !exists && to.isCandidate(b) { // isOldest means b is the oldest block in global vector, and isCandidate // is still needed here due to round change. For example: // o o o <- genesis block for round change, isCandidate returns true // | | but isOldest is false // o o // | | // o o o <- isOldest is true but isCandidate returns false // | | / // o o to.prepareCandidate(b) } } if to.duringFlush { return to.flushBlocks(b) } return to.deliverBlocks() }