aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/agreement-state.go40
-rw-r--r--core/agreement-state_test.go36
-rw-r--r--core/agreement.go169
-rw-r--r--core/agreement_test.go162
-rw-r--r--core/consensus.go247
-rw-r--r--core/leader-selector.go5
-rw-r--r--core/reliable-broadcast.go22
-rw-r--r--core/reliable-broadcast_test.go8
-rw-r--r--core/types/block.go3
-rw-r--r--core/types/vote.go3
-rw-r--r--crypto/utils.go6
-rw-r--r--integration_test/validator.go3
-rw-r--r--simulation/app.go2
-rw-r--r--simulation/tcp-network.go14
-rw-r--r--simulation/verification.go2
15 files changed, 616 insertions, 106 deletions
diff --git a/core/agreement-state.go b/core/agreement-state.go
index 892d7c3..33f86ce 100644
--- a/core/agreement-state.go
+++ b/core/agreement-state.go
@@ -19,7 +19,7 @@ package core
import (
"fmt"
- "sync/atomic"
+ "sync"
"github.com/dexon-foundation/dexon-consensus-core/common"
"github.com/dexon-foundation/dexon-consensus-core/core/types"
@@ -124,15 +124,13 @@ func (s *ackState) receiveVote() error { return nil }
// ----- ConfirmState -----
type confirmState struct {
a *agreementData
- voted *atomic.Value
+ lock sync.Mutex
+ voted bool
}
func newConfirmState(a *agreementData) *confirmState {
- voted := &atomic.Value{}
- voted.Store(false)
return &confirmState{
- a: a,
- voted: voted,
+ a: a,
}
}
@@ -143,7 +141,9 @@ func (s *confirmState) nextState() (agreementState, error) {
return newPass1State(s.a), nil
}
func (s *confirmState) receiveVote() error {
- if s.voted.Load().(bool) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ if s.voted {
return nil
}
hash, ok := s.a.countVote(s.a.period, types.VoteAck)
@@ -156,8 +156,8 @@ func (s *confirmState) receiveVote() error {
BlockHash: hash,
Period: s.a.period,
})
+ s.voted = true
}
- s.voted.Store(true)
return nil
}
@@ -175,10 +175,12 @@ func (s *pass1State) clocks() int { return 0 }
func (s *pass1State) terminate() {}
func (s *pass1State) nextState() (agreementState, error) {
voteDefault := false
- s.a.votesLock.RLock()
- defer s.a.votesLock.RUnlock()
- if vote, exist :=
- s.a.votes[s.a.period][types.VoteConfirm][s.a.ID]; exist {
+ if vote, exist := func() (*types.Vote, bool) {
+ s.a.votesLock.RLock()
+ defer s.a.votesLock.RUnlock()
+ v, e := s.a.votes[s.a.period][types.VoteConfirm][s.a.ID]
+ return v, e
+ }(); exist {
s.a.recv.proposeVote(&types.Vote{
Type: types.VotePass,
BlockHash: vote.BlockHash,
@@ -216,17 +218,15 @@ func (s *pass1State) receiveVote() error { return nil }
// ----- Pass2State -----
type pass2State struct {
a *agreementData
- voted *atomic.Value
+ lock sync.Mutex
+ voted bool
enoughPassVote chan common.Hash
terminateChan chan struct{}
}
func newPass2State(a *agreementData) *pass2State {
- voted := &atomic.Value{}
- voted.Store(false)
return &pass2State{
a: a,
- voted: voted,
enoughPassVote: make(chan common.Hash, 1),
terminateChan: make(chan struct{}),
}
@@ -254,7 +254,9 @@ func (s *pass2State) nextState() (agreementState, error) {
return newPrepareState(s.a), nil
}
func (s *pass2State) receiveVote() error {
- if s.voted.Load().(bool) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ if s.voted {
return nil
}
ackHash, ok := s.a.countVote(s.a.period, types.VoteAck)
@@ -264,7 +266,7 @@ func (s *pass2State) receiveVote() error {
BlockHash: ackHash,
Period: s.a.period,
})
- s.voted.Store(true)
+ s.voted = true
} else if s.a.period > 1 {
if _, exist :=
s.a.votes[s.a.period][types.VoteConfirm][s.a.ID]; !exist {
@@ -275,7 +277,7 @@ func (s *pass2State) receiveVote() error {
BlockHash: hash,
Period: s.a.period,
})
- s.voted.Store(true)
+ s.voted = true
}
}
}
diff --git a/core/agreement-state_test.go b/core/agreement-state_test.go
index 9d587d1..a120965 100644
--- a/core/agreement-state_test.go
+++ b/core/agreement-state_test.go
@@ -29,7 +29,7 @@ import (
"github.com/dexon-foundation/dexon-consensus-core/crypto/eth"
)
-type AgreementTestSuite struct {
+type AgreementStateTestSuite struct {
suite.Suite
ID types.ValidatorID
prvKey map[types.ValidatorID]crypto.PrivateKey
@@ -39,23 +39,23 @@ type AgreementTestSuite struct {
block map[common.Hash]*types.Block
}
-type agreementTestReceiver struct {
- s *AgreementTestSuite
+type agreementStateTestReceiver struct {
+ s *AgreementStateTestSuite
}
-func (r *agreementTestReceiver) proposeVote(vote *types.Vote) {
+func (r *agreementStateTestReceiver) proposeVote(vote *types.Vote) {
r.s.voteChan <- vote
}
-func (r *agreementTestReceiver) proposeBlock(block common.Hash) {
+func (r *agreementStateTestReceiver) proposeBlock(block common.Hash) {
r.s.blockChan <- block
}
-func (r *agreementTestReceiver) confirmBlock(block common.Hash) {
+func (r *agreementStateTestReceiver) confirmBlock(block common.Hash) {
r.s.confirmChan <- block
}
-func (s *AgreementTestSuite) proposeBlock(
+func (s *AgreementStateTestSuite) proposeBlock(
leader *leaderSelector) *types.Block {
block := &types.Block{
ProposerID: s.ID,
@@ -66,7 +66,7 @@ func (s *AgreementTestSuite) proposeBlock(
return block
}
-func (s *AgreementTestSuite) prepareVote(
+func (s *AgreementStateTestSuite) prepareVote(
vID types.ValidatorID, voteType types.VoteType, blockHash common.Hash,
period uint64) (
vote *types.Vote) {
@@ -84,7 +84,7 @@ func (s *AgreementTestSuite) prepareVote(
return
}
-func (s *AgreementTestSuite) SetupTest() {
+func (s *AgreementStateTestSuite) SetupTest() {
prvKey, err := eth.NewPrivateKey()
s.Require().Nil(err)
s.ID = types.NewValidatorID(prvKey.PublicKey())
@@ -97,7 +97,7 @@ func (s *AgreementTestSuite) SetupTest() {
s.block = make(map[common.Hash]*types.Block)
}
-func (s *AgreementTestSuite) newAgreement(numValidator int) *agreement {
+func (s *AgreementStateTestSuite) newAgreement(numValidator int) *agreement {
leader := newGenesisLeaderSelector("I ❤️ DEXON", eth.SigToPub)
blockProposer := func() *types.Block {
return s.proposeBlock(leader)
@@ -113,7 +113,7 @@ func (s *AgreementTestSuite) newAgreement(numValidator int) *agreement {
validators = append(validators, s.ID)
agreement := newAgreement(
s.ID,
- &agreementTestReceiver{s},
+ &agreementStateTestReceiver{s},
validators,
leader,
eth.SigToPub,
@@ -122,7 +122,7 @@ func (s *AgreementTestSuite) newAgreement(numValidator int) *agreement {
return agreement
}
-func (s *AgreementTestSuite) TestPrepareState() {
+func (s *AgreementStateTestSuite) TestPrepareState() {
a := s.newAgreement(4)
state := newPrepareState(a.data)
s.Equal(statePrepare, state.state())
@@ -178,7 +178,7 @@ func (s *AgreementTestSuite) TestPrepareState() {
s.Equal(stateAck, newState.state())
}
-func (s *AgreementTestSuite) TestAckState() {
+func (s *AgreementStateTestSuite) TestAckState() {
a := s.newAgreement(4)
state := newAckState(a.data)
s.Equal(stateAck, state.state())
@@ -236,7 +236,7 @@ func (s *AgreementTestSuite) TestAckState() {
s.Equal(stateConfirm, newState.state())
}
-func (s *AgreementTestSuite) TestConfirmState() {
+func (s *AgreementStateTestSuite) TestConfirmState() {
a := s.newAgreement(4)
state := newConfirmState(a.data)
s.Equal(stateConfirm, state.state())
@@ -282,7 +282,7 @@ func (s *AgreementTestSuite) TestConfirmState() {
s.Equal(statePass1, newState.state())
}
-func (s *AgreementTestSuite) TestPass1State() {
+func (s *AgreementStateTestSuite) TestPass1State() {
a := s.newAgreement(4)
state := newPass1State(a.data)
s.Equal(statePass1, state.state())
@@ -368,7 +368,7 @@ func (s *AgreementTestSuite) TestPass1State() {
s.Equal(statePass2, newState.state())
}
-func (s *AgreementTestSuite) TestPass2State() {
+func (s *AgreementStateTestSuite) TestPass2State() {
a := s.newAgreement(4)
state := newPass2State(a.data)
s.Equal(statePass2, state.state())
@@ -446,6 +446,6 @@ func (s *AgreementTestSuite) TestPass2State() {
}
}
-func TestAgreement(t *testing.T) {
- suite.Run(t, new(AgreementTestSuite))
+func TestAgreementState(t *testing.T) {
+ suite.Run(t, new(AgreementStateTestSuite))
}
diff --git a/core/agreement.go b/core/agreement.go
index 299971e..8fb2207 100644
--- a/core/agreement.go
+++ b/core/agreement.go
@@ -21,6 +21,7 @@ import (
"fmt"
"sync"
"sync/atomic"
+ "time"
"github.com/dexon-foundation/dexon-consensus-core/common"
"github.com/dexon-foundation/dexon-consensus-core/core/types"
@@ -62,6 +63,16 @@ type agreementReceiver interface {
confirmBlock(common.Hash)
}
+type pendingBlock struct {
+ block *types.Block
+ receivedTime time.Time
+}
+
+type pendingVote struct {
+ vote *types.Vote
+ receivedTime time.Time
+}
+
// agreementData is the data for agreementState.
type agreementData struct {
recv agreementReceiver
@@ -74,17 +85,22 @@ type agreementData struct {
votes map[uint64][]map[types.ValidatorID]*types.Vote
votesLock sync.RWMutex
blocks map[types.ValidatorID]*types.Block
+ blocksLock sync.Mutex
blockProposer blockProposerFn
}
// agreement is the agreement protocal describe in the Crypto Shuffle Algorithm.
type agreement struct {
- state agreementState
- data *agreementData
- aID *atomic.Value
- validators map[types.ValidatorID]struct{}
- sigToPub SigToPubFn
- hasOutput bool
+ state agreementState
+ data *agreementData
+ aID *atomic.Value
+ validators map[types.ValidatorID]struct{}
+ sigToPub SigToPubFn
+ hasOutput bool
+ lock sync.RWMutex
+ pendingBlock []pendingBlock
+ pendingVote []pendingVote
+ candidateBlock map[common.Hash]*types.Block
}
// newAgreement creates a agreement instance.
@@ -102,10 +118,11 @@ func newAgreement(
leader: leader,
blockProposer: blockProposer,
},
- aID: &atomic.Value{},
- sigToPub: sigToPub,
+ aID: &atomic.Value{},
+ sigToPub: sigToPub,
+ candidateBlock: make(map[common.Hash]*types.Block),
}
- agreement.restart(validators)
+ agreement.restart(validators, types.Position{})
return agreement
}
@@ -117,19 +134,67 @@ func (a *agreement) terminate() {
}
// restart the agreement
-func (a *agreement) restart(validators types.ValidatorIDs) {
- a.data.votesLock.Lock()
- defer a.data.votesLock.Unlock()
- a.data.votes = make(map[uint64][]map[types.ValidatorID]*types.Vote)
- a.data.votes[1] = newVoteListMap()
- a.data.period = 1
- a.data.blocks = make(map[types.ValidatorID]*types.Block)
- a.data.requiredVote = len(validators)/3*2 + 1
- a.hasOutput = false
- a.state = newPrepareState(a.data)
- a.validators = make(map[types.ValidatorID]struct{})
- for _, v := range validators {
- a.validators[v] = struct{}{}
+func (a *agreement) restart(validators types.ValidatorIDs, aID types.Position) {
+ func() {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ a.data.votesLock.Lock()
+ defer a.data.votesLock.Unlock()
+ a.data.blocksLock.Lock()
+ defer a.data.blocksLock.Unlock()
+ a.data.votes = make(map[uint64][]map[types.ValidatorID]*types.Vote)
+ a.data.votes[1] = newVoteListMap()
+ a.data.period = 1
+ a.data.blocks = make(map[types.ValidatorID]*types.Block)
+ a.data.requiredVote = len(validators)/3*2 + 1
+ a.data.leader.restart()
+ a.hasOutput = false
+ a.state = newPrepareState(a.data)
+ a.validators = make(map[types.ValidatorID]struct{})
+ for _, v := range validators {
+ a.validators[v] = struct{}{}
+ }
+ a.candidateBlock = make(map[common.Hash]*types.Block)
+ a.aID.Store(aID)
+ }()
+
+ expireTime := time.Now().Add(-10 * time.Second)
+ replayBlock := make([]*types.Block, 0)
+ func() {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ newPendingBlock := make([]pendingBlock, 0)
+ for _, pending := range a.pendingBlock {
+ if pending.block.Position == aID {
+ replayBlock = append(replayBlock, pending.block)
+ } else if pending.receivedTime.After(expireTime) {
+ newPendingBlock = append(newPendingBlock, pending)
+ }
+ }
+ a.pendingBlock = newPendingBlock
+ }()
+
+ replayVote := make([]*types.Vote, 0)
+ func() {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ newPendingVote := make([]pendingVote, 0)
+ for _, pending := range a.pendingVote {
+ if pending.vote.Position == aID {
+ replayVote = append(replayVote, pending.vote)
+ } else if pending.receivedTime.After(expireTime) {
+ newPendingVote = append(newPendingVote, pending)
+ }
+ }
+ a.pendingVote = newPendingVote
+ }()
+
+ for _, block := range replayBlock {
+ a.processBlock(block)
+ }
+
+ for _, vote := range replayVote {
+ a.processVote(vote)
}
}
@@ -143,11 +208,6 @@ func (a *agreement) agreementID() types.Position {
return a.aID.Load().(types.Position)
}
-// setAgreementID sets the current agreementID.
-func (a *agreement) setAgreementID(ID types.Position) {
- a.aID.Store(ID)
-}
-
// nextState is called at the spcifi clock time.
func (a *agreement) nextState() (err error) {
a.state, err = a.state.nextState()
@@ -155,7 +215,12 @@ func (a *agreement) nextState() (err error) {
}
func (a *agreement) sanityCheck(vote *types.Vote) error {
- if _, exist := a.validators[vote.ProposerID]; !exist {
+ if exist := func() bool {
+ a.lock.RLock()
+ defer a.lock.RUnlock()
+ _, exist := a.validators[vote.ProposerID]
+ return exist
+ }(); !exist {
return ErrNotValidator
}
ok, err := verifyVoteSignature(vote, a.sigToPub)
@@ -165,7 +230,12 @@ func (a *agreement) sanityCheck(vote *types.Vote) error {
if !ok {
return ErrIncorrectVoteSignature
}
- if _, exist := a.data.votes[vote.Period]; exist {
+ if exist := func() bool {
+ a.data.votesLock.RLock()
+ defer a.data.votesLock.RUnlock()
+ _, exist := a.data.votes[vote.Period]
+ return exist
+ }(); exist {
if oldVote, exist :=
a.data.votes[vote.Period][vote.Type][vote.ProposerID]; exist {
if vote.BlockHash != oldVote.BlockHash {
@@ -179,6 +249,8 @@ func (a *agreement) sanityCheck(vote *types.Vote) error {
// prepareVote prepares a vote.
func (a *agreement) prepareVote(vote *types.Vote, prv crypto.PrivateKey) (
err error) {
+ vote.ProposerID = a.data.ID
+ vote.Position = a.agreementID()
hash := hashVote(vote)
vote.Signature, err = prv.Sign(hash)
return
@@ -186,10 +258,16 @@ func (a *agreement) prepareVote(vote *types.Vote, prv crypto.PrivateKey) (
// processVote is the entry point for processing Vote.
func (a *agreement) processVote(vote *types.Vote) error {
- vote = vote.Clone()
if err := a.sanityCheck(vote); err != nil {
return err
}
+ if vote.Position != a.agreementID() {
+ a.pendingVote = append(a.pendingVote, pendingVote{
+ vote: vote,
+ receivedTime: time.Now().UTC(),
+ })
+ return nil
+ }
if func() bool {
a.data.votesLock.Lock()
defer a.data.votesLock.Unlock()
@@ -211,21 +289,50 @@ func (a *agreement) processVote(vote *types.Vote) error {
return nil
}
+// prepareBlok prepares a block.
+func (a *agreement) prepareBlock(
+ block *types.Block, prv crypto.PrivateKey) error {
+ return a.data.leader.prepareBlock(block, prv)
+}
+
// processBlock is the entry point for processing Block.
func (a *agreement) processBlock(block *types.Block) error {
+ a.data.blocksLock.Lock()
+ defer a.data.blocksLock.Unlock()
if b, exist := a.data.blocks[block.ProposerID]; exist {
if b.Hash != block.Hash {
return &ErrFork{block.ProposerID, b.Hash, block.Hash}
}
return nil
}
- a.data.blocks[block.ProposerID] = block
+ if block.Position != a.agreementID() {
+ a.pendingBlock = append(a.pendingBlock, pendingBlock{
+ block: block,
+ receivedTime: time.Now().UTC(),
+ })
+ return nil
+ }
if err := a.data.leader.processBlock(block); err != nil {
return err
}
+ a.data.blocks[block.ProposerID] = block
+ a.addCandidateBlock(block)
return nil
}
+func (a *agreement) addCandidateBlock(block *types.Block) {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ a.candidateBlock[block.Hash] = block
+}
+
+func (a *agreement) findCandidateBlock(hash common.Hash) (*types.Block, bool) {
+ a.lock.RLock()
+ defer a.lock.RUnlock()
+ b, e := a.candidateBlock[hash]
+ return b, e
+}
+
func (a *agreementData) countVote(period uint64, voteType types.VoteType) (
blockHash common.Hash, ok bool) {
a.votesLock.RLock()
diff --git a/core/agreement_test.go b/core/agreement_test.go
new file mode 100644
index 0000000..aafd1ed
--- /dev/null
+++ b/core/agreement_test.go
@@ -0,0 +1,162 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "testing"
+
+ "github.com/dexon-foundation/dexon-consensus-core/common"
+ "github.com/dexon-foundation/dexon-consensus-core/core/types"
+ "github.com/dexon-foundation/dexon-consensus-core/crypto"
+ "github.com/dexon-foundation/dexon-consensus-core/crypto/eth"
+ "github.com/stretchr/testify/suite"
+)
+
+// agreementTestReceiver implements core.agreementReceiveer
+type agreementTestReceiver struct {
+ s *AgreementTestSuite
+}
+
+func (r *agreementTestReceiver) proposeVote(vote *types.Vote) {
+ r.s.voteChan <- vote
+}
+
+func (r *agreementTestReceiver) proposeBlock(block common.Hash) {
+ r.s.blockChan <- block
+}
+
+func (r *agreementTestReceiver) confirmBlock(block common.Hash) {
+ r.s.confirmChan <- block
+}
+
+func (s *AgreementTestSuite) proposeBlock(
+ agreementIdx int) *types.Block {
+ block := &types.Block{
+ ProposerID: s.ID,
+ Hash: common.NewRandomHash(),
+ }
+ s.block[block.Hash] = block
+ s.Require().Nil(s.agreement[agreementIdx].prepareBlock(block, s.prvKey[s.ID]))
+ return block
+}
+
+type AgreementTestSuite struct {
+ suite.Suite
+ ID types.ValidatorID
+ prvKey map[types.ValidatorID]crypto.PrivateKey
+ voteChan chan *types.Vote
+ blockChan chan common.Hash
+ confirmChan chan common.Hash
+ block map[common.Hash]*types.Block
+ agreement []*agreement
+}
+
+func (s *AgreementTestSuite) SetupTest() {
+ prvKey, err := eth.NewPrivateKey()
+ s.Require().Nil(err)
+ s.ID = types.NewValidatorID(prvKey.PublicKey())
+ s.prvKey = map[types.ValidatorID]crypto.PrivateKey{
+ s.ID: prvKey,
+ }
+ s.voteChan = make(chan *types.Vote, 100)
+ s.blockChan = make(chan common.Hash, 100)
+ s.confirmChan = make(chan common.Hash, 100)
+ s.block = make(map[common.Hash]*types.Block)
+}
+
+func (s *AgreementTestSuite) newAgreement(numValidator int) *agreement {
+ leader := newGenesisLeaderSelector("🖖👽", eth.SigToPub)
+ agreementIdx := len(s.agreement)
+ blockProposer := func() *types.Block {
+ return s.proposeBlock(agreementIdx)
+ }
+
+ validators := make(types.ValidatorIDs, numValidator-1)
+ for i := range validators {
+ prvKey, err := eth.NewPrivateKey()
+ s.Require().Nil(err)
+ validators[i] = types.NewValidatorID(prvKey.PublicKey())
+ s.prvKey[validators[i]] = prvKey
+ }
+ validators = append(validators, s.ID)
+ agreement := newAgreement(
+ s.ID,
+ &agreementTestReceiver{s},
+ validators,
+ leader,
+ eth.SigToPub,
+ blockProposer,
+ )
+ s.agreement = append(s.agreement, agreement)
+ return agreement
+}
+
+func (s *AgreementTestSuite) prepareVote(vote *types.Vote) {
+ prvKey, exist := s.prvKey[vote.ProposerID]
+ s.Require().True(exist)
+ hash := hashVote(vote)
+ var err error
+ vote.Signature, err = prvKey.Sign(hash)
+ s.Require().NoError(err)
+}
+
+func (s *AgreementTestSuite) copyVote(
+ vote *types.Vote, proposer types.ValidatorID) *types.Vote {
+ v := vote.Clone()
+ v.ProposerID = proposer
+ s.prepareVote(v)
+ return v
+}
+
+func (s *AgreementTestSuite) TestSimpleConfirm() {
+ a := s.newAgreement(4)
+ // PrepareState
+ a.nextState()
+ // AckState
+ s.Require().Len(s.blockChan, 1)
+ blockHash := <-s.blockChan
+ block, exist := s.block[blockHash]
+ s.Require().True(exist)
+ s.Require().NoError(a.processBlock(block))
+ a.nextState()
+ // ConfirmState
+ s.Require().Len(s.voteChan, 1)
+ vote := <-s.voteChan
+ s.Equal(types.VoteAck, vote.Type)
+ for vID := range s.prvKey {
+ v := s.copyVote(vote, vID)
+ s.Require().NoError(a.processVote(v))
+ }
+ a.nextState()
+ // Pass1State
+ s.Require().Len(s.voteChan, 1)
+ vote = <-s.voteChan
+ s.Equal(types.VoteConfirm, vote.Type)
+ for vID := range s.prvKey {
+ v := s.copyVote(vote, vID)
+ s.Require().NoError(a.processVote(v))
+ }
+ // We have enough of Confirm-Votes.
+ s.Require().Len(s.confirmChan, 1)
+ confirmBlock := <-s.confirmChan
+ s.Equal(blockHash, confirmBlock)
+}
+
+func TestAgreement(t *testing.T) {
+ suite.Run(t, new(AgreementTestSuite))
+}
diff --git a/core/consensus.go b/core/consensus.go
index 2618b54..5b85fcb 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -18,7 +18,9 @@
package core
import (
+ "context"
"fmt"
+ "log"
"sort"
"sync"
"time"
@@ -53,24 +55,79 @@ var (
"signature of block is incorrect")
ErrGenesisBlockNotEmpty = fmt.Errorf(
"genesis block should be empty")
+ ErrUnknownBlockProposed = fmt.Errorf(
+ "unknown block is proposed")
+ ErrUnknownBlockConfirmed = fmt.Errorf(
+ "unknown block is confirmed")
+ ErrIncorrectBlockPosition = fmt.Errorf(
+ "position of block is incorrect")
)
+// consensusReceiver implements agreementReceiver.
+type consensusReceiver struct {
+ consensus *Consensus
+ chainID uint32
+ restart chan struct{}
+}
+
+func (recv *consensusReceiver) proposeVote(vote *types.Vote) {
+ // TODO(jimmy-dexon): move prepareVote() into agreement.
+ if err := recv.consensus.prepareVote(recv.chainID, vote); err != nil {
+ fmt.Println(err)
+ return
+ }
+ go func() {
+ if err := recv.consensus.ProcessVote(vote); err != nil {
+ fmt.Println(err)
+ return
+ }
+ recv.consensus.network.BroadcastVote(vote)
+ }()
+}
+func (recv *consensusReceiver) proposeBlock(hash common.Hash) {
+ block, exist := recv.consensus.baModules[recv.chainID].findCandidateBlock(hash)
+ if !exist {
+ fmt.Println(ErrUnknownBlockProposed)
+ fmt.Println(hash)
+ return
+ }
+ if err := recv.consensus.PreProcessBlock(block); err != nil {
+ fmt.Println(err)
+ return
+ }
+ recv.consensus.network.BroadcastBlock(block)
+}
+func (recv *consensusReceiver) confirmBlock(hash common.Hash) {
+ block, exist := recv.consensus.baModules[recv.chainID].findCandidateBlock(hash)
+ if !exist {
+ fmt.Println(ErrUnknownBlockConfirmed, hash)
+ return
+ }
+ recv.restart <- struct{}{}
+ if err := recv.consensus.ProcessBlock(block); err != nil {
+ fmt.Println(err)
+ return
+ }
+}
+
// Consensus implements DEXON Consensus algorithm.
type Consensus struct {
- ID types.ValidatorID
- app Application
- gov Governance
- rbModule *reliableBroadcast
- toModule *totalOrdering
- ctModule *consensusTimestamp
- ccModule *compactionChain
- db blockdb.BlockDatabase
- network Network
- tick *time.Ticker
- prvKey crypto.PrivateKey
- sigToPub SigToPubFn
- lock sync.RWMutex
- stopChan chan struct{}
+ ID types.ValidatorID
+ app Application
+ gov Governance
+ baModules []*agreement
+ receivers []*consensusReceiver
+ rbModule *reliableBroadcast
+ toModule *totalOrdering
+ ctModule *consensusTimestamp
+ ccModule *compactionChain
+ db blockdb.BlockDatabase
+ network Network
+ tick *time.Ticker
+ prvKey crypto.PrivateKey
+ sigToPub SigToPubFn
+ lock sync.RWMutex
+ stopChan chan struct{}
}
// NewConsensus construct an Consensus instance.
@@ -86,7 +143,7 @@ func NewConsensus(
// Setup acking by information returned from Governace.
rb := newReliableBroadcast()
- rb.setChainNum(len(validatorSet))
+ rb.setChainNum(gov.GetChainNumber())
for vID := range validatorSet {
rb.addValidator(vID)
}
@@ -101,7 +158,7 @@ func NewConsensus(
uint64(float32(len(validatorSet)-1)*gov.GetPhiRatio()+1),
validators)
- return &Consensus{
+ con := &Consensus{
ID: types.NewValidatorID(prv.PublicKey()),
rbModule: rb,
toModule: to,
@@ -116,11 +173,104 @@ func NewConsensus(
sigToPub: sigToPub,
stopChan: make(chan struct{}),
}
+
+ con.baModules = make([]*agreement, con.gov.GetChainNumber())
+ con.receivers = make([]*consensusReceiver, con.gov.GetChainNumber())
+ for i := uint32(0); i < con.gov.GetChainNumber(); i++ {
+ chainID := i
+ con.receivers[chainID] = &consensusReceiver{
+ consensus: con,
+ chainID: chainID,
+ restart: make(chan struct{}, 1),
+ }
+ blockProposer := func() *types.Block {
+ block := con.proposeBlock(chainID)
+ con.baModules[chainID].addCandidateBlock(block)
+ return block
+ }
+ con.baModules[chainID] = newAgreement(
+ con.ID,
+ con.receivers[chainID],
+ validators,
+ newGenesisLeaderSelector(con.gov.GetGenesisCRS(), con.sigToPub),
+ con.sigToPub,
+ blockProposer,
+ )
+ }
+
+ return con
}
-// Run starts running Consensus core.
+// Run starts running DEXON Consensus.
func (con *Consensus) Run() {
- go con.processMsg(con.network.ReceiveChan())
+ ctx, cancel := context.WithCancel(context.Background())
+ ticks := make([]chan struct{}, 0, con.gov.GetChainNumber())
+ for i := uint32(0); i < con.gov.GetChainNumber(); i++ {
+ tick := make(chan struct{})
+ ticks = append(ticks, tick)
+ go con.runBA(ctx, i, tick)
+ }
+ go func() {
+ <-con.stopChan
+ cancel()
+ }()
+ go con.processMsg(con.network.ReceiveChan(), con.PreProcessBlock)
+ // Reset ticker.
+ <-con.tick.C
+ <-con.tick.C
+ for {
+ <-con.tick.C
+ for _, tick := range ticks {
+ go func(tick chan struct{}) { tick <- struct{}{} }(tick)
+ }
+ }
+}
+
+func (con *Consensus) runBA(
+ ctx context.Context, chainID uint32, tick <-chan struct{}) {
+ // TODO(jimmy-dexon): move this function inside agreement.
+ validatorSet := con.gov.GetValidatorSet()
+ validators := make(types.ValidatorIDs, 0, len(validatorSet))
+ for vID := range validatorSet {
+ validators = append(validators, vID)
+ }
+ agreement := con.baModules[chainID]
+ recv := con.receivers[chainID]
+ recv.restart <- struct{}{}
+ // Reset ticker
+ <-tick
+BALoop:
+ for {
+ select {
+ case <-ctx.Done():
+ break BALoop
+ default:
+ }
+ for i := 0; i < agreement.clocks(); i++ {
+ <-tick
+ }
+ select {
+ case <-recv.restart:
+ // TODO(jimmy-dexon): handling change of validator set.
+ aID := types.Position{
+ ShardID: 0,
+ ChainID: chainID,
+ Height: con.rbModule.nextHeight(chainID),
+ }
+ agreement.restart(validators, aID)
+ default:
+ }
+ err := agreement.nextState()
+ if err != nil {
+ log.Printf("[%s] %s\n", con.ID.String(), err)
+ break BALoop
+ }
+ }
+}
+
+// RunLegacy starts running Legacy DEXON Consensus.
+func (con *Consensus) RunLegacy() {
+ go con.processMsg(con.network.ReceiveChan(), con.ProcessBlock)
chainID := uint32(0)
hashes := make(common.Hashes, 0, len(con.gov.GetValidatorSet()))
@@ -134,6 +284,7 @@ func (con *Consensus) Run() {
break
}
}
+ con.rbModule.setChainNum(uint32(len(hashes)))
genesisBlock := &types.Block{
ProposerID: con.ID,
@@ -178,7 +329,9 @@ func (con *Consensus) Stop() {
con.stopChan <- struct{}{}
}
-func (con *Consensus) processMsg(msgChan <-chan interface{}) {
+func (con *Consensus) processMsg(
+ msgChan <-chan interface{},
+ blockProcesser func(*types.Block) error) {
for {
var msg interface{}
select {
@@ -189,10 +342,10 @@ func (con *Consensus) processMsg(msgChan <-chan interface{}) {
switch val := msg.(type) {
case *types.Block:
- if err := con.ProcessBlock(val); err != nil {
+ if err := blockProcesser(val); err != nil {
fmt.Println(err)
}
- types.RecycleBlock(val)
+ //types.RecycleBlock(val)
case *types.NotaryAck:
if err := con.ProcessNotaryAck(val); err != nil {
fmt.Println(err)
@@ -205,13 +358,43 @@ func (con *Consensus) processMsg(msgChan <-chan interface{}) {
}
}
+func (con *Consensus) proposeBlock(chainID uint32) *types.Block {
+ block := &types.Block{
+ ProposerID: con.ID,
+ Position: types.Position{
+ ChainID: chainID,
+ Height: con.rbModule.nextHeight(chainID),
+ },
+ }
+ if err := con.PrepareBlock(block, time.Now().UTC()); err != nil {
+ fmt.Println(err)
+ return nil
+ }
+ if err := con.baModules[chainID].prepareBlock(block, con.prvKey); err != nil {
+ fmt.Println(err)
+ return nil
+ }
+ return block
+}
+
// ProcessVote is the entry point to submit ont vote to a Consensus instance.
func (con *Consensus) ProcessVote(vote *types.Vote) (err error) {
- return
+ v := vote.Clone()
+ err = con.baModules[v.Position.ChainID].processVote(v)
+ return err
+}
+
+// prepareVote prepares a vote.
+func (con *Consensus) prepareVote(chainID uint32, vote *types.Vote) error {
+ return con.baModules[chainID].prepareVote(vote, con.prvKey)
}
// sanityCheck checks if the block is a valid block
func (con *Consensus) sanityCheck(b *types.Block) (err error) {
+ // Check block.Position.
+ if b.Position.ShardID != 0 || b.Position.ChainID >= con.rbModule.chainNum() {
+ return ErrIncorrectBlockPosition
+ }
// Check the hash of block.
hash, err := hashBlock(b)
if err != nil || hash != b.Hash {
@@ -229,19 +412,29 @@ func (con *Consensus) sanityCheck(b *types.Block) (err error) {
return nil
}
-// ProcessBlock is the entry point to submit one block to a Consensus instance.
-func (con *Consensus) ProcessBlock(b *types.Block) (err error) {
- // TODO(jimmy-dexon): BlockConverter.Block() is called twice in this method.
+// PreProcessBlock performs Byzantine Agreement on the block.
+func (con *Consensus) PreProcessBlock(b *types.Block) (err error) {
if err := con.sanityCheck(b); err != nil {
return err
}
+ if err := con.baModules[b.Position.ChainID].processBlock(b); err != nil {
+ return err
+ }
+ return
+}
+
+// ProcessBlock is the entry point to submit one block to a Consensus instance.
+func (con *Consensus) ProcessBlock(block *types.Block) (err error) {
+ if err := con.sanityCheck(block); err != nil {
+ return err
+ }
var (
deliveredBlocks []*types.Block
earlyDelivered bool
)
// To avoid application layer modify the content of block during
// processing, we should always operate based on the cloned one.
- b = b.Clone()
+ b := block.Clone()
con.lock.Lock()
defer con.lock.Unlock()
@@ -249,7 +442,7 @@ func (con *Consensus) ProcessBlock(b *types.Block) (err error) {
if err = con.rbModule.processBlock(b); err != nil {
return err
}
- con.app.BlockConfirmed(b.Clone())
+ con.app.BlockConfirmed(block)
for _, b := range con.rbModule.extractBlocks() {
// Notify application layer that some block is strongly acked.
con.app.StronglyAcked(b.Hash)
diff --git a/core/leader-selector.go b/core/leader-selector.go
index 4295050..d96de4e 100644
--- a/core/leader-selector.go
+++ b/core/leader-selector.go
@@ -90,6 +90,11 @@ func (l *leaderSelector) probability(sig crypto.Signature) float64 {
return p
}
+func (l *leaderSelector) restart() {
+ l.minCRSBlock = maxHash
+ l.minBlockHash = common.Hash{}
+}
+
func (l *leaderSelector) leaderBlockHash() common.Hash {
return l.minBlockHash
}
diff --git a/core/reliable-broadcast.go b/core/reliable-broadcast.go
index 6d5d15b..7cca4f1 100644
--- a/core/reliable-broadcast.go
+++ b/core/reliable-broadcast.go
@@ -65,6 +65,9 @@ type rbcValidatorStatus struct {
// nextOutput is the next output height of block, default to 0.
nextOutput uint64
+
+ // nextHeight is the next height of block to be prepared.
+ nextHeight uint64
}
type rbcBlockInfo struct {
@@ -195,6 +198,9 @@ func (rb *reliableBroadcast) processBlock(block *types.Block) (err error) {
ackedChain: make(map[uint32]struct{}),
}
rb.receivedBlocks[block.Hash] = block
+ if rb.lattice[block.Position.ChainID].nextHeight <= block.Position.Height {
+ rb.lattice[block.Position.ChainID].nextHeight = block.Position.Height + 1
+ }
// Check blocks in receivedBlocks if its acks are all in lattice. If a block's
// acking blocks are all in lattice, execute sanity check and add the block
@@ -417,7 +423,9 @@ func (rb *reliableBroadcast) prepareBlock(block *types.Block) {
accumulateTimestamps(times, curBlock)
if uint32(chainID) == block.Position.ChainID {
block.ParentHash = curBlock.Hash
- block.Position.Height = curBlock.Position.Height + 1
+ if block.Position.Height == 0 {
+ block.Position.Height = curBlock.Position.Height + 1
+ }
}
}
block.Timestamps = times
@@ -436,13 +444,23 @@ func (rb *reliableBroadcast) deleteValidator(h types.ValidatorID) {
}
// setChainNum set the number of chains.
-func (rb *reliableBroadcast) setChainNum(num int) {
+func (rb *reliableBroadcast) setChainNum(num uint32) {
rb.lattice = make([]*rbcValidatorStatus, num)
for i := range rb.lattice {
rb.lattice[i] = &rbcValidatorStatus{
blocks: make(map[uint64]*types.Block),
nextAck: make([]uint64, num),
nextOutput: 0,
+ nextHeight: 0,
}
}
}
+
+func (rb *reliableBroadcast) chainNum() uint32 {
+ return uint32(len(rb.lattice))
+}
+
+// nextHeight returns the next height for the chain.
+func (rb *reliableBroadcast) nextHeight(chainID uint32) uint64 {
+ return rb.lattice[chainID].nextHeight
+}
diff --git a/core/reliable-broadcast_test.go b/core/reliable-broadcast_test.go
index 206b2fa..2c71bcb 100644
--- a/core/reliable-broadcast_test.go
+++ b/core/reliable-broadcast_test.go
@@ -102,7 +102,7 @@ func genTestCase1(s *ReliableBroadcastTest, rb *reliableBroadcast) []types.Valid
rb.addValidator(vid)
vids = append(vids, vid)
}
- rb.setChainNum(len(vids))
+ rb.setChainNum(uint32(len(vids)))
// Add genesis blocks.
for _, vid := range vids {
b = s.prepareGenesisBlock(vid, vids)
@@ -529,7 +529,7 @@ func (s *ReliableBroadcastTest) TestRandomIntensiveAcking() {
for _, vid := range vids {
rb.addValidator(vid)
}
- rb.setChainNum(len(vids))
+ rb.setChainNum(uint32(len(vids)))
// Generate genesis blocks.
for _, vid := range vids {
b := s.prepareGenesisBlock(vid, vids)
@@ -602,7 +602,7 @@ func (s *ReliableBroadcastTest) TestRandomlyGeneratedBlocks() {
for i := 0; i < repeat; i++ {
validators := map[types.ValidatorID]struct{}{}
rb := newReliableBroadcast()
- rb.setChainNum(validatorCount)
+ rb.setChainNum(uint32(validatorCount))
stronglyAckedHashes := common.Hashes{}
revealer.Reset()
@@ -657,7 +657,7 @@ func (s *ReliableBroadcastTest) TestPrepareBlock() {
for _, vID := range validators {
rb.addValidator(vID)
}
- rb.setChainNum(len(validators))
+ rb.setChainNum(uint32(len(validators)))
// Setup genesis blocks.
b00 := s.prepareGenesisBlock(validators[0], validators)
time.Sleep(minInterval)
diff --git a/core/types/block.go b/core/types/block.go
index ab2ef84..f02b1a0 100644
--- a/core/types/block.go
+++ b/core/types/block.go
@@ -77,7 +77,8 @@ type Block struct {
}
func (b *Block) String() string {
- return fmt.Sprintf("Block(%v)", b.Hash.String()[:6])
+ return fmt.Sprintf("Block(%v:%d:%d)", b.Hash.String()[:6],
+ b.Position.ChainID, b.Position.Height)
}
// Clone returns a deep copy of a block.
diff --git a/core/types/vote.go b/core/types/vote.go
index 158c209..bae8f7d 100644
--- a/core/types/vote.go
+++ b/core/types/vote.go
@@ -47,7 +47,8 @@ type Vote struct {
}
func (v *Vote) String() string {
- return fmt.Sprintf("Vote(%d:%d):%s",
+ return fmt.Sprintf("Vote[%s:%d:%d](%d:%d):%s",
+ v.ProposerID.String()[:6], v.Position.ChainID, v.Position.Height,
v.Period, v.Type, v.BlockHash.String()[:6])
}
diff --git a/crypto/utils.go b/crypto/utils.go
index dfb4987..07a8b2b 100644
--- a/crypto/utils.go
+++ b/crypto/utils.go
@@ -18,6 +18,8 @@
package crypto
import (
+ "encoding/hex"
+
"github.com/ethereum/go-ethereum/crypto"
"github.com/dexon-foundation/dexon-consensus-core/common"
@@ -33,3 +35,7 @@ func Keccak256Hash(data ...[]byte) (h common.Hash) {
func (sig Signature) Clone() Signature {
return append(Signature{}, sig...)
}
+
+func (sig Signature) String() string {
+ return hex.EncodeToString([]byte(sig[:]))
+}
diff --git a/integration_test/validator.go b/integration_test/validator.go
index 25cbac0..bfe517f 100644
--- a/integration_test/validator.go
+++ b/integration_test/validator.go
@@ -102,7 +102,8 @@ func NewValidator(
networkLatency: networkLatency,
proposingLatency: proposingLatency,
cons: core.NewConsensus(
- app, gov, db, &Network{}, time.NewTicker(1), privateKey, eth.SigToPub),
+ app, gov,
+ db, &Network{}, time.NewTicker(1), privateKey, eth.SigToPub),
}
}
diff --git a/simulation/app.go b/simulation/app.go
index aded2a7..4bf4aff 100644
--- a/simulation/app.go
+++ b/simulation/app.go
@@ -116,7 +116,7 @@ func (a *simApp) TotalOrderingDeliver(blockHashes common.Hashes, early bool) {
}
a.Outputs = blocks
a.Early = early
- //fmt.Println("OUTPUT", a.ValidatorID, a.Early, a.Outputs)
+ fmt.Println("OUTPUT", a.ValidatorID, a.Early, a.Outputs)
confirmLatency := []time.Duration{}
diff --git a/simulation/tcp-network.go b/simulation/tcp-network.go
index 31c8d95..468baff 100644
--- a/simulation/tcp-network.go
+++ b/simulation/tcp-network.go
@@ -168,6 +168,20 @@ func (n *TCPNetwork) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
n.recieveChan <- block
+ case "vote":
+ vote := &types.Vote{}
+ if err := json.Unmarshal(m.Payload, vote); err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ n.recieveChan <- vote
+ case "notaryAck":
+ ack := &types.NotaryAck{}
+ if err := json.Unmarshal(m.Payload, ack); err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ n.recieveChan <- ack
default:
w.WriteHeader(http.StatusBadRequest)
return
diff --git a/simulation/verification.go b/simulation/verification.go
index ad2c911..2e90b8b 100644
--- a/simulation/verification.go
+++ b/simulation/verification.go
@@ -202,7 +202,7 @@ func VerifyTotalOrder(id types.ValidatorID,
if hasError {
log.Printf("[%d] Hash is %v from %v\n", i, hash, id)
} else {
- //log.Printf("Block %v confirmed\n", hash)
+ log.Printf("Block %v confirmed\n", hash)
}
}