diff options
Diffstat (limited to 'core/total-ordering.go')
-rw-r--r-- | core/total-ordering.go | 499 |
1 files changed, 300 insertions, 199 deletions
diff --git a/core/total-ordering.go b/core/total-ordering.go index c7270c5..1edccdf 100644 --- a/core/total-ordering.go +++ b/core/total-ordering.go @@ -31,22 +31,45 @@ const ( infinity uint64 = math.MaxUint64 ) -// ErrNotValidDAG would be reported when block subbmitted to totalOrdering -// didn't form a DAG. -var ErrNotValidDAG = fmt.Errorf("not a valid dag") +var ( + // ErrNotValidDAG would be reported when block subbmitted to totalOrdering + // didn't form a DAG. + ErrNotValidDAG = fmt.Errorf("not a valid dag") + // ErrValidatorNotRecognized means the validator is unknown to this module. + ErrValidatorNotRecognized = fmt.Errorf("validator not recognized") +) // totalOrderinWinRecord caches which validators this candidate // wins another one based on their height vector. -type totalOrderingWinRecord map[types.ValidatorID]struct{} +type totalOrderingWinRecord struct { + wins []int8 + count uint +} + +func (rec *totalOrderingWinRecord) reset() { + rec.count = 0 + for idx := range rec.wins { + rec.wins[idx] = 0 + } +} + +func newTotalOrderingWinRecord(validatorCount int) ( + rec *totalOrderingWinRecord) { + + rec = &totalOrderingWinRecord{} + rec.reset() + rec.wins = make([]int8, validatorCount) + return +} // grade implements the 'grade' potential function described in white paper. -func (rec totalOrderingWinRecord) grade( +func (rec *totalOrderingWinRecord) grade( validatorCount, phi uint64, globalAnsLength uint64) int { - if uint64(len(rec)) >= phi { + if uint64(rec.count) >= phi { return 1 - } else if uint64(len(rec)) < phi-validatorCount+globalAnsLength { + } else if uint64(rec.count) < phi-validatorCount+globalAnsLength { return 0 } else { return -1 @@ -66,38 +89,44 @@ type totalOrderingHeightRecord struct{ minHeight, count uint64 } // However, to reuse a map, we have no easy way to erase its content but // iterating its keys and delete corresponding values. type totalOrderingObjectCache struct { - ackedStatus []map[types.ValidatorID]*totalOrderingHeightRecord - winRecordContainers []map[common.Hash]totalOrderingWinRecord - heightVectors []map[types.ValidatorID]uint64 + ackedStatus [][]*totalOrderingHeightRecord + heightVectors [][]uint64 + winRecordContainers [][]*totalOrderingWinRecord ackedVectors []map[common.Hash]struct{} winRecordPool sync.Pool + validatorCount int } // newTotalOrderingObjectCache constructs an totalOrderingObjectCache // instance. -func newTotalOrderingObjectCache() *totalOrderingObjectCache { +func newTotalOrderingObjectCache(validatorCount int) *totalOrderingObjectCache { return &totalOrderingObjectCache{ winRecordPool: sync.Pool{ New: func() interface{} { - return make(totalOrderingWinRecord) + return newTotalOrderingWinRecord(validatorCount) }, }, + validatorCount: validatorCount, } } // requestAckedStatus requests a structure to record acking status of one // candidate (or a global view of acking status of pending set). func (cache *totalOrderingObjectCache) requestAckedStatus() ( - acked map[types.ValidatorID]*totalOrderingHeightRecord) { + acked []*totalOrderingHeightRecord) { if len(cache.ackedStatus) == 0 { - acked = make(map[types.ValidatorID]*totalOrderingHeightRecord) + acked = make([]*totalOrderingHeightRecord, cache.validatorCount) + for idx := range acked { + acked[idx] = &totalOrderingHeightRecord{count: 0} + } } else { acked, cache.ackedStatus = cache.ackedStatus[len(cache.ackedStatus)-1], cache.ackedStatus[:len(cache.ackedStatus)-1] - for k := range acked { - delete(acked, k) + // Reset acked status. + for idx := range acked { + acked[idx].count = 0 } } return @@ -105,43 +134,44 @@ func (cache *totalOrderingObjectCache) requestAckedStatus() ( // recycleAckedStatys recycles the structure to record acking status. func (cache *totalOrderingObjectCache) recycleAckedStatus( - acked map[types.ValidatorID]*totalOrderingHeightRecord) { + acked []*totalOrderingHeightRecord) { cache.ackedStatus = append(cache.ackedStatus, acked) } // requestWinRecord requests an totalOrderingWinRecord instance. func (cache *totalOrderingObjectCache) requestWinRecord() ( - win totalOrderingWinRecord) { + win *totalOrderingWinRecord) { - win = cache.winRecordPool.Get().(totalOrderingWinRecord) - for k := range win { - delete(win, k) - } + win = cache.winRecordPool.Get().(*totalOrderingWinRecord) + win.reset() return } // recycleWinRecord recycles an totalOrderingWinRecord instance. func (cache *totalOrderingObjectCache) recycleWinRecord( - win totalOrderingWinRecord) { + win *totalOrderingWinRecord) { + if win == nil { + return + } cache.winRecordPool.Put(win) } // requestHeightVector requests a structure to record acking heights // of one candidate. func (cache *totalOrderingObjectCache) requestHeightVector() ( - hv map[types.ValidatorID]uint64) { + hv []uint64) { if len(cache.heightVectors) == 0 { - hv = make(map[types.ValidatorID]uint64) + hv = make([]uint64, cache.validatorCount) } else { hv, cache.heightVectors = cache.heightVectors[len(cache.heightVectors)-1], cache.heightVectors[:len(cache.heightVectors)-1] - for k := range hv { - delete(hv, k) - } + } + for idx := range hv { + hv[idx] = infinity } return } @@ -149,23 +179,23 @@ func (cache *totalOrderingObjectCache) requestHeightVector() ( // recycleHeightVector recycles an instance to record acking heights // of one candidate. func (cache *totalOrderingObjectCache) recycleHeightVector( - hv map[types.ValidatorID]uint64) { + hv []uint64) { cache.heightVectors = append(cache.heightVectors, hv) } // requestWinRecordContainer requests a map of totalOrderingWinRecord. func (cache *totalOrderingObjectCache) requestWinRecordContainer() ( - con map[common.Hash]totalOrderingWinRecord) { + con []*totalOrderingWinRecord) { if len(cache.winRecordContainers) == 0 { - con = make(map[common.Hash]totalOrderingWinRecord) + con = make([]*totalOrderingWinRecord, cache.validatorCount) } else { con, cache.winRecordContainers = cache.winRecordContainers[len(cache.winRecordContainers)-1], cache.winRecordContainers[:len(cache.winRecordContainers)-1] - for k := range con { - delete(con, k) + for idx := range con { + con[idx] = nil } } return @@ -173,7 +203,7 @@ func (cache *totalOrderingObjectCache) requestWinRecordContainer() ( // recycleWinRecordContainer recycles a map of totalOrderingWinRecord. func (cache *totalOrderingObjectCache) recycleWinRecordContainer( - con map[common.Hash]totalOrderingWinRecord) { + con []*totalOrderingWinRecord) { cache.winRecordContainers = append(cache.winRecordContainers, con) } @@ -221,26 +251,29 @@ func (cache *totalOrderingObjectCache) recycleAckedVector( // - count of acking blocks from that proposer // to repsent the acking status for block A. type totalOrderingCandidateInfo struct { - ackedStatus map[types.ValidatorID]*totalOrderingHeightRecord - cachedHeightVector map[types.ValidatorID]uint64 - winRecords map[common.Hash]totalOrderingWinRecord + ackedStatus []*totalOrderingHeightRecord + cachedHeightVector []uint64 + winRecords []*totalOrderingWinRecord + hash common.Hash } // newTotalOrderingCandidateInfo constructs an totalOrderingCandidateInfo // instance. func newTotalOrderingCandidateInfo( + candidateHash common.Hash, objCache *totalOrderingObjectCache) *totalOrderingCandidateInfo { return &totalOrderingCandidateInfo{ ackedStatus: objCache.requestAckedStatus(), winRecords: objCache.requestWinRecordContainer(), + hash: candidateHash, } } // clean clear information related to another candidate, which should be called // when that candidate is selected as deliver set. -func (v *totalOrderingCandidateInfo) clean(otherCandidate common.Hash) { - delete(v.winRecords, otherCandidate) +func (v *totalOrderingCandidateInfo) clean(otherCandidateIndex int) { + v.winRecords[otherCandidateIndex] = nil } // recycle objects for later usage, this eases the loading of @@ -262,13 +295,13 @@ func (v *totalOrderingCandidateInfo) recycle( // addBlock would update totalOrderingCandidateInfo, it's caller's duty // to make sure the input block acutally acking the target block. -func (v *totalOrderingCandidateInfo) addBlock(b *types.Block) (err error) { - rec, exists := v.ackedStatus[b.ProposerID] - if !exists { - v.ackedStatus[b.ProposerID] = &totalOrderingHeightRecord{ - minHeight: b.Height, - count: 1, - } +func (v *totalOrderingCandidateInfo) addBlock( + b *types.Block, proposerIndex int) (err error) { + + rec := v.ackedStatus[proposerIndex] + if rec.count == 0 { + rec.minHeight = b.Height + rec.count = 1 } else { if b.Height < rec.minHeight { err = ErrNotValidDAG @@ -294,17 +327,15 @@ func (v *totalOrderingCandidateInfo) getAckingNodeSetLength( global *totalOrderingCandidateInfo, k uint64) (count uint64) { - var ( - rec *totalOrderingHeightRecord - exists bool - ) - - for vID, gRec := range global.ackedStatus { - rec, exists = v.ackedStatus[vID] - if !exists { + var rec *totalOrderingHeightRecord + for idx, gRec := range global.ackedStatus { + if gRec.count == 0 { + continue + } + rec = v.ackedStatus[idx] + if rec.count == 0 { continue } - // This line would check if these two ranges would overlap: // - (global minimum height + k, infinity) // - (local minimum height, local minimum height + count - 1) @@ -322,13 +353,12 @@ func (v *totalOrderingCandidateInfo) getAckingNodeSetLength( func (v *totalOrderingCandidateInfo) updateAckingHeightVector( global *totalOrderingCandidateInfo, k uint64, - dirtyValidators types.ValidatorIDs, + dirtyValidatorIndexes []int, objCache *totalOrderingObjectCache) { var ( - vID types.ValidatorID + idx int gRec, rec *totalOrderingHeightRecord - exists bool ) // The reason not to merge the two loops is the iteration over map @@ -339,42 +369,39 @@ func (v *totalOrderingCandidateInfo) updateAckingHeightVector( if v.cachedHeightVector == nil { // Generate height vector from scratch. v.cachedHeightVector = objCache.requestHeightVector() - for vID, gRec = range global.ackedStatus { + for idx, gRec = range global.ackedStatus { if gRec.count <= k { - delete(v.cachedHeightVector, vID) continue } - rec, exists = v.ackedStatus[vID] - if !exists { - v.cachedHeightVector[vID] = infinity + rec = v.ackedStatus[idx] + if rec.count == 0 { + v.cachedHeightVector[idx] = infinity } else if rec.minHeight <= gRec.minHeight+k { // This check is sufficient to make sure the block height: // // gRec.minHeight + k // // would be included in this totalOrderingCandidateInfo. - v.cachedHeightVector[vID] = gRec.minHeight + k + v.cachedHeightVector[idx] = gRec.minHeight + k } else { - v.cachedHeightVector[vID] = infinity + v.cachedHeightVector[idx] = infinity } } } else { // Return the cached one, only update dirty fields. - for _, vID = range dirtyValidators { - gRec, exists = global.ackedStatus[vID] - if !exists { + for _, idx = range dirtyValidatorIndexes { + gRec = global.ackedStatus[idx] + if gRec.count == 0 || gRec.count <= k { + v.cachedHeightVector[idx] = infinity continue } - rec, exists = v.ackedStatus[vID] - if gRec.count <= k { - delete(v.cachedHeightVector, vID) - continue - } else if !exists { - v.cachedHeightVector[vID] = infinity + rec = v.ackedStatus[idx] + if rec.count == 0 { + v.cachedHeightVector[idx] = infinity } else if rec.minHeight <= gRec.minHeight+k { - v.cachedHeightVector[vID] = gRec.minHeight + k + v.cachedHeightVector[idx] = gRec.minHeight + k } else { - v.cachedHeightVector[vID] = infinity + v.cachedHeightVector[idx] = infinity } } } @@ -383,15 +410,14 @@ func (v *totalOrderingCandidateInfo) updateAckingHeightVector( // updateWinRecord setup win records between two candidates. func (v *totalOrderingCandidateInfo) updateWinRecord( - otherCandidate common.Hash, + otherCandidateIndex int, other *totalOrderingCandidateInfo, - dirtyValidators types.ValidatorIDs, + dirtyValidatorIndexes []int, objCache *totalOrderingObjectCache) { var ( - vID types.ValidatorID - hTo, hFrom uint64 - exists bool + idx int + height uint64 ) // The reason not to merge the two loops is the iteration over map @@ -399,35 +425,38 @@ func (v *totalOrderingCandidateInfo) updateWinRecord( // validators is cheaper. // TODO(mission): merge the code in this if/else if add a function won't // affect the performance. - win, exists := v.winRecords[otherCandidate] - if !exists { + win := v.winRecords[otherCandidateIndex] + if win == nil { win = objCache.requestWinRecord() - v.winRecords[otherCandidate] = win - for vID, hFrom = range v.cachedHeightVector { - hTo, exists = other.cachedHeightVector[vID] - if !exists { + v.winRecords[otherCandidateIndex] = win + for idx, height = range v.cachedHeightVector { + if height == infinity { continue } - if hFrom != infinity && hTo == infinity { - win[vID] = struct{}{} + if other.cachedHeightVector[idx] == infinity { + win.wins[idx] = 1 + win.count++ } } } else { - for _, vID = range dirtyValidators { - hFrom, exists = v.cachedHeightVector[vID] - if !exists { - delete(win, vID) - return - } - hTo, exists = other.cachedHeightVector[vID] - if !exists { - delete(win, vID) - return + for _, idx = range dirtyValidatorIndexes { + if v.cachedHeightVector[idx] == infinity { + if win.wins[idx] == 1 { + win.wins[idx] = 0 + win.count-- + } + continue } - if hFrom != infinity && hTo == infinity { - win[vID] = struct{}{} + if other.cachedHeightVector[idx] == infinity { + if win.wins[idx] == 0 { + win.wins[idx] = 1 + win.count++ + } } else { - delete(win, vID) + if win.wins[idx] == 1 { + win.wins[idx] = 0 + win.count-- + } } } } @@ -438,22 +467,26 @@ type totalOrderingGlobalVector struct { // blocks stores all blocks grouped by their proposers and // sorted by their block height. // - // TODO: the way we use this slice would make it reallocate frequently. - blocks map[types.ValidatorID][]*types.Block + // TODO(mission): the way we use this slice would make it reallocate frequently. + blocks [][]*types.Block // cachedCandidateInfo is an totalOrderingCandidateInfo instance, // which is just used for actual candidates to calculate height vector. cachedCandidateInfo *totalOrderingCandidateInfo } -func newTotalOrderingGlobalVector() *totalOrderingGlobalVector { +func newTotalOrderingGlobalVector( + validatorCount int) *totalOrderingGlobalVector { + return &totalOrderingGlobalVector{ - blocks: make(map[types.ValidatorID][]*types.Block), + blocks: make([][]*types.Block, validatorCount), } } -func (global *totalOrderingGlobalVector) addBlock(b *types.Block) (err error) { - blocksFromProposer := global.blocks[b.ProposerID] +func (global *totalOrderingGlobalVector) addBlock( + b *types.Block, proposerIndex int) (err error) { + + blocksFromProposer := global.blocks[proposerIndex] if len(blocksFromProposer) > 0 { lastBlock := blocksFromProposer[len(blocksFromProposer)-1] if b.Height-lastBlock.Height != 1 { @@ -461,44 +494,43 @@ func (global *totalOrderingGlobalVector) addBlock(b *types.Block) (err error) { return } } - global.blocks[b.ProposerID] = append(blocksFromProposer, b) + global.blocks[proposerIndex] = append(blocksFromProposer, b) return } // updateCandidateInfo udpate cached candidate info. func (global *totalOrderingGlobalVector) updateCandidateInfo( - dirtyValidators types.ValidatorIDs, objCache *totalOrderingObjectCache) { + dirtyValidatorIndexes []int, objCache *totalOrderingObjectCache) { var ( - vID types.ValidatorID + idx int blocks []*types.Block info *totalOrderingCandidateInfo + rec *totalOrderingHeightRecord ) if global.cachedCandidateInfo == nil { - info = newTotalOrderingCandidateInfo(objCache) - for vID, blocks = range global.blocks { + info = newTotalOrderingCandidateInfo(common.Hash{}, objCache) + for idx, blocks = range global.blocks { if len(blocks) == 0 { continue } - info.ackedStatus[vID] = &totalOrderingHeightRecord{ - minHeight: blocks[0].Height, - count: uint64(len(blocks)), - } + rec = info.ackedStatus[idx] + rec.minHeight = blocks[0].Height + rec.count = uint64(len(blocks)) } global.cachedCandidateInfo = info } else { info = global.cachedCandidateInfo - for _, vID = range dirtyValidators { - blocks = global.blocks[vID] + for _, idx = range dirtyValidatorIndexes { + blocks = global.blocks[idx] if len(blocks) == 0 { - delete(info.ackedStatus, vID) + info.ackedStatus[idx].count = 0 continue } - info.ackedStatus[vID] = &totalOrderingHeightRecord{ - minHeight: blocks[0].Height, - count: uint64(len(blocks)), - } + rec = info.ackedStatus[idx] + rec.minHeight = blocks[0].Height + rec.count = uint64(len(blocks)) } } return @@ -531,31 +563,55 @@ type totalOrdering struct { // candidates caches result of potential function during generating // preceding sets. - candidates map[common.Hash]*totalOrderingCandidateInfo + candidates []*totalOrderingCandidateInfo // acked cache the 'block A acked by block B' relation by // keeping a record in acked[A.Hash][B.Hash] acked map[common.Hash]map[common.Hash]struct{} - // dirtyValidators records which validatorID that should be updated for - // all cached status (win record, acking status). - dirtyValidators types.ValidatorIDs + // dirtyValidatorIndexes records which validatorID that should be updated + // for all cached status (win record, acking status). + dirtyValidatorIndexes []int // objCache caches allocated objects, like map. objCache *totalOrderingObjectCache + + // validatorIndexMapping maps validatorID to an unique integer, which + // could be used as slice index. + validatorIndexMapping map[types.ValidatorID]int + + // candidateIndexMapping maps block hashes of candidates to an unique + // integer, which could be used as slice index. + candidateIndexMapping map[common.Hash]int + + // allocatedCandidateSlotIndexes records all used slot indexes in + // candidates slice. + allocatedCandidateSlotIndexes []int } -func newTotalOrdering(k, phi, validatorCount uint64) *totalOrdering { +func newTotalOrdering( + k, phi uint64, validators types.ValidatorIDs) *totalOrdering { + + // Setup validatorID to index mapping. + validatorIndexMapping := make(map[types.ValidatorID]int) + for _, vID := range validators { + validatorIndexMapping[vID] = len(validatorIndexMapping) + } + validatorCount := len(validators) return &totalOrdering{ - candidates: make(map[common.Hash]*totalOrderingCandidateInfo), - pendings: make(map[common.Hash]*types.Block), - k: k, - phi: phi, - validatorCount: validatorCount, - globalVector: newTotalOrderingGlobalVector(), - dirtyValidators: make(types.ValidatorIDs, 0, int(validatorCount)), - acked: make(map[common.Hash]map[common.Hash]struct{}), - objCache: newTotalOrderingObjectCache(), + pendings: make(map[common.Hash]*types.Block), + k: k, + phi: phi, + validatorCount: uint64(validatorCount), + globalVector: newTotalOrderingGlobalVector(validatorCount), + dirtyValidatorIndexes: make([]int, 0, validatorCount), + acked: make(map[common.Hash]map[common.Hash]struct{}), + objCache: newTotalOrderingObjectCache(validatorCount), + validatorIndexMapping: validatorIndexMapping, + candidateIndexMapping: make(map[common.Hash]int), + candidates: make( + []*totalOrderingCandidateInfo, validatorCount), + allocatedCandidateSlotIndexes: make([]int, 0, validatorCount), } } @@ -601,32 +657,41 @@ func (to *totalOrdering) clean(h common.Hash) { to.objCache.recycleAckedVector(to.acked[h]) delete(to.acked, h) delete(to.pendings, h) - to.candidates[h].recycle(to.objCache) - delete(to.candidates, h) - for _, info := range to.candidates { - info.clean(h) + slotIndex := to.candidateIndexMapping[h] + to.candidates[slotIndex].recycle(to.objCache) + to.candidates[slotIndex] = nil + delete(to.candidateIndexMapping, h) + // Remove this candidate from allocated slot indexes. + to.allocatedCandidateSlotIndexes = removeFromSortedIntSlice( + to.allocatedCandidateSlotIndexes, slotIndex) + // Clear records of this candidate from other candidates. + for _, idx := range to.allocatedCandidateSlotIndexes { + to.candidates[idx].clean(slotIndex) } } // updateVectors is a helper function to update all cached vectors. -func (to *totalOrdering) updateVectors(b *types.Block) (err error) { +func (to *totalOrdering) updateVectors( + b *types.Block, proposerIndex int) (err error) { var ( - candidate common.Hash - info *totalOrderingCandidateInfo - acked bool + candidateHash common.Hash + candidateIndex int + acked bool ) // Update global height vector - err = to.globalVector.addBlock(b) + err = to.globalVector.addBlock(b, proposerIndex) if err != nil { return } // Update acking status of candidates. - for candidate, info = range to.candidates { - if _, acked = to.acked[candidate][b.Hash]; !acked { + for candidateHash, candidateIndex = range to.candidateIndexMapping { + if _, acked = to.acked[candidateHash][b.Hash]; !acked { continue } - if err = info.addBlock(b); err != nil { + if err = to.candidates[candidateIndex].addBlock( + b, proposerIndex); err != nil { + return } } @@ -636,20 +701,32 @@ func (to *totalOrdering) updateVectors(b *types.Block) (err error) { // prepareCandidate is a helper function to // build totalOrderingCandidateInfo for new candidate. func (to *totalOrdering) prepareCandidate( - candidate *types.Block) (info *totalOrderingCandidateInfo) { + candidate *types.Block, proposerIndex int) { + + var ( + info = newTotalOrderingCandidateInfo( + candidate.Hash, to.objCache) + ) + + to.candidates[proposerIndex] = info + to.candidateIndexMapping[candidate.Hash] = proposerIndex + // Add index to slot to allocated list, make sure the modified list sorted. + to.allocatedCandidateSlotIndexes = append( + to.allocatedCandidateSlotIndexes, proposerIndex) + sort.Ints(to.allocatedCandidateSlotIndexes) - info = newTotalOrderingCandidateInfo(to.objCache) - info.ackedStatus[candidate.ProposerID] = &totalOrderingHeightRecord{ + info.ackedStatus[proposerIndex] = &totalOrderingHeightRecord{ minHeight: candidate.Height, - count: uint64(len(to.globalVector.blocks[candidate.ProposerID])), + count: uint64(len(to.globalVector.blocks[proposerIndex])), } ackedsForCandidate, exists := to.acked[candidate.Hash] if !exists { // This candidate is acked by nobody. return } - for vID, blocks := range to.globalVector.blocks { - if vID == candidate.ProposerID { + var rec *totalOrderingHeightRecord + for idx, blocks := range to.globalVector.blocks { + if idx == proposerIndex { continue } for i, b := range blocks { @@ -658,10 +735,9 @@ func (to *totalOrdering) prepareCandidate( } // If this block acks this candidate, all newer blocks // from the same validator also 'indirect' acks it. - info.ackedStatus[vID] = &totalOrderingHeightRecord{ - minHeight: b.Height, - count: uint64(len(blocks) - i), - } + rec = info.ackedStatus[idx] + rec.minHeight = b.Height + rec.count = uint64(len(blocks) - i) break } } @@ -685,32 +761,40 @@ func (to *totalOrdering) output(precedings map[common.Hash]struct{}) (ret []*typ for p := range precedings { // Remove the first element from corresponding blockVector. b := to.pendings[p] - to.globalVector.blocks[b.ProposerID] = - to.globalVector.blocks[b.ProposerID][1:] + // TODO(mission): This way to use slice makes it reallocate frequently. + blockProposerIndex := to.validatorIndexMapping[b.ProposerID] + to.globalVector.blocks[blockProposerIndex] = + to.globalVector.blocks[blockProposerIndex][1:] ret = append(ret, b) // Remove block relations. to.clean(p) - to.dirtyValidators = append(to.dirtyValidators, b.ProposerID) + to.dirtyValidatorIndexes = append( + to.dirtyValidatorIndexes, blockProposerIndex) } sort.Sort(types.ByHash(ret)) // Find new candidates from tip of globalVector of each validator. // The complexity here is O(N^2logN). + // TODO(mission): only those tips that acking some blocks in + // the devliered set should be checked. This + // improvment related to the latency introduced by K. for _, blocks := range to.globalVector.blocks { if len(blocks) == 0 { continue } tip := blocks[0] if _, alreadyCandidate := - to.candidates[tip.Hash]; alreadyCandidate { + to.candidateIndexMapping[tip.Hash]; alreadyCandidate { continue } if !to.isAckOnlyPrecedings(tip) { continue } // Build totalOrderingCandidateInfo for new candidate. - to.candidates[tip.Hash] = to.prepareCandidate(tip) + to.prepareCandidate( + tip, + to.validatorIndexMapping[tip.ProposerID]) } return ret } @@ -722,52 +806,61 @@ func (to *totalOrdering) generateDeliverSet() ( delivered map[common.Hash]struct{}, early bool) { var ( - candidate, otherCandidate common.Hash - info, otherInfo *totalOrderingCandidateInfo - precedings = make(map[common.Hash]struct{}) + candidateIndex, otherCandidateIndex int + info, otherInfo *totalOrderingCandidateInfo + precedings = make(map[int]struct{}) ) - to.globalVector.updateCandidateInfo(to.dirtyValidators, to.objCache) + to.globalVector.updateCandidateInfo(to.dirtyValidatorIndexes, to.objCache) globalInfo := to.globalVector.cachedCandidateInfo - for _, info = range to.candidates { - info.updateAckingHeightVector( - globalInfo, to.k, to.dirtyValidators, to.objCache) + for _, candidateIndex = range to.allocatedCandidateSlotIndexes { + to.candidates[candidateIndex].updateAckingHeightVector( + globalInfo, to.k, to.dirtyValidatorIndexes, to.objCache) } // Update winning records for each candidate. + // TODO(mission): It's not reasonable to + // request one routine for each candidate, the context + // switch rate would be high. var wg sync.WaitGroup - wg.Add(len(to.candidates)) - for candidate, info := range to.candidates { - go func(can common.Hash, canInfo *totalOrderingCandidateInfo) { - for otherCandidate, otherInfo := range to.candidates { - if can == otherCandidate { + wg.Add(len(to.allocatedCandidateSlotIndexes)) + for _, candidateIndex := range to.allocatedCandidateSlotIndexes { + info = to.candidates[candidateIndex] + go func(can int, canInfo *totalOrderingCandidateInfo) { + for _, otherCandidateIndex := range to.allocatedCandidateSlotIndexes { + if can == otherCandidateIndex { continue } canInfo.updateWinRecord( - otherCandidate, otherInfo, to.dirtyValidators, to.objCache) + otherCandidateIndex, + to.candidates[otherCandidateIndex], + to.dirtyValidatorIndexes, + to.objCache) } wg.Done() - }(candidate, info) + }(candidateIndex, info) } wg.Wait() // Reset dirty validators. - to.dirtyValidators = to.dirtyValidators[:0] + to.dirtyValidatorIndexes = to.dirtyValidatorIndexes[:0] globalAnsLength := globalInfo.getAckingNodeSetLength(globalInfo, to.k) CheckNextCandidateLoop: - for candidate = range to.candidates { - for otherCandidate, otherInfo = range to.candidates { - if candidate == otherCandidate { + for _, candidateIndex = range to.allocatedCandidateSlotIndexes { + info = to.candidates[candidateIndex] + for _, otherCandidateIndex = range to.allocatedCandidateSlotIndexes { + if candidateIndex == otherCandidateIndex { continue } - if otherInfo.winRecords[candidate].grade( + otherInfo = to.candidates[otherCandidateIndex] + if otherInfo.winRecords[candidateIndex].grade( to.validatorCount, to.phi, globalAnsLength) != 0 { continue CheckNextCandidateLoop } } - precedings[candidate] = struct{}{} + precedings[candidateIndex] = struct{}{} } if len(precedings) == 0 { return @@ -777,15 +870,15 @@ CheckNextCandidateLoop: internal := func() bool { var ( isPreceding, beaten bool - p common.Hash + p int ) - for candidate = range to.candidates { - if _, isPreceding = precedings[candidate]; isPreceding { + for _, candidateIndex = range to.allocatedCandidateSlotIndexes { + if _, isPreceding = precedings[candidateIndex]; isPreceding { continue } beaten = false for p = range precedings { - if beaten = to.candidates[p].winRecords[candidate].grade( + if beaten = to.candidates[p].winRecords[candidateIndex].grade( to.validatorCount, to.phi, globalAnsLength) == 1; beaten { break } @@ -802,15 +895,13 @@ CheckNextCandidateLoop: // to lead the whole preceding set. checkAHV := func() bool { var ( - height uint64 - p common.Hash - count uint64 - status *totalOrderingCandidateInfo + height, count uint64 + p int ) for p = range precedings { count = 0 - status = to.candidates[p] - for _, height = range status.cachedHeightVector { + info = to.candidates[p] + for _, height = range info.cachedHeightVector { if height != infinity { count++ if count > to.phi { @@ -853,7 +944,10 @@ CheckNextCandidateLoop: return } } - delivered = precedings + delivered = make(map[common.Hash]struct{}) + for p := range precedings { + delivered[to.candidates[p].hash] = struct{}{} + } return } @@ -864,16 +958,23 @@ func (to *totalOrdering) processBlock(b *types.Block) ( // That means, all its acking blocks are during/after // total ordering stage. + blockProposerIndex, exists := to.validatorIndexMapping[b.ProposerID] + if !exists { + err = ErrValidatorNotRecognized + return + } + to.pendings[b.Hash] = b to.buildBlockRelation(b) - if err = to.updateVectors(b); err != nil { + if err = to.updateVectors(b, blockProposerIndex); err != nil { return } if to.isAckOnlyPrecedings(b) { - to.candidates[b.Hash] = to.prepareCandidate(b) + to.prepareCandidate(b, blockProposerIndex) } // Mark the proposer of incoming block as dirty. - to.dirtyValidators = append(to.dirtyValidators, b.ProposerID) + to.dirtyValidatorIndexes = append( + to.dirtyValidatorIndexes, blockProposerIndex) hashes, early := to.generateDeliverSet() // output precedings |