From 5bbe5e67d375d67f658dc6b3f2b2e82d1914c91b Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Fri, 5 Apr 2019 13:50:06 +0800 Subject: core: add timeout to leader selector --- core/agreement-mgr.go | 1 + core/leader-selector.go | 98 ++++++++++++++++++++++++++++++++------------ core/leader-selector_test.go | 42 +++++++++++++++++++ 3 files changed, 114 insertions(+), 27 deletions(-) diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index 4e6f230..f79ab4b 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -466,6 +466,7 @@ Loop: Round: currentRound, Height: math.MaxUint64, } + mgr.baModule.data.leader.setTimeout(curConfig.lambdaBA / 2) if err := mgr.baRoutineForOneRound(setting); err != nil { mgr.logger.Error("BA routine failed", "error", err, diff --git a/core/leader-selector.go b/core/leader-selector.go index 8c06328..25b2a2a 100644 --- a/core/leader-selector.go +++ b/core/leader-selector.go @@ -18,9 +18,12 @@ package core import ( + "context" "fmt" "math/big" "sync" + "sync/atomic" + "time" "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/crypto" @@ -50,12 +53,17 @@ func init() { one = big.NewRat(1, 1) } +type leader struct { + minCRSBlock *big.Int + minBlockHash common.Hash +} + type leaderSelector struct { hashCRS common.Hash numCRS *big.Int - minCRSBlock *big.Int - minBlockHash common.Hash + leaderValue atomic.Value pendingBlocks map[common.Hash]*types.Block + timeout atomic.Value validLeader validLeaderFn lock sync.Mutex logger common.Logger @@ -63,11 +71,15 @@ type leaderSelector struct { func newLeaderSelector( validLeader validLeaderFn, logger common.Logger) *leaderSelector { - return &leaderSelector{ - minCRSBlock: maxHash, + ls := &leaderSelector{ validLeader: validLeader, logger: logger, } + ls.leaderValue.Store(&leader{ + minCRSBlock: maxHash, + }) + ls.timeout.Store(100 * time.Millisecond) + return ls } func (l *leaderSelector) distance(sig crypto.Signature) *big.Int { @@ -85,6 +97,10 @@ func (l *leaderSelector) probability(sig crypto.Signature) float64 { return p } +func (l *leaderSelector) setTimeout(timeout time.Duration) { + l.timeout.Store(timeout) +} + func (l *leaderSelector) restart(crs common.Hash) { numCRS := big.NewInt(0) numCRS.SetBytes(crs[:]) @@ -92,31 +108,51 @@ func (l *leaderSelector) restart(crs common.Hash) { defer l.lock.Unlock() l.numCRS = numCRS l.hashCRS = crs - l.minCRSBlock = maxHash - l.minBlockHash = types.NullBlockHash + l.leaderValue.Store(&leader{ + minCRSBlock: maxHash, + minBlockHash: types.NullBlockHash, + }) l.pendingBlocks = make(map[common.Hash]*types.Block) } func (l *leaderSelector) leaderBlockHash() common.Hash { - l.lock.Lock() - defer l.lock.Unlock() - for _, b := range l.pendingBlocks { - ok, dist := l.potentialLeader(b) - if !ok { - continue - } - ok, err := l.validLeader(b) - if err != nil { - l.logger.Error("Error checking validLeader", "error", err, "block", b) - delete(l.pendingBlocks, b.Hash) - continue - } - if ok { - l.updateLeader(b, dist) - delete(l.pendingBlocks, b.Hash) + ch := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + l.lock.Lock() + defer l.lock.Unlock() + Loop: + for _, b := range l.pendingBlocks { + select { + case <-ctx.Done(): + break Loop + default: + } + ok, dist := l.potentialLeader(b) + if !ok { + delete(l.pendingBlocks, b.Hash) + continue + } + ok, err := l.validLeader(b) + if err != nil { + l.logger.Error("Error checking validLeader", + "error", err, "block", b) + delete(l.pendingBlocks, b.Hash) + continue + } + if ok { + l.updateLeader(b, dist) + delete(l.pendingBlocks, b.Hash) + } } + ch <- struct{}{} + }() + select { + case <-ch: + case <-time.After(l.timeout.Load().(time.Duration)): } - return l.minBlockHash + return l.leaderValue.Load().(*leader).minBlockHash } func (l *leaderSelector) processBlock(block *types.Block) error { @@ -129,6 +165,9 @@ func (l *leaderSelector) processBlock(block *types.Block) error { } l.lock.Lock() defer l.lock.Unlock() + if _, exist := l.pendingBlocks[block.Hash]; exist { + return nil + } ok, dist := l.potentialLeader(block) if !ok { return nil @@ -147,17 +186,22 @@ func (l *leaderSelector) processBlock(block *types.Block) error { func (l *leaderSelector) potentialLeader(block *types.Block) (bool, *big.Int) { dist := l.distance(block.CRSSignature) - cmp := l.minCRSBlock.Cmp(dist) - return (cmp > 0 || (cmp == 0 && block.Hash.Less(l.minBlockHash))), dist + leader := l.leaderValue.Load().(*leader) + cmp := leader.minCRSBlock.Cmp(dist) + return (cmp > 0 || (cmp == 0 && block.Hash.Less(leader.minBlockHash))), dist } func (l *leaderSelector) updateLeader(block *types.Block, dist *big.Int) { - l.minCRSBlock = dist - l.minBlockHash = block.Hash + l.leaderValue.Store(&leader{ + minCRSBlock: dist, + minBlockHash: block.Hash, + }) } func (l *leaderSelector) findPendingBlock( hash common.Hash) (*types.Block, bool) { + l.lock.Lock() + defer l.lock.Unlock() b, e := l.pendingBlocks[hash] return b, e } diff --git a/core/leader-selector_test.go b/core/leader-selector_test.go index 9e09279..f781f84 100644 --- a/core/leader-selector_test.go +++ b/core/leader-selector_test.go @@ -123,6 +123,44 @@ func (s *LeaderSelectorTestSuite) TestLeaderBlockHash() { } } +func (s *LeaderSelectorTestSuite) TestTimeout() { + leader := s.newLeader() + blocks := make(map[common.Hash]*types.Block) + for i := 0; i < 10; i++ { + prv, err := ecdsa.NewPrivateKey() + s.Require().NoError(err) + block := &types.Block{ + ProposerID: types.NewNodeID(prv.PublicKey()), + Hash: common.NewRandomHash(), + } + s.Require().NoError( + utils.NewSigner(prv).SignCRS(block, leader.hashCRS)) + s.Require().NoError(leader.processBlock(block)) + blocks[block.Hash] = block + } + blockHash := leader.leaderBlockHash() + ch := make(chan struct{}) + defer close(ch) + + leader.validLeader = func(block *types.Block) (bool, error) { + return false, nil + } + + leader.restart(leader.hashCRS) + for _, b := range blocks { + s.Require().NoError(leader.processBlock(b)) + } + + leader.validLeader = func(block *types.Block) (bool, error) { + if block.Hash == blockHash { + // Block forever + <-ch + } + return true, nil + } + s.NotEqual(blockHash, leader.leaderBlockHash()) +} + func (s *LeaderSelectorTestSuite) TestValidLeaderFn() { leader := s.newLeader() blocks := make(map[common.Hash]*types.Block) @@ -156,7 +194,9 @@ func (s *LeaderSelectorTestSuite) TestPotentialLeader() { blocks := make(map[common.Hash]*types.Block) for i := 0; i < 10; i++ { if i > 0 { + leader.lock.Lock() s.mockValidLeaderDefault = false + leader.lock.Unlock() } prv, err := ecdsa.NewPrivateKey() s.Require().NoError(err) @@ -168,6 +208,7 @@ func (s *LeaderSelectorTestSuite) TestPotentialLeader() { utils.NewSigner(prv).SignCRS(block, leader.hashCRS)) ok, _ := leader.potentialLeader(block) s.Require().NoError(leader.processBlock(block)) + leader.lock.Lock() if i > 0 { if ok { s.Contains(leader.pendingBlocks, block.Hash) @@ -176,6 +217,7 @@ func (s *LeaderSelectorTestSuite) TestPotentialLeader() { } blocks[block.Hash] = block } + leader.lock.Unlock() } } -- cgit v1.2.3