aboutsummaryrefslogtreecommitdiffstats
path: root/core/total-ordering.go
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-08-28 13:13:21 +0800
committerGitHub <noreply@github.com>2018-08-28 13:13:21 +0800
commit7e9d2db5576d697b578669c935b2e7bbf9422ec7 (patch)
treee4fb9f4b95b23934a142a88ee05fbd49dff50b3c /core/total-ordering.go
parent9c8f9a447bfd768a7b29db904bd604410ec66a09 (diff)
downloaddexon-consensus-7e9d2db5576d697b578669c935b2e7bbf9422ec7.tar
dexon-consensus-7e9d2db5576d697b578669c935b2e7bbf9422ec7.tar.gz
dexon-consensus-7e9d2db5576d697b578669c935b2e7bbf9422ec7.tar.bz2
dexon-consensus-7e9d2db5576d697b578669c935b2e7bbf9422ec7.tar.lz
dexon-consensus-7e9d2db5576d697b578669c935b2e7bbf9422ec7.tar.xz
dexon-consensus-7e9d2db5576d697b578669c935b2e7bbf9422ec7.tar.zst
dexon-consensus-7e9d2db5576d697b578669c935b2e7bbf9422ec7.zip
core: tune performance (#73)
- Avoid using recursive function in critical path. - Do not write through when using levelDB. Things put to levelDB would be safe from panic even we didn't force to write through every time. - Dump count of confirmed blocks proposed by self. - Avoid allocating variables in loop. - Return length of acking node set, we only need that when total ordering. - Fix potential bug: make sure win records updated when acking height vectors of candidates are changed. - Keep dirty validators in slice. - Add cache for objects to ease the pressure to garbage collector. - Cache global acking status when total ordering. - Add method to recycle blocks. - Marshal JSON should be called once for each broadcast. - Make updateWinRecord called in parallel. - Log average / deviation of latencies when simulation finished.
Diffstat (limited to 'core/total-ordering.go')
-rw-r--r--core/total-ordering.go490
1 files changed, 375 insertions, 115 deletions
diff --git a/core/total-ordering.go b/core/total-ordering.go
index a9ec5b7..c7270c5 100644
--- a/core/total-ordering.go
+++ b/core/total-ordering.go
@@ -21,6 +21,7 @@ import (
"fmt"
"math"
"sort"
+ "sync"
"github.com/dexon-foundation/dexon-consensus-core/common"
"github.com/dexon-foundation/dexon-consensus-core/core/types"
@@ -41,11 +42,11 @@ type totalOrderingWinRecord map[types.ValidatorID]struct{}
// grade implements the 'grade' potential function described in white paper.
func (rec totalOrderingWinRecord) grade(
validatorCount, phi uint64,
- globalAns map[types.ValidatorID]struct{}) int {
+ globalAnsLength uint64) int {
if uint64(len(rec)) >= phi {
return 1
- } else if uint64(len(rec)) < phi-validatorCount+uint64(len(globalAns)) {
+ } else if uint64(len(rec)) < phi-validatorCount+globalAnsLength {
return 0
} else {
return -1
@@ -57,6 +58,153 @@ func (rec totalOrderingWinRecord) grade(
// - the count of blocks from that validator acking this block.
type totalOrderingHeightRecord struct{ minHeight, count uint64 }
+// totalOrderingObjectCache caches objects for reuse.
+// The target object is map because:
+// - reuse map would prevent it grows during usage, when map grows,
+// hashes of key would be recaculated, bucket reallocated, and values
+// are copied.
+// However, to reuse a map, we have no easy way to erase its content but
+// iterating its keys and delete corresponding values.
+type totalOrderingObjectCache struct {
+ ackedStatus []map[types.ValidatorID]*totalOrderingHeightRecord
+ winRecordContainers []map[common.Hash]totalOrderingWinRecord
+ heightVectors []map[types.ValidatorID]uint64
+ ackedVectors []map[common.Hash]struct{}
+ winRecordPool sync.Pool
+}
+
+// newTotalOrderingObjectCache constructs an totalOrderingObjectCache
+// instance.
+func newTotalOrderingObjectCache() *totalOrderingObjectCache {
+ return &totalOrderingObjectCache{
+ winRecordPool: sync.Pool{
+ New: func() interface{} {
+ return make(totalOrderingWinRecord)
+ },
+ },
+ }
+}
+
+// requestAckedStatus requests a structure to record acking status of one
+// candidate (or a global view of acking status of pending set).
+func (cache *totalOrderingObjectCache) requestAckedStatus() (
+ acked map[types.ValidatorID]*totalOrderingHeightRecord) {
+
+ if len(cache.ackedStatus) == 0 {
+ acked = make(map[types.ValidatorID]*totalOrderingHeightRecord)
+ } else {
+ acked, cache.ackedStatus =
+ cache.ackedStatus[len(cache.ackedStatus)-1],
+ cache.ackedStatus[:len(cache.ackedStatus)-1]
+ for k := range acked {
+ delete(acked, k)
+ }
+ }
+ return
+}
+
+// recycleAckedStatys recycles the structure to record acking status.
+func (cache *totalOrderingObjectCache) recycleAckedStatus(
+ acked map[types.ValidatorID]*totalOrderingHeightRecord) {
+
+ cache.ackedStatus = append(cache.ackedStatus, acked)
+}
+
+// requestWinRecord requests an totalOrderingWinRecord instance.
+func (cache *totalOrderingObjectCache) requestWinRecord() (
+ win totalOrderingWinRecord) {
+
+ win = cache.winRecordPool.Get().(totalOrderingWinRecord)
+ for k := range win {
+ delete(win, k)
+ }
+ return
+}
+
+// recycleWinRecord recycles an totalOrderingWinRecord instance.
+func (cache *totalOrderingObjectCache) recycleWinRecord(
+ win totalOrderingWinRecord) {
+
+ cache.winRecordPool.Put(win)
+}
+
+// requestHeightVector requests a structure to record acking heights
+// of one candidate.
+func (cache *totalOrderingObjectCache) requestHeightVector() (
+ hv map[types.ValidatorID]uint64) {
+
+ if len(cache.heightVectors) == 0 {
+ hv = make(map[types.ValidatorID]uint64)
+ } else {
+ hv, cache.heightVectors =
+ cache.heightVectors[len(cache.heightVectors)-1],
+ cache.heightVectors[:len(cache.heightVectors)-1]
+ for k := range hv {
+ delete(hv, k)
+ }
+ }
+ return
+}
+
+// recycleHeightVector recycles an instance to record acking heights
+// of one candidate.
+func (cache *totalOrderingObjectCache) recycleHeightVector(
+ hv map[types.ValidatorID]uint64) {
+
+ cache.heightVectors = append(cache.heightVectors, hv)
+}
+
+// requestWinRecordContainer requests a map of totalOrderingWinRecord.
+func (cache *totalOrderingObjectCache) requestWinRecordContainer() (
+ con map[common.Hash]totalOrderingWinRecord) {
+
+ if len(cache.winRecordContainers) == 0 {
+ con = make(map[common.Hash]totalOrderingWinRecord)
+ } else {
+ con, cache.winRecordContainers =
+ cache.winRecordContainers[len(cache.winRecordContainers)-1],
+ cache.winRecordContainers[:len(cache.winRecordContainers)-1]
+ for k := range con {
+ delete(con, k)
+ }
+ }
+ return
+}
+
+// recycleWinRecordContainer recycles a map of totalOrderingWinRecord.
+func (cache *totalOrderingObjectCache) recycleWinRecordContainer(
+ con map[common.Hash]totalOrderingWinRecord) {
+
+ cache.winRecordContainers = append(cache.winRecordContainers, con)
+}
+
+// requestAckedVector requests an acked vector instance.
+func (cache *totalOrderingObjectCache) requestAckedVector() (
+ acked map[common.Hash]struct{}) {
+
+ if len(cache.ackedVectors) == 0 {
+ acked = make(map[common.Hash]struct{})
+ } else {
+ acked, cache.ackedVectors =
+ cache.ackedVectors[len(cache.ackedVectors)-1],
+ cache.ackedVectors[:len(cache.ackedVectors)-1]
+ for k := range acked {
+ delete(acked, k)
+ }
+ }
+ return
+}
+
+// recycleAckedVector recycles an acked vector instance.
+func (cache *totalOrderingObjectCache) recycleAckedVector(
+ acked map[common.Hash]struct{}) {
+
+ if acked == nil {
+ return
+ }
+ cache.ackedVectors = append(cache.ackedVectors, acked)
+}
+
// totalOrderingCandidateInfo describes proceeding status for one candidate,
// including:
// - acked status as height records, which could keep 'how many blocks from
@@ -80,17 +228,38 @@ type totalOrderingCandidateInfo struct {
// newTotalOrderingCandidateInfo constructs an totalOrderingCandidateInfo
// instance.
-func newTotalOrderingCandidateInfo() *totalOrderingCandidateInfo {
+func newTotalOrderingCandidateInfo(
+ objCache *totalOrderingObjectCache) *totalOrderingCandidateInfo {
+
return &totalOrderingCandidateInfo{
- ackedStatus: make(map[types.ValidatorID]*totalOrderingHeightRecord),
- winRecords: make(map[common.Hash]totalOrderingWinRecord),
+ ackedStatus: objCache.requestAckedStatus(),
+ winRecords: objCache.requestWinRecordContainer(),
}
}
+// clean clear information related to another candidate, which should be called
+// when that candidate is selected as deliver set.
func (v *totalOrderingCandidateInfo) clean(otherCandidate common.Hash) {
delete(v.winRecords, otherCandidate)
}
+// recycle objects for later usage, this eases the loading of
+// golangs' GC.
+func (v *totalOrderingCandidateInfo) recycle(
+ objCache *totalOrderingObjectCache) {
+
+ if v.winRecords != nil {
+ for _, win := range v.winRecords {
+ objCache.recycleWinRecord(win)
+ }
+ objCache.recycleWinRecordContainer(v.winRecords)
+ }
+ if v.cachedHeightVector != nil {
+ objCache.recycleHeightVector(v.cachedHeightVector)
+ }
+ objCache.recycleAckedStatus(v.ackedStatus)
+}
+
// addBlock would update totalOrderingCandidateInfo, it's caller's duty
// to make sure the input block acutally acking the target block.
func (v *totalOrderingCandidateInfo) addBlock(b *types.Block) (err error) {
@@ -110,8 +279,8 @@ func (v *totalOrderingCandidateInfo) addBlock(b *types.Block) (err error) {
return
}
-// getAckingNodeSet would generate the Acking Node Set.
-// Only block height larger than
+// getAckingNodeSetLength would generate the Acking Node Set and return its
+// length. Only block height larger than
//
// global minimum height + k
//
@@ -121,13 +290,17 @@ func (v *totalOrderingCandidateInfo) addBlock(b *types.Block) (err error) {
// - the global minimum acking height = 1,
// - k = 1
// then only block height >= 2 would be added to acking node set.
-func (v *totalOrderingCandidateInfo) getAckingNodeSet(
+func (v *totalOrderingCandidateInfo) getAckingNodeSetLength(
global *totalOrderingCandidateInfo,
- k uint64) map[types.ValidatorID]struct{} {
+ k uint64) (count uint64) {
+
+ var (
+ rec *totalOrderingHeightRecord
+ exists bool
+ )
- ret := make(map[types.ValidatorID]struct{})
for vID, gRec := range global.ackedStatus {
- rec, exists := v.ackedStatus[vID]
+ rec, exists = v.ackedStatus[vID]
if !exists {
continue
}
@@ -136,10 +309,10 @@ func (v *totalOrderingCandidateInfo) getAckingNodeSet(
// - (global minimum height + k, infinity)
// - (local minimum height, local minimum height + count - 1)
if rec.minHeight+rec.count-1 >= gRec.minHeight+k {
- ret[vID] = struct{}{}
+ count++
}
}
- return ret
+ return
}
// updateAckingHeightVector would cached acking height vector.
@@ -149,7 +322,14 @@ func (v *totalOrderingCandidateInfo) getAckingNodeSet(
func (v *totalOrderingCandidateInfo) updateAckingHeightVector(
global *totalOrderingCandidateInfo,
k uint64,
- dirtyValidators map[types.ValidatorID]struct{}) {
+ dirtyValidators types.ValidatorIDs,
+ objCache *totalOrderingObjectCache) {
+
+ var (
+ vID types.ValidatorID
+ gRec, rec *totalOrderingHeightRecord
+ exists bool
+ )
// The reason not to merge the two loops is the iteration over map
// is expensive when validator count is large, iterating over dirty
@@ -158,13 +338,14 @@ func (v *totalOrderingCandidateInfo) updateAckingHeightVector(
// downgraded when adding a function for the shared part.
if v.cachedHeightVector == nil {
// Generate height vector from scratch.
- v.cachedHeightVector = make(map[types.ValidatorID]uint64)
- for vID, gRec := range global.ackedStatus {
- rec, exists := v.ackedStatus[vID]
+ v.cachedHeightVector = objCache.requestHeightVector()
+ for vID, gRec = range global.ackedStatus {
if gRec.count <= k {
delete(v.cachedHeightVector, vID)
continue
- } else if !exists {
+ }
+ rec, exists = v.ackedStatus[vID]
+ if !exists {
v.cachedHeightVector[vID] = infinity
} else if rec.minHeight <= gRec.minHeight+k {
// This check is sufficient to make sure the block height:
@@ -179,12 +360,12 @@ func (v *totalOrderingCandidateInfo) updateAckingHeightVector(
}
} else {
// Return the cached one, only update dirty fields.
- for vID := range dirtyValidators {
- gRec, exists := global.ackedStatus[vID]
+ for _, vID = range dirtyValidators {
+ gRec, exists = global.ackedStatus[vID]
if !exists {
continue
}
- rec, exists := v.ackedStatus[vID]
+ rec, exists = v.ackedStatus[vID]
if gRec.count <= k {
delete(v.cachedHeightVector, vID)
continue
@@ -204,7 +385,14 @@ func (v *totalOrderingCandidateInfo) updateAckingHeightVector(
func (v *totalOrderingCandidateInfo) updateWinRecord(
otherCandidate common.Hash,
other *totalOrderingCandidateInfo,
- dirtyValidators map[types.ValidatorID]struct{}) {
+ dirtyValidators types.ValidatorIDs,
+ objCache *totalOrderingObjectCache) {
+
+ var (
+ vID types.ValidatorID
+ hTo, hFrom uint64
+ exists bool
+ )
// The reason not to merge the two loops is the iteration over map
// is expensive when validator count is large, iterating over dirty
@@ -213,10 +401,10 @@ func (v *totalOrderingCandidateInfo) updateWinRecord(
// affect the performance.
win, exists := v.winRecords[otherCandidate]
if !exists {
- win = make(map[types.ValidatorID]struct{})
+ win = objCache.requestWinRecord()
v.winRecords[otherCandidate] = win
- for vID, hFrom := range v.cachedHeightVector {
- hTo, exists := other.cachedHeightVector[vID]
+ for vID, hFrom = range v.cachedHeightVector {
+ hTo, exists = other.cachedHeightVector[vID]
if !exists {
continue
}
@@ -225,13 +413,15 @@ func (v *totalOrderingCandidateInfo) updateWinRecord(
}
}
} else {
- for vID := range dirtyValidators {
- hFrom, exists := v.cachedHeightVector[vID]
+ for _, vID = range dirtyValidators {
+ hFrom, exists = v.cachedHeightVector[vID]
if !exists {
+ delete(win, vID)
return
}
- hTo, exists := other.cachedHeightVector[vID]
+ hTo, exists = other.cachedHeightVector[vID]
if !exists {
+ delete(win, vID)
return
}
if hFrom != infinity && hTo == infinity {
@@ -243,12 +433,27 @@ func (v *totalOrderingCandidateInfo) updateWinRecord(
}
}
-// blockVector stores all blocks grouped by their proposers and
-// sorted by their block height.
-type blockVector map[types.ValidatorID][]*types.Block
+// totalOrderingGroupVector keeps global status of current pending set.
+type totalOrderingGlobalVector struct {
+ // blocks stores all blocks grouped by their proposers and
+ // sorted by their block height.
+ //
+ // TODO: the way we use this slice would make it reallocate frequently.
+ blocks map[types.ValidatorID][]*types.Block
+
+ // cachedCandidateInfo is an totalOrderingCandidateInfo instance,
+ // which is just used for actual candidates to calculate height vector.
+ cachedCandidateInfo *totalOrderingCandidateInfo
+}
+
+func newTotalOrderingGlobalVector() *totalOrderingGlobalVector {
+ return &totalOrderingGlobalVector{
+ blocks: make(map[types.ValidatorID][]*types.Block),
+ }
+}
-func (v blockVector) addBlock(b *types.Block) (err error) {
- blocksFromProposer := v[b.ProposerID]
+func (global *totalOrderingGlobalVector) addBlock(b *types.Block) (err error) {
+ blocksFromProposer := global.blocks[b.ProposerID]
if len(blocksFromProposer) > 0 {
lastBlock := blocksFromProposer[len(blocksFromProposer)-1]
if b.Height-lastBlock.Height != 1 {
@@ -256,21 +461,44 @@ func (v blockVector) addBlock(b *types.Block) (err error) {
return
}
}
- v[b.ProposerID] = append(blocksFromProposer, b)
+ global.blocks[b.ProposerID] = append(blocksFromProposer, b)
return
}
-// getCandidateInfo would convert a blockVector to
-// totalOrderingCandidateInfo.
-func (v blockVector) getCandidateInfo() (info *totalOrderingCandidateInfo) {
- info = newTotalOrderingCandidateInfo()
- for vID, vec := range v {
- if len(vec) == 0 {
- continue
+// updateCandidateInfo udpate cached candidate info.
+func (global *totalOrderingGlobalVector) updateCandidateInfo(
+ dirtyValidators types.ValidatorIDs, objCache *totalOrderingObjectCache) {
+
+ var (
+ vID types.ValidatorID
+ blocks []*types.Block
+ info *totalOrderingCandidateInfo
+ )
+
+ if global.cachedCandidateInfo == nil {
+ info = newTotalOrderingCandidateInfo(objCache)
+ for vID, blocks = range global.blocks {
+ if len(blocks) == 0 {
+ continue
+ }
+ info.ackedStatus[vID] = &totalOrderingHeightRecord{
+ minHeight: blocks[0].Height,
+ count: uint64(len(blocks)),
+ }
}
- info.ackedStatus[vID] = &totalOrderingHeightRecord{
- minHeight: vec[0].Height,
- count: uint64(len(vec)),
+ global.cachedCandidateInfo = info
+ } else {
+ info = global.cachedCandidateInfo
+ for _, vID = range dirtyValidators {
+ blocks = global.blocks[vID]
+ if len(blocks) == 0 {
+ delete(info.ackedStatus, vID)
+ continue
+ }
+ info.ackedStatus[vID] = &totalOrderingHeightRecord{
+ minHeight: blocks[0].Height,
+ count: uint64(len(blocks)),
+ }
}
}
return
@@ -299,7 +527,7 @@ type totalOrdering struct {
//
// - build global height vector
// - picking candidates next round
- globalVector blockVector
+ globalVector *totalOrderingGlobalVector
// candidates caches result of potential function during generating
// preceding sets.
@@ -311,7 +539,10 @@ type totalOrdering struct {
// dirtyValidators records which validatorID that should be updated for
// all cached status (win record, acking status).
- dirtyValidators map[types.ValidatorID]struct{}
+ dirtyValidators types.ValidatorIDs
+
+ // objCache caches allocated objects, like map.
+ objCache *totalOrderingObjectCache
}
func newTotalOrdering(k, phi, validatorCount uint64) *totalOrdering {
@@ -321,47 +552,56 @@ func newTotalOrdering(k, phi, validatorCount uint64) *totalOrdering {
k: k,
phi: phi,
validatorCount: validatorCount,
- globalVector: blockVector{},
- dirtyValidators: make(map[types.ValidatorID]struct{}),
+ globalVector: newTotalOrderingGlobalVector(),
+ dirtyValidators: make(types.ValidatorIDs, 0, int(validatorCount)),
acked: make(map[common.Hash]map[common.Hash]struct{}),
+ objCache: newTotalOrderingObjectCache(),
}
}
// buildBlockRelation populates the acked according their acking relationships.
+// This function would update all blocks implcitly acked by input block
+// recursively.
func (to *totalOrdering) buildBlockRelation(b *types.Block) {
- // populateAcked would update all blocks implcitly acked
- // by input block recursively.
- var populateAcked func(bx, target *types.Block)
- populateAcked = func(bx, target *types.Block) {
- for ack := range bx.Acks {
- acked, exists := to.acked[ack]
- if !exists {
- acked = make(map[common.Hash]struct{})
+ var (
+ curBlock, nextBlock *types.Block
+ ack common.Hash
+ acked map[common.Hash]struct{}
+ exists, alreadyPopulated bool
+ toCheck = []*types.Block{b}
+ )
+ for {
+ if len(toCheck) == 0 {
+ break
+ }
+ curBlock, toCheck = toCheck[len(toCheck)-1], toCheck[:len(toCheck)-1]
+ for ack = range curBlock.Acks {
+ if acked, exists = to.acked[ack]; !exists {
+ acked = to.objCache.requestAckedVector()
to.acked[ack] = acked
}
-
// This means we've walked this block already.
- if _, alreadyPopulated := acked[target.Hash]; alreadyPopulated {
+ if _, alreadyPopulated = acked[b.Hash]; alreadyPopulated {
continue
}
- acked[target.Hash] = struct{}{}
-
+ acked[b.Hash] = struct{}{}
// See if we need to go forward.
- if nextBlock, exists := to.pendings[ack]; !exists {
+ if nextBlock, exists = to.pendings[ack]; !exists {
continue
} else {
- populateAcked(nextBlock, target)
+ toCheck = append(toCheck, nextBlock)
}
}
}
- populateAcked(b, b)
}
// clean would remove a block from working set. This behaviour
// would prevent our memory usage growing infinity.
func (to *totalOrdering) clean(h common.Hash) {
+ to.objCache.recycleAckedVector(to.acked[h])
delete(to.acked, h)
delete(to.pendings, h)
+ to.candidates[h].recycle(to.objCache)
delete(to.candidates, h)
for _, info := range to.candidates {
info.clean(h)
@@ -370,6 +610,11 @@ func (to *totalOrdering) clean(h common.Hash) {
// updateVectors is a helper function to update all cached vectors.
func (to *totalOrdering) updateVectors(b *types.Block) (err error) {
+ var (
+ candidate common.Hash
+ info *totalOrderingCandidateInfo
+ acked bool
+ )
// Update global height vector
err = to.globalVector.addBlock(b)
if err != nil {
@@ -377,8 +622,8 @@ func (to *totalOrdering) updateVectors(b *types.Block) (err error) {
}
// Update acking status of candidates.
- for candidate, info := range to.candidates {
- if _, acked := to.acked[candidate][b.Hash]; !acked {
+ for candidate, info = range to.candidates {
+ if _, acked = to.acked[candidate][b.Hash]; !acked {
continue
}
if err = info.addBlock(b); err != nil {
@@ -393,18 +638,17 @@ func (to *totalOrdering) updateVectors(b *types.Block) (err error) {
func (to *totalOrdering) prepareCandidate(
candidate *types.Block) (info *totalOrderingCandidateInfo) {
- blocks := to.globalVector[candidate.ProposerID]
- info = newTotalOrderingCandidateInfo()
+ info = newTotalOrderingCandidateInfo(to.objCache)
info.ackedStatus[candidate.ProposerID] = &totalOrderingHeightRecord{
minHeight: candidate.Height,
- count: uint64(len(blocks)),
+ count: uint64(len(to.globalVector.blocks[candidate.ProposerID])),
}
ackedsForCandidate, exists := to.acked[candidate.Hash]
if !exists {
// This candidate is acked by nobody.
return
}
- for vID, blocks := range to.globalVector {
+ for vID, blocks := range to.globalVector.blocks {
if vID == candidate.ProposerID {
continue
}
@@ -441,18 +685,19 @@ func (to *totalOrdering) output(precedings map[common.Hash]struct{}) (ret []*typ
for p := range precedings {
// Remove the first element from corresponding blockVector.
b := to.pendings[p]
- to.globalVector[b.ProposerID] = to.globalVector[b.ProposerID][1:]
+ to.globalVector.blocks[b.ProposerID] =
+ to.globalVector.blocks[b.ProposerID][1:]
ret = append(ret, b)
// Remove block relations.
to.clean(p)
- to.dirtyValidators[b.ProposerID] = struct{}{}
+ to.dirtyValidators = append(to.dirtyValidators, b.ProposerID)
}
sort.Sort(types.ByHash(ret))
// Find new candidates from tip of globalVector of each validator.
// The complexity here is O(N^2logN).
- for _, blocks := range to.globalVector {
+ for _, blocks := range to.globalVector.blocks {
if len(blocks) == 0 {
continue
}
@@ -476,35 +721,48 @@ func (to *totalOrdering) output(precedings map[common.Hash]struct{}) (ret []*typ
func (to *totalOrdering) generateDeliverSet() (
delivered map[common.Hash]struct{}, early bool) {
- globalCandidatesInfo := to.globalVector.getCandidateInfo()
- for _, info := range to.candidates {
+ var (
+ candidate, otherCandidate common.Hash
+ info, otherInfo *totalOrderingCandidateInfo
+ precedings = make(map[common.Hash]struct{})
+ )
+
+ to.globalVector.updateCandidateInfo(to.dirtyValidators, to.objCache)
+ globalInfo := to.globalVector.cachedCandidateInfo
+ for _, info = range to.candidates {
info.updateAckingHeightVector(
- globalCandidatesInfo, to.k, to.dirtyValidators)
+ globalInfo, to.k, to.dirtyValidators, to.objCache)
}
+
// Update winning records for each candidate.
+ var wg sync.WaitGroup
+ wg.Add(len(to.candidates))
for candidate, info := range to.candidates {
- for otherCandidate, otherInfo := range to.candidates {
- if candidate == otherCandidate {
- continue
+ go func(can common.Hash, canInfo *totalOrderingCandidateInfo) {
+ for otherCandidate, otherInfo := range to.candidates {
+ if can == otherCandidate {
+ continue
+ }
+ canInfo.updateWinRecord(
+ otherCandidate, otherInfo, to.dirtyValidators, to.objCache)
}
- info.updateWinRecord(otherCandidate, otherInfo, to.dirtyValidators)
- }
+ wg.Done()
+ }(candidate, info)
}
- // Reset dirty validators.
- to.dirtyValidators = make(map[types.ValidatorID]struct{})
+ wg.Wait()
- globalAns := globalCandidatesInfo.getAckingNodeSet(
- globalCandidatesInfo, to.k)
- precedings := make(map[common.Hash]struct{})
+ // Reset dirty validators.
+ to.dirtyValidators = to.dirtyValidators[:0]
+ globalAnsLength := globalInfo.getAckingNodeSetLength(globalInfo, to.k)
CheckNextCandidateLoop:
- for candidate := range to.candidates {
- for otherCandidate, otherInfo := range to.candidates {
+ for candidate = range to.candidates {
+ for otherCandidate, otherInfo = range to.candidates {
if candidate == otherCandidate {
continue
}
if otherInfo.winRecords[candidate].grade(
- to.validatorCount, to.phi, globalAns) != 0 {
+ to.validatorCount, to.phi, globalAnsLength) != 0 {
continue CheckNextCandidateLoop
}
@@ -517,15 +775,18 @@ CheckNextCandidateLoop:
// internal is a helper function to verify internal stability.
internal := func() bool {
- for candidate := range to.candidates {
- if _, isPreceding := precedings[candidate]; isPreceding {
+ var (
+ isPreceding, beaten bool
+ p common.Hash
+ )
+ for candidate = range to.candidates {
+ if _, isPreceding = precedings[candidate]; isPreceding {
continue
}
-
- beaten := false
- for p := range precedings {
+ beaten = false
+ for p = range precedings {
if beaten = to.candidates[p].winRecords[candidate].grade(
- to.validatorCount, to.phi, globalAns) == 1; beaten {
+ to.validatorCount, to.phi, globalAnsLength) == 1; beaten {
break
}
}
@@ -540,17 +801,23 @@ CheckNextCandidateLoop:
// It would make sure some preceding block is strong enough
// to lead the whole preceding set.
checkAHV := func() bool {
- for p := range precedings {
- count := uint64(0)
- status := to.candidates[p]
- for _, v := range status.cachedHeightVector {
- if v != infinity {
+ var (
+ height uint64
+ p common.Hash
+ count uint64
+ status *totalOrderingCandidateInfo
+ )
+ for p = range precedings {
+ count = 0
+ status = to.candidates[p]
+ for _, height = range status.cachedHeightVector {
+ if height != infinity {
count++
+ if count > to.phi {
+ return true
+ }
}
}
- if count > to.phi {
- return true
- }
}
return false
}
@@ -559,10 +826,11 @@ CheckNextCandidateLoop:
// It would make sure all preceding blocks are strong enough
// to be delivered.
checkANS := func() bool {
+ var validatorAnsLength uint64
for p := range precedings {
- validatorAns := to.candidates[p].getAckingNodeSet(
- globalCandidatesInfo, to.k)
- if uint64(len(validatorAns)) < to.validatorCount-to.phi {
+ validatorAnsLength = to.candidates[p].getAckingNodeSetLength(
+ globalInfo, to.k)
+ if uint64(validatorAnsLength) < to.validatorCount-to.phi {
return false
}
}
@@ -571,7 +839,7 @@ CheckNextCandidateLoop:
// If all validators propose enough blocks, we should force
// to deliver since the whole picture of the DAG is revealed.
- if uint64(len(globalAns)) != to.validatorCount {
+ if globalAnsLength != to.validatorCount {
// Check internal stability first.
if !internal() {
return
@@ -585,7 +853,6 @@ CheckNextCandidateLoop:
return
}
}
-
delivered = precedings
return
}
@@ -597,7 +864,6 @@ func (to *totalOrdering) processBlock(b *types.Block) (
// That means, all its acking blocks are during/after
// total ordering stage.
- // Incremental part.
to.pendings[b.Hash] = b
to.buildBlockRelation(b)
if err = to.updateVectors(b); err != nil {
@@ -607,13 +873,7 @@ func (to *totalOrdering) processBlock(b *types.Block) (
to.candidates[b.Hash] = to.prepareCandidate(b)
}
// Mark the proposer of incoming block as dirty.
- to.dirtyValidators[b.ProposerID] = struct{}{}
-
- // Not-Incremental part (yet).
- // - generate ahv for each candidate
- // - generate ans for each candidate
- // - generate global ans
- // - find preceding set
+ to.dirtyValidators = append(to.dirtyValidators, b.ProposerID)
hashes, early := to.generateDeliverSet()
// output precedings