aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--dex/backend.go1
-rw-r--r--params/config.go2
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go46
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-state.go79
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go142
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go246
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/db/level-db.go43
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go3
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go62
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go59
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/leader-selector.go5
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go68
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go285
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/ticker.go14
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go16
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/types/nodeset.go26
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/types/vote.go1
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go12
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go41
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go34
-rw-r--r--vendor/vendor.json50
21 files changed, 891 insertions, 344 deletions
diff --git a/dex/backend.go b/dex/backend.go
index a000d5fbc..b16bdc493 100644
--- a/dex/backend.go
+++ b/dex/backend.go
@@ -163,6 +163,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
engine.SetGovStateFetcher(dex.governance)
dMoment := time.Unix(int64(chainConfig.DMoment), 0)
+ log.Info("Consensus DMoment", "dMoment", dMoment)
// Force starting with full sync mode if this node is a bootstrap proposer.
if config.BlockProposerEnabled && dMoment.After(time.Now()) {
diff --git a/params/config.go b/params/config.go
index e8b2f5df0..65f22a148 100644
--- a/params/config.go
+++ b/params/config.go
@@ -90,7 +90,7 @@ var (
// TestnetChainConfig contains the chain parameters to run a node on the Taiwan test network.
TestnetChainConfig = &ChainConfig{
ChainID: big.NewInt(238),
- DMoment: 1547471400,
+ DMoment: 1547608200,
HomesteadBlock: big.NewInt(0),
DAOForkBlock: nil,
DAOForkSupport: true,
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
index e8cafbd68..e468e9c2e 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
@@ -22,6 +22,7 @@ import (
"errors"
"math"
"sync"
+ "sync/atomic"
"time"
"github.com/dexon-foundation/dexon-consensus/common"
@@ -41,7 +42,7 @@ func genValidLeader(
if block.Timestamp.After(time.Now()) {
return false, nil
}
- if err := mgr.lattice.SanityCheck(block); err != nil {
+ if err := mgr.lattice.SanityCheck(block, true); err != nil {
if err == ErrRetrySanityCheckLater {
return false, nil
}
@@ -180,12 +181,23 @@ func (mgr *agreementMgr) appendConfig(
consensus: mgr.con,
chainID: i,
restartNotary: make(chan types.Position, 1),
+ roundValue: &atomic.Value{},
}
+ recv.roundValue.Store(uint64(0))
agrModule := newAgreement(
mgr.con.ID,
recv,
newLeaderSelector(genValidLeader(mgr), mgr.logger),
- mgr.signer)
+ mgr.signer,
+ mgr.logger)
+ // Hacky way to initialize first notarySet.
+ nodes, err := mgr.cache.GetNodeSet(round)
+ if err != nil {
+ return err
+ }
+ agrModule.notarySet = nodes.GetSubSet(
+ int(config.NotarySetSize),
+ types.NewNotarySetTarget(crs, i))
// Hacky way to make agreement module self contained.
recv.agreementModule = agrModule
mgr.baModules = append(mgr.baModules, agrModule)
@@ -266,7 +278,11 @@ func (mgr *agreementMgr) processAgreementResult(
return err
}
}
- agreement.restart(nIDs, result.Position, crs)
+ leader, err := mgr.cache.GetLeaderNode(result.Position)
+ if err != nil {
+ return err
+ }
+ agreement.restart(nIDs, result.Position, leader, crs)
}
return nil
}
@@ -332,14 +348,12 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) {
return
}
// Check if this node in notary set of this chain in this round.
- nodeSet, err := mgr.cache.GetNodeSet(nextRound)
+ notarySet, err := mgr.cache.GetNotarySet(nextRound, chainID)
if err != nil {
panic(err)
}
setting.crs = config.crs
- setting.notarySet = nodeSet.GetSubSet(
- int(config.notarySetSize),
- types.NewNotarySetTarget(config.crs, chainID))
+ setting.notarySet = notarySet
_, isNotary = setting.notarySet[mgr.ID]
if isNotary {
mgr.logger.Info("selected as notary set",
@@ -396,7 +410,7 @@ Loop:
<-setting.ticker.Tick()
}
// Run BA for this round.
- recv.round = currentRound
+ recv.roundValue.Store(currentRound)
recv.changeNotaryTime = roundEndTime
recv.restartNotary <- types.Position{ChainID: math.MaxUint32}
if err := mgr.baRoutineForOneRound(&setting); err != nil {
@@ -435,12 +449,14 @@ Loop:
}
}
var nextHeight uint64
+ var nextTime time.Time
for {
- nextHeight, err = mgr.lattice.NextHeight(recv.round, setting.chainID)
+ nextHeight, nextTime, err =
+ mgr.lattice.NextBlock(recv.round(), setting.chainID)
if err != nil {
mgr.logger.Debug("Error getting next height",
"error", err,
- "round", recv.round,
+ "round", recv.round(),
"chainID", setting.chainID)
err = nil
nextHeight = oldPos.Height
@@ -456,12 +472,18 @@ Loop:
time.Sleep(100 * time.Millisecond)
}
nextPos := types.Position{
- Round: recv.round,
+ Round: recv.round(),
ChainID: setting.chainID,
Height: nextHeight,
}
oldPos = nextPos
- agr.restart(setting.notarySet, nextPos, setting.crs)
+ leader, err := mgr.cache.GetLeaderNode(nextPos)
+ if err != nil {
+ return err
+ }
+ time.Sleep(nextTime.Sub(time.Now()))
+ setting.ticker.Restart()
+ agr.restart(setting.notarySet, nextPos, leader, setting.crs)
default:
}
if agr.pullVotes() {
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-state.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-state.go
index 77f293376..266e44294 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-state.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-state.go
@@ -35,7 +35,10 @@ type agreementStateType int
// agreementStateType enum.
const (
- stateInitial agreementStateType = iota
+ stateFast agreementStateType = iota
+ stateFastVote
+ stateFastRollback
+ stateInitial
statePreCommit
stateCommit
stateForward
@@ -43,7 +46,7 @@ const (
stateSleep
)
-var nullBlockHash = common.Hash{}
+var nullBlockHash common.Hash
var skipBlockHash common.Hash
func init() {
@@ -58,6 +61,63 @@ type agreementState interface {
clocks() int
}
+//----- FastState -----
+type fastState struct {
+ a *agreementData
+}
+
+func newFastState(a *agreementData) *fastState {
+ return &fastState{a: a}
+}
+
+func (s *fastState) state() agreementStateType { return stateFast }
+func (s *fastState) clocks() int { return 0 }
+func (s *fastState) nextState() (agreementState, error) {
+ if func() bool {
+ s.a.lock.Lock()
+ defer s.a.lock.Unlock()
+ return s.a.isLeader
+ }() {
+ hash := s.a.recv.ProposeBlock()
+ if hash != nullBlockHash {
+ s.a.lock.Lock()
+ defer s.a.lock.Unlock()
+ s.a.recv.ProposeVote(types.NewVote(types.VoteFast, hash, s.a.period))
+ }
+ }
+ return newFastVoteState(s.a), nil
+}
+
+//----- FastVoteState -----
+type fastVoteState struct {
+ a *agreementData
+}
+
+func newFastVoteState(a *agreementData) *fastVoteState {
+ return &fastVoteState{a: a}
+}
+
+func (s *fastVoteState) state() agreementStateType { return stateFastVote }
+func (s *fastVoteState) clocks() int { return 2 }
+func (s *fastVoteState) nextState() (agreementState, error) {
+ return newFastRollbackState(s.a), nil
+}
+
+//----- FastRollbackState -----
+type fastRollbackState struct {
+ a *agreementData
+}
+
+func newFastRollbackState(a *agreementData) *fastRollbackState {
+ return &fastRollbackState{a: a}
+}
+
+func (s *fastRollbackState) state() agreementStateType { return stateFastRollback }
+func (s *fastRollbackState) clocks() int { return 1 }
+func (s *fastRollbackState) nextState() (agreementState, error) {
+ return newInitialState(s.a), nil
+}
+
//----- InitialState -----
type initialState struct {
a *agreementData
@@ -70,10 +130,17 @@ func newInitialState(a *agreementData) *initialState {
func (s *initialState) state() agreementStateType { return stateInitial }
func (s *initialState) clocks() int { return 0 }
func (s *initialState) nextState() (agreementState, error) {
- hash := s.a.recv.ProposeBlock()
- s.a.lock.Lock()
- defer s.a.lock.Unlock()
- s.a.recv.ProposeVote(types.NewVote(types.VoteInit, hash, s.a.period))
+ if func() bool {
+ s.a.lock.Lock()
+ defer s.a.lock.Unlock()
+ return !s.a.isLeader
+ }() {
+ // Leader already proposed block in fastState.
+ hash := s.a.recv.ProposeBlock()
+ s.a.lock.Lock()
+ defer s.a.lock.Unlock()
+ s.a.recv.ProposeVote(types.NewVote(types.VoteInit, hash, s.a.period))
+ }
return newPreCommitState(s.a), nil
}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go
index eead4628a..62bbe250f 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go
@@ -31,6 +31,7 @@ import (
// Errors for agreement module.
var (
+ ErrInvalidVote = fmt.Errorf("invalid vote")
ErrNotInNotarySet = fmt.Errorf("not in notary set")
ErrIncorrectVoteSignature = fmt.Errorf("incorrect vote signature")
)
@@ -73,6 +74,8 @@ type agreementReceiver interface {
// agreement module.
ConfirmBlock(common.Hash, map[types.NodeID]*types.Vote)
PullBlocks(common.Hashes)
+ ReportForkVote(v1, v2 *types.Vote)
+ ReportForkBlock(b1, b2 *types.Block)
}
type pendingBlock struct {
@@ -90,6 +93,7 @@ type agreementData struct {
recv agreementReceiver
ID types.NodeID
+ isLeader bool
leader *leaderSelector
lockValue common.Hash
lockRound uint64
@@ -114,6 +118,7 @@ type agreement struct {
candidateBlock map[common.Hash]*types.Block
fastForward chan uint64
signer *utils.Signer
+ logger common.Logger
}
// newAgreement creates a agreement instance.
@@ -121,7 +126,8 @@ func newAgreement(
ID types.NodeID,
recv agreementReceiver,
leader *leaderSelector,
- signer *utils.Signer) *agreement {
+ signer *utils.Signer,
+ logger common.Logger) *agreement {
agreement := &agreement{
data: &agreementData{
recv: recv,
@@ -132,6 +138,7 @@ func newAgreement(
candidateBlock: make(map[common.Hash]*types.Block),
fastForward: make(chan uint64, 1),
signer: signer,
+ logger: logger,
}
agreement.stop()
return agreement
@@ -139,8 +146,8 @@ func newAgreement(
// restart the agreement
func (a *agreement) restart(
- notarySet map[types.NodeID]struct{}, aID types.Position, crs common.Hash) {
-
+ notarySet map[types.NodeID]struct{}, aID types.Position, leader types.NodeID,
+ crs common.Hash) {
if !func() bool {
a.lock.Lock()
defer a.lock.Unlock()
@@ -162,12 +169,16 @@ func (a *agreement) restart(
a.data.leader.restart(crs)
a.data.lockValue = nullBlockHash
a.data.lockRound = 0
+ a.data.isLeader = a.data.ID == leader
a.fastForward = make(chan uint64, 1)
a.hasOutput = false
- a.state = newInitialState(a.data)
+ a.state = newFastState(a.data)
a.notarySet = notarySet
a.candidateBlock = make(map[common.Hash]*types.Block)
- a.aID.Store(aID)
+ a.aID.Store(struct {
+ pos types.Position
+ leader types.NodeID
+ }{aID, leader})
return true
}() {
return
@@ -213,18 +224,24 @@ func (a *agreement) restart(
}()
for _, block := range replayBlock {
- a.processBlock(block)
+ if err := a.processBlock(block); err != nil {
+ a.logger.Error("failed to process block when restarting agreement",
+ "block", block)
+ }
}
for _, vote := range replayVote {
- a.processVote(vote)
+ if err := a.processVote(vote); err != nil {
+ a.logger.Error("failed to process vote when restarting agreement",
+ "vote", vote)
+ }
}
}
func (a *agreement) stop() {
a.restart(make(map[types.NodeID]struct{}), types.Position{
ChainID: math.MaxUint32,
- }, common.Hash{})
+ }, types.NodeID{}, common.Hash{})
}
func isStop(aID types.Position) bool {
@@ -235,7 +252,12 @@ func isStop(aID types.Position) bool {
func (a *agreement) clocks() int {
a.data.lock.RLock()
defer a.data.lock.RUnlock()
- return a.state.clocks()
+ scale := int(a.data.period)
+ // 10 is a magic number derived from many years of experience.
+ if scale > 10 {
+ scale = 10
+ }
+ return a.state.clocks() * scale
}
// pullVotes returns if current agreement requires more votes to continue.
@@ -243,21 +265,31 @@ func (a *agreement) pullVotes() bool {
a.data.lock.RLock()
defer a.data.lock.RUnlock()
return a.state.state() == statePullVote ||
+ a.state.state() == stateFastRollback ||
(a.state.state() == statePreCommit && (a.data.period%3) == 0)
}
// agreementID returns the current agreementID.
func (a *agreement) agreementID() types.Position {
- return a.aID.Load().(types.Position)
+ return a.aID.Load().(struct {
+ pos types.Position
+ leader types.NodeID
+ }).pos
+}
+
+// leader returns the current leader.
+func (a *agreement) leader() types.NodeID {
+ return a.aID.Load().(struct {
+ pos types.Position
+ leader types.NodeID
+ }).leader
}
// nextState is called at the specific clock time.
func (a *agreement) nextState() (err error) {
- if func() bool {
- a.lock.RLock()
- defer a.lock.RUnlock()
- return a.hasOutput
- }() {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ if a.hasOutput {
a.state = newSleepState(a.data)
return
}
@@ -266,6 +298,9 @@ func (a *agreement) nextState() (err error) {
}
func (a *agreement) sanityCheck(vote *types.Vote) error {
+ if vote.Type >= types.MaxVoteType {
+ return ErrInvalidVote
+ }
if _, exist := a.notarySet[vote.ProposerID]; !exist {
return ErrNotInNotarySet
}
@@ -279,22 +314,21 @@ func (a *agreement) sanityCheck(vote *types.Vote) error {
return nil
}
-func (a *agreement) checkForkVote(vote *types.Vote) error {
- if err := func() error {
- a.data.lock.RLock()
- defer a.data.lock.RUnlock()
- if votes, exist := a.data.votes[vote.Period]; exist {
- if oldVote, exist := votes[vote.Type][vote.ProposerID]; exist {
- if vote.BlockHash != oldVote.BlockHash {
- return &ErrForkVote{vote.ProposerID, oldVote, vote}
- }
+func (a *agreement) checkForkVote(vote *types.Vote) (
+ alreadyExist bool, err error) {
+ a.data.lock.RLock()
+ defer a.data.lock.RUnlock()
+ if votes, exist := a.data.votes[vote.Period]; exist {
+ if oldVote, exist := votes[vote.Type][vote.ProposerID]; exist {
+ alreadyExist = true
+ if vote.BlockHash != oldVote.BlockHash {
+ a.data.recv.ReportForkVote(oldVote, vote)
+ err = &ErrForkVote{vote.ProposerID, oldVote, vote}
+ return
}
}
- return nil
- }(); err != nil {
- return err
}
- return nil
+ return
}
// prepareVote prepares a vote.
@@ -314,6 +348,13 @@ func (a *agreement) processVote(vote *types.Vote) error {
aID := a.agreementID()
// Agreement module has stopped.
if isStop(aID) {
+ // Hacky way to not drop first votes for height 0.
+ if vote.Position.Height == uint64(0) {
+ a.pendingVote = append(a.pendingVote, pendingVote{
+ vote: vote,
+ receivedTime: time.Now().UTC(),
+ })
+ }
return nil
}
if vote.Position != aID {
@@ -326,9 +367,13 @@ func (a *agreement) processVote(vote *types.Vote) error {
})
return nil
}
- if err := a.checkForkVote(vote); err != nil {
+ exist, err := a.checkForkVote(vote)
+ if err != nil {
return err
}
+ if exist {
+ return nil
+ }
a.data.lock.Lock()
defer a.data.lock.Unlock()
@@ -336,12 +381,13 @@ func (a *agreement) processVote(vote *types.Vote) error {
a.data.votes[vote.Period] = newVoteListMap()
}
a.data.votes[vote.Period][vote.Type][vote.ProposerID] = vote
- if !a.hasOutput && vote.Type == types.VoteCom {
+ if !a.hasOutput &&
+ (vote.Type == types.VoteCom || vote.Type == types.VoteFast) {
if hash, ok := a.data.countVoteNoLock(vote.Period, vote.Type); ok &&
hash != skipBlockHash {
a.hasOutput = true
a.data.recv.ConfirmBlock(hash,
- a.data.votes[vote.Period][types.VoteCom])
+ a.data.votes[vote.Period][vote.Type])
return nil
}
} else if a.hasOutput {
@@ -443,6 +489,7 @@ func (a *agreement) processBlock(block *types.Block) error {
}
if b, exist := a.data.blocks[block.ProposerID]; exist {
if b.Hash != block.Hash {
+ a.data.recv.ReportForkBlock(b, block)
return &ErrFork{block.ProposerID, b.Hash, block.Hash}
}
return nil
@@ -452,6 +499,39 @@ func (a *agreement) processBlock(block *types.Block) error {
}
a.data.blocks[block.ProposerID] = block
a.addCandidateBlockNoLock(block)
+ if (a.state.state() == stateFast || a.state.state() == stateFastVote) &&
+ block.ProposerID == a.leader() {
+ go func() {
+ for func() bool {
+ a.lock.RLock()
+ defer a.lock.RUnlock()
+ if a.state.state() != stateFast && a.state.state() != stateFastVote {
+ return false
+ }
+ block, exist := a.data.blocks[a.leader()]
+ if !exist {
+ return true
+ }
+ a.data.lock.RLock()
+ defer a.data.lock.RUnlock()
+ ok, err := a.data.leader.validLeader(block)
+ if err != nil {
+ fmt.Println("Error checking validLeader for Fast BA",
+ "error", err, "block", block)
+ return false
+ }
+ if ok {
+ a.data.recv.ProposeVote(
+ types.NewVote(types.VoteFast, block.Hash, a.data.period))
+ return false
+ }
+ return true
+ }() {
+ // TODO(jimmy): retry interval should be related to configurations.
+ time.Sleep(250 * time.Millisecond)
+ }
+ }()
+ }
return nil
}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
index 0e5a1fbdb..0754e800f 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
@@ -22,6 +22,7 @@ import (
"encoding/hex"
"fmt"
"sync"
+ "sync/atomic"
"time"
"github.com/dexon-foundation/dexon-consensus/common"
@@ -69,11 +70,15 @@ type consensusBAReceiver struct {
agreementModule *agreement
chainID uint32
changeNotaryTime time.Time
- round uint64
+ roundValue *atomic.Value
isNotary bool
restartNotary chan types.Position
}
+func (recv *consensusBAReceiver) round() uint64 {
+ return recv.roundValue.Load().(uint64)
+}
+
func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) {
if !recv.isNotary {
return
@@ -99,17 +104,20 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash {
if !recv.isNotary {
return common.Hash{}
}
- block := recv.consensus.proposeBlock(recv.chainID, recv.round)
+ block := recv.consensus.proposeBlock(recv.chainID, recv.round())
if block == nil {
recv.consensus.logger.Error("unable to propose block")
return nullBlockHash
}
- if err := recv.consensus.preProcessBlock(block); err != nil {
- recv.consensus.logger.Error("Failed to pre-process block", "error", err)
- return common.Hash{}
- }
- recv.consensus.logger.Debug("Calling Network.BroadcastBlock", "block", block)
- recv.consensus.network.BroadcastBlock(block)
+ go func() {
+ if err := recv.consensus.preProcessBlock(block); err != nil {
+ recv.consensus.logger.Error("Failed to pre-process block", "error", err)
+ return
+ }
+ recv.consensus.logger.Debug("Calling Network.BroadcastBlock",
+ "block", block)
+ recv.consensus.network.BroadcastBlock(block)
+ }()
return block.Hash
}
@@ -122,7 +130,7 @@ func (recv *consensusBAReceiver) ConfirmBlock(
recv.consensus.logger.Info("Empty block is confirmed",
"position", &aID)
var err error
- block, err = recv.consensus.proposeEmptyBlock(recv.round, recv.chainID)
+ block, err = recv.consensus.proposeEmptyBlock(recv.round(), recv.chainID)
if err != nil {
recv.consensus.logger.Error("Propose empty block failed", "error", err)
return
@@ -140,9 +148,19 @@ func (recv *consensusBAReceiver) ConfirmBlock(
defer recv.consensus.lock.Unlock()
recv.consensus.baConfirmedBlock[hash] = ch
}()
- recv.consensus.network.PullBlocks(common.Hashes{hash})
go func() {
- block = <-ch
+ hashes := common.Hashes{hash}
+ PullBlockLoop:
+ for {
+ recv.consensus.logger.Debug("Calling Network.PullBlock for BA block",
+ "hash", hash)
+ recv.consensus.network.PullBlocks(hashes)
+ select {
+ case block = <-ch:
+ break PullBlockLoop
+ case <-time.After(1 * time.Second):
+ }
+ }
recv.consensus.logger.Info("Receive unknown block",
"hash", hash.String()[:6],
"position", &block.Position,
@@ -242,7 +260,7 @@ CleanChannelLoop:
}
newPos := block.Position
if block.Timestamp.After(recv.changeNotaryTime) {
- recv.round++
+ recv.roundValue.Store(recv.round() + 1)
newPos.Round++
}
recv.restartNotary <- newPos
@@ -256,6 +274,14 @@ func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) {
recv.consensus.network.PullBlocks(hashes)
}
+func (recv *consensusBAReceiver) ReportForkVote(v1, v2 *types.Vote) {
+ recv.consensus.gov.ReportForkVote(v1, v2)
+}
+
+func (recv *consensusBAReceiver) ReportForkBlock(b1, b2 *types.Block) {
+ recv.consensus.gov.ReportForkBlock(b1, b2)
+}
+
// consensusDKGReceiver implements dkgReceiver.
type consensusDKGReceiver struct {
ID types.NodeID
@@ -388,6 +414,14 @@ type Consensus struct {
logger common.Logger
nonFinalizedBlockDelivered bool
resetRandomnessTicker chan struct{}
+ resetDeliveryGuardTicker chan struct{}
+ msgChan chan interface{}
+ waitGroup sync.WaitGroup
+
+ // Context of Dummy receiver during switching from syncer.
+ dummyCancel context.CancelFunc
+ dummyFinished <-chan struct{}
+ dummyMsgBuffer []interface{}
}
// NewConsensus construct an Consensus instance.
@@ -423,6 +457,9 @@ func NewConsensusForSimulation(
// You need to provide the initial block for this newly created Consensus
// instance to bootstrap with. A proper choice is the last finalized block you
// delivered to syncer.
+//
+// NOTE: those confirmed blocks should be organized by chainID and sorted by
+// their positions, in ascending order.
func NewConsensusFromSyncer(
initBlock *types.Block,
initRoundBeginTime time.Time,
@@ -432,17 +469,43 @@ func NewConsensusFromSyncer(
networkModule Network,
prv crypto.PrivateKey,
latticeModule *Lattice,
- blocks []*types.Block,
+ confirmedBlocks [][]*types.Block,
randomnessResults []*types.BlockRandomnessResult,
+ cachedMessages []interface{},
logger common.Logger) (*Consensus, error) {
// Setup Consensus instance.
con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db,
networkModule, prv, logger, latticeModule, true)
- // Dump all BA-confirmed blocks to the consensus instance.
- for _, b := range blocks {
- con.ccModule.registerBlock(b)
- if err := con.processBlock(b); err != nil {
- return nil, err
+ // Launch a dummy receiver before we start receiving from network module.
+ con.dummyMsgBuffer = cachedMessages
+ con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
+ con.ctx, networkModule.ReceiveChan(), func(msg interface{}) {
+ con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
+ })
+ // Dump all BA-confirmed blocks to the consensus instance, make sure these
+ // added blocks forming a DAG.
+ for {
+ updated := false
+ for idx, bs := range confirmedBlocks {
+ for bIdx, b := range bs {
+ // Only when its parent block is already added to lattice, we can
+ // then add this block. If not, our pulling mechanism would stop at
+ // the block we added, and lost its parent block forever.
+ if !latticeModule.Exist(b.ParentHash) {
+ logger.Debug("Skip discontinuous confirmed block",
+ "from", b,
+ "until", bs[len(bs)-1])
+ confirmedBlocks[idx] = bs[bIdx:]
+ break
+ }
+ con.ccModule.registerBlock(b)
+ if err := con.processBlock(b); err != nil {
+ return nil, err
+ }
+ }
+ }
+ if !updated {
+ break
}
}
// Dump all randomness result to the consensus instance.
@@ -504,22 +567,25 @@ func newConsensusForRound(
}
// Construct Consensus instance.
con := &Consensus{
- ID: ID,
- ccModule: newCompactionChain(gov),
- lattice: latticeModule,
- app: appModule,
- debugApp: debugApp,
- gov: gov,
- db: db,
- network: network,
- baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
- dkgReady: sync.NewCond(&sync.Mutex{}),
- cfgModule: cfgModule,
- dMoment: initRoundBeginTime,
- nodeSetCache: nodeSetCache,
- signer: signer,
- event: common.NewEvent(),
- logger: logger,
+ ID: ID,
+ ccModule: newCompactionChain(gov),
+ lattice: latticeModule,
+ app: appModule,
+ debugApp: debugApp,
+ gov: gov,
+ db: db,
+ network: network,
+ baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
+ dkgReady: sync.NewCond(&sync.Mutex{}),
+ cfgModule: cfgModule,
+ dMoment: initRoundBeginTime,
+ nodeSetCache: nodeSetCache,
+ signer: signer,
+ event: common.NewEvent(),
+ logger: logger,
+ resetRandomnessTicker: make(chan struct{}),
+ resetDeliveryGuardTicker: make(chan struct{}),
+ msgChan: make(chan interface{}, 1024),
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime)
@@ -585,12 +651,40 @@ func (con *Consensus) Run() {
con.baMgr.run()
// Launch network handler.
con.logger.Debug("Calling Network.ReceiveChan")
- go con.processMsg(con.network.ReceiveChan())
+ con.waitGroup.Add(1)
+ go con.deliverNetworkMsg()
+ con.waitGroup.Add(1)
+ go con.processMsg()
// Sleep until dMoment come.
time.Sleep(con.dMoment.Sub(time.Now().UTC()))
// Take some time to bootstrap.
time.Sleep(3 * time.Second)
+ con.waitGroup.Add(1)
go con.pullRandomness()
+ // Stop dummy receiver if launched.
+ if con.dummyCancel != nil {
+ con.logger.Trace("Stop dummy receiver")
+ con.dummyCancel()
+ <-con.dummyFinished
+ // Replay those cached messages.
+ con.logger.Trace("Dummy receiver stoped, start dumping cached messages",
+ "count", len(con.dummyMsgBuffer))
+ for _, msg := range con.dummyMsgBuffer {
+ loop:
+ for {
+ select {
+ case con.msgChan <- msg:
+ break loop
+ case <-time.After(50 * time.Millisecond):
+ con.logger.Debug(
+ "internal message channel is full when syncing")
+ }
+ }
+ }
+ con.logger.Trace("Finish dumping cached messages")
+ }
+ con.waitGroup.Add(1)
+ go con.deliveryGuard()
// Block until done.
select {
case <-con.ctx.Done():
@@ -785,18 +879,51 @@ func (con *Consensus) Stop() {
con.ctxCancel()
con.baMgr.stop()
con.event.Reset()
+ con.waitGroup.Wait()
}
-func (con *Consensus) processMsg(msgChan <-chan interface{}) {
+func (con *Consensus) deliverNetworkMsg() {
+ defer con.waitGroup.Done()
+ recv := con.network.ReceiveChan()
+ for {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ select {
+ case msg := <-recv:
+ innerLoop:
+ for {
+ select {
+ case con.msgChan <- msg:
+ break innerLoop
+ case <-time.After(500 * time.Millisecond):
+ con.logger.Debug("internal message channel is full",
+ "pending", msg)
+ }
+ }
+ case <-con.ctx.Done():
+ return
+ }
+ }
+}
+
+func (con *Consensus) processMsg() {
+ defer con.waitGroup.Done()
MessageLoop:
for {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
var msg interface{}
select {
- case msg = <-msgChan:
+ case msg = <-con.msgChan:
case <-con.ctx.Done():
return
}
-
switch val := msg.(type) {
case *types.Block:
if ch, exist := func() (chan<- *types.Block, bool) {
@@ -805,7 +932,7 @@ MessageLoop:
ch, e := con.baConfirmedBlock[val.Hash]
return ch, e
}(); exist {
- if err := con.lattice.SanityCheck(val); err != nil {
+ if err := con.lattice.SanityCheck(val, false); err != nil {
if err == ErrRetrySanityCheckLater {
err = nil
} else {
@@ -913,6 +1040,7 @@ func (con *Consensus) ProcessAgreementResult(
if err := VerifyAgreementResult(rand, con.nodeSetCache); err != nil {
return err
}
+ con.lattice.AddShallowBlock(rand.BlockHash, rand.Position)
// Syncing BA Module.
if err := con.baMgr.processAgreementResult(rand); err != nil {
return err
@@ -1007,10 +1135,16 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
}
func (con *Consensus) pullRandomness() {
+ defer con.waitGroup.Done()
for {
select {
case <-con.ctx.Done():
return
+ default:
+ }
+ select {
+ case <-con.ctx.Done():
+ return
case <-con.resetRandomnessTicker:
case <-time.After(1500 * time.Millisecond):
// TODO(jimmy): pulling period should be related to lambdaBA.
@@ -1024,12 +1158,38 @@ func (con *Consensus) pullRandomness() {
}
}
+func (con *Consensus) deliveryGuard() {
+ defer con.waitGroup.Done()
+ time.Sleep(con.dMoment.Sub(time.Now()))
+ // Node takes time to start.
+ time.Sleep(60 * time.Second)
+ for {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ select {
+ case <-con.ctx.Done():
+ return
+ case <-con.resetDeliveryGuardTicker:
+ case <-time.After(60 * time.Second):
+ con.logger.Error("no blocks delivered for too long", "ID", con.ID)
+ panic(fmt.Errorf("no blocks delivered for too long"))
+ }
+ }
+}
+
// deliverBlock deliver a block to application layer.
func (con *Consensus) deliverBlock(b *types.Block) {
select {
case con.resetRandomnessTicker <- struct{}{}:
default:
}
+ select {
+ case con.resetDeliveryGuardTicker <- struct{}{}:
+ default:
+ }
if err := con.db.UpdateBlock(*b); err != nil {
panic(err)
}
@@ -1086,9 +1246,7 @@ func (con *Consensus) deliverFinalizedBlocksWithoutLock() (err error) {
for _, b := range deliveredBlocks {
con.deliverBlock(b)
}
- if err = con.lattice.PurgeBlocks(deliveredBlocks); err != nil {
- return
- }
+ err = con.lattice.PurgeBlocks(deliveredBlocks)
return
}
@@ -1150,9 +1308,7 @@ func (con *Consensus) prepareBlock(b *types.Block,
err = ErrCRSNotReady
return
}
- if err = con.signer.SignCRS(b, crs); err != nil {
- return
- }
+ err = con.signer.SignCRS(b, crs)
return
}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/db/level-db.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/db/level-db.go
index 75c30372f..1fe29fa10 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/db/level-db.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/db/level-db.go
@@ -72,11 +72,7 @@ func (lvl *LevelDBBackedDB) HasBlock(hash common.Hash) bool {
}
func (lvl *LevelDBBackedDB) internalHasBlock(key []byte) (bool, error) {
- exists, err := lvl.db.Has(key, nil)
- if err != nil {
- return false, err
- }
- return exists, nil
+ return lvl.db.Has(key, nil)
}
// GetBlock implements the Reader.GetBlock method.
@@ -90,9 +86,6 @@ func (lvl *LevelDBBackedDB) GetBlock(
return
}
err = rlp.DecodeBytes(queried, &block)
- if err != nil {
- return
- }
return
}
@@ -113,9 +106,7 @@ func (lvl *LevelDBBackedDB) UpdateBlock(block types.Block) (err error) {
err = ErrBlockDoesNotExist
return
}
- if err = lvl.db.Put(blockKey, marshaled, nil); err != nil {
- return
- }
+ err = lvl.db.Put(blockKey, marshaled, nil)
return
}
@@ -134,9 +125,7 @@ func (lvl *LevelDBBackedDB) PutBlock(block types.Block) (err error) {
err = ErrBlockExists
return
}
- if err = lvl.db.Put(blockKey, marshaled, nil); err != nil {
- return
- }
+ err = lvl.db.Put(blockKey, marshaled, nil)
return
}
@@ -166,10 +155,7 @@ func (lvl *LevelDBBackedDB) PutCompactionChainTipInfo(
if info.Height+1 != height {
return ErrInvalidCompactionChainTipHeight
}
- if err = lvl.db.Put(compactionChainTipInfoKey, marshaled, nil); err != nil {
- return err
- }
- return nil
+ return lvl.db.Put(compactionChainTipInfoKey, marshaled, nil)
}
func (lvl *LevelDBBackedDB) internalGetCompactionChainTipInfo() (
@@ -181,9 +167,7 @@ func (lvl *LevelDBBackedDB) internalGetCompactionChainTipInfo() (
}
return
}
- if err = rlp.DecodeBytes(queried, &info); err != nil {
- return
- }
+ err = rlp.DecodeBytes(queried, &info)
return
}
@@ -201,11 +185,7 @@ func (lvl *LevelDBBackedDB) GetCompactionChainTipInfo() (
// HasDKGPrivateKey check existence of DKG private key of one round.
func (lvl *LevelDBBackedDB) HasDKGPrivateKey(round uint64) (bool, error) {
- exists, err := lvl.db.Has(lvl.getDKGPrivateKeyKey(round), nil)
- if err != nil {
- return false, err
- }
- return exists, nil
+ return lvl.db.Has(lvl.getDKGPrivateKeyKey(round), nil)
}
// GetDKGPrivateKey get DKG private key of one round.
@@ -218,9 +198,7 @@ func (lvl *LevelDBBackedDB) GetDKGPrivateKey(round uint64) (
}
return
}
- if err = rlp.DecodeBytes(queried, &prv); err != nil {
- return
- }
+ err = rlp.DecodeBytes(queried, &prv)
return
}
@@ -239,11 +217,8 @@ func (lvl *LevelDBBackedDB) PutDKGPrivateKey(
if err != nil {
return err
}
- if err := lvl.db.Put(
- lvl.getDKGPrivateKeyKey(round), marshaled, nil); err != nil {
- return err
- }
- return nil
+ return lvl.db.Put(
+ lvl.getDKGPrivateKeyKey(round), marshaled, nil)
}
func (lvl *LevelDBBackedDB) getBlockKey(hash common.Hash) (ret []byte) {
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go
index aa87e38f7..a77ec9385 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go
@@ -159,4 +159,7 @@ type Ticker interface {
// Stop the ticker.
Stop()
+
+ // Retart the ticker and clear all internal data.
+ Restart()
}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go
index e55c0dbfc..cf81a1161 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go
@@ -55,7 +55,7 @@ type ErrAckingBlockNotExists struct {
}
func (e ErrAckingBlockNotExists) Error() string {
- return fmt.Sprintf("acking block %s not exists", e.hash)
+ return fmt.Sprintf("acking block %s not exists", e.hash.String()[:6])
}
// Errors for method usage
@@ -113,6 +113,8 @@ type latticeData struct {
blockByHash map[common.Hash]*types.Block
// This stores configuration for each round.
configs []*latticeDataConfig
+ // shallowBlocks stores the hash of blocks that their body is not receive yet.
+ shallowBlocks map[common.Hash]types.Position
}
// newLatticeData creates a new latticeData instance.
@@ -126,10 +128,11 @@ func newLatticeData(
genesisConfig.fromConfig(round, config)
genesisConfig.setRoundBeginTime(dMoment)
data = &latticeData{
- db: db,
- chains: make([]*chainStatus, genesisConfig.numChains),
- blockByHash: make(map[common.Hash]*types.Block),
- configs: []*latticeDataConfig{genesisConfig},
+ db: db,
+ chains: make([]*chainStatus, genesisConfig.numChains),
+ blockByHash: make(map[common.Hash]*types.Block),
+ configs: []*latticeDataConfig{genesisConfig},
+ shallowBlocks: make(map[common.Hash]types.Position),
}
for i := range data.chains {
data.chains[i] = &chainStatus{
@@ -141,15 +144,35 @@ func newLatticeData(
return
}
-func (data *latticeData) checkAckingRelations(b *types.Block) error {
+func (data *latticeData) addShallowBlock(hash common.Hash, pos types.Position) {
+ // We don't care other errors here. This `if` is to prevent being spammed by
+ // very old blocks.
+ if _, err := data.findBlock(hash); err != db.ErrBlockDoesNotExist {
+ return
+ }
+ data.shallowBlocks[hash] = pos
+}
+
+func (data *latticeData) checkAckingRelations(
+ b *types.Block, allowShallow bool) error {
acksByChainID := make(map[uint32]struct{}, len(data.chains))
for _, hash := range b.Acks {
bAck, err := data.findBlock(hash)
if err != nil {
if err == db.ErrBlockDoesNotExist {
- return &ErrAckingBlockNotExists{hash}
+ err = &ErrAckingBlockNotExists{hash}
+ if allowShallow {
+ if pos, exist := data.shallowBlocks[hash]; exist {
+ bAck = &types.Block{
+ Position: pos,
+ }
+ err = nil
+ }
+ }
+ }
+ if err != nil {
+ return err
}
- return err
}
// Check if it acks blocks from old rounds, the allowed round difference
// is 1.
@@ -172,7 +195,7 @@ func (data *latticeData) checkAckingRelations(b *types.Block) error {
return nil
}
-func (data *latticeData) sanityCheck(b *types.Block) error {
+func (data *latticeData) sanityCheck(b *types.Block, allowShallow bool) error {
// TODO(mission): Check if its proposer is in validator set, lattice has no
// knowledge about node set.
config := data.getConfig(b.Position.Round)
@@ -196,7 +219,7 @@ func (data *latticeData) sanityCheck(b *types.Block) error {
if !config.isValidGenesisBlockTime(b) {
return ErrIncorrectBlockTime
}
- return data.checkAckingRelations(b)
+ return data.checkAckingRelations(b, allowShallow)
}
// Check parent block if parent hash is specified.
if !b.ParentHash.Equal(common.Hash{}) {
@@ -257,10 +280,7 @@ func (data *latticeData) sanityCheck(b *types.Block) error {
return ErrNotAckParent
}
}
- if err := data.checkAckingRelations(b); err != nil {
- return err
- }
- return nil
+ return data.checkAckingRelations(b, allowShallow)
}
// addBlock processes blocks. It does sanity check, inserts block into lattice
@@ -504,19 +524,21 @@ func (data *latticeData) prepareEmptyBlock(b *types.Block) (err error) {
}
// TODO(mission): make more abstraction for this method.
-// nextHeight returns the next height of a chain.
-func (data *latticeData) nextHeight(
- round uint64, chainID uint32) (uint64, error) {
+// nextBlock returns the next height and timestamp of a chain.
+func (data *latticeData) nextBlock(
+ round uint64, chainID uint32) (uint64, time.Time, error) {
chainTip := data.chains[chainID].tip
bindTip, err := data.isBindTip(
types.Position{Round: round, ChainID: chainID}, chainTip)
if err != nil {
- return 0, err
+ return 0, time.Time{}, err
}
+ config := data.getConfig(round)
if bindTip {
- return chainTip.Position.Height + 1, nil
+ return chainTip.Position.Height + 1,
+ chainTip.Timestamp.Add(config.minBlockTimeInterval), nil
}
- return 0, nil
+ return 0, config.roundBeginTime, nil
}
// findBlock seeks blocks in memory or db.
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go
index b30306aef..d531639b9 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go
@@ -90,9 +90,7 @@ func (l *Lattice) PrepareBlock(
if b.Witness, err = l.app.PrepareWitness(b.Witness.Height); err != nil {
return
}
- if err = l.signer.SignBlock(b); err != nil {
- return
- }
+ err = l.signer.SignBlock(b)
return
}
@@ -103,17 +101,23 @@ func (l *Lattice) PrepareEmptyBlock(b *types.Block) (err error) {
if err = l.data.prepareEmptyBlock(b); err != nil {
return
}
- if b.Hash, err = utils.HashBlock(b); err != nil {
- return
- }
+ b.Hash, err = utils.HashBlock(b)
return
}
+// AddShallowBlock adds a hash of a block that is confirmed by other nodes but
+// the content is not arrived yet.
+func (l *Lattice) AddShallowBlock(hash common.Hash, pos types.Position) {
+ l.lock.Lock()
+ defer l.lock.Unlock()
+ l.data.addShallowBlock(hash, pos)
+}
+
// SanityCheck checks the validity of a block.
//
// If any acking block of this block does not exist, Lattice caches this block
// and retries when Lattice.ProcessBlock is called.
-func (l *Lattice) SanityCheck(b *types.Block) (err error) {
+func (l *Lattice) SanityCheck(b *types.Block, allowShallow bool) (err error) {
if b.IsEmpty() {
// Only need to verify block's hash.
var hash common.Hash
@@ -139,18 +143,13 @@ func (l *Lattice) SanityCheck(b *types.Block) (err error) {
return
}
}
- if err = func() (err error) {
- l.lock.RLock()
- defer l.lock.RUnlock()
- if err = l.data.sanityCheck(b); err != nil {
- if _, ok := err.(*ErrAckingBlockNotExists); ok {
- err = ErrRetrySanityCheckLater
- }
- return
+ l.lock.RLock()
+ defer l.lock.RUnlock()
+ if err = l.data.sanityCheck(b, allowShallow); err != nil {
+ if _, ok := err.(*ErrAckingBlockNotExists); ok {
+ err = ErrRetrySanityCheckLater
}
return
- }(); err != nil {
- return
}
return
}
@@ -159,10 +158,8 @@ func (l *Lattice) SanityCheck(b *types.Block) (err error) {
func (l *Lattice) Exist(hash common.Hash) bool {
l.lock.RLock()
defer l.lock.RUnlock()
- if _, err := l.data.findBlock(hash); err != nil {
- return false
- }
- return true
+ _, err := l.data.findBlock(hash)
+ return err == nil
}
// addBlockToLattice adds a block into lattice, and delivers blocks with the
@@ -189,7 +186,7 @@ func (l *Lattice) addBlockToLattice(
if tip = l.pool.tip(i); tip == nil {
continue
}
- err = l.data.sanityCheck(tip)
+ err = l.data.sanityCheck(tip, false)
if err == nil {
var output []*types.Block
if output, err = l.data.addBlock(tip); err != nil {
@@ -199,6 +196,7 @@ func (l *Lattice) addBlockToLattice(
"block", tip, "error", err)
panic(err)
}
+ delete(l.data.shallowBlocks, tip.Hash)
hasOutput = true
outputBlocks = append(outputBlocks, output...)
l.pool.removeTip(i)
@@ -267,11 +265,11 @@ func (l *Lattice) ProcessBlock(
if len(toDelivered) == 0 {
break
}
- hashes := make(common.Hashes, len(toDelivered))
- for idx := range toDelivered {
- hashes[idx] = toDelivered[idx].Hash
- }
if l.debug != nil {
+ hashes := make(common.Hashes, len(toDelivered))
+ for idx := range toDelivered {
+ hashes[idx] = toDelivered[idx].Hash
+ }
l.debug.TotalOrderingDelivered(hashes, deliveredMode)
}
// Perform consensus timestamp module.
@@ -283,12 +281,13 @@ func (l *Lattice) ProcessBlock(
return
}
-// NextHeight returns expected height of incoming block for specified chain and
-// given round.
-func (l *Lattice) NextHeight(round uint64, chainID uint32) (uint64, error) {
+// NextBlock returns expected height and timestamp of incoming block for
+// specified chain and given round.
+func (l *Lattice) NextBlock(round uint64, chainID uint32) (
+ uint64, time.Time, error) {
l.lock.RLock()
defer l.lock.RUnlock()
- return l.data.nextHeight(round, chainID)
+ return l.data.nextBlock(round, chainID)
}
// PurgeBlocks purges blocks' cache in memory, this is called when the caller
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/leader-selector.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/leader-selector.go
index 94d28faeb..bcfa57fe6 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/leader-selector.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/leader-selector.go
@@ -150,10 +150,7 @@ func (l *leaderSelector) processBlock(block *types.Block) error {
func (l *leaderSelector) potentialLeader(block *types.Block) (bool, *big.Int) {
dist := l.distance(block.CRSSignature)
cmp := l.minCRSBlock.Cmp(dist)
- if cmp > 0 || (cmp == 0 && block.Hash.Less(l.minBlockHash)) {
- return true, dist
- }
- return false, dist
+ return (cmp > 0 || (cmp == 0 && block.Hash.Less(l.minBlockHash))), dist
}
func (l *leaderSelector) updateLeader(block *types.Block, dist *big.Int) {
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go
index 9b351eabc..acc4f1c6c 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go
@@ -1,4 +1,4 @@
-// Copyright 2018 The dexon-consensus-core Authors
+// Copyright 2018 The dexon-consensus Authors
// This file is part of the dexon-consensus-core library.
//
// The dexon-consensus-core library is free software: you can redistribute it
@@ -18,6 +18,9 @@
package syncer
import (
+ "context"
+ "time"
+
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core"
"github.com/dexon-foundation/dexon-consensus/core/types"
@@ -37,16 +40,14 @@ type agreement struct {
pendings map[uint64]map[common.Hash]*types.AgreementResult
logger common.Logger
confirmedBlocks map[common.Hash]struct{}
+ ctx context.Context
+ ctxCancel context.CancelFunc
}
// newAgreement creates a new agreement instance.
-func newAgreement(
- ch chan<- *types.Block,
- pullChan chan<- common.Hash,
- cache *utils.NodeSetCache,
- logger common.Logger) *agreement {
-
- return &agreement{
+func newAgreement(ch chan<- *types.Block, pullChan chan<- common.Hash,
+ cache *utils.NodeSetCache, logger common.Logger) *agreement {
+ a := &agreement{
cache: cache,
inputChan: make(chan interface{}, 1000),
outputChan: ch,
@@ -58,11 +59,14 @@ func newAgreement(
map[uint64]map[common.Hash]*types.AgreementResult),
confirmedBlocks: make(map[common.Hash]struct{}),
}
+ a.ctx, a.ctxCancel = context.WithCancel(context.Background())
+ return a
}
// run starts the agreement, this does not start a new routine, go a new
// routine explicitly in the caller.
func (a *agreement) run() {
+ defer a.ctxCancel()
for {
select {
case val, ok := <-a.inputChan:
@@ -119,22 +123,35 @@ func (a *agreement) processAgreementResult(r *types.AgreementResult) {
return
}
if r.IsEmptyBlock {
- // Empty block is also confirmed.
b := &types.Block{
Position: r.Position,
}
+ // Empty blocks should be confirmed directly, they won't be sent over
+ // the wire.
a.confirm(b)
- } else {
- needPull := true
- if bs, exist := a.blocks[r.Position]; exist {
- if b, exist := bs[r.BlockHash]; exist {
- a.confirm(b)
- needPull = false
- }
+ return
+ }
+ if bs, exist := a.blocks[r.Position]; exist {
+ if b, exist := bs[r.BlockHash]; exist {
+ a.confirm(b)
+ return
}
- if needPull {
- a.agreementResults[r.BlockHash] = struct{}{}
- a.pullChan <- r.BlockHash
+ }
+ a.agreementResults[r.BlockHash] = struct{}{}
+loop:
+ for {
+ select {
+ case a.pullChan <- r.BlockHash:
+ break loop
+ case <-a.ctx.Done():
+ a.logger.Error("pull request is not sent",
+ "position", &r.Position,
+ "hash", r.BlockHash.String()[:6])
+ return
+ case <-time.After(500 * time.Millisecond):
+ a.logger.Debug("pull request is unable to send",
+ "position", &r.Position,
+ "hash", r.BlockHash.String()[:6])
}
}
}
@@ -168,7 +185,18 @@ func (a *agreement) confirm(b *types.Block) {
if _, exist := a.confirmedBlocks[b.Hash]; !exist {
delete(a.blocks, b.Position)
delete(a.agreementResults, b.Hash)
- a.outputChan <- b
+ loop:
+ for {
+ select {
+ case a.outputChan <- b:
+ break loop
+ case <-a.ctx.Done():
+ a.logger.Error("confirmed block is not sent", "block", b)
+ return
+ case <-time.After(500 * time.Millisecond):
+ a.logger.Debug("agreement output channel is full", "block", b)
+ }
+ }
a.confirmedBlocks[b.Hash] = struct{}{}
}
}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go
index 92f8fd8d0..75c106793 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go
@@ -18,7 +18,6 @@
package syncer
import (
- "bytes"
"context"
"fmt"
"sort"
@@ -40,7 +39,8 @@ var (
ErrNotSynced = fmt.Errorf("not synced yet")
// ErrGenesisBlockReached is reported when genesis block reached.
ErrGenesisBlockReached = fmt.Errorf("genesis block reached")
- // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered blocks.
+ // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered
+ // blocks.
ErrInvalidBlockOrder = fmt.Errorf("invalid block order")
// ErrMismatchBlockHashSequence means the delivering sequence is not
// correct, compared to finalized blocks.
@@ -61,6 +61,7 @@ type Consensus struct {
prv crypto.PrivateKey
network core.Network
nodeSetCache *utils.NodeSetCache
+ tsigVerifier *core.TSigVerifierCache
lattice *core.Lattice
validatedChains map[uint32]struct{}
@@ -83,6 +84,9 @@ type Consensus struct {
ctxCancel context.CancelFunc
syncedLastBlock *types.Block
syncedConsensus *core.Consensus
+ dummyCancel context.CancelFunc
+ dummyFinished <-chan struct{}
+ dummyMsgBuffer []interface{}
}
// NewConsensus creates an instance for Consensus (syncer consensus).
@@ -102,6 +106,7 @@ func NewConsensus(
db: db,
network: network,
nodeSetCache: utils.NewNodeSetCache(gov),
+ tsigVerifier: core.NewTSigVerifierCache(gov, 7),
prv: prv,
logger: logger,
validatedChains: make(map[uint32]struct{}),
@@ -118,17 +123,15 @@ func NewConsensus(
}
func (con *Consensus) initConsensusObj(initBlock *types.Block) {
- var cfg *types.Config
func() {
con.lock.Lock()
defer con.lock.Unlock()
con.latticeLastRound = initBlock.Position.Round
- cfg = con.configs[con.latticeLastRound]
debugApp, _ := con.app.(core.Debug)
con.lattice = core.NewLattice(
con.roundBeginTimes[con.latticeLastRound],
con.latticeLastRound,
- cfg,
+ con.configs[con.latticeLastRound],
utils.NewSigner(con.prv),
con.app,
debugApp,
@@ -136,37 +139,49 @@ func (con *Consensus) initConsensusObj(initBlock *types.Block) {
con.logger,
)
}()
- con.startAgreement(cfg.NumChains)
+ con.startAgreement()
con.startNetwork()
con.startCRSMonitor()
}
-func (con *Consensus) checkIfValidated() bool {
+func (con *Consensus) checkIfValidated() (validated bool) {
con.lock.RLock()
defer con.lock.RUnlock()
- var numChains = con.configs[con.blocks[0][0].Position.Round].NumChains
- var validatedChainCount uint32
+ var (
+ round = con.blocks[0][0].Position.Round
+ numChains = con.configs[round].NumChains
+ validatedChainCount uint32
+ )
// Make sure we validate some block in all chains.
for chainID := range con.validatedChains {
if chainID < numChains {
validatedChainCount++
}
}
- if validatedChainCount == numChains {
- return true
- }
- con.logger.Debug("not validated yet", "validated-chain", validatedChainCount)
- return false
+ validated = validatedChainCount == numChains
+ con.logger.Debug("syncer chain-validation status",
+ "validated-chain", validatedChainCount,
+ "round", round,
+ "valid", validated)
+ return
}
-func (con *Consensus) checkIfSynced(blocks []*types.Block) bool {
+func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) {
con.lock.RLock()
defer con.lock.RUnlock()
var (
- numChains = con.configs[con.blocks[0][0].Position.Round].NumChains
+ round = con.blocks[0][0].Position.Round
+ numChains = con.configs[round].NumChains
compactionTips = make([]*types.Block, numChains)
overlapCount = uint32(0)
)
+ defer func() {
+ con.logger.Debug("syncer synced status",
+ "overlap-count", overlapCount,
+ "num-chain", numChains,
+ "last-block", blocks[len(blocks)-1],
+ "synced", synced)
+ }()
// Find tips (newset blocks) of each chain in compaction chain.
b := blocks[len(blocks)-1]
for tipCount := uint32(0); tipCount < numChains; {
@@ -178,7 +193,7 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool {
}
}
if (b.Finalization.ParentHash == common.Hash{}) {
- return false
+ return
}
b1, err := con.db.GetBlock(b.Finalization.ParentHash)
if err != nil {
@@ -196,14 +211,8 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool {
}
}
}
- if overlapCount == numChains {
- return true
- }
- con.logger.Debug("not synced yet",
- "overlap-count", overlapCount,
- "num-chain", numChains,
- "last-block", blocks[len(blocks)-1])
- return false
+ synced = overlapCount == numChains
+ return
}
// ensureAgreementOverlapRound ensures the oldest blocks in each chain in
@@ -212,6 +221,10 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool {
func (con *Consensus) ensureAgreementOverlapRound() bool {
con.lock.Lock()
defer con.lock.Unlock()
+ defer func() {
+ con.logger.Debug("ensureAgreementOverlapRound returned",
+ "round", con.agreementRoundCut)
+ }()
if con.agreementRoundCut > 0 {
return true
}
@@ -267,7 +280,6 @@ func (con *Consensus) ensureAgreementOverlapRound() bool {
"configs", len(con.configs))
if tipRoundMap[r] == con.configs[r].NumChains {
con.agreementRoundCut = r
- con.logger.Info("agreement round cut found, round", r)
return true
}
}
@@ -278,12 +290,17 @@ func (con *Consensus) findLatticeSyncBlock(
blocks []*types.Block) (*types.Block, error) {
lastBlock := blocks[len(blocks)-1]
round := lastBlock.Position.Round
+ isConfigChanged := func(prev, cur *types.Config) bool {
+ return prev.K != cur.K ||
+ prev.NumChains != cur.NumChains ||
+ prev.PhiRatio != cur.PhiRatio
+ }
for {
// Find round r which r-1, r, r+1 are all in same total ordering config.
for {
- sameAsPrevRound := round == 0 || !con.isConfigChanged(
+ sameAsPrevRound := round == 0 || !isConfigChanged(
con.configs[round-1], con.configs[round])
- sameAsNextRound := !con.isConfigChanged(
+ sameAsNextRound := !isConfigChanged(
con.configs[round], con.configs[round+1])
if sameAsPrevRound && sameAsNextRound {
break
@@ -306,8 +323,9 @@ func (con *Consensus) findLatticeSyncBlock(
lastBlock = &b
}
// Find the deliver set by hash for two times. Blocks in a deliver set
- // returned by total ordering is sorted by hash. If a block's parent hash
- // is greater than its hash means there is a cut between deliver sets.
+ // returned by total ordering is sorted by hash. If a block's parent
+ // hash is greater than its hash means there is a cut between deliver
+ // sets.
var curBlock, prevBlock *types.Block
var deliverSetFirstBlock, deliverSetLastBlock *types.Block
curBlock = lastBlock
@@ -374,11 +392,13 @@ func (con *Consensus) processFinalizedBlock(block *types.Block) error {
if con.lattice == nil {
return nil
}
- con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash)
delivered, err := con.lattice.ProcessFinalizedBlock(block)
if err != nil {
return err
}
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash)
for idx, b := range delivered {
if con.finalizedBlockHashes[idx] != b.Hash {
return ErrMismatchBlockHashSequence
@@ -393,18 +413,27 @@ func (con *Consensus) processFinalizedBlock(block *types.Block) error {
// regards the blocks are the latest ones. Notice that latest can be true for
// many times.
// NOTICE: parameter "blocks" should be consecutive in compaction height.
+// NOTICE: this method is not expected to be called concurrently.
func (con *Consensus) SyncBlocks(
- blocks []*types.Block, latest bool) (bool, error) {
+ blocks []*types.Block, latest bool) (synced bool, err error) {
+ defer func() {
+ con.logger.Debug("SyncBlocks returned",
+ "synced", synced,
+ "error", err,
+ "last-block", con.syncedLastBlock)
+ }()
if con.syncedLastBlock != nil {
- return true, ErrAlreadySynced
+ synced, err = true, ErrAlreadySynced
+ return
}
if len(blocks) == 0 {
- return false, nil
+ return
}
// Check if blocks are consecutive.
for i := 1; i < len(blocks); i++ {
if blocks[i].Finalization.Height != blocks[i-1].Finalization.Height+1 {
- return false, ErrInvalidBlockOrder
+ err = ErrInvalidBlockOrder
+ return
}
}
// Make sure the first block is the next block of current compaction chain
@@ -414,7 +443,8 @@ func (con *Consensus) SyncBlocks(
con.logger.Error("mismatched finalization height",
"now", blocks[0].Finalization.Height,
"expected", tipHeight+1)
- return false, ErrInvalidSyncingFinalizationHeight
+ err = ErrInvalidSyncingFinalizationHeight
+ return
}
con.logger.Trace("syncBlocks",
"position", &blocks[0].Position,
@@ -425,30 +455,35 @@ func (con *Consensus) SyncBlocks(
con.setupConfigs(blocks)
for _, b := range blocks {
// TODO(haoping) remove this if lattice puts blocks into db.
- if err := con.db.PutBlock(*b); err != nil {
+ if err = con.db.PutBlock(*b); err != nil {
// A block might be put into db when confirmed by BA, but not
// finalized yet.
if err == db.ErrBlockExists {
err = con.db.UpdateBlock(*b)
}
if err != nil {
- return false, err
+ return
}
}
- if err := con.db.PutCompactionChainTipInfo(
+ if err = con.db.PutCompactionChainTipInfo(
b.Hash, b.Finalization.Height); err != nil {
- return false, err
+ return
}
- if err := con.processFinalizedBlock(b); err != nil {
- return false, err
+ if err = con.processFinalizedBlock(b); err != nil {
+ return
}
}
if latest && con.lattice == nil {
- // New Lattice and find the deliver set of total ordering when "latest" is
- // true for first time. Deliver set is found by block hashes.
- syncBlock, err := con.findLatticeSyncBlock(blocks)
+ // New Lattice and find the deliver set of total ordering when "latest"
+ // is true for first time. Deliver set is found by block hashes.
+ var syncBlock *types.Block
+ syncBlock, err = con.findLatticeSyncBlock(blocks)
if err != nil {
- return false, err
+ if err == ErrGenesisBlockReached {
+ con.logger.Debug("SyncBlocks skip error", "error", err)
+ err = nil
+ }
+ return
}
if syncBlock != nil {
con.logger.Debug("deliver set found", "block", syncBlock)
@@ -457,7 +492,8 @@ func (con *Consensus) SyncBlocks(
con.setupConfigs(blocks)
// Process blocks from syncBlock to blocks' last block.
b := blocks[len(blocks)-1]
- blocksCount := b.Finalization.Height - syncBlock.Finalization.Height + 1
+ blocksCount :=
+ b.Finalization.Height - syncBlock.Finalization.Height + 1
blocksToProcess := make([]*types.Block, blocksCount)
for {
blocksToProcess[blocksCount-1] = b
@@ -465,15 +501,16 @@ func (con *Consensus) SyncBlocks(
if b.Hash == syncBlock.Hash {
break
}
- b1, err := con.db.GetBlock(b.Finalization.ParentHash)
+ var b1 types.Block
+ b1, err = con.db.GetBlock(b.Finalization.ParentHash)
if err != nil {
- return false, err
+ return
}
b = &b1
}
for _, b := range blocksToProcess {
- if err := con.processFinalizedBlock(b); err != nil {
- return false, err
+ if err = con.processFinalizedBlock(b); err != nil {
+ return
}
}
}
@@ -483,19 +520,25 @@ func (con *Consensus) SyncBlocks(
// overlapping of compaction chain and BA's oldest blocks means the
// syncing is done.
if con.checkIfValidated() && con.checkIfSynced(blocks) {
- if err := con.Stop(); err != nil {
- return false, err
+ if err = con.Stop(); err != nil {
+ return
}
+ con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
+ context.Background(), con.network.ReceiveChan(),
+ func(msg interface{}) {
+ con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
+ })
con.syncedLastBlock = blocks[len(blocks)-1]
- con.logger.Info("syncer.Consensus synced",
- "last-block", con.syncedLastBlock)
+ synced = true
}
}
- return con.syncedLastBlock != nil, nil
+ return
}
// GetSyncedConsensus returns the core.Consensus instance after synced.
func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
+ con.lock.Lock()
+ defer con.lock.Unlock()
if con.syncedConsensus != nil {
return con.syncedConsensus, nil
}
@@ -504,18 +547,16 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
}
// flush all blocks in con.blocks into core.Consensus, and build
// core.Consensus from syncer.
- confirmedBlocks := []*types.Block{}
+ confirmedBlocks := make([][]*types.Block, len(con.blocks))
+ for i, bs := range con.blocks {
+ confirmedBlocks[i] = []*types.Block(bs)
+ }
randomnessResults := []*types.BlockRandomnessResult{}
- func() {
- con.lock.Lock()
- defer con.lock.Unlock()
- for _, bs := range con.blocks {
- confirmedBlocks = append(confirmedBlocks, bs...)
- }
- for _, r := range con.randomnessResults {
- randomnessResults = append(randomnessResults, r)
- }
- }()
+ for _, r := range con.randomnessResults {
+ randomnessResults = append(randomnessResults, r)
+ }
+ con.dummyCancel()
+ <-con.dummyFinished
var err error
con.syncedConsensus, err = core.NewConsensusFromSyncer(
con.syncedLastBlock,
@@ -528,6 +569,7 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
con.lattice,
confirmedBlocks,
randomnessResults,
+ con.dummyMsgBuffer,
con.logger)
return con.syncedConsensus, err
}
@@ -535,13 +577,17 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
// Stop the syncer.
//
// This method is mainly for caller to stop the syncer before synced, the syncer
-// would call this method automatically after synced.
+// would call this method automatically after being synced.
func (con *Consensus) Stop() error {
+ con.logger.Trace("syncer is about to stop")
// Stop network and CRS routines, wait until they are all stoped.
con.ctxCancel()
+ con.logger.Trace("stop syncer modules")
con.moduleWaitGroup.Wait()
// Stop agreements.
+ con.logger.Trace("stop syncer agreement modules")
con.stopAgreement()
+ con.logger.Trace("syncer stopped")
return nil
}
@@ -566,6 +612,10 @@ func (con *Consensus) setupConfigsUntilRound(round uint64) {
func() {
con.lock.Lock()
defer con.lock.Unlock()
+ con.logger.Debug("syncer setupConfigs",
+ "until-round", round,
+ "length", len(con.configs),
+ "lattice", con.latticeLastRound)
for r := uint64(len(con.configs)); r <= round; r++ {
cfg := utils.GetConfigWithPanic(con.gov, r, con.logger)
con.configs = append(con.configs, cfg)
@@ -589,6 +639,7 @@ func (con *Consensus) setupConfigsUntilRound(round uint64) {
}
}()
con.resizeByNumChains(curMaxNumChains)
+ con.logger.Trace("setupConfgis finished", "round", round)
}
// setupConfigs is called by SyncBlocks with blocks from compaction chain. In
@@ -601,9 +652,6 @@ func (con *Consensus) setupConfigs(blocks []*types.Block) {
maxRound = b.Position.Round
}
}
- con.logger.Debug("syncer setupConfigs",
- "max", maxRound,
- "lattice", con.latticeLastRound)
// Get configs from governance.
//
// In fullnode, the notification of new round is yet another TX, which
@@ -623,7 +671,8 @@ func (con *Consensus) resizeByNumChains(numChains uint32) {
// Resize the pool of blocks.
con.blocks = append(con.blocks, types.ByPosition{})
// Resize agreement modules.
- a := newAgreement(con.receiveChan, con.pullChan, con.nodeSetCache, con.logger)
+ a := newAgreement(
+ con.receiveChan, con.pullChan, con.nodeSetCache, con.logger)
con.agreements = append(con.agreements, a)
con.agreementWaitGroup.Add(1)
go func() {
@@ -635,7 +684,7 @@ func (con *Consensus) resizeByNumChains(numChains uint32) {
}
// startAgreement starts agreements for receiving votes and agreements.
-func (con *Consensus) startAgreement(numChains uint32) {
+func (con *Consensus) startAgreement() {
// Start a routine for listening receive channel and pull block channel.
go func() {
for {
@@ -648,8 +697,8 @@ func (con *Consensus) startAgreement(numChains uint32) {
func() {
con.lock.Lock()
defer con.lock.Unlock()
- // If round is cut in agreements, do not add blocks with round less
- // then cut round.
+ // If round is cut in agreements, do not add blocks with
+ // round less then cut round.
if b.Position.Round < con.agreementRoundCut {
return
}
@@ -667,6 +716,10 @@ func (con *Consensus) startAgreement(numChains uint32) {
}
func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) {
+ // There is no block randomness at round-0.
+ if r.Position.Round == 0 {
+ return
+ }
// We only have to cache randomness result after cutting round.
if r.Position.Round < func() uint64 {
con.lock.RLock()
@@ -675,23 +728,43 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) {
}() {
return
}
- con.lock.Lock()
- defer con.lock.Unlock()
- if old, exists := con.randomnessResults[r.BlockHash]; exists {
- if bytes.Compare(old.Randomness, r.Randomness) != 0 {
- panic(fmt.Errorf("receive different randomness result: %s, %s",
- r.BlockHash.String()[:6], &r.Position))
- }
- // We don't have to assign the map again.
+ if func() (exists bool) {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ _, exists = con.randomnessResults[r.BlockHash]
+ return
+ }() {
+ return
+ }
+ v, ok, err := con.tsigVerifier.UpdateAndGet(r.Position.Round)
+ if err != nil {
+ con.logger.Error("Unable to get tsig verifier",
+ "hash", r.BlockHash.String()[:6],
+ "position", &r.Position,
+ "error", err)
+ return
+ }
+ if !ok {
+ con.logger.Error("Tsig is not ready", "position", &r.Position)
return
}
+ if !v.VerifySignature(r.BlockHash, crypto.Signature{
+ Type: "bls",
+ Signature: r.Randomness}) {
+ con.logger.Info("Block randomness is not valid",
+ "position", &r.Position,
+ "hash", r.BlockHash.String()[:6])
+ return
+ }
+ con.lock.Lock()
+ defer con.lock.Unlock()
con.randomnessResults[r.BlockHash] = r
}
// startNetwork starts network for receiving blocks and agreement results.
func (con *Consensus) startNetwork() {
+ con.moduleWaitGroup.Add(1)
go func() {
- con.moduleWaitGroup.Add(1)
defer con.moduleWaitGroup.Done()
Loop:
for {
@@ -709,15 +782,22 @@ func (con *Consensus) startNetwork() {
default:
continue Loop
}
- func() {
+ if func() bool {
con.lock.RLock()
defer con.lock.RUnlock()
if pos.ChainID >= uint32(len(con.agreements)) {
- con.logger.Error("Unknown chainID message received (syncer)",
+ // This error might be easily encountered when the
+ // "latest" parameter of SyncBlocks is turned on too
+ // early.
+ con.logger.Error(
+ "Unknown chainID message received (syncer)",
"position", &pos)
+ return false
}
- }()
- con.agreements[pos.ChainID].inputChan <- val
+ return true
+ }() {
+ con.agreements[pos.ChainID].inputChan <- val
+ }
case <-con.ctx.Done():
return
}
@@ -732,19 +812,32 @@ func (con *Consensus) startCRSMonitor() {
// Notify all agreements for new CRS.
notifyNewCRS := func(round uint64) {
con.setupConfigsUntilRound(round)
- con.lock.Lock()
- defer con.lock.Unlock()
if round == lastNotifiedRound {
return
}
con.logger.Debug("CRS is ready", "round", round)
lastNotifiedRound = round
- for _, a := range con.agreements {
- a.inputChan <- round
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ for idx, a := range con.agreements {
+ loop:
+ for {
+ select {
+ case <-con.ctx.Done():
+ break loop
+ case a.inputChan <- round:
+ break loop
+ case <-time.After(500 * time.Millisecond):
+ con.logger.Debug(
+ "agreement input channel is full when putting CRS",
+ "chainID", idx,
+ "round", round)
+ }
+ }
}
}
+ con.moduleWaitGroup.Add(1)
go func() {
- con.moduleWaitGroup.Add(1)
defer con.moduleWaitGroup.Done()
for {
select {
@@ -781,9 +874,3 @@ func (con *Consensus) stopAgreement() {
close(con.receiveChan)
close(con.pullChan)
}
-
-func (con *Consensus) isConfigChanged(prev, cur *types.Config) bool {
- return prev.K != cur.K ||
- prev.NumChains != cur.NumChains ||
- prev.PhiRatio != cur.PhiRatio
-}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/ticker.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/ticker.go
index f8d0c67d9..ffd5ab45d 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/ticker.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/ticker.go
@@ -36,12 +36,16 @@ const (
// defaultTicker is a wrapper to implement ticker interface based on
// time.Ticker.
type defaultTicker struct {
- ticker *time.Ticker
+ ticker *time.Ticker
+ duration time.Duration
}
// newDefaultTicker constructs an defaultTicker instance by giving an interval.
func newDefaultTicker(lambda time.Duration) *defaultTicker {
- return &defaultTicker{ticker: time.NewTicker(lambda)}
+ return &defaultTicker{
+ ticker: time.NewTicker(lambda),
+ duration: lambda,
+ }
}
// Tick implements Tick method of ticker interface.
@@ -54,6 +58,12 @@ func (t *defaultTicker) Stop() {
t.ticker.Stop()
}
+// Restart implements Stop method of ticker interface.
+func (t *defaultTicker) Restart() {
+ t.ticker.Stop()
+ t.ticker = time.NewTicker(t.duration)
+}
+
// newTicker is a helper to setup a ticker by giving an Governance. If
// the governace object implements a ticker generator, a ticker from that
// generator would be returned, else constructs a default one.
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go
index de8dd0bb7..2e2158e7c 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go
@@ -652,12 +652,14 @@ func (global *totalOrderingGlobalVector) addBlock(
return
}
// Add breakpoint.
- global.breakpoints[chainID] = append(
- global.breakpoints[chainID],
- &totalOrderingBreakpoint{
- roundID: b.Position.Round,
- lastHeight: tip.Position.Height,
- })
+ if b.Position.Round > global.curRound {
+ global.breakpoints[chainID] = append(
+ global.breakpoints[chainID],
+ &totalOrderingBreakpoint{
+ roundID: b.Position.Round,
+ lastHeight: tip.Position.Height,
+ })
+ }
} else {
if b.Position.Height != tip.Position.Height+1 {
err = ErrInvalidDAG
@@ -1052,7 +1054,6 @@ func (to *totalOrdering) generateDeliverSet() (
wg.Wait()
// Reset dirty chains.
to.dirtyChainIDs = to.dirtyChainIDs[:0]
- // TODO(mission): ANS should be bounded by current numChains.
globalAnsLength := globalInfo.getAckingNodeSetLength(
globalInfo, cfg.k, cfg.numChains)
CheckNextCandidateLoop:
@@ -1126,7 +1127,6 @@ CheckNextCandidateLoop:
checkANS := func() bool {
var chainAnsLength uint64
for p := range precedings {
- // TODO(mission): ANS should be bound by current numChains.
chainAnsLength = to.candidates[p].getAckingNodeSetLength(
globalInfo, cfg.k, cfg.numChains)
if uint64(chainAnsLength) < uint64(cfg.numChains)-cfg.phi {
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/nodeset.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/nodeset.go
index da615e1f0..21a1e528e 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/nodeset.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/nodeset.go
@@ -41,6 +41,7 @@ type subSetTargetType byte
const (
targetNotarySet subSetTargetType = iota
targetDKGSet
+ targetNodeLeader
)
type nodeRank struct {
@@ -72,6 +73,17 @@ func NewNodeSet() *NodeSet {
}
}
+// NewNodeSetFromMap creates a new NodeSet from NodeID map.
+func NewNodeSetFromMap(nodes map[NodeID]struct{}) *NodeSet {
+ nIDs := make(map[NodeID]struct{}, len(nodes))
+ for nID := range nodes {
+ nIDs[nID] = struct{}{}
+ }
+ return &NodeSet{
+ IDs: nIDs,
+ }
+}
+
// NewNotarySetTarget is the target for getting Notary Set.
func NewNotarySetTarget(crs common.Hash, chainID uint32) *SubSetTarget {
binaryChainID := make([]byte, 4)
@@ -80,11 +92,23 @@ func NewNotarySetTarget(crs common.Hash, chainID uint32) *SubSetTarget {
return newTarget(targetNotarySet, crs[:], binaryChainID)
}
-// NewDKGSetTarget is the target for getting DKG Set.
+// NewDKGSetTarget is the target for getting DKG Set.
func NewDKGSetTarget(crs common.Hash) *SubSetTarget {
return newTarget(targetDKGSet, crs[:])
}
+// NewNodeLeaderTarget is the target for getting leader of fast BA.
+func NewNodeLeaderTarget(crs common.Hash, pos Position) *SubSetTarget {
+ binaryRoundID := make([]byte, 8)
+ binary.LittleEndian.PutUint64(binaryRoundID, pos.Round)
+ binaryChainID := make([]byte, 4)
+ binary.LittleEndian.PutUint32(binaryChainID, pos.ChainID)
+ binaryHeight := make([]byte, 8)
+ binary.LittleEndian.PutUint64(binaryHeight, pos.Height)
+ return newTarget(targetNodeLeader, crs[:],
+ binaryRoundID, binaryChainID, binaryHeight)
+}
+
// Add a NodeID to the set.
func (ns *NodeSet) Add(ID NodeID) {
ns.IDs[ID] = struct{}{}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/vote.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/vote.go
index 7601542ae..97044f5aa 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/vote.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/vote.go
@@ -32,6 +32,7 @@ const (
VoteInit VoteType = iota
VotePreCom
VoteCom
+ VoteFast
// Do not add any type below MaxVoteType.
MaxVoteType
)
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go
index 0c2d15588..3b1069eb8 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go
@@ -159,8 +159,10 @@ func VerifyAgreementResult(
if len(res.Votes) < len(notarySet)/3*2+1 {
return ErrNotEnoughVotes
}
- if len(res.Votes) > len(notarySet) {
- return ErrIncorrectVoteProposer
+ voted := make(map[types.NodeID]struct{}, len(notarySet))
+ voteType := res.Votes[0].Type
+ if voteType != types.VoteFast && voteType != types.VoteCom {
+ return ErrIncorrectVoteType
}
for _, vote := range res.Votes {
if res.IsEmptyBlock {
@@ -172,7 +174,7 @@ func VerifyAgreementResult(
return ErrIncorrectVoteBlockHash
}
}
- if vote.Type != types.VoteCom {
+ if vote.Type != voteType {
return ErrIncorrectVoteType
}
if vote.Position != res.Position {
@@ -188,6 +190,10 @@ func VerifyAgreementResult(
if !ok {
return ErrIncorrectVoteSignature
}
+ voted[vote.ProposerID] = struct{}{}
+ }
+ if len(voted) < len(notarySet)/3*2+1 {
+ return ErrNotEnoughVotes
}
return nil
}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go
index 6d4f7b0ba..8a07c9d2b 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go
@@ -38,9 +38,11 @@ var (
)
type sets struct {
- nodeSet *types.NodeSet
- notarySet []map[types.NodeID]struct{}
- dkgSet map[types.NodeID]struct{}
+ crs common.Hash
+ nodeSet *types.NodeSet
+ notarySet []map[types.NodeID]struct{}
+ dkgSet map[types.NodeID]struct{}
+ leaderNode []map[uint64]types.NodeID
}
// NodeSetCacheInterface interface specifies interface used by NodeSetCache.
@@ -146,6 +148,33 @@ func (cache *NodeSetCache) GetDKGSet(
return cache.cloneMap(IDs.dkgSet), nil
}
+// GetLeaderNode returns the BA leader of the position.
+func (cache *NodeSetCache) GetLeaderNode(pos types.Position) (
+ types.NodeID, error) {
+ IDs, err := cache.getOrUpdate(pos.Round)
+ if err != nil {
+ return types.NodeID{}, err
+ }
+ if pos.ChainID >= uint32(len(IDs.leaderNode)) {
+ return types.NodeID{}, ErrInvalidChainID
+ }
+ cache.lock.Lock()
+ defer cache.lock.Unlock()
+ if _, exist := IDs.leaderNode[pos.ChainID][pos.Height]; !exist {
+ notarySet := types.NewNodeSetFromMap(IDs.notarySet[pos.ChainID])
+ leader :=
+ notarySet.GetSubSet(1, types.NewNodeLeaderTarget(IDs.crs, pos))
+ if len(leader) != 1 {
+ panic(errors.New("length of leader is not one"))
+ }
+ for nID := range leader {
+ IDs.leaderNode[pos.ChainID][pos.Height] = nID
+ break
+ }
+ }
+ return IDs.leaderNode[pos.ChainID][pos.Height], nil
+}
+
func (cache *NodeSetCache) cloneMap(
nIDs map[types.NodeID]struct{}) map[types.NodeID]struct{} {
nIDsCopy := make(map[types.NodeID]struct{}, len(nIDs))
@@ -207,15 +236,21 @@ func (cache *NodeSetCache) update(
return
}
nIDs = &sets{
+ crs: crs,
nodeSet: nodeSet,
notarySet: make([]map[types.NodeID]struct{}, cfg.NumChains),
dkgSet: nodeSet.GetSubSet(
int(cfg.DKGSetSize), types.NewDKGSetTarget(crs)),
+ leaderNode: make([]map[uint64]types.NodeID, cfg.NumChains),
}
for i := range nIDs.notarySet {
nIDs.notarySet[i] = nodeSet.GetSubSet(
int(cfg.NotarySetSize), types.NewNotarySetTarget(crs, uint32(i)))
}
+ nodesPerChain := cfg.RoundInterval / cfg.MinBlockInterval
+ for i := range nIDs.leaderNode {
+ nIDs.leaderNode[i] = make(map[uint64]types.NodeID, nodesPerChain)
+ }
cache.rounds[round] = nIDs
// Purge older rounds.
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go
index 8c9f77a69..8486d2854 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go
@@ -18,6 +18,7 @@
package utils
import (
+ "context"
"fmt"
"github.com/dexon-foundation/dexon-consensus/common"
@@ -92,3 +93,36 @@ func VerifyDKGComplaint(
}
return ok, nil
}
+
+// LaunchDummyReceiver launches a go routine to receive from the receive
+// channel of a network module. An context is required to stop the go routine
+// automatically. An optinal message handler could be provided.
+func LaunchDummyReceiver(
+ ctx context.Context, recv <-chan interface{}, handler func(interface{})) (
+ context.CancelFunc, <-chan struct{}) {
+ var (
+ dummyCtx, dummyCancel = context.WithCancel(ctx)
+ finishedChan = make(chan struct{}, 1)
+ )
+ go func() {
+ defer func() {
+ finishedChan <- struct{}{}
+ }()
+ loop:
+ for {
+ select {
+ case <-dummyCtx.Done():
+ break loop
+ case v, ok := <-recv:
+ if !ok {
+ panic(fmt.Errorf(
+ "receive channel is closed before dummy receiver"))
+ }
+ if handler != nil {
+ handler(v)
+ }
+ }
+ }
+ }()
+ return dummyCancel, finishedChan
+}
diff --git a/vendor/vendor.json b/vendor/vendor.json
index f2da6e117..6d367a737 100644
--- a/vendor/vendor.json
+++ b/vendor/vendor.json
@@ -141,16 +141,16 @@
{
"checksumSHA1": "ZUuiRqS6PnoNIvBmLStVQiyhkOM=",
"path": "github.com/dexon-foundation/dexon-consensus/common",
- "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903",
- "revisionTime": "2019-01-05T09:58:34Z",
+ "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc",
+ "revisionTime": "2019-01-16T02:21:07Z",
"version": "master",
"versionExact": "master"
},
{
- "checksumSHA1": "aqhVp5CBDq52ytHUH3HatpWhTDQ=",
+ "checksumSHA1": "TKAPWiezlt9t0oZca1Cq9S388lI=",
"path": "github.com/dexon-foundation/dexon-consensus/core",
- "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903",
- "revisionTime": "2019-01-05T09:58:34Z",
+ "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc",
+ "revisionTime": "2019-01-16T02:21:07Z",
"version": "master",
"versionExact": "master"
},
@@ -165,64 +165,64 @@
{
"checksumSHA1": "tQSbYCu5P00lUhKsx3IbBZCuSLY=",
"path": "github.com/dexon-foundation/dexon-consensus/core/crypto",
- "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903",
- "revisionTime": "2019-01-05T09:58:34Z",
+ "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc",
+ "revisionTime": "2019-01-16T02:21:07Z",
"version": "master",
"versionExact": "master"
},
{
"checksumSHA1": "W2P7pkuJ+26BpJg03K4Y0nB5obI=",
"path": "github.com/dexon-foundation/dexon-consensus/core/crypto/dkg",
- "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903",
- "revisionTime": "2019-01-05T09:58:34Z",
+ "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc",
+ "revisionTime": "2019-01-16T02:21:07Z",
"version": "master",
"versionExact": "master"
},
{
"checksumSHA1": "6Pf6caC8LTNCI7IflFmglKYnxYo=",
"path": "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa",
- "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903",
- "revisionTime": "2019-01-05T09:58:34Z",
+ "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc",
+ "revisionTime": "2019-01-16T02:21:07Z",
"version": "master",
"versionExact": "master"
},
{
- "checksumSHA1": "PJXR1OuWwVVYrdJMK3skPr1/8ls=",
+ "checksumSHA1": "zpuCdMT8MGsy4pLgHKpg/Wd4izU=",
"path": "github.com/dexon-foundation/dexon-consensus/core/db",
- "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903",
- "revisionTime": "2019-01-05T09:58:34Z",
+ "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc",
+ "revisionTime": "2019-01-16T02:21:07Z",
"version": "master",
"versionExact": "master"
},
{
- "checksumSHA1": "h674l/hugVujbZUy/NSeDmio3/U=",
+ "checksumSHA1": "eq19vhMpc90UUJ7I91ti5P2CkQ0=",
"path": "github.com/dexon-foundation/dexon-consensus/core/syncer",
- "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903",
- "revisionTime": "2019-01-05T09:58:34Z",
+ "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc",
+ "revisionTime": "2019-01-16T02:21:07Z",
"version": "master",
"versionExact": "master"
},
{
- "checksumSHA1": "GRiBmU5T1LAoGHs5g1owGE1tNNo=",
+ "checksumSHA1": "zkrt3MOtqHPB3BmZtppZ9uesw3s=",
"path": "github.com/dexon-foundation/dexon-consensus/core/types",
- "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903",
- "revisionTime": "2019-01-05T09:58:34Z",
+ "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc",
+ "revisionTime": "2019-01-16T02:21:07Z",
"version": "master",
"versionExact": "master"
},
{
"checksumSHA1": "rmv8uxwrqMhJAeA3RPvwYP8mFro=",
"path": "github.com/dexon-foundation/dexon-consensus/core/types/dkg",
- "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903",
- "revisionTime": "2019-01-05T09:58:34Z",
+ "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc",
+ "revisionTime": "2019-01-16T02:21:07Z",
"version": "master",
"versionExact": "master"
},
{
- "checksumSHA1": "NCAEGRVPfM0jCKdrBN2yvEXkeIo=",
+ "checksumSHA1": "FUHa68Hif8F8YHmx4h0sQIUNp40=",
"path": "github.com/dexon-foundation/dexon-consensus/core/utils",
- "revision": "af8c182a07f9bf3a7a17c938c87f4eef489bb903",
- "revisionTime": "2019-01-05T09:58:34Z",
+ "revision": "f4f572e4638309e523bbbde751547a9ea4d489bc",
+ "revisionTime": "2019-01-16T02:21:07Z",
"version": "master",
"versionExact": "master"
},