aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2019-04-05 13:50:06 +0800
committerJimmy Hu <jimmy.hu@dexon.org>2019-04-05 16:58:07 +0800
commit5bbe5e67d375d67f658dc6b3f2b2e82d1914c91b (patch)
tree786e598b484e1fcaca90909d45378c8959d72053
parentd433231363d622acb26f8fa9b072470c179dbc62 (diff)
downloaddexon-consensus-jimmy-verify-timeout.tar
dexon-consensus-jimmy-verify-timeout.tar.gz
dexon-consensus-jimmy-verify-timeout.tar.bz2
dexon-consensus-jimmy-verify-timeout.tar.lz
dexon-consensus-jimmy-verify-timeout.tar.xz
dexon-consensus-jimmy-verify-timeout.tar.zst
dexon-consensus-jimmy-verify-timeout.zip
core: add timeout to leader selectorjimmy-verify-timeout
-rw-r--r--core/agreement-mgr.go1
-rw-r--r--core/leader-selector.go98
-rw-r--r--core/leader-selector_test.go42
3 files changed, 114 insertions, 27 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go
index 4e6f230..f79ab4b 100644
--- a/core/agreement-mgr.go
+++ b/core/agreement-mgr.go
@@ -466,6 +466,7 @@ Loop:
Round: currentRound,
Height: math.MaxUint64,
}
+ mgr.baModule.data.leader.setTimeout(curConfig.lambdaBA / 2)
if err := mgr.baRoutineForOneRound(setting); err != nil {
mgr.logger.Error("BA routine failed",
"error", err,
diff --git a/core/leader-selector.go b/core/leader-selector.go
index 8c06328..25b2a2a 100644
--- a/core/leader-selector.go
+++ b/core/leader-selector.go
@@ -18,9 +18,12 @@
package core
import (
+ "context"
"fmt"
"math/big"
"sync"
+ "sync/atomic"
+ "time"
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
@@ -50,12 +53,17 @@ func init() {
one = big.NewRat(1, 1)
}
+type leader struct {
+ minCRSBlock *big.Int
+ minBlockHash common.Hash
+}
+
type leaderSelector struct {
hashCRS common.Hash
numCRS *big.Int
- minCRSBlock *big.Int
- minBlockHash common.Hash
+ leaderValue atomic.Value
pendingBlocks map[common.Hash]*types.Block
+ timeout atomic.Value
validLeader validLeaderFn
lock sync.Mutex
logger common.Logger
@@ -63,11 +71,15 @@ type leaderSelector struct {
func newLeaderSelector(
validLeader validLeaderFn, logger common.Logger) *leaderSelector {
- return &leaderSelector{
- minCRSBlock: maxHash,
+ ls := &leaderSelector{
validLeader: validLeader,
logger: logger,
}
+ ls.leaderValue.Store(&leader{
+ minCRSBlock: maxHash,
+ })
+ ls.timeout.Store(100 * time.Millisecond)
+ return ls
}
func (l *leaderSelector) distance(sig crypto.Signature) *big.Int {
@@ -85,6 +97,10 @@ func (l *leaderSelector) probability(sig crypto.Signature) float64 {
return p
}
+func (l *leaderSelector) setTimeout(timeout time.Duration) {
+ l.timeout.Store(timeout)
+}
+
func (l *leaderSelector) restart(crs common.Hash) {
numCRS := big.NewInt(0)
numCRS.SetBytes(crs[:])
@@ -92,31 +108,51 @@ func (l *leaderSelector) restart(crs common.Hash) {
defer l.lock.Unlock()
l.numCRS = numCRS
l.hashCRS = crs
- l.minCRSBlock = maxHash
- l.minBlockHash = types.NullBlockHash
+ l.leaderValue.Store(&leader{
+ minCRSBlock: maxHash,
+ minBlockHash: types.NullBlockHash,
+ })
l.pendingBlocks = make(map[common.Hash]*types.Block)
}
func (l *leaderSelector) leaderBlockHash() common.Hash {
- l.lock.Lock()
- defer l.lock.Unlock()
- for _, b := range l.pendingBlocks {
- ok, dist := l.potentialLeader(b)
- if !ok {
- continue
- }
- ok, err := l.validLeader(b)
- if err != nil {
- l.logger.Error("Error checking validLeader", "error", err, "block", b)
- delete(l.pendingBlocks, b.Hash)
- continue
- }
- if ok {
- l.updateLeader(b, dist)
- delete(l.pendingBlocks, b.Hash)
+ ch := make(chan struct{})
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ go func() {
+ l.lock.Lock()
+ defer l.lock.Unlock()
+ Loop:
+ for _, b := range l.pendingBlocks {
+ select {
+ case <-ctx.Done():
+ break Loop
+ default:
+ }
+ ok, dist := l.potentialLeader(b)
+ if !ok {
+ delete(l.pendingBlocks, b.Hash)
+ continue
+ }
+ ok, err := l.validLeader(b)
+ if err != nil {
+ l.logger.Error("Error checking validLeader",
+ "error", err, "block", b)
+ delete(l.pendingBlocks, b.Hash)
+ continue
+ }
+ if ok {
+ l.updateLeader(b, dist)
+ delete(l.pendingBlocks, b.Hash)
+ }
}
+ ch <- struct{}{}
+ }()
+ select {
+ case <-ch:
+ case <-time.After(l.timeout.Load().(time.Duration)):
}
- return l.minBlockHash
+ return l.leaderValue.Load().(*leader).minBlockHash
}
func (l *leaderSelector) processBlock(block *types.Block) error {
@@ -129,6 +165,9 @@ func (l *leaderSelector) processBlock(block *types.Block) error {
}
l.lock.Lock()
defer l.lock.Unlock()
+ if _, exist := l.pendingBlocks[block.Hash]; exist {
+ return nil
+ }
ok, dist := l.potentialLeader(block)
if !ok {
return nil
@@ -147,17 +186,22 @@ func (l *leaderSelector) processBlock(block *types.Block) error {
func (l *leaderSelector) potentialLeader(block *types.Block) (bool, *big.Int) {
dist := l.distance(block.CRSSignature)
- cmp := l.minCRSBlock.Cmp(dist)
- return (cmp > 0 || (cmp == 0 && block.Hash.Less(l.minBlockHash))), dist
+ leader := l.leaderValue.Load().(*leader)
+ cmp := leader.minCRSBlock.Cmp(dist)
+ return (cmp > 0 || (cmp == 0 && block.Hash.Less(leader.minBlockHash))), dist
}
func (l *leaderSelector) updateLeader(block *types.Block, dist *big.Int) {
- l.minCRSBlock = dist
- l.minBlockHash = block.Hash
+ l.leaderValue.Store(&leader{
+ minCRSBlock: dist,
+ minBlockHash: block.Hash,
+ })
}
func (l *leaderSelector) findPendingBlock(
hash common.Hash) (*types.Block, bool) {
+ l.lock.Lock()
+ defer l.lock.Unlock()
b, e := l.pendingBlocks[hash]
return b, e
}
diff --git a/core/leader-selector_test.go b/core/leader-selector_test.go
index 9e09279..f781f84 100644
--- a/core/leader-selector_test.go
+++ b/core/leader-selector_test.go
@@ -123,6 +123,44 @@ func (s *LeaderSelectorTestSuite) TestLeaderBlockHash() {
}
}
+func (s *LeaderSelectorTestSuite) TestTimeout() {
+ leader := s.newLeader()
+ blocks := make(map[common.Hash]*types.Block)
+ for i := 0; i < 10; i++ {
+ prv, err := ecdsa.NewPrivateKey()
+ s.Require().NoError(err)
+ block := &types.Block{
+ ProposerID: types.NewNodeID(prv.PublicKey()),
+ Hash: common.NewRandomHash(),
+ }
+ s.Require().NoError(
+ utils.NewSigner(prv).SignCRS(block, leader.hashCRS))
+ s.Require().NoError(leader.processBlock(block))
+ blocks[block.Hash] = block
+ }
+ blockHash := leader.leaderBlockHash()
+ ch := make(chan struct{})
+ defer close(ch)
+
+ leader.validLeader = func(block *types.Block) (bool, error) {
+ return false, nil
+ }
+
+ leader.restart(leader.hashCRS)
+ for _, b := range blocks {
+ s.Require().NoError(leader.processBlock(b))
+ }
+
+ leader.validLeader = func(block *types.Block) (bool, error) {
+ if block.Hash == blockHash {
+ // Block forever
+ <-ch
+ }
+ return true, nil
+ }
+ s.NotEqual(blockHash, leader.leaderBlockHash())
+}
+
func (s *LeaderSelectorTestSuite) TestValidLeaderFn() {
leader := s.newLeader()
blocks := make(map[common.Hash]*types.Block)
@@ -156,7 +194,9 @@ func (s *LeaderSelectorTestSuite) TestPotentialLeader() {
blocks := make(map[common.Hash]*types.Block)
for i := 0; i < 10; i++ {
if i > 0 {
+ leader.lock.Lock()
s.mockValidLeaderDefault = false
+ leader.lock.Unlock()
}
prv, err := ecdsa.NewPrivateKey()
s.Require().NoError(err)
@@ -168,6 +208,7 @@ func (s *LeaderSelectorTestSuite) TestPotentialLeader() {
utils.NewSigner(prv).SignCRS(block, leader.hashCRS))
ok, _ := leader.potentialLeader(block)
s.Require().NoError(leader.processBlock(block))
+ leader.lock.Lock()
if i > 0 {
if ok {
s.Contains(leader.pendingBlocks, block.Hash)
@@ -176,6 +217,7 @@ func (s *LeaderSelectorTestSuite) TestPotentialLeader() {
}
blocks[block.Hash] = block
}
+ leader.lock.Unlock()
}
}