From be7c5cc02e6b960abb92a63142d98cd3661ab4b4 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Tue, 16 Oct 2018 17:22:58 +0800 Subject: core: New dexon ba (#210) --- core/agreement-state.go | 249 ++++++++++-------------------------- core/agreement-state_test.go | 295 ++++++++----------------------------------- core/agreement.go | 128 +++++++++++++------ core/agreement_test.go | 179 ++++++++++++++++++++++++-- core/authenticator_test.go | 2 +- core/consensus.go | 21 ++- core/crypto_test.go | 4 +- core/types/vote.go | 6 +- 8 files changed, 405 insertions(+), 479 deletions(-) (limited to 'core') diff --git a/core/agreement-state.go b/core/agreement-state.go index fc74998..56c6c27 100644 --- a/core/agreement-state.go +++ b/core/agreement-state.go @@ -19,7 +19,7 @@ package core import ( "fmt" - "sync" + "math" "github.com/dexon-foundation/dexon-consensus-core/common" "github.com/dexon-foundation/dexon-consensus-core/core/types" @@ -36,226 +36,117 @@ type agreementStateType int // agreementStateType enum. const ( - statePrepare agreementStateType = iota - stateAck - stateConfirm - statePass1 - statePass2 + stateInitial agreementStateType = iota + statePreCommit + stateCommit + stateForward ) var nullBlockHash = common.Hash{} +var skipBlockHash common.Hash + +func init() { + for idx := range skipBlockHash { + skipBlockHash[idx] = 0xff + } +} type agreementState interface { state() agreementStateType nextState() (agreementState, error) - receiveVote() error clocks() int - terminate() } -//----- PrepareState ----- -type prepareState struct { +//----- InitialState ----- +type initialState struct { a *agreementData } -func newPrepareState(a *agreementData) *prepareState { - return &prepareState{a: a} +func newInitialState(a *agreementData) *initialState { + return &initialState{a: a} } -func (s *prepareState) state() agreementStateType { return statePrepare } -func (s *prepareState) clocks() int { return 0 } -func (s *prepareState) terminate() {} -func (s *prepareState) nextState() (agreementState, error) { - if s.a.period == 1 { - s.a.recv.ProposeBlock() - } else { - var proposed bool - _, proposed = s.a.countVote(s.a.period-1, types.VotePass) - if !proposed { - return nil, ErrNoEnoughVoteInPrepareState - } - } - return newAckState(s.a), nil +func (s *initialState) state() agreementStateType { return stateInitial } +func (s *initialState) clocks() int { return 0 } +func (s *initialState) nextState() (agreementState, error) { + s.a.lock.Lock() + defer s.a.lock.Unlock() + hash := s.a.recv.ProposeBlock() + s.a.recv.ProposeVote(&types.Vote{ + Type: types.VoteInit, + BlockHash: hash, + Period: s.a.period, + }) + return newPreCommitState(s.a), nil } -func (s *prepareState) receiveVote() error { return nil } -// ----- AckState ----- -type ackState struct { +//----- PreCommitState ----- +type preCommitState struct { a *agreementData } -func newAckState(a *agreementData) *ackState { - return &ackState{a: a} +func newPreCommitState(a *agreementData) *preCommitState { + return &preCommitState{a: a} } -func (s *ackState) state() agreementStateType { return stateAck } -func (s *ackState) clocks() int { return 2 } -func (s *ackState) terminate() {} -func (s *ackState) nextState() (agreementState, error) { - acked := false - hash := common.Hash{} - if s.a.period == 1 { - acked = true - } else { - hash, acked = s.a.countVote(s.a.period-1, types.VotePass) - } - if !acked { - return nil, ErrNoEnoughVoteInAckState - } +func (s *preCommitState) state() agreementStateType { return statePreCommit } +func (s *preCommitState) clocks() int { return 2 } +func (s *preCommitState) nextState() (agreementState, error) { + s.a.lock.RLock() + defer s.a.lock.RUnlock() + hash := s.a.lockValue if hash == nullBlockHash { hash = s.a.leader.leaderBlockHash() } s.a.recv.ProposeVote(&types.Vote{ - Type: types.VoteAck, + Type: types.VotePreCom, BlockHash: hash, Period: s.a.period, }) - return newConfirmState(s.a), nil -} -func (s *ackState) receiveVote() error { return nil } - -// ----- ConfirmState ----- -type confirmState struct { - a *agreementData - lock sync.Mutex - voted bool -} - -func newConfirmState(a *agreementData) *confirmState { - return &confirmState{ - a: a, - } -} - -func (s *confirmState) state() agreementStateType { return stateConfirm } -func (s *confirmState) clocks() int { return 2 } -func (s *confirmState) terminate() {} -func (s *confirmState) nextState() (agreementState, error) { - return newPass1State(s.a), nil -} -func (s *confirmState) receiveVote() error { - s.lock.Lock() - defer s.lock.Unlock() - if s.voted { - return nil - } - hash, ok := s.a.countVote(s.a.period, types.VoteAck) - if !ok { - return nil - } - if hash != nullBlockHash { - s.a.recv.ProposeVote(&types.Vote{ - Type: types.VoteConfirm, - BlockHash: hash, - Period: s.a.period, - }) - s.voted = true - } - return nil + return newCommitState(s.a), nil } -// ----- Pass1State ----- -type pass1State struct { +//----- CommitState ----- +type commitState struct { a *agreementData } -func newPass1State(a *agreementData) *pass1State { - return &pass1State{a: a} +func newCommitState(a *agreementData) *commitState { + return &commitState{a: a} } -func (s *pass1State) state() agreementStateType { return statePass1 } -func (s *pass1State) clocks() int { return 0 } -func (s *pass1State) terminate() {} -func (s *pass1State) nextState() (agreementState, error) { - voteDefault := false - if vote, exist := func() (*types.Vote, bool) { - s.a.votesLock.RLock() - defer s.a.votesLock.RUnlock() - v, e := s.a.votes[s.a.period][types.VoteConfirm][s.a.ID] - return v, e - }(); exist { - s.a.recv.ProposeVote(&types.Vote{ - Type: types.VotePass, - BlockHash: vote.BlockHash, - Period: s.a.period, - }) - } else if s.a.period == 1 { - voteDefault = true +func (s *commitState) state() agreementStateType { return stateCommit } +func (s *commitState) clocks() int { return 2 } +func (s *commitState) nextState() (agreementState, error) { + hash, ok := s.a.countVote(s.a.period, types.VotePreCom) + s.a.lock.Lock() + defer s.a.lock.Unlock() + if ok && hash != skipBlockHash { + s.a.lockValue = hash + s.a.lockRound = s.a.period } else { - hash, ok := s.a.countVote(s.a.period-1, types.VotePass) - if ok { - if hash == nullBlockHash { - s.a.recv.ProposeVote(&types.Vote{ - Type: types.VotePass, - BlockHash: hash, - Period: s.a.period, - }) - } else { - voteDefault = true - } - } else { - voteDefault = true - } + hash = skipBlockHash } - if voteDefault { - s.a.recv.ProposeVote(&types.Vote{ - Type: types.VotePass, - BlockHash: s.a.defaultBlock, - Period: s.a.period, - }) - } - return newPass2State(s.a), nil + s.a.recv.ProposeVote(&types.Vote{ + Type: types.VoteCom, + BlockHash: hash, + Period: s.a.period, + }) + return newForwardState(s.a), nil } -func (s *pass1State) receiveVote() error { return nil } -// ----- Pass2State ----- -type pass2State struct { - a *agreementData - lock sync.Mutex - voted bool - enoughPassVote chan common.Hash - terminateChan chan struct{} +// ----- ForwardState ----- +type forwardState struct { + a *agreementData } -func newPass2State(a *agreementData) *pass2State { - return &pass2State{ - a: a, - enoughPassVote: make(chan common.Hash, 1), - terminateChan: make(chan struct{}), - } +func newForwardState(a *agreementData) *forwardState { + return &forwardState{a: a} } -func (s *pass2State) state() agreementStateType { return statePass2 } -func (s *pass2State) clocks() int { return 0 } -func (s *pass2State) terminate() { - s.terminateChan <- struct{}{} -} -func (s *pass2State) nextState() (agreementState, error) { - select { - case <-s.terminateChan: - break - case hash := <-s.enoughPassVote: - s.a.votesLock.RLock() - defer s.a.votesLock.RUnlock() - s.a.defaultBlock = hash - s.a.period++ - oldBlock := s.a.blocks[s.a.ID] - s.a.blocks = map[types.NodeID]*types.Block{ - s.a.ID: oldBlock, - } - } - return newPrepareState(s.a), nil -} -func (s *pass2State) receiveVote() error { - s.lock.Lock() - defer s.lock.Unlock() - if s.voted { - return nil - } - hash, ok := s.a.countVote(s.a.period, types.VotePass) - if ok { - s.voted = true - s.enoughPassVote <- hash - } - return nil +func (s *forwardState) state() agreementStateType { return stateForward } +func (s *forwardState) clocks() int { return math.MaxInt32 } + +func (s *forwardState) nextState() (agreementState, error) { + return s, nil } diff --git a/core/agreement-state_test.go b/core/agreement-state_test.go index 030a295..8a2c016 100644 --- a/core/agreement-state_test.go +++ b/core/agreement-state_test.go @@ -19,7 +19,6 @@ package core import ( "testing" - "time" "github.com/stretchr/testify/suite" @@ -47,9 +46,10 @@ func (r *agreementStateTestReceiver) ProposeVote(vote *types.Vote) { r.s.voteChan <- vote } -func (r *agreementStateTestReceiver) ProposeBlock() { +func (r *agreementStateTestReceiver) ProposeBlock() common.Hash { block := r.s.proposeBlock(r.leader) r.s.blockChan <- block.Hash + return block.Hash } func (r *agreementStateTestReceiver) ConfirmBlock(block common.Hash, @@ -119,61 +119,27 @@ func (s *AgreementStateTestSuite) newAgreement(numNode int) *agreement { return agreement } -func (s *AgreementStateTestSuite) TestPrepareState() { +func (s *AgreementStateTestSuite) TestInitialState() { a := s.newAgreement(4) - state := newPrepareState(a.data) - s.Equal(statePrepare, state.state()) + state := newInitialState(a.data) + s.Equal(stateInitial, state.state()) s.Equal(0, state.clocks()) - // For period == 1, proposing a new block. + // Proposing a new block. a.data.period = 1 newState, err := state.nextState() s.Require().Nil(err) - s.Require().True(len(s.blockChan) > 0) + s.Require().Len(s.blockChan, 1) proposedBlock := <-s.blockChan s.NotEqual(common.Hash{}, proposedBlock) s.Require().Nil(a.processBlock(s.block[proposedBlock])) - s.Equal(stateAck, newState.state()) - - // For period >= 2, if the pass-vote for block b equal to {} - // is more than 2f+1, proposing the block previously proposed. - a.data.period = 2 - _, err = state.nextState() - s.Equal(ErrNoEnoughVoteInPrepareState, err) - - for nID := range a.notarySet { - vote := s.prepareVote(nID, types.VotePass, common.Hash{}, 1) - s.Require().Nil(a.processVote(vote)) - } - - newState, err = state.nextState() - s.Require().Nil(err) - s.Equal(stateAck, newState.state()) - - // For period >= 2, if the pass-vote for block v not equal to {} - // is more than 2f+1, proposing the block v. - a.data.period = 3 - block := s.proposeBlock(a.data.leader) - prv, err := ecdsa.NewPrivateKey() - s.Require().Nil(err) - block.ProposerID = types.NewNodeID(prv.PublicKey()) - s.Require().NoError( - NewAuthenticator(prv).SignCRS(block, a.data.leader.hashCRS)) - s.Require().Nil(a.processBlock(block)) - for nID := range a.notarySet { - vote := s.prepareVote(nID, types.VotePass, block.Hash, 2) - s.Require().Nil(a.processVote(vote)) - } - - newState, err = state.nextState() - s.Require().Nil(err) - s.Equal(stateAck, newState.state()) + s.Equal(statePreCommit, newState.state()) } -func (s *AgreementStateTestSuite) TestAckState() { +func (s *AgreementStateTestSuite) TestPreCommitState() { a := s.newAgreement(4) - state := newAckState(a.data) - s.Equal(stateAck, state.state()) + state := newPreCommitState(a.data) + s.Equal(statePreCommit, state.state()) s.Equal(2, state.clocks()) blocks := make([]*types.Block, 3) @@ -187,247 +153,90 @@ func (s *AgreementStateTestSuite) TestAckState() { s.Require().Nil(a.processBlock(blocks[i])) } - // For period 1, propose ack-vote for the block having largest potential. + // If lockvalue == null, propose preCom-vote for the leader block. + a.data.lockValue = nullBlockHash a.data.period = 1 newState, err := state.nextState() s.Require().Nil(err) - s.Require().True(len(s.voteChan) > 0) + s.Require().Len(s.voteChan, 1) vote := <-s.voteChan - s.Equal(types.VoteAck, vote.Type) + s.Equal(types.VotePreCom, vote.Type) s.NotEqual(common.Hash{}, vote.BlockHash) - s.Equal(stateConfirm, newState.state()) + s.Equal(stateCommit, newState.state()) - // For period >= 2, if block v equal to {} has more than 2f+1 pass-vote - // in period 1, propose ack-vote for the block having largest potential. + // Else, preCom-vote on lockValue. a.data.period = 2 - for nID := range a.notarySet { - vote := s.prepareVote(nID, types.VotePass, common.Hash{}, 1) - s.Require().Nil(a.processVote(vote)) - } - newState, err = state.nextState() - s.Require().Nil(err) - s.Require().True(len(s.voteChan) > 0) - vote = <-s.voteChan - s.Equal(types.VoteAck, vote.Type) - s.NotEqual(common.Hash{}, vote.BlockHash) - s.Equal(stateConfirm, newState.state()) - - // For period >= 2, if block v not equal to {} has more than 2f+1 pass-vote - // in period 1, propose ack-vote for block v. - hash := blocks[0].Hash - a.data.period = 3 - for nID := range a.notarySet { - vote := s.prepareVote(nID, types.VotePass, hash, 2) - s.Require().Nil(a.processVote(vote)) - } + hash := common.NewRandomHash() + a.data.lockValue = hash newState, err = state.nextState() s.Require().Nil(err) - s.Require().True(len(s.voteChan) > 0) + s.Require().Len(s.voteChan, 1) vote = <-s.voteChan - s.Equal(types.VoteAck, vote.Type) + s.Equal(types.VotePreCom, vote.Type) s.Equal(hash, vote.BlockHash) - s.Equal(stateConfirm, newState.state()) + s.Equal(stateCommit, newState.state()) } -func (s *AgreementStateTestSuite) TestConfirmState() { +func (s *AgreementStateTestSuite) TestCommitState() { a := s.newAgreement(4) - state := newConfirmState(a.data) - s.Equal(stateConfirm, state.state()) + state := newCommitState(a.data) + s.Equal(stateCommit, state.state()) s.Equal(2, state.clocks()) - // If there are 2f+1 ack-votes for block v not equal to {}, - // propose a confirm-vote for block v. + // If there are 2f+1 preCom-votes for block v or null, + // propose a com-vote for block v. a.data.period = 1 block := s.proposeBlock(a.data.leader) s.Require().Nil(a.processBlock(block)) for nID := range a.notarySet { - vote := s.prepareVote(nID, types.VoteAck, block.Hash, 1) + vote := s.prepareVote(nID, types.VotePreCom, block.Hash, 1) s.Require().Nil(a.processVote(vote)) } - s.Require().Nil(state.receiveVote()) newState, err := state.nextState() s.Require().Nil(err) - s.Require().True(len(s.voteChan) > 0) + s.Require().Len(s.voteChan, 1) + s.Equal(block.Hash, a.data.lockValue) + s.Equal(uint64(1), a.data.lockRound) vote := <-s.voteChan - s.Equal(types.VoteConfirm, vote.Type) + s.Equal(types.VoteCom, vote.Type) s.Equal(block.Hash, vote.BlockHash) - s.Equal(statePass1, newState.state()) + s.Equal(stateForward, newState.state()) - // Else, no vote is propose in this state. + // Else, com-vote on SKIP. a.data.period = 2 - s.Require().Nil(state.receiveVote()) newState, err = state.nextState() s.Require().Nil(err) - s.Require().True(len(s.voteChan) == 0) - s.Equal(statePass1, newState.state()) - - // If there are 2f+1 ack-vote for block v equal to {}, - // no vote should be proposed. - a.data.period = 3 - for nID := range a.notarySet { - vote := s.prepareVote(nID, types.VoteAck, common.Hash{}, 3) - s.Require().Nil(a.processVote(vote)) - } - s.Require().Nil(state.receiveVote()) - newState, err = state.nextState() - s.Require().Nil(err) - s.Require().True(len(s.voteChan) == 0) - s.Equal(statePass1, newState.state()) -} - -func (s *AgreementStateTestSuite) TestPass1State() { - a := s.newAgreement(4) - state := newPass1State(a.data) - s.Equal(statePass1, state.state()) - s.Equal(0, state.clocks()) - - // If confirm-vote was proposed in the same period, - // propose pass-vote with same block. - a.data.period = 1 - hash := common.NewRandomHash() - vote := s.prepareVote(s.ID, types.VoteConfirm, hash, 1) - s.Require().Nil(a.processVote(vote)) - newState, err := state.nextState() - s.Require().Nil(err) - s.Require().True(len(s.voteChan) > 0) - vote = <-s.voteChan - s.Equal(types.VotePass, vote.Type) - s.Equal(hash, vote.BlockHash) - s.Equal(statePass2, newState.state()) - - // Else if period >= 2 and has 2f+1 pass-vote in period-1 for block {}, - // propose pass-vote for block {}. - a.data.period = 2 - for nID := range a.notarySet { - vote := s.prepareVote(nID, types.VotePass, common.Hash{}, 1) - s.Require().Nil(a.processVote(vote)) - } - vote = s.prepareVote(s.ID, types.VoteAck, common.Hash{}, 2) - s.Require().Nil(a.processVote(vote)) - newState, err = state.nextState() - s.Require().Nil(err) - s.Require().True(len(s.voteChan) > 0) + s.Require().Len(s.voteChan, 1) vote = <-s.voteChan - s.Equal(types.VotePass, vote.Type) - s.Equal(common.Hash{}, vote.BlockHash) - s.Equal(statePass2, newState.state()) + s.Equal(types.VoteCom, vote.Type) + s.Equal(skipBlockHash, vote.BlockHash) + s.Equal(stateForward, newState.state()) - // Else, propose pass-vote for default block. + // If there are 2f+1 preCom-votes for SKIP, it's same as the 'else' condition. a.data.period = 3 - block := s.proposeBlock(a.data.leader) - a.data.defaultBlock = block.Hash - hash = common.NewRandomHash() for nID := range a.notarySet { - vote := s.prepareVote(nID, types.VotePass, hash, 2) + vote := s.prepareVote(nID, types.VotePreCom, skipBlockHash, 3) s.Require().Nil(a.processVote(vote)) } - vote = s.prepareVote(s.ID, types.VoteAck, common.Hash{}, 3) - s.Require().Nil(a.processVote(vote)) newState, err = state.nextState() s.Require().Nil(err) - s.Require().True(len(s.voteChan) > 0) + s.Require().Len(s.voteChan, 1) vote = <-s.voteChan - s.Equal(types.VotePass, vote.Type) - s.Equal(block.Hash, vote.BlockHash) - s.Equal(statePass2, newState.state()) - - // Period == 1 is also else condition. - a = s.newAgreement(4) - state = newPass1State(a.data) - a.data.period = 1 - block = s.proposeBlock(a.data.leader) - a.data.defaultBlock = block.Hash - hash = common.NewRandomHash() - vote = s.prepareVote(s.ID, types.VoteAck, common.Hash{}, 1) - s.Require().Nil(a.processVote(vote)) - newState, err = state.nextState() - s.Require().Nil(err) - s.Require().True(len(s.voteChan) > 0) - vote = <-s.voteChan - s.Equal(types.VotePass, vote.Type) - s.Equal(block.Hash, vote.BlockHash) - s.Equal(statePass2, newState.state()) - - // No enought pass-vote for period-1. - a.data.period = 4 - vote = s.prepareVote(s.ID, types.VoteAck, common.Hash{}, 4) - s.Require().Nil(a.processVote(vote)) - newState, err = state.nextState() - s.Require().Nil(err) - s.Require().True(len(s.voteChan) > 0) - vote = <-s.voteChan - s.Equal(types.VotePass, vote.Type) - s.Equal(block.Hash, vote.BlockHash) - s.Equal(statePass2, newState.state()) + s.Equal(types.VoteCom, vote.Type) + s.Equal(skipBlockHash, vote.BlockHash) + s.Equal(stateForward, newState.state()) } -func (s *AgreementStateTestSuite) TestPass2State() { +func (s *AgreementStateTestSuite) TestForwardState() { a := s.newAgreement(4) - state := newPass2State(a.data) - s.Equal(statePass2, state.state()) - - // If there are 2f+1 ack-vote for block v not equal to {}, - // propose pass-vote for v. - block := s.proposeBlock(a.data.leader) - s.Require().Nil(a.processBlock(block)) - for nID := range a.notarySet { - vote := s.prepareVote(nID, types.VoteAck, block.Hash, 1) - s.Require().Nil(a.processVote(vote)) - } - s.Require().Nil(state.receiveVote()) - // Only propose one vote. - s.Require().Nil(state.receiveVote()) - s.Require().True(len(s.voteChan) == 0) - - // If period >= 2 and - // there are 2f+1 pass-vote in period-1 for block v equal to {} and - // no confirm-vote is proposed, propose pass-vote for {}. - a = s.newAgreement(4) - state = newPass2State(a.data) - a.data.period = 2 - for nID := range a.notarySet { - vote := s.prepareVote(nID, types.VotePass, common.Hash{}, 1) - s.Require().Nil(a.processVote(vote)) - } - vote := s.prepareVote(s.ID, types.VoteAck, common.Hash{}, 2) - s.Require().Nil(a.processVote(vote)) - s.Require().Nil(state.receiveVote()) - // Test terminate. - ok := make(chan struct{}) - go func() { - go state.terminate() - newState, err := state.nextState() - s.Require().Nil(err) - s.Equal(statePrepare, newState.state()) - ok <- struct{}{} - }() - select { - case <-ok: - case <-time.After(50 * time.Millisecond): - s.FailNow("Terminate fail.\n") - } + state := newForwardState(a.data) + s.Equal(stateForward, state.state()) + s.True(state.clocks() > 100000) - // If there are 2f+1 pass-vote, proceed to next period - a = s.newAgreement(4) - state = newPass2State(a.data) - a.data.period = 1 - for nID := range a.notarySet { - vote := s.prepareVote(nID, types.VotePass, common.Hash{}, 1) - s.Require().Nil(a.processVote(vote)) - } - s.Require().Nil(state.receiveVote()) - go func() { - newState, err := state.nextState() - s.Require().Nil(err) - s.Equal(statePrepare, newState.state()) - s.Equal(uint64(2), a.data.period) - ok <- struct{}{} - }() - select { - case <-ok: - case <-time.After(50 * time.Millisecond): - s.FailNow("Unable to proceed to next state.\n") - } + // nextState() should return instantly without doing anything. + _, err := state.nextState() + s.Require().Nil(err) + s.Require().Len(s.voteChan, 0) } func TestAgreementState(t *testing.T) { diff --git a/core/agreement.go b/core/agreement.go index 1b995e7..2337b43 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -67,7 +67,7 @@ func newVoteListMap() []map[types.NodeID]*types.Vote { // agreementReceiver is the interface receiving agreement event. type agreementReceiver interface { ProposeVote(vote *types.Vote) - ProposeBlock() + ProposeBlock() common.Hash ConfirmBlock(common.Hash, map[types.NodeID]*types.Vote) } @@ -87,11 +87,12 @@ type agreementData struct { ID types.NodeID leader *leaderSelector - defaultBlock common.Hash + lockValue common.Hash + lockRound uint64 period uint64 requiredVote int votes map[uint64][]map[types.NodeID]*types.Vote - votesLock sync.RWMutex + lock sync.RWMutex blocks map[types.NodeID]*types.Block blocksLock sync.Mutex } @@ -107,6 +108,7 @@ type agreement struct { pendingBlock []pendingBlock pendingVote []pendingVote candidateBlock map[common.Hash]*types.Block + fastForward chan uint64 authModule *Authenticator } @@ -125,6 +127,7 @@ func newAgreement( }, aID: &atomic.Value{}, candidateBlock: make(map[common.Hash]*types.Block), + fastForward: make(chan uint64, 1), authModule: authModule, } agreement.restart(notarySet, types.Position{ @@ -133,13 +136,6 @@ func newAgreement( return agreement } -// terminate the current running state. -func (a *agreement) terminate() { - if a.state != nil { - a.state.terminate() - } -} - // restart the agreement func (a *agreement) restart( notarySet map[types.NodeID]struct{}, aID types.Position) { @@ -147,8 +143,8 @@ func (a *agreement) restart( func() { a.lock.Lock() defer a.lock.Unlock() - a.data.votesLock.Lock() - defer a.data.votesLock.Unlock() + a.data.lock.Lock() + defer a.data.lock.Unlock() a.data.blocksLock.Lock() defer a.data.blocksLock.Unlock() a.data.votes = make(map[uint64][]map[types.NodeID]*types.Vote) @@ -157,9 +153,11 @@ func (a *agreement) restart( a.data.blocks = make(map[types.NodeID]*types.Block) a.data.requiredVote = len(notarySet)/3*2 + 1 a.data.leader.restart() - a.data.defaultBlock = common.Hash{} + a.data.lockValue = nullBlockHash + a.data.lockRound = 1 + a.fastForward = make(chan uint64, 1) a.hasOutput = false - a.state = newPrepareState(a.data) + a.state = newInitialState(a.data) a.notarySet = notarySet a.candidateBlock = make(map[common.Hash]*types.Block) a.aID.Store(aID) @@ -215,11 +213,8 @@ func (a *agreement) agreementID() types.Position { return a.aID.Load().(types.Position) } -// nextState is called at the spcifi clock time. +// nextState is called at the specific clock time. func (a *agreement) nextState() (err error) { - if err = a.state.receiveVote(); err != nil { - return - } a.state, err = a.state.nextState() return } @@ -245,8 +240,8 @@ func (a *agreement) sanityCheck(vote *types.Vote) error { func (a *agreement) checkForkVote(vote *types.Vote) error { if err := func() error { - a.data.votesLock.RLock() - defer a.data.votesLock.RUnlock() + a.data.lock.RLock() + defer a.data.lock.RUnlock() if votes, exist := a.data.votes[vote.Period]; exist { if oldVote, exist := votes[vote.Type][vote.ProposerID]; exist { if vote.BlockHash != oldVote.BlockHash { @@ -286,28 +281,73 @@ func (a *agreement) processVote(vote *types.Vote) error { return err } - if func() bool { - a.data.votesLock.Lock() - defer a.data.votesLock.Unlock() - if _, exist := a.data.votes[vote.Period]; !exist { - a.data.votes[vote.Period] = newVoteListMap() + a.data.lock.Lock() + defer a.data.lock.Unlock() + if _, exist := a.data.votes[vote.Period]; !exist { + a.data.votes[vote.Period] = newVoteListMap() + } + a.data.votes[vote.Period][vote.Type][vote.ProposerID] = vote + if !a.hasOutput && vote.Type == types.VoteCom { + if hash, ok := a.data.countVoteNoLock(vote.Period, vote.Type); ok && + hash != skipBlockHash { + a.hasOutput = true + a.data.recv.ConfirmBlock(hash, + a.data.votes[vote.Period][types.VoteCom]) + return nil } - a.data.votes[vote.Period][vote.Type][vote.ProposerID] = vote - if !a.hasOutput && vote.Type == types.VoteConfirm { - if len(a.data.votes[vote.Period][types.VoteConfirm]) >= - a.data.requiredVote { - a.hasOutput = true - a.data.recv.ConfirmBlock(vote.BlockHash, - a.data.votes[vote.Period][types.VoteConfirm]) + } else if a.hasOutput { + return nil + } + + // Check if the agreement requires fast-forwarding. + if vote.Type == types.VotePreCom { + if hash, ok := a.data.countVoteNoLock(vote.Period, vote.Type); ok && + hash != skipBlockHash { + // Condition 1. + if a.data.period >= vote.Period && vote.Period > a.data.lockRound && + vote.BlockHash != a.data.lockValue { + a.data.lockValue = hash + a.data.lockRound = vote.Period + a.fastForward <- a.data.period + 1 + return nil + } + // Condition 2. + if vote.Period > a.data.period { + a.data.lockValue = hash + a.data.lockRound = vote.Period + a.fastForward <- vote.Period + return nil } } - return true - }() { - return a.state.receiveVote() + } + // Condition 3. + if vote.Type == types.VoteCom && vote.Period >= a.data.period && + len(a.data.votes[vote.Period][types.VoteCom]) >= a.data.requiredVote { + a.fastForward <- vote.Period + 1 + return nil } return nil } +func (a *agreement) done() <-chan struct{} { + ch := make(chan struct{}, 1) + if a.hasOutput { + ch <- struct{}{} + } else { + select { + case period := <-a.fastForward: + if period <= a.data.period { + break + } + a.data.setPeriod(period) + a.state = newPreCommitState(a.data) + ch <- struct{}{} + default: + } + } + return ch +} + // processBlock is the entry point for processing Block. func (a *agreement) processBlock(block *types.Block) error { a.data.blocksLock.Lock() @@ -348,8 +388,13 @@ func (a *agreement) findCandidateBlock(hash common.Hash) (*types.Block, bool) { func (a *agreementData) countVote(period uint64, voteType types.VoteType) ( blockHash common.Hash, ok bool) { - a.votesLock.RLock() - defer a.votesLock.RUnlock() + a.lock.RLock() + defer a.lock.RUnlock() + return a.countVoteNoLock(period, voteType) +} + +func (a *agreementData) countVoteNoLock( + period uint64, voteType types.VoteType) (blockHash common.Hash, ok bool) { votes, exist := a.votes[period] if !exist { return @@ -370,3 +415,12 @@ func (a *agreementData) countVote(period uint64, voteType types.VoteType) ( } return } + +func (a *agreementData) setPeriod(period uint64) { + for i := a.period + 1; i <= period; i++ { + if _, exist := a.votes[i]; !exist { + a.votes[i] = newVoteListMap() + } + } + a.period = period +} diff --git a/core/agreement_test.go b/core/agreement_test.go index 047dbe7..83405ad 100644 --- a/core/agreement_test.go +++ b/core/agreement_test.go @@ -36,9 +36,10 @@ func (r *agreementTestReceiver) ProposeVote(vote *types.Vote) { r.s.voteChan <- vote } -func (r *agreementTestReceiver) ProposeBlock() { +func (r *agreementTestReceiver) ProposeBlock() common.Hash { block := r.s.proposeBlock(r.agreementIndex) r.s.blockChan <- block.Hash + return block.Hash } func (r *agreementTestReceiver) ConfirmBlock(block common.Hash, @@ -116,40 +117,200 @@ func (s *AgreementTestSuite) copyVote( return v } +func (s *AgreementTestSuite) prepareVote( + nID types.NodeID, voteType types.VoteType, blockHash common.Hash, + period uint64) ( + vote *types.Vote) { + vote = &types.Vote{ + Type: voteType, + BlockHash: blockHash, + Period: period, + } + s.Require().NoError(s.auths[nID].SignVote(vote)) + return +} + func (s *AgreementTestSuite) TestSimpleConfirm() { a := s.newAgreement(4) - // PrepareState + // InitialState a.nextState() - // AckState + // PreCommitState s.Require().Len(s.blockChan, 1) blockHash := <-s.blockChan block, exist := s.block[blockHash] s.Require().True(exist) s.Require().NoError(a.processBlock(block)) - a.nextState() - // ConfirmState s.Require().Len(s.voteChan, 1) vote := <-s.voteChan - s.Equal(types.VoteAck, vote.Type) + s.Equal(types.VoteInit, vote.Type) + s.Equal(blockHash, vote.BlockHash) + a.nextState() + // CommitState + s.Require().Len(s.voteChan, 1) + vote = <-s.voteChan + s.Equal(types.VotePreCom, vote.Type) + s.Equal(blockHash, vote.BlockHash) for nID := range s.auths { v := s.copyVote(vote, nID) s.Require().NoError(a.processVote(v)) } a.nextState() - // Pass1State + // ForwardState s.Require().Len(s.voteChan, 1) vote = <-s.voteChan - s.Equal(types.VoteConfirm, vote.Type) + s.Equal(types.VoteCom, vote.Type) + s.Equal(blockHash, vote.BlockHash) + s.Equal(blockHash, a.data.lockValue) + s.Equal(uint64(1), a.data.lockRound) for nID := range s.auths { v := s.copyVote(vote, nID) s.Require().NoError(a.processVote(v)) } - // We have enough of Confirm-Votes. + // We have enough of Com-Votes. s.Require().Len(s.confirmChan, 1) confirmBlock := <-s.confirmChan s.Equal(blockHash, confirmBlock) } +func (s *AgreementTestSuite) TestFastForwardCond1() { + votes := 0 + a := s.newAgreement(4) + a.data.lockRound = 1 + a.data.period = 3 + hash := common.NewRandomHash() + for nID := range a.notarySet { + vote := s.prepareVote(nID, types.VotePreCom, hash, uint64(2)) + s.Require().NoError(a.processVote(vote)) + if votes++; votes == 3 { + break + } + } + + select { + case <-a.done(): + default: + s.FailNow("Expecting fast forward.") + } + s.Equal(hash, a.data.lockValue) + s.Equal(uint64(2), a.data.lockRound) + s.Equal(uint64(4), a.data.period) + + // No fast forward if vote.BlockHash == SKIP + a.data.lockRound = 6 + a.data.period = 8 + a.data.lockValue = nullBlockHash + for nID := range a.notarySet { + vote := s.prepareVote(nID, types.VotePreCom, skipBlockHash, uint64(7)) + s.Require().NoError(a.processVote(vote)) + } + + select { + case <-a.done(): + s.FailNow("Unexpected fast forward.") + default: + } + + // No fast forward if lockValue == vote.BlockHash. + a.data.lockRound = 11 + a.data.period = 13 + a.data.lockValue = hash + for nID := range a.notarySet { + vote := s.prepareVote(nID, types.VotePreCom, hash, uint64(12)) + s.Require().NoError(a.processVote(vote)) + } + + select { + case <-a.done(): + s.FailNow("Unexpected fast forward.") + default: + } +} + +func (s *AgreementTestSuite) TestFastForwardCond2() { + votes := 0 + a := s.newAgreement(4) + a.data.period = 1 + hash := common.NewRandomHash() + for nID := range a.notarySet { + vote := s.prepareVote(nID, types.VotePreCom, hash, uint64(2)) + s.Require().NoError(a.processVote(vote)) + if votes++; votes == 3 { + break + } + } + + select { + case <-a.done(): + default: + s.FailNow("Expecting fast forward.") + } + s.Equal(hash, a.data.lockValue) + s.Equal(uint64(2), a.data.lockRound) + s.Equal(uint64(2), a.data.period) + + // No fast forward if vote.BlockHash == SKIP + a.data.period = 6 + for nID := range a.notarySet { + vote := s.prepareVote(nID, types.VotePreCom, skipBlockHash, uint64(7)) + s.Require().NoError(a.processVote(vote)) + } + + select { + case <-a.done(): + s.FailNow("Unexpected fast forward.") + default: + } +} + +func (s *AgreementTestSuite) TestFastForwardCond3() { + votes := 0 + a := s.newAgreement(4) + a.data.period = 1 + for nID := range a.notarySet { + vote := s.prepareVote(nID, types.VoteCom, common.NewRandomHash(), uint64(2)) + s.Require().NoError(a.processVote(vote)) + if votes++; votes == 3 { + break + } + } + + select { + case <-a.done(): + default: + s.FailNow("Expecting fast forward.") + } + s.Equal(uint64(3), a.data.period) +} + +func (s *AgreementTestSuite) TestDecide() { + votes := 0 + a := s.newAgreement(4) + a.data.period = 5 + + // No decide if com-vote on SKIP. + for nID := range a.notarySet { + vote := s.prepareVote(nID, types.VoteCom, skipBlockHash, uint64(2)) + s.Require().NoError(a.processVote(vote)) + if votes++; votes == 3 { + break + } + } + s.Require().Len(s.confirmChan, 0) + + // Normal decide. + hash := common.NewRandomHash() + for nID := range a.notarySet { + vote := s.prepareVote(nID, types.VoteCom, hash, uint64(3)) + s.Require().NoError(a.processVote(vote)) + if votes++; votes == 3 { + break + } + } + s.Require().Len(s.confirmChan, 1) + confirmBlock := <-s.confirmChan + s.Equal(hash, confirmBlock) +} + func TestAgreement(t *testing.T) { suite.Run(t, new(AgreementTestSuite)) } diff --git a/core/authenticator_test.go b/core/authenticator_test.go index 074d521..b06b0a6 100644 --- a/core/authenticator_test.go +++ b/core/authenticator_test.go @@ -55,7 +55,7 @@ func (s *AuthenticatorTestSuite) TestVote() { k := s.setupAuthenticator() v := &types.Vote{ ProposerID: types.NodeID{Hash: common.NewRandomHash()}, - Type: types.VoteConfirm, + Type: types.VoteCom, BlockHash: common.NewRandomHash(), Period: 123, Position: types.Position{ diff --git a/core/consensus.go b/core/consensus.go index e8d8b15..0781505 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -79,14 +79,15 @@ func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { }() } -func (recv *consensusBAReceiver) ProposeBlock() { +func (recv *consensusBAReceiver) ProposeBlock() common.Hash { block := recv.consensus.proposeBlock(recv.chainID, recv.round) recv.consensus.baModules[recv.chainID].addCandidateBlock(block) if err := recv.consensus.preProcessBlock(block); err != nil { log.Println(err) - return + return common.Hash{} } recv.consensus.network.BroadcastBlock(block) + return block.Hash } func (recv *consensusBAReceiver) ConfirmBlock( @@ -357,9 +358,6 @@ BALoop: break BALoop default: } - for i := 0; i < agreement.clocks(); i++ { - <-tick - } select { case newNotary := <-recv.restartNotary: if newNotary { @@ -383,6 +381,19 @@ BALoop: log.Printf("[%s] %s\n", con.ID.String(), err) break BALoop } + for i := 0; i < agreement.clocks(); i++ { + // Priority select for agreement.done(). + select { + case <-agreement.done(): + continue BALoop + default: + } + select { + case <-agreement.done(): + continue BALoop + case <-tick: + } + } } } diff --git a/core/crypto_test.go b/core/crypto_test.go index b0161d9..c68b0c3 100644 --- a/core/crypto_test.go +++ b/core/crypto_test.go @@ -132,7 +132,7 @@ func (s *CryptoTestSuite) TestVoteSignature() { nID := types.NewNodeID(pub) vote := &types.Vote{ ProposerID: nID, - Type: types.VoteAck, + Type: types.VoteInit, BlockHash: common.NewRandomHash(), Period: 1, } @@ -141,7 +141,7 @@ func (s *CryptoTestSuite) TestVoteSignature() { ok, err := verifyVoteSignature(vote) s.Require().NoError(err) s.True(ok) - vote.Type = types.VoteConfirm + vote.Type = types.VoteCom ok, err = verifyVoteSignature(vote) s.Require().NoError(err) s.False(ok) diff --git a/core/types/vote.go b/core/types/vote.go index 5ef7db3..bbf2f26 100644 --- a/core/types/vote.go +++ b/core/types/vote.go @@ -29,9 +29,9 @@ type VoteType byte // VoteType enum. const ( - VoteAck VoteType = iota - VoteConfirm - VotePass + VoteInit VoteType = iota + VotePreCom + VoteCom // Do not add any type below MaxVoteType. MaxVoteType ) -- cgit v1.2.3