aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go')
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go159
1 files changed, 122 insertions, 37 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go
index 16f36bccd..d4f1bbd0c 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go
@@ -38,9 +38,11 @@ func init() {
// Errors for agreement module.
var (
- ErrInvalidVote = fmt.Errorf("invalid vote")
- ErrNotInNotarySet = fmt.Errorf("not in notary set")
- ErrIncorrectVoteSignature = fmt.Errorf("incorrect vote signature")
+ ErrInvalidVote = fmt.Errorf("invalid vote")
+ ErrNotInNotarySet = fmt.Errorf("not in notary set")
+ ErrIncorrectVoteSignature = fmt.Errorf("incorrect vote signature")
+ ErrIncorrectVotePartialSignature = fmt.Errorf("incorrect vote psig")
+ ErrMismatchBlockPosition = fmt.Errorf("mismatch block position")
)
// ErrFork for fork error in agreement.
@@ -83,6 +85,7 @@ type agreementReceiver interface {
PullBlocks(common.Hashes)
ReportForkVote(v1, v2 *types.Vote)
ReportForkBlock(b1, b2 *types.Block)
+ VerifyPartialSignature(vote *types.Vote) bool
}
type pendingBlock struct {
@@ -114,20 +117,21 @@ type agreementData struct {
// agreement is the agreement protocal describe in the Crypto Shuffle Algorithm.
type agreement struct {
- state agreementState
- data *agreementData
- aID *atomic.Value
- doneChan chan struct{}
- notarySet map[types.NodeID]struct{}
- hasVoteFast bool
- hasOutput bool
- lock sync.RWMutex
- pendingBlock []pendingBlock
- pendingVote []pendingVote
- candidateBlock map[common.Hash]*types.Block
- fastForward chan uint64
- signer *utils.Signer
- logger common.Logger
+ state agreementState
+ data *agreementData
+ aID *atomic.Value
+ doneChan chan struct{}
+ notarySet map[types.NodeID]struct{}
+ hasVoteFast bool
+ hasOutput bool
+ lock sync.RWMutex
+ pendingBlock []pendingBlock
+ pendingVote []pendingVote
+ pendingAgreementResult map[types.Position]*types.AgreementResult
+ candidateBlock map[common.Hash]*types.Block
+ fastForward chan uint64
+ signer *utils.Signer
+ logger common.Logger
}
// newAgreement creates a agreement instance.
@@ -143,11 +147,12 @@ func newAgreement(
ID: ID,
leader: leader,
},
- aID: &atomic.Value{},
- candidateBlock: make(map[common.Hash]*types.Block),
- fastForward: make(chan uint64, 1),
- signer: signer,
- logger: logger,
+ aID: &atomic.Value{},
+ pendingAgreementResult: make(map[types.Position]*types.AgreementResult),
+ candidateBlock: make(map[common.Hash]*types.Block),
+ fastForward: make(chan uint64, 1),
+ signer: signer,
+ logger: logger,
}
agreement.stop()
return agreement
@@ -176,7 +181,7 @@ func (a *agreement) restart(
a.data.blocks = make(map[types.NodeID]*types.Block)
a.data.requiredVote = len(notarySet)*2/3 + 1
a.data.leader.restart(crs)
- a.data.lockValue = types.NullBlockHash
+ a.data.lockValue = types.SkipBlockHash
a.data.lockIter = 0
a.data.isLeader = a.data.ID == leader
if a.doneChan != nil {
@@ -202,6 +207,22 @@ func (a *agreement) restart(
return
}
+ var result *types.AgreementResult
+ func() {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ newPendingAgreementResult := make(
+ map[types.Position]*types.AgreementResult)
+ for pos, agr := range a.pendingAgreementResult {
+ if pos.Newer(aID) {
+ newPendingAgreementResult[pos] = agr
+ } else if pos == aID {
+ result = agr
+ }
+ }
+ a.pendingAgreementResult = newPendingAgreementResult
+ }()
+
expireTime := time.Now().Add(-10 * time.Second)
replayBlock := make([]*types.Block, 0)
func() {
@@ -212,7 +233,11 @@ func (a *agreement) restart(
if aID.Newer(pending.block.Position) {
continue
} else if pending.block.Position == aID {
- replayBlock = append(replayBlock, pending.block)
+ if result == nil ||
+ result.Position.Round < DKGDelayRound ||
+ result.BlockHash == pending.block.Hash {
+ replayBlock = append(replayBlock, pending.block)
+ }
} else if pending.receivedTime.After(expireTime) {
newPendingBlock = append(newPendingBlock, pending)
}
@@ -229,7 +254,9 @@ func (a *agreement) restart(
if aID.Newer(pending.vote.Position) {
continue
} else if pending.vote.Position == aID {
- replayVote = append(replayVote, pending.vote)
+ if result == nil || result.Position.Round < DKGDelayRound {
+ replayVote = append(replayVote, pending.vote)
+ }
} else if pending.receivedTime.After(expireTime) {
newPendingVote = append(newPendingVote, pending)
}
@@ -244,6 +271,13 @@ func (a *agreement) restart(
}
}
+ if result != nil {
+ if err := a.processAgreementResult(result); err != nil {
+ a.logger.Error("Failed to process agreement result when retarting",
+ "result", result)
+ }
+ }
+
for _, vote := range replayVote {
if err := a.processVote(vote); err != nil {
a.logger.Error("Failed to process vote when restarting agreement",
@@ -332,6 +366,9 @@ func (a *agreement) sanityCheck(vote *types.Vote) error {
if !ok {
return ErrIncorrectVoteSignature
}
+ if !a.data.recv.VerifyPartialSignature(vote) {
+ return ErrIncorrectVotePartialSignature
+ }
return nil
}
@@ -424,11 +461,17 @@ func (a *agreement) processVote(vote *types.Vote) error {
hash != types.SkipBlockHash {
if vote.Type == types.VoteFast {
if !a.hasVoteFast {
- a.data.recv.ProposeVote(
- types.NewVote(types.VoteFastCom, hash, vote.Period))
- a.data.lockValue = hash
- a.data.lockIter = 1
- a.hasVoteFast = true
+ if a.state.state() == stateFast ||
+ a.state.state() == stateFastVote {
+ a.data.recv.ProposeVote(
+ types.NewVote(types.VoteFastCom, hash, vote.Period))
+ a.hasVoteFast = true
+
+ }
+ if a.data.lockIter == 0 {
+ a.data.lockValue = hash
+ a.data.lockIter = 1
+ }
}
} else {
a.hasOutput = true
@@ -457,18 +500,12 @@ func (a *agreement) processVote(vote *types.Vote) error {
if hash, ok := a.data.countVoteNoLock(vote.Period, vote.Type); ok &&
hash != types.SkipBlockHash {
// Condition 1.
- if a.data.period >= vote.Period && vote.Period > a.data.lockIter &&
- vote.BlockHash != a.data.lockValue {
+ if vote.Period > a.data.lockIter {
a.data.lockValue = hash
a.data.lockIter = vote.Period
- return nil
}
// Condition 2.
if vote.Period > a.data.period {
- if vote.Period > a.data.lockIter {
- a.data.lockValue = hash
- a.data.lockIter = vote.Period
- }
a.fastForward <- vote.Period
if a.doneChan != nil {
close(a.doneChan)
@@ -508,6 +545,54 @@ func (a *agreement) processVote(vote *types.Vote) error {
return nil
}
+func (a *agreement) processFinalizedBlock(block *types.Block) {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ if a.hasOutput {
+ return
+ }
+ aID := a.agreementID()
+ if aID.Older(block.Position) {
+ return
+ }
+ a.addCandidateBlockNoLock(block)
+ a.hasOutput = true
+ a.data.lock.Lock()
+ defer a.data.lock.Unlock()
+ a.data.recv.ConfirmBlock(block.Hash, nil)
+ if a.doneChan != nil {
+ close(a.doneChan)
+ a.doneChan = nil
+ }
+}
+
+func (a *agreement) processAgreementResult(result *types.AgreementResult) error {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ aID := a.agreementID()
+ if result.Position.Older(aID) {
+ return nil
+ } else if result.Position.Newer(aID) {
+ a.pendingAgreementResult[result.Position] = result
+ return nil
+ }
+ if a.hasOutput {
+ return nil
+ }
+ a.data.lock.Lock()
+ defer a.data.lock.Unlock()
+ if _, exist := a.findCandidateBlockNoLock(result.BlockHash); !exist {
+ a.data.recv.PullBlocks(common.Hashes{result.BlockHash})
+ }
+ a.hasOutput = true
+ a.data.recv.ConfirmBlock(result.BlockHash, nil)
+ if a.doneChan != nil {
+ close(a.doneChan)
+ a.doneChan = nil
+ }
+ return nil
+}
+
func (a *agreement) done() <-chan struct{} {
a.lock.Lock()
defer a.lock.Unlock()