diff options
-rw-r--r-- | core/agreement-state.go | 40 | ||||
-rw-r--r-- | core/agreement-state_test.go | 36 | ||||
-rw-r--r-- | core/agreement.go | 169 | ||||
-rw-r--r-- | core/agreement_test.go | 162 | ||||
-rw-r--r-- | core/consensus.go | 247 | ||||
-rw-r--r-- | core/leader-selector.go | 5 | ||||
-rw-r--r-- | core/reliable-broadcast.go | 22 | ||||
-rw-r--r-- | core/reliable-broadcast_test.go | 8 | ||||
-rw-r--r-- | core/types/block.go | 3 | ||||
-rw-r--r-- | core/types/vote.go | 3 | ||||
-rw-r--r-- | crypto/utils.go | 6 | ||||
-rw-r--r-- | integration_test/validator.go | 3 | ||||
-rw-r--r-- | simulation/app.go | 2 | ||||
-rw-r--r-- | simulation/tcp-network.go | 14 | ||||
-rw-r--r-- | simulation/verification.go | 2 |
15 files changed, 616 insertions, 106 deletions
diff --git a/core/agreement-state.go b/core/agreement-state.go index 892d7c3..33f86ce 100644 --- a/core/agreement-state.go +++ b/core/agreement-state.go @@ -19,7 +19,7 @@ package core import ( "fmt" - "sync/atomic" + "sync" "github.com/dexon-foundation/dexon-consensus-core/common" "github.com/dexon-foundation/dexon-consensus-core/core/types" @@ -124,15 +124,13 @@ func (s *ackState) receiveVote() error { return nil } // ----- ConfirmState ----- type confirmState struct { a *agreementData - voted *atomic.Value + lock sync.Mutex + voted bool } func newConfirmState(a *agreementData) *confirmState { - voted := &atomic.Value{} - voted.Store(false) return &confirmState{ - a: a, - voted: voted, + a: a, } } @@ -143,7 +141,9 @@ func (s *confirmState) nextState() (agreementState, error) { return newPass1State(s.a), nil } func (s *confirmState) receiveVote() error { - if s.voted.Load().(bool) { + s.lock.Lock() + defer s.lock.Unlock() + if s.voted { return nil } hash, ok := s.a.countVote(s.a.period, types.VoteAck) @@ -156,8 +156,8 @@ func (s *confirmState) receiveVote() error { BlockHash: hash, Period: s.a.period, }) + s.voted = true } - s.voted.Store(true) return nil } @@ -175,10 +175,12 @@ func (s *pass1State) clocks() int { return 0 } func (s *pass1State) terminate() {} func (s *pass1State) nextState() (agreementState, error) { voteDefault := false - s.a.votesLock.RLock() - defer s.a.votesLock.RUnlock() - if vote, exist := - s.a.votes[s.a.period][types.VoteConfirm][s.a.ID]; exist { + if vote, exist := func() (*types.Vote, bool) { + s.a.votesLock.RLock() + defer s.a.votesLock.RUnlock() + v, e := s.a.votes[s.a.period][types.VoteConfirm][s.a.ID] + return v, e + }(); exist { s.a.recv.proposeVote(&types.Vote{ Type: types.VotePass, BlockHash: vote.BlockHash, @@ -216,17 +218,15 @@ func (s *pass1State) receiveVote() error { return nil } // ----- Pass2State ----- type pass2State struct { a *agreementData - voted *atomic.Value + lock sync.Mutex + voted bool enoughPassVote chan common.Hash terminateChan chan struct{} } func newPass2State(a *agreementData) *pass2State { - voted := &atomic.Value{} - voted.Store(false) return &pass2State{ a: a, - voted: voted, enoughPassVote: make(chan common.Hash, 1), terminateChan: make(chan struct{}), } @@ -254,7 +254,9 @@ func (s *pass2State) nextState() (agreementState, error) { return newPrepareState(s.a), nil } func (s *pass2State) receiveVote() error { - if s.voted.Load().(bool) { + s.lock.Lock() + defer s.lock.Unlock() + if s.voted { return nil } ackHash, ok := s.a.countVote(s.a.period, types.VoteAck) @@ -264,7 +266,7 @@ func (s *pass2State) receiveVote() error { BlockHash: ackHash, Period: s.a.period, }) - s.voted.Store(true) + s.voted = true } else if s.a.period > 1 { if _, exist := s.a.votes[s.a.period][types.VoteConfirm][s.a.ID]; !exist { @@ -275,7 +277,7 @@ func (s *pass2State) receiveVote() error { BlockHash: hash, Period: s.a.period, }) - s.voted.Store(true) + s.voted = true } } } diff --git a/core/agreement-state_test.go b/core/agreement-state_test.go index 9d587d1..a120965 100644 --- a/core/agreement-state_test.go +++ b/core/agreement-state_test.go @@ -29,7 +29,7 @@ import ( "github.com/dexon-foundation/dexon-consensus-core/crypto/eth" ) -type AgreementTestSuite struct { +type AgreementStateTestSuite struct { suite.Suite ID types.ValidatorID prvKey map[types.ValidatorID]crypto.PrivateKey @@ -39,23 +39,23 @@ type AgreementTestSuite struct { block map[common.Hash]*types.Block } -type agreementTestReceiver struct { - s *AgreementTestSuite +type agreementStateTestReceiver struct { + s *AgreementStateTestSuite } -func (r *agreementTestReceiver) proposeVote(vote *types.Vote) { +func (r *agreementStateTestReceiver) proposeVote(vote *types.Vote) { r.s.voteChan <- vote } -func (r *agreementTestReceiver) proposeBlock(block common.Hash) { +func (r *agreementStateTestReceiver) proposeBlock(block common.Hash) { r.s.blockChan <- block } -func (r *agreementTestReceiver) confirmBlock(block common.Hash) { +func (r *agreementStateTestReceiver) confirmBlock(block common.Hash) { r.s.confirmChan <- block } -func (s *AgreementTestSuite) proposeBlock( +func (s *AgreementStateTestSuite) proposeBlock( leader *leaderSelector) *types.Block { block := &types.Block{ ProposerID: s.ID, @@ -66,7 +66,7 @@ func (s *AgreementTestSuite) proposeBlock( return block } -func (s *AgreementTestSuite) prepareVote( +func (s *AgreementStateTestSuite) prepareVote( vID types.ValidatorID, voteType types.VoteType, blockHash common.Hash, period uint64) ( vote *types.Vote) { @@ -84,7 +84,7 @@ func (s *AgreementTestSuite) prepareVote( return } -func (s *AgreementTestSuite) SetupTest() { +func (s *AgreementStateTestSuite) SetupTest() { prvKey, err := eth.NewPrivateKey() s.Require().Nil(err) s.ID = types.NewValidatorID(prvKey.PublicKey()) @@ -97,7 +97,7 @@ func (s *AgreementTestSuite) SetupTest() { s.block = make(map[common.Hash]*types.Block) } -func (s *AgreementTestSuite) newAgreement(numValidator int) *agreement { +func (s *AgreementStateTestSuite) newAgreement(numValidator int) *agreement { leader := newGenesisLeaderSelector("I ❤️ DEXON", eth.SigToPub) blockProposer := func() *types.Block { return s.proposeBlock(leader) @@ -113,7 +113,7 @@ func (s *AgreementTestSuite) newAgreement(numValidator int) *agreement { validators = append(validators, s.ID) agreement := newAgreement( s.ID, - &agreementTestReceiver{s}, + &agreementStateTestReceiver{s}, validators, leader, eth.SigToPub, @@ -122,7 +122,7 @@ func (s *AgreementTestSuite) newAgreement(numValidator int) *agreement { return agreement } -func (s *AgreementTestSuite) TestPrepareState() { +func (s *AgreementStateTestSuite) TestPrepareState() { a := s.newAgreement(4) state := newPrepareState(a.data) s.Equal(statePrepare, state.state()) @@ -178,7 +178,7 @@ func (s *AgreementTestSuite) TestPrepareState() { s.Equal(stateAck, newState.state()) } -func (s *AgreementTestSuite) TestAckState() { +func (s *AgreementStateTestSuite) TestAckState() { a := s.newAgreement(4) state := newAckState(a.data) s.Equal(stateAck, state.state()) @@ -236,7 +236,7 @@ func (s *AgreementTestSuite) TestAckState() { s.Equal(stateConfirm, newState.state()) } -func (s *AgreementTestSuite) TestConfirmState() { +func (s *AgreementStateTestSuite) TestConfirmState() { a := s.newAgreement(4) state := newConfirmState(a.data) s.Equal(stateConfirm, state.state()) @@ -282,7 +282,7 @@ func (s *AgreementTestSuite) TestConfirmState() { s.Equal(statePass1, newState.state()) } -func (s *AgreementTestSuite) TestPass1State() { +func (s *AgreementStateTestSuite) TestPass1State() { a := s.newAgreement(4) state := newPass1State(a.data) s.Equal(statePass1, state.state()) @@ -368,7 +368,7 @@ func (s *AgreementTestSuite) TestPass1State() { s.Equal(statePass2, newState.state()) } -func (s *AgreementTestSuite) TestPass2State() { +func (s *AgreementStateTestSuite) TestPass2State() { a := s.newAgreement(4) state := newPass2State(a.data) s.Equal(statePass2, state.state()) @@ -446,6 +446,6 @@ func (s *AgreementTestSuite) TestPass2State() { } } -func TestAgreement(t *testing.T) { - suite.Run(t, new(AgreementTestSuite)) +func TestAgreementState(t *testing.T) { + suite.Run(t, new(AgreementStateTestSuite)) } diff --git a/core/agreement.go b/core/agreement.go index 299971e..8fb2207 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -21,6 +21,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" "github.com/dexon-foundation/dexon-consensus-core/common" "github.com/dexon-foundation/dexon-consensus-core/core/types" @@ -62,6 +63,16 @@ type agreementReceiver interface { confirmBlock(common.Hash) } +type pendingBlock struct { + block *types.Block + receivedTime time.Time +} + +type pendingVote struct { + vote *types.Vote + receivedTime time.Time +} + // agreementData is the data for agreementState. type agreementData struct { recv agreementReceiver @@ -74,17 +85,22 @@ type agreementData struct { votes map[uint64][]map[types.ValidatorID]*types.Vote votesLock sync.RWMutex blocks map[types.ValidatorID]*types.Block + blocksLock sync.Mutex blockProposer blockProposerFn } // agreement is the agreement protocal describe in the Crypto Shuffle Algorithm. type agreement struct { - state agreementState - data *agreementData - aID *atomic.Value - validators map[types.ValidatorID]struct{} - sigToPub SigToPubFn - hasOutput bool + state agreementState + data *agreementData + aID *atomic.Value + validators map[types.ValidatorID]struct{} + sigToPub SigToPubFn + hasOutput bool + lock sync.RWMutex + pendingBlock []pendingBlock + pendingVote []pendingVote + candidateBlock map[common.Hash]*types.Block } // newAgreement creates a agreement instance. @@ -102,10 +118,11 @@ func newAgreement( leader: leader, blockProposer: blockProposer, }, - aID: &atomic.Value{}, - sigToPub: sigToPub, + aID: &atomic.Value{}, + sigToPub: sigToPub, + candidateBlock: make(map[common.Hash]*types.Block), } - agreement.restart(validators) + agreement.restart(validators, types.Position{}) return agreement } @@ -117,19 +134,67 @@ func (a *agreement) terminate() { } // restart the agreement -func (a *agreement) restart(validators types.ValidatorIDs) { - a.data.votesLock.Lock() - defer a.data.votesLock.Unlock() - a.data.votes = make(map[uint64][]map[types.ValidatorID]*types.Vote) - a.data.votes[1] = newVoteListMap() - a.data.period = 1 - a.data.blocks = make(map[types.ValidatorID]*types.Block) - a.data.requiredVote = len(validators)/3*2 + 1 - a.hasOutput = false - a.state = newPrepareState(a.data) - a.validators = make(map[types.ValidatorID]struct{}) - for _, v := range validators { - a.validators[v] = struct{}{} +func (a *agreement) restart(validators types.ValidatorIDs, aID types.Position) { + func() { + a.lock.Lock() + defer a.lock.Unlock() + a.data.votesLock.Lock() + defer a.data.votesLock.Unlock() + a.data.blocksLock.Lock() + defer a.data.blocksLock.Unlock() + a.data.votes = make(map[uint64][]map[types.ValidatorID]*types.Vote) + a.data.votes[1] = newVoteListMap() + a.data.period = 1 + a.data.blocks = make(map[types.ValidatorID]*types.Block) + a.data.requiredVote = len(validators)/3*2 + 1 + a.data.leader.restart() + a.hasOutput = false + a.state = newPrepareState(a.data) + a.validators = make(map[types.ValidatorID]struct{}) + for _, v := range validators { + a.validators[v] = struct{}{} + } + a.candidateBlock = make(map[common.Hash]*types.Block) + a.aID.Store(aID) + }() + + expireTime := time.Now().Add(-10 * time.Second) + replayBlock := make([]*types.Block, 0) + func() { + a.lock.Lock() + defer a.lock.Unlock() + newPendingBlock := make([]pendingBlock, 0) + for _, pending := range a.pendingBlock { + if pending.block.Position == aID { + replayBlock = append(replayBlock, pending.block) + } else if pending.receivedTime.After(expireTime) { + newPendingBlock = append(newPendingBlock, pending) + } + } + a.pendingBlock = newPendingBlock + }() + + replayVote := make([]*types.Vote, 0) + func() { + a.lock.Lock() + defer a.lock.Unlock() + newPendingVote := make([]pendingVote, 0) + for _, pending := range a.pendingVote { + if pending.vote.Position == aID { + replayVote = append(replayVote, pending.vote) + } else if pending.receivedTime.After(expireTime) { + newPendingVote = append(newPendingVote, pending) + } + } + a.pendingVote = newPendingVote + }() + + for _, block := range replayBlock { + a.processBlock(block) + } + + for _, vote := range replayVote { + a.processVote(vote) } } @@ -143,11 +208,6 @@ func (a *agreement) agreementID() types.Position { return a.aID.Load().(types.Position) } -// setAgreementID sets the current agreementID. -func (a *agreement) setAgreementID(ID types.Position) { - a.aID.Store(ID) -} - // nextState is called at the spcifi clock time. func (a *agreement) nextState() (err error) { a.state, err = a.state.nextState() @@ -155,7 +215,12 @@ func (a *agreement) nextState() (err error) { } func (a *agreement) sanityCheck(vote *types.Vote) error { - if _, exist := a.validators[vote.ProposerID]; !exist { + if exist := func() bool { + a.lock.RLock() + defer a.lock.RUnlock() + _, exist := a.validators[vote.ProposerID] + return exist + }(); !exist { return ErrNotValidator } ok, err := verifyVoteSignature(vote, a.sigToPub) @@ -165,7 +230,12 @@ func (a *agreement) sanityCheck(vote *types.Vote) error { if !ok { return ErrIncorrectVoteSignature } - if _, exist := a.data.votes[vote.Period]; exist { + if exist := func() bool { + a.data.votesLock.RLock() + defer a.data.votesLock.RUnlock() + _, exist := a.data.votes[vote.Period] + return exist + }(); exist { if oldVote, exist := a.data.votes[vote.Period][vote.Type][vote.ProposerID]; exist { if vote.BlockHash != oldVote.BlockHash { @@ -179,6 +249,8 @@ func (a *agreement) sanityCheck(vote *types.Vote) error { // prepareVote prepares a vote. func (a *agreement) prepareVote(vote *types.Vote, prv crypto.PrivateKey) ( err error) { + vote.ProposerID = a.data.ID + vote.Position = a.agreementID() hash := hashVote(vote) vote.Signature, err = prv.Sign(hash) return @@ -186,10 +258,16 @@ func (a *agreement) prepareVote(vote *types.Vote, prv crypto.PrivateKey) ( // processVote is the entry point for processing Vote. func (a *agreement) processVote(vote *types.Vote) error { - vote = vote.Clone() if err := a.sanityCheck(vote); err != nil { return err } + if vote.Position != a.agreementID() { + a.pendingVote = append(a.pendingVote, pendingVote{ + vote: vote, + receivedTime: time.Now().UTC(), + }) + return nil + } if func() bool { a.data.votesLock.Lock() defer a.data.votesLock.Unlock() @@ -211,21 +289,50 @@ func (a *agreement) processVote(vote *types.Vote) error { return nil } +// prepareBlok prepares a block. +func (a *agreement) prepareBlock( + block *types.Block, prv crypto.PrivateKey) error { + return a.data.leader.prepareBlock(block, prv) +} + // processBlock is the entry point for processing Block. func (a *agreement) processBlock(block *types.Block) error { + a.data.blocksLock.Lock() + defer a.data.blocksLock.Unlock() if b, exist := a.data.blocks[block.ProposerID]; exist { if b.Hash != block.Hash { return &ErrFork{block.ProposerID, b.Hash, block.Hash} } return nil } - a.data.blocks[block.ProposerID] = block + if block.Position != a.agreementID() { + a.pendingBlock = append(a.pendingBlock, pendingBlock{ + block: block, + receivedTime: time.Now().UTC(), + }) + return nil + } if err := a.data.leader.processBlock(block); err != nil { return err } + a.data.blocks[block.ProposerID] = block + a.addCandidateBlock(block) return nil } +func (a *agreement) addCandidateBlock(block *types.Block) { + a.lock.Lock() + defer a.lock.Unlock() + a.candidateBlock[block.Hash] = block +} + +func (a *agreement) findCandidateBlock(hash common.Hash) (*types.Block, bool) { + a.lock.RLock() + defer a.lock.RUnlock() + b, e := a.candidateBlock[hash] + return b, e +} + func (a *agreementData) countVote(period uint64, voteType types.VoteType) ( blockHash common.Hash, ok bool) { a.votesLock.RLock() diff --git a/core/agreement_test.go b/core/agreement_test.go new file mode 100644 index 0000000..aafd1ed --- /dev/null +++ b/core/agreement_test.go @@ -0,0 +1,162 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package core + +import ( + "testing" + + "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/types" + "github.com/dexon-foundation/dexon-consensus-core/crypto" + "github.com/dexon-foundation/dexon-consensus-core/crypto/eth" + "github.com/stretchr/testify/suite" +) + +// agreementTestReceiver implements core.agreementReceiveer +type agreementTestReceiver struct { + s *AgreementTestSuite +} + +func (r *agreementTestReceiver) proposeVote(vote *types.Vote) { + r.s.voteChan <- vote +} + +func (r *agreementTestReceiver) proposeBlock(block common.Hash) { + r.s.blockChan <- block +} + +func (r *agreementTestReceiver) confirmBlock(block common.Hash) { + r.s.confirmChan <- block +} + +func (s *AgreementTestSuite) proposeBlock( + agreementIdx int) *types.Block { + block := &types.Block{ + ProposerID: s.ID, + Hash: common.NewRandomHash(), + } + s.block[block.Hash] = block + s.Require().Nil(s.agreement[agreementIdx].prepareBlock(block, s.prvKey[s.ID])) + return block +} + +type AgreementTestSuite struct { + suite.Suite + ID types.ValidatorID + prvKey map[types.ValidatorID]crypto.PrivateKey + voteChan chan *types.Vote + blockChan chan common.Hash + confirmChan chan common.Hash + block map[common.Hash]*types.Block + agreement []*agreement +} + +func (s *AgreementTestSuite) SetupTest() { + prvKey, err := eth.NewPrivateKey() + s.Require().Nil(err) + s.ID = types.NewValidatorID(prvKey.PublicKey()) + s.prvKey = map[types.ValidatorID]crypto.PrivateKey{ + s.ID: prvKey, + } + s.voteChan = make(chan *types.Vote, 100) + s.blockChan = make(chan common.Hash, 100) + s.confirmChan = make(chan common.Hash, 100) + s.block = make(map[common.Hash]*types.Block) +} + +func (s *AgreementTestSuite) newAgreement(numValidator int) *agreement { + leader := newGenesisLeaderSelector("🖖👽", eth.SigToPub) + agreementIdx := len(s.agreement) + blockProposer := func() *types.Block { + return s.proposeBlock(agreementIdx) + } + + validators := make(types.ValidatorIDs, numValidator-1) + for i := range validators { + prvKey, err := eth.NewPrivateKey() + s.Require().Nil(err) + validators[i] = types.NewValidatorID(prvKey.PublicKey()) + s.prvKey[validators[i]] = prvKey + } + validators = append(validators, s.ID) + agreement := newAgreement( + s.ID, + &agreementTestReceiver{s}, + validators, + leader, + eth.SigToPub, + blockProposer, + ) + s.agreement = append(s.agreement, agreement) + return agreement +} + +func (s *AgreementTestSuite) prepareVote(vote *types.Vote) { + prvKey, exist := s.prvKey[vote.ProposerID] + s.Require().True(exist) + hash := hashVote(vote) + var err error + vote.Signature, err = prvKey.Sign(hash) + s.Require().NoError(err) +} + +func (s *AgreementTestSuite) copyVote( + vote *types.Vote, proposer types.ValidatorID) *types.Vote { + v := vote.Clone() + v.ProposerID = proposer + s.prepareVote(v) + return v +} + +func (s *AgreementTestSuite) TestSimpleConfirm() { + a := s.newAgreement(4) + // PrepareState + a.nextState() + // AckState + s.Require().Len(s.blockChan, 1) + blockHash := <-s.blockChan + block, exist := s.block[blockHash] + s.Require().True(exist) + s.Require().NoError(a.processBlock(block)) + a.nextState() + // ConfirmState + s.Require().Len(s.voteChan, 1) + vote := <-s.voteChan + s.Equal(types.VoteAck, vote.Type) + for vID := range s.prvKey { + v := s.copyVote(vote, vID) + s.Require().NoError(a.processVote(v)) + } + a.nextState() + // Pass1State + s.Require().Len(s.voteChan, 1) + vote = <-s.voteChan + s.Equal(types.VoteConfirm, vote.Type) + for vID := range s.prvKey { + v := s.copyVote(vote, vID) + s.Require().NoError(a.processVote(v)) + } + // We have enough of Confirm-Votes. + s.Require().Len(s.confirmChan, 1) + confirmBlock := <-s.confirmChan + s.Equal(blockHash, confirmBlock) +} + +func TestAgreement(t *testing.T) { + suite.Run(t, new(AgreementTestSuite)) +} diff --git a/core/consensus.go b/core/consensus.go index 2618b54..5b85fcb 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -18,7 +18,9 @@ package core import ( + "context" "fmt" + "log" "sort" "sync" "time" @@ -53,24 +55,79 @@ var ( "signature of block is incorrect") ErrGenesisBlockNotEmpty = fmt.Errorf( "genesis block should be empty") + ErrUnknownBlockProposed = fmt.Errorf( + "unknown block is proposed") + ErrUnknownBlockConfirmed = fmt.Errorf( + "unknown block is confirmed") + ErrIncorrectBlockPosition = fmt.Errorf( + "position of block is incorrect") ) +// consensusReceiver implements agreementReceiver. +type consensusReceiver struct { + consensus *Consensus + chainID uint32 + restart chan struct{} +} + +func (recv *consensusReceiver) proposeVote(vote *types.Vote) { + // TODO(jimmy-dexon): move prepareVote() into agreement. + if err := recv.consensus.prepareVote(recv.chainID, vote); err != nil { + fmt.Println(err) + return + } + go func() { + if err := recv.consensus.ProcessVote(vote); err != nil { + fmt.Println(err) + return + } + recv.consensus.network.BroadcastVote(vote) + }() +} +func (recv *consensusReceiver) proposeBlock(hash common.Hash) { + block, exist := recv.consensus.baModules[recv.chainID].findCandidateBlock(hash) + if !exist { + fmt.Println(ErrUnknownBlockProposed) + fmt.Println(hash) + return + } + if err := recv.consensus.PreProcessBlock(block); err != nil { + fmt.Println(err) + return + } + recv.consensus.network.BroadcastBlock(block) +} +func (recv *consensusReceiver) confirmBlock(hash common.Hash) { + block, exist := recv.consensus.baModules[recv.chainID].findCandidateBlock(hash) + if !exist { + fmt.Println(ErrUnknownBlockConfirmed, hash) + return + } + recv.restart <- struct{}{} + if err := recv.consensus.ProcessBlock(block); err != nil { + fmt.Println(err) + return + } +} + // Consensus implements DEXON Consensus algorithm. type Consensus struct { - ID types.ValidatorID - app Application - gov Governance - rbModule *reliableBroadcast - toModule *totalOrdering - ctModule *consensusTimestamp - ccModule *compactionChain - db blockdb.BlockDatabase - network Network - tick *time.Ticker - prvKey crypto.PrivateKey - sigToPub SigToPubFn - lock sync.RWMutex - stopChan chan struct{} + ID types.ValidatorID + app Application + gov Governance + baModules []*agreement + receivers []*consensusReceiver + rbModule *reliableBroadcast + toModule *totalOrdering + ctModule *consensusTimestamp + ccModule *compactionChain + db blockdb.BlockDatabase + network Network + tick *time.Ticker + prvKey crypto.PrivateKey + sigToPub SigToPubFn + lock sync.RWMutex + stopChan chan struct{} } // NewConsensus construct an Consensus instance. @@ -86,7 +143,7 @@ func NewConsensus( // Setup acking by information returned from Governace. rb := newReliableBroadcast() - rb.setChainNum(len(validatorSet)) + rb.setChainNum(gov.GetChainNumber()) for vID := range validatorSet { rb.addValidator(vID) } @@ -101,7 +158,7 @@ func NewConsensus( uint64(float32(len(validatorSet)-1)*gov.GetPhiRatio()+1), validators) - return &Consensus{ + con := &Consensus{ ID: types.NewValidatorID(prv.PublicKey()), rbModule: rb, toModule: to, @@ -116,11 +173,104 @@ func NewConsensus( sigToPub: sigToPub, stopChan: make(chan struct{}), } + + con.baModules = make([]*agreement, con.gov.GetChainNumber()) + con.receivers = make([]*consensusReceiver, con.gov.GetChainNumber()) + for i := uint32(0); i < con.gov.GetChainNumber(); i++ { + chainID := i + con.receivers[chainID] = &consensusReceiver{ + consensus: con, + chainID: chainID, + restart: make(chan struct{}, 1), + } + blockProposer := func() *types.Block { + block := con.proposeBlock(chainID) + con.baModules[chainID].addCandidateBlock(block) + return block + } + con.baModules[chainID] = newAgreement( + con.ID, + con.receivers[chainID], + validators, + newGenesisLeaderSelector(con.gov.GetGenesisCRS(), con.sigToPub), + con.sigToPub, + blockProposer, + ) + } + + return con } -// Run starts running Consensus core. +// Run starts running DEXON Consensus. func (con *Consensus) Run() { - go con.processMsg(con.network.ReceiveChan()) + ctx, cancel := context.WithCancel(context.Background()) + ticks := make([]chan struct{}, 0, con.gov.GetChainNumber()) + for i := uint32(0); i < con.gov.GetChainNumber(); i++ { + tick := make(chan struct{}) + ticks = append(ticks, tick) + go con.runBA(ctx, i, tick) + } + go func() { + <-con.stopChan + cancel() + }() + go con.processMsg(con.network.ReceiveChan(), con.PreProcessBlock) + // Reset ticker. + <-con.tick.C + <-con.tick.C + for { + <-con.tick.C + for _, tick := range ticks { + go func(tick chan struct{}) { tick <- struct{}{} }(tick) + } + } +} + +func (con *Consensus) runBA( + ctx context.Context, chainID uint32, tick <-chan struct{}) { + // TODO(jimmy-dexon): move this function inside agreement. + validatorSet := con.gov.GetValidatorSet() + validators := make(types.ValidatorIDs, 0, len(validatorSet)) + for vID := range validatorSet { + validators = append(validators, vID) + } + agreement := con.baModules[chainID] + recv := con.receivers[chainID] + recv.restart <- struct{}{} + // Reset ticker + <-tick +BALoop: + for { + select { + case <-ctx.Done(): + break BALoop + default: + } + for i := 0; i < agreement.clocks(); i++ { + <-tick + } + select { + case <-recv.restart: + // TODO(jimmy-dexon): handling change of validator set. + aID := types.Position{ + ShardID: 0, + ChainID: chainID, + Height: con.rbModule.nextHeight(chainID), + } + agreement.restart(validators, aID) + default: + } + err := agreement.nextState() + if err != nil { + log.Printf("[%s] %s\n", con.ID.String(), err) + break BALoop + } + } +} + +// RunLegacy starts running Legacy DEXON Consensus. +func (con *Consensus) RunLegacy() { + go con.processMsg(con.network.ReceiveChan(), con.ProcessBlock) chainID := uint32(0) hashes := make(common.Hashes, 0, len(con.gov.GetValidatorSet())) @@ -134,6 +284,7 @@ func (con *Consensus) Run() { break } } + con.rbModule.setChainNum(uint32(len(hashes))) genesisBlock := &types.Block{ ProposerID: con.ID, @@ -178,7 +329,9 @@ func (con *Consensus) Stop() { con.stopChan <- struct{}{} } -func (con *Consensus) processMsg(msgChan <-chan interface{}) { +func (con *Consensus) processMsg( + msgChan <-chan interface{}, + blockProcesser func(*types.Block) error) { for { var msg interface{} select { @@ -189,10 +342,10 @@ func (con *Consensus) processMsg(msgChan <-chan interface{}) { switch val := msg.(type) { case *types.Block: - if err := con.ProcessBlock(val); err != nil { + if err := blockProcesser(val); err != nil { fmt.Println(err) } - types.RecycleBlock(val) + //types.RecycleBlock(val) case *types.NotaryAck: if err := con.ProcessNotaryAck(val); err != nil { fmt.Println(err) @@ -205,13 +358,43 @@ func (con *Consensus) processMsg(msgChan <-chan interface{}) { } } +func (con *Consensus) proposeBlock(chainID uint32) *types.Block { + block := &types.Block{ + ProposerID: con.ID, + Position: types.Position{ + ChainID: chainID, + Height: con.rbModule.nextHeight(chainID), + }, + } + if err := con.PrepareBlock(block, time.Now().UTC()); err != nil { + fmt.Println(err) + return nil + } + if err := con.baModules[chainID].prepareBlock(block, con.prvKey); err != nil { + fmt.Println(err) + return nil + } + return block +} + // ProcessVote is the entry point to submit ont vote to a Consensus instance. func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { - return + v := vote.Clone() + err = con.baModules[v.Position.ChainID].processVote(v) + return err +} + +// prepareVote prepares a vote. +func (con *Consensus) prepareVote(chainID uint32, vote *types.Vote) error { + return con.baModules[chainID].prepareVote(vote, con.prvKey) } // sanityCheck checks if the block is a valid block func (con *Consensus) sanityCheck(b *types.Block) (err error) { + // Check block.Position. + if b.Position.ShardID != 0 || b.Position.ChainID >= con.rbModule.chainNum() { + return ErrIncorrectBlockPosition + } // Check the hash of block. hash, err := hashBlock(b) if err != nil || hash != b.Hash { @@ -229,19 +412,29 @@ func (con *Consensus) sanityCheck(b *types.Block) (err error) { return nil } -// ProcessBlock is the entry point to submit one block to a Consensus instance. -func (con *Consensus) ProcessBlock(b *types.Block) (err error) { - // TODO(jimmy-dexon): BlockConverter.Block() is called twice in this method. +// PreProcessBlock performs Byzantine Agreement on the block. +func (con *Consensus) PreProcessBlock(b *types.Block) (err error) { if err := con.sanityCheck(b); err != nil { return err } + if err := con.baModules[b.Position.ChainID].processBlock(b); err != nil { + return err + } + return +} + +// ProcessBlock is the entry point to submit one block to a Consensus instance. +func (con *Consensus) ProcessBlock(block *types.Block) (err error) { + if err := con.sanityCheck(block); err != nil { + return err + } var ( deliveredBlocks []*types.Block earlyDelivered bool ) // To avoid application layer modify the content of block during // processing, we should always operate based on the cloned one. - b = b.Clone() + b := block.Clone() con.lock.Lock() defer con.lock.Unlock() @@ -249,7 +442,7 @@ func (con *Consensus) ProcessBlock(b *types.Block) (err error) { if err = con.rbModule.processBlock(b); err != nil { return err } - con.app.BlockConfirmed(b.Clone()) + con.app.BlockConfirmed(block) for _, b := range con.rbModule.extractBlocks() { // Notify application layer that some block is strongly acked. con.app.StronglyAcked(b.Hash) diff --git a/core/leader-selector.go b/core/leader-selector.go index 4295050..d96de4e 100644 --- a/core/leader-selector.go +++ b/core/leader-selector.go @@ -90,6 +90,11 @@ func (l *leaderSelector) probability(sig crypto.Signature) float64 { return p } +func (l *leaderSelector) restart() { + l.minCRSBlock = maxHash + l.minBlockHash = common.Hash{} +} + func (l *leaderSelector) leaderBlockHash() common.Hash { return l.minBlockHash } diff --git a/core/reliable-broadcast.go b/core/reliable-broadcast.go index 6d5d15b..7cca4f1 100644 --- a/core/reliable-broadcast.go +++ b/core/reliable-broadcast.go @@ -65,6 +65,9 @@ type rbcValidatorStatus struct { // nextOutput is the next output height of block, default to 0. nextOutput uint64 + + // nextHeight is the next height of block to be prepared. + nextHeight uint64 } type rbcBlockInfo struct { @@ -195,6 +198,9 @@ func (rb *reliableBroadcast) processBlock(block *types.Block) (err error) { ackedChain: make(map[uint32]struct{}), } rb.receivedBlocks[block.Hash] = block + if rb.lattice[block.Position.ChainID].nextHeight <= block.Position.Height { + rb.lattice[block.Position.ChainID].nextHeight = block.Position.Height + 1 + } // Check blocks in receivedBlocks if its acks are all in lattice. If a block's // acking blocks are all in lattice, execute sanity check and add the block @@ -417,7 +423,9 @@ func (rb *reliableBroadcast) prepareBlock(block *types.Block) { accumulateTimestamps(times, curBlock) if uint32(chainID) == block.Position.ChainID { block.ParentHash = curBlock.Hash - block.Position.Height = curBlock.Position.Height + 1 + if block.Position.Height == 0 { + block.Position.Height = curBlock.Position.Height + 1 + } } } block.Timestamps = times @@ -436,13 +444,23 @@ func (rb *reliableBroadcast) deleteValidator(h types.ValidatorID) { } // setChainNum set the number of chains. -func (rb *reliableBroadcast) setChainNum(num int) { +func (rb *reliableBroadcast) setChainNum(num uint32) { rb.lattice = make([]*rbcValidatorStatus, num) for i := range rb.lattice { rb.lattice[i] = &rbcValidatorStatus{ blocks: make(map[uint64]*types.Block), nextAck: make([]uint64, num), nextOutput: 0, + nextHeight: 0, } } } + +func (rb *reliableBroadcast) chainNum() uint32 { + return uint32(len(rb.lattice)) +} + +// nextHeight returns the next height for the chain. +func (rb *reliableBroadcast) nextHeight(chainID uint32) uint64 { + return rb.lattice[chainID].nextHeight +} diff --git a/core/reliable-broadcast_test.go b/core/reliable-broadcast_test.go index 206b2fa..2c71bcb 100644 --- a/core/reliable-broadcast_test.go +++ b/core/reliable-broadcast_test.go @@ -102,7 +102,7 @@ func genTestCase1(s *ReliableBroadcastTest, rb *reliableBroadcast) []types.Valid rb.addValidator(vid) vids = append(vids, vid) } - rb.setChainNum(len(vids)) + rb.setChainNum(uint32(len(vids))) // Add genesis blocks. for _, vid := range vids { b = s.prepareGenesisBlock(vid, vids) @@ -529,7 +529,7 @@ func (s *ReliableBroadcastTest) TestRandomIntensiveAcking() { for _, vid := range vids { rb.addValidator(vid) } - rb.setChainNum(len(vids)) + rb.setChainNum(uint32(len(vids))) // Generate genesis blocks. for _, vid := range vids { b := s.prepareGenesisBlock(vid, vids) @@ -602,7 +602,7 @@ func (s *ReliableBroadcastTest) TestRandomlyGeneratedBlocks() { for i := 0; i < repeat; i++ { validators := map[types.ValidatorID]struct{}{} rb := newReliableBroadcast() - rb.setChainNum(validatorCount) + rb.setChainNum(uint32(validatorCount)) stronglyAckedHashes := common.Hashes{} revealer.Reset() @@ -657,7 +657,7 @@ func (s *ReliableBroadcastTest) TestPrepareBlock() { for _, vID := range validators { rb.addValidator(vID) } - rb.setChainNum(len(validators)) + rb.setChainNum(uint32(len(validators))) // Setup genesis blocks. b00 := s.prepareGenesisBlock(validators[0], validators) time.Sleep(minInterval) diff --git a/core/types/block.go b/core/types/block.go index ab2ef84..f02b1a0 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -77,7 +77,8 @@ type Block struct { } func (b *Block) String() string { - return fmt.Sprintf("Block(%v)", b.Hash.String()[:6]) + return fmt.Sprintf("Block(%v:%d:%d)", b.Hash.String()[:6], + b.Position.ChainID, b.Position.Height) } // Clone returns a deep copy of a block. diff --git a/core/types/vote.go b/core/types/vote.go index 158c209..bae8f7d 100644 --- a/core/types/vote.go +++ b/core/types/vote.go @@ -47,7 +47,8 @@ type Vote struct { } func (v *Vote) String() string { - return fmt.Sprintf("Vote(%d:%d):%s", + return fmt.Sprintf("Vote[%s:%d:%d](%d:%d):%s", + v.ProposerID.String()[:6], v.Position.ChainID, v.Position.Height, v.Period, v.Type, v.BlockHash.String()[:6]) } diff --git a/crypto/utils.go b/crypto/utils.go index dfb4987..07a8b2b 100644 --- a/crypto/utils.go +++ b/crypto/utils.go @@ -18,6 +18,8 @@ package crypto import ( + "encoding/hex" + "github.com/ethereum/go-ethereum/crypto" "github.com/dexon-foundation/dexon-consensus-core/common" @@ -33,3 +35,7 @@ func Keccak256Hash(data ...[]byte) (h common.Hash) { func (sig Signature) Clone() Signature { return append(Signature{}, sig...) } + +func (sig Signature) String() string { + return hex.EncodeToString([]byte(sig[:])) +} diff --git a/integration_test/validator.go b/integration_test/validator.go index 25cbac0..bfe517f 100644 --- a/integration_test/validator.go +++ b/integration_test/validator.go @@ -102,7 +102,8 @@ func NewValidator( networkLatency: networkLatency, proposingLatency: proposingLatency, cons: core.NewConsensus( - app, gov, db, &Network{}, time.NewTicker(1), privateKey, eth.SigToPub), + app, gov, + db, &Network{}, time.NewTicker(1), privateKey, eth.SigToPub), } } diff --git a/simulation/app.go b/simulation/app.go index aded2a7..4bf4aff 100644 --- a/simulation/app.go +++ b/simulation/app.go @@ -116,7 +116,7 @@ func (a *simApp) TotalOrderingDeliver(blockHashes common.Hashes, early bool) { } a.Outputs = blocks a.Early = early - //fmt.Println("OUTPUT", a.ValidatorID, a.Early, a.Outputs) + fmt.Println("OUTPUT", a.ValidatorID, a.Early, a.Outputs) confirmLatency := []time.Duration{} diff --git a/simulation/tcp-network.go b/simulation/tcp-network.go index 31c8d95..468baff 100644 --- a/simulation/tcp-network.go +++ b/simulation/tcp-network.go @@ -168,6 +168,20 @@ func (n *TCPNetwork) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } n.recieveChan <- block + case "vote": + vote := &types.Vote{} + if err := json.Unmarshal(m.Payload, vote); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + n.recieveChan <- vote + case "notaryAck": + ack := &types.NotaryAck{} + if err := json.Unmarshal(m.Payload, ack); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + n.recieveChan <- ack default: w.WriteHeader(http.StatusBadRequest) return diff --git a/simulation/verification.go b/simulation/verification.go index ad2c911..2e90b8b 100644 --- a/simulation/verification.go +++ b/simulation/verification.go @@ -202,7 +202,7 @@ func VerifyTotalOrder(id types.ValidatorID, if hasError { log.Printf("[%d] Hash is %v from %v\n", i, hash, id) } else { - //log.Printf("Block %v confirmed\n", hash) + log.Printf("Block %v confirmed\n", hash) } } |