From 7b804a5950981324e683085cbbcfee5fa9162f6f Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Wed, 29 Aug 2018 10:55:23 +0800 Subject: core: Add agreement module (#77) --- core/agreement-state.go | 288 +++++++++++++++++++++++++ core/agreement-state_test.go | 493 +++++++++++++++++++++++++++++++++++++++++++ core/agreement.go | 262 +++++++++++++++++++++++ core/leader-selector.go | 43 ++++ 4 files changed, 1086 insertions(+) create mode 100644 core/agreement-state.go create mode 100644 core/agreement-state_test.go create mode 100644 core/agreement.go create mode 100644 core/leader-selector.go (limited to 'core') diff --git a/core/agreement-state.go b/core/agreement-state.go new file mode 100644 index 0000000..6022bf3 --- /dev/null +++ b/core/agreement-state.go @@ -0,0 +1,288 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// . + +package core + +import ( + "fmt" + "sync/atomic" + + "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +// Errors for agreement state module. +var ( + ErrNoEnoughVoteInPrepareState = fmt.Errorf("no enough vote in prepare state") + ErrNoEnoughVoteInAckState = fmt.Errorf("no enough vote in ack state") +) + +// agreementStateType is the state of agreement +type agreementStateType int + +// agreementStateType enum. +const ( + statePrepare agreementStateType = iota + stateAck + stateConfirm + statePass1 + statePass2 +) + +var nullBlockHash = common.Hash{} + +type agreementState interface { + state() agreementStateType + nextState() (agreementState, error) + receiveVote() error + clocks() int + terminate() +} + +//----- PrepareState ----- +type prepareState struct { + a *agreementData +} + +func newPrepareState(a *agreementData) *prepareState { + return &prepareState{a: a} +} + +func (s *prepareState) state() agreementStateType { return statePrepare } +func (s *prepareState) clocks() int { return 0 } +func (s *prepareState) terminate() {} +func (s *prepareState) nextState() (agreementState, error) { + hash := common.Hash{} + if s.a.period == 1 { + hash = s.a.blockProposer().Hash + } else { + var proposed bool + hash, proposed = s.a.countVote(s.a.period-1, types.VotePass) + if !proposed { + return nil, ErrNoEnoughVoteInPrepareState + } + if hash == nullBlockHash { + hash = s.a.blocks[s.a.ID].Hash + } else { + delete(s.a.blocks, s.a.ID) + } + } + s.a.blockChan <- hash + return newAckState(s.a), nil +} +func (s *prepareState) receiveVote() error { return nil } + +// ----- AckState ----- +type ackState struct { + a *agreementData +} + +func newAckState(a *agreementData) *ackState { + return &ackState{a: a} +} + +func (s *ackState) state() agreementStateType { return stateAck } +func (s *ackState) clocks() int { return 2 } +func (s *ackState) terminate() {} +func (s *ackState) nextState() (agreementState, error) { + acked := false + hash := common.Hash{} + if s.a.period == 1 { + acked = true + } else { + hash, acked = s.a.countVote(s.a.period-1, types.VotePass) + } + if !acked { + return nil, ErrNoEnoughVoteInAckState + } + if hash == nullBlockHash { + hash = s.a.leader.leaderBlockHash() + } + s.a.voteChan <- &types.Vote{ + Type: types.VoteAck, + BlockHash: hash, + Period: s.a.period, + } + return newConfirmState(s.a), nil +} +func (s *ackState) receiveVote() error { return nil } + +// ----- ConfirmState ----- +type confirmState struct { + a *agreementData + voted *atomic.Value +} + +func newConfirmState(a *agreementData) *confirmState { + voted := &atomic.Value{} + voted.Store(false) + return &confirmState{ + a: a, + voted: voted, + } +} + +func (s *confirmState) state() agreementStateType { return stateConfirm } +func (s *confirmState) clocks() int { return 2 } +func (s *confirmState) terminate() {} +func (s *confirmState) nextState() (agreementState, error) { + return newPass1State(s.a), nil +} +func (s *confirmState) receiveVote() error { + if s.voted.Load().(bool) { + return nil + } + hash, ok := s.a.countVote(s.a.period, types.VoteAck) + if !ok { + return nil + } + if hash != nullBlockHash { + s.a.voteChan <- &types.Vote{ + Type: types.VoteConfirm, + BlockHash: hash, + Period: s.a.period, + } + } + s.voted.Store(true) + return nil +} + +// ----- Pass1State ----- +type pass1State struct { + a *agreementData +} + +func newPass1State(a *agreementData) *pass1State { + return &pass1State{a: a} +} + +func (s *pass1State) state() agreementStateType { return statePass1 } +func (s *pass1State) clocks() int { return 0 } +func (s *pass1State) terminate() {} +func (s *pass1State) nextState() (agreementState, error) { + voteDefault := false + s.a.votesLock.RLock() + defer s.a.votesLock.RUnlock() + if vote, exist := + s.a.votes[s.a.period][types.VoteConfirm][s.a.ID]; exist { + s.a.voteChan <- &types.Vote{ + Type: types.VotePass, + BlockHash: vote.BlockHash, + Period: s.a.period, + } + } else if s.a.period == 1 { + voteDefault = true + } else { + hash, ok := s.a.countVote(s.a.period-1, types.VotePass) + if ok { + if hash == nullBlockHash { + s.a.voteChan <- &types.Vote{ + Type: types.VotePass, + BlockHash: hash, + Period: s.a.period, + } + } else { + voteDefault = true + } + } else { + voteDefault = true + } + } + if voteDefault { + s.a.voteChan <- &types.Vote{ + Type: types.VotePass, + BlockHash: s.a.defaultBlock, + Period: s.a.period, + } + } + return newPass2State(s.a), nil +} +func (s *pass1State) receiveVote() error { return nil } + +// ----- Pass2State ----- +type pass2State struct { + a *agreementData + voted *atomic.Value + enoughPassVote chan common.Hash + terminateChan chan struct{} +} + +func newPass2State(a *agreementData) *pass2State { + voted := &atomic.Value{} + voted.Store(false) + return &pass2State{ + a: a, + voted: voted, + enoughPassVote: make(chan common.Hash), + terminateChan: make(chan struct{}), + } +} + +func (s *pass2State) state() agreementStateType { return statePass2 } +func (s *pass2State) clocks() int { return 0 } +func (s *pass2State) terminate() { + s.terminateChan <- struct{}{} +} +func (s *pass2State) nextState() (agreementState, error) { + select { + case <-s.terminateChan: + break + case hash := <-s.enoughPassVote: + s.a.votesLock.RLock() + defer s.a.votesLock.RUnlock() + s.a.defaultBlock = hash + s.a.period++ + oldBlock := s.a.blocks[s.a.ID] + s.a.blocks = map[types.ValidatorID]*types.Block{ + s.a.ID: oldBlock, + } + } + return newPrepareState(s.a), nil +} +func (s *pass2State) receiveVote() error { + if s.voted.Load().(bool) { + return nil + } + ackHash, ok := s.a.countVote(s.a.period, types.VoteAck) + if ok && ackHash != nullBlockHash { + s.a.voteChan <- &types.Vote{ + Type: types.VotePass, + BlockHash: ackHash, + Period: s.a.period, + } + } else if s.a.period > 1 { + if _, exist := + s.a.votes[s.a.period][types.VoteConfirm][s.a.ID]; !exist { + hash, ok := s.a.countVote(s.a.period-1, types.VotePass) + if ok && hash == nullBlockHash { + s.a.voteChan <- &types.Vote{ + Type: types.VotePass, + BlockHash: hash, + Period: s.a.period, + } + } + } + } + go func() { + hash, ok := s.a.countVote(s.a.period, types.VotePass) + if ok { + s.enoughPassVote <- hash + } + }() + s.voted.Store(true) + return nil +} diff --git a/core/agreement-state_test.go b/core/agreement-state_test.go new file mode 100644 index 0000000..14b7c6a --- /dev/null +++ b/core/agreement-state_test.go @@ -0,0 +1,493 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// . + +package core + +import ( + "testing" + "time" + + "github.com/stretchr/testify/suite" + + "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/types" + "github.com/dexon-foundation/dexon-consensus-core/crypto" + "github.com/dexon-foundation/dexon-consensus-core/crypto/eth" +) + +type AgreementTestSuite struct { + suite.Suite + ID types.ValidatorID + prvKey map[types.ValidatorID]crypto.PrivateKey + voteChan chan *types.Vote + blockChan chan common.Hash + confirmChan chan common.Hash + block map[common.Hash]*types.Block +} + +func (s *AgreementTestSuite) blockProposer() *types.Block { + block := &types.Block{ + ProposerID: s.ID, + Hash: common.NewRandomHash(), + } + s.block[block.Hash] = block + return block +} + +func (s *AgreementTestSuite) prepareVote( + vID types.ValidatorID, voteType types.VoteType, blockHash common.Hash, + period uint64) ( + vote *types.Vote) { + prvKey, exist := s.prvKey[vID] + s.Require().True(exist) + vote = &types.Vote{ + ProposerID: vID, + Type: voteType, + BlockHash: blockHash, + Period: period, + } + var err error + vote.Signature, err = prvKey.Sign(hashVote(vote)) + s.Require().Nil(err) + return +} + +func (s *AgreementTestSuite) SetupTest() { + prvKey, err := eth.NewPrivateKey() + s.Require().Nil(err) + s.ID = types.NewValidatorID(prvKey.PublicKey()) + s.prvKey = map[types.ValidatorID]crypto.PrivateKey{ + s.ID: prvKey, + } + s.voteChan = make(chan *types.Vote, 100) + s.blockChan = make(chan common.Hash, 100) + s.confirmChan = make(chan common.Hash, 100) + s.block = make(map[common.Hash]*types.Block) +} + +func (s *AgreementTestSuite) newAgreement(numValidator int) *agreement { + validators := make(types.ValidatorIDs, numValidator-1) + for i := range validators { + prvKey, err := eth.NewPrivateKey() + s.Require().Nil(err) + validators[i] = types.NewValidatorID(prvKey.PublicKey()) + s.prvKey[validators[i]] = prvKey + } + validators = append(validators, s.ID) + agreement, voteChan, blockChan, confirmChan := newAgreement( + s.ID, + validators, + eth.SigToPub, + s.blockProposer, + ) + go func() { + for { + s.voteChan <- <-voteChan + } + }() + go func() { + for { + s.blockChan <- <-blockChan + } + }() + go func() { + for { + s.confirmChan <- <-confirmChan + } + }() + return agreement +} + +func (s *AgreementTestSuite) TestPrepareState() { + a := s.newAgreement(4) + state := newPrepareState(a.data) + s.Equal(statePrepare, state.state()) + s.Equal(0, state.clocks()) + + // For period == 1, proposing a new block. + a.data.period = 1 + newState, err := state.nextState() + s.Require().Nil(err) + var proposedBlock common.Hash + select { + case proposedBlock = <-s.blockChan: + s.NotEqual(common.Hash{}, proposedBlock) + err := a.processBlock(s.block[proposedBlock]) + s.Require().Nil(err) + case <-time.After(50 * time.Millisecond): + s.FailNow("Expecting a proposed block.\n") + } + s.Equal(stateAck, newState.state()) + + // For period >= 2, if the pass-vote for block b equal to {} + // is more than 2f+1, proposing the block previously proposed. + a.data.period = 2 + _, err = state.nextState() + s.Equal(ErrNoEnoughVoteInPrepareState, err) + + for vID := range a.validators { + vote := s.prepareVote(vID, types.VotePass, common.Hash{}, 1) + s.Require().Nil(a.processVote(vote)) + } + + newState, err = state.nextState() + s.Require().Nil(err) + select { + case hash := <-s.blockChan: + s.Equal(proposedBlock, hash) + case <-time.After(50 * time.Millisecond): + s.FailNow("Expecting a proposed block.\n") + } + s.Equal(stateAck, newState.state()) + + // For period >= 2, if the pass-vote for block v not equal to {} + // is more than 2f+1, proposing the block v. + a.data.period = 3 + block := s.blockProposer() + block.ProposerID.Hash = common.NewRandomHash() + err = a.processBlock(block) + s.Require().Nil(err) + for vID := range a.validators { + vote := s.prepareVote(vID, types.VotePass, block.Hash, 2) + s.Require().Nil(a.processVote(vote)) + } + + newState, err = state.nextState() + s.Require().Nil(err) + select { + case hash := <-s.blockChan: + s.Equal(block.Hash, hash) + case <-time.After(50 * time.Millisecond): + s.FailNow("Expecting a proposed block.\n") + } + s.Equal(stateAck, newState.state()) +} + +func (s *AgreementTestSuite) TestAckState() { + a := s.newAgreement(4) + state := newAckState(a.data) + s.Equal(stateAck, state.state()) + s.Equal(2, state.clocks()) + + blocks := make([]*types.Block, 3) + for i := range blocks { + blocks[i] = s.blockProposer() + blocks[i].ProposerID.Hash = common.NewRandomHash() + err := a.processBlock(blocks[i]) + s.Require().Nil(err) + } + + // For period 1, propose ack-vote for the block having largest potential. + a.data.period = 1 + newState, err := state.nextState() + s.Require().Nil(err) + select { + case vote := <-s.voteChan: + s.Equal(types.VoteAck, vote.Type) + s.NotEqual(common.Hash{}, vote.BlockHash) + case <-time.After(50 * time.Millisecond): + s.FailNow("Expecting a proposed vote.\n") + } + s.Equal(stateConfirm, newState.state()) + + // For period >= 2, if block v equal to {} has more than 2f+1 pass-vote + // in period 1, propose ack-vote for the block having largest potential. + a.data.period = 2 + for vID := range a.validators { + vote := s.prepareVote(vID, types.VotePass, common.Hash{}, 1) + s.Require().Nil(a.processVote(vote)) + } + newState, err = state.nextState() + s.Require().Nil(err) + select { + case vote := <-s.voteChan: + s.Equal(types.VoteAck, vote.Type) + s.NotEqual(common.Hash{}, vote.BlockHash) + case <-time.After(50 * time.Millisecond): + s.FailNow("Expecting a proposed vote.\n") + } + s.Equal(stateConfirm, newState.state()) + + // For period >= 2, if block v not equal to {} has more than 2f+1 pass-vote + // in period 1, propose ack-vote for block v. + hash := blocks[0].Hash + a.data.period = 3 + for vID := range a.validators { + vote := s.prepareVote(vID, types.VotePass, hash, 2) + s.Require().Nil(a.processVote(vote)) + } + newState, err = state.nextState() + s.Require().Nil(err) + select { + case vote := <-s.voteChan: + s.Equal(types.VoteAck, vote.Type) + s.Equal(hash, vote.BlockHash) + case <-time.After(50 * time.Millisecond): + s.FailNow("Expecting a proposed vote.\n") + } + s.Equal(stateConfirm, newState.state()) +} + +func (s *AgreementTestSuite) TestConfirmState() { + a := s.newAgreement(4) + state := newConfirmState(a.data) + s.Equal(stateConfirm, state.state()) + s.Equal(2, state.clocks()) + + // If there are 2f+1 ack-votes for block v not equal to {}, + // propose a confirm-vote for block v. + a.data.period = 1 + block := s.blockProposer() + s.Require().Nil(a.processBlock(block)) + for vID := range a.validators { + vote := s.prepareVote(vID, types.VoteAck, block.Hash, 1) + s.Require().Nil(a.processVote(vote)) + } + s.Require().Nil(state.receiveVote()) + newState, err := state.nextState() + s.Require().Nil(err) + select { + case vote := <-s.voteChan: + s.Equal(types.VoteConfirm, vote.Type) + s.Equal(block.Hash, vote.BlockHash) + case <-time.After(50 * time.Millisecond): + s.FailNow("Expecting a proposed vote.\n") + } + s.Equal(statePass1, newState.state()) + + // Else, no vote is propose in this state. + a.data.period = 2 + s.Require().Nil(state.receiveVote()) + newState, err = state.nextState() + s.Require().Nil(err) + select { + case <-s.voteChan: + s.FailNow("Unexpected proposed vote.\n") + case <-time.After(50 * time.Millisecond): + } + s.Equal(statePass1, newState.state()) + + // If there are 2f+1 ack-vote for block v equal to {}, + // no vote should be proposed. + a.data.period = 3 + for vID := range a.validators { + vote := s.prepareVote(vID, types.VoteAck, common.Hash{}, 3) + s.Require().Nil(a.processVote(vote)) + } + s.Require().Nil(state.receiveVote()) + newState, err = state.nextState() + s.Require().Nil(err) + select { + case <-s.voteChan: + s.FailNow("Unexpected proposed vote.\n") + case <-time.After(50 * time.Millisecond): + } + s.Equal(statePass1, newState.state()) +} + +func (s *AgreementTestSuite) TestPass1State() { + a := s.newAgreement(4) + state := newPass1State(a.data) + s.Equal(statePass1, state.state()) + s.Equal(0, state.clocks()) + + // If confirm-vote was proposed in the same period, + // propose pass-vote with same block. + a.data.period = 1 + hash := common.NewRandomHash() + vote := s.prepareVote(s.ID, types.VoteConfirm, hash, 1) + s.Require().Nil(a.processVote(vote)) + newState, err := state.nextState() + s.Require().Nil(err) + select { + case vote := <-s.voteChan: + s.Equal(types.VotePass, vote.Type) + s.Equal(hash, vote.BlockHash) + case <-time.After(50 * time.Millisecond): + s.FailNow("Expecting a proposed vote.\n") + } + s.Equal(statePass2, newState.state()) + + // Else if period >= 2 and has 2f+1 pass-vote in period-1 for block {}, + // propose pass-vote for block {}. + a.data.period = 2 + for vID := range a.validators { + vote := s.prepareVote(vID, types.VotePass, common.Hash{}, 1) + s.Require().Nil(a.processVote(vote)) + } + vote = s.prepareVote(s.ID, types.VoteAck, common.Hash{}, 2) + s.Require().Nil(a.processVote(vote)) + newState, err = state.nextState() + s.Require().Nil(err) + select { + case vote := <-s.voteChan: + s.Equal(types.VotePass, vote.Type) + s.Equal(common.Hash{}, vote.BlockHash) + case <-time.After(50 * time.Millisecond): + s.FailNow("Expecting a proposed vote.\n") + } + s.Equal(statePass2, newState.state()) + + // Else, propose pass-vote for default block. + a.data.period = 3 + block := s.blockProposer() + a.data.defaultBlock = block.Hash + hash = common.NewRandomHash() + for vID := range a.validators { + vote := s.prepareVote(vID, types.VotePass, hash, 2) + s.Require().Nil(a.processVote(vote)) + } + vote = s.prepareVote(s.ID, types.VoteAck, common.Hash{}, 3) + s.Require().Nil(a.processVote(vote)) + newState, err = state.nextState() + s.Require().Nil(err) + select { + case vote := <-s.voteChan: + s.Equal(types.VotePass, vote.Type) + s.Equal(block.Hash, vote.BlockHash) + case <-time.After(50 * time.Millisecond): + s.FailNow("Expecting a proposed vote.\n") + } + s.Equal(statePass2, newState.state()) + + // Period == 1 is also else condition. + a = s.newAgreement(4) + state = newPass1State(a.data) + a.data.period = 1 + block = s.blockProposer() + a.data.defaultBlock = block.Hash + hash = common.NewRandomHash() + vote = s.prepareVote(s.ID, types.VoteAck, common.Hash{}, 1) + s.Require().Nil(a.processVote(vote)) + newState, err = state.nextState() + s.Require().Nil(err) + select { + case vote := <-s.voteChan: + s.Equal(types.VotePass, vote.Type) + s.Equal(block.Hash, vote.BlockHash) + case <-time.After(50 * time.Millisecond): + s.FailNow("Expecting a proposed vote.\n") + } + s.Equal(statePass2, newState.state()) + + // No enought pass-vote for period-1. + a.data.period = 4 + vote = s.prepareVote(s.ID, types.VoteAck, common.Hash{}, 4) + s.Require().Nil(a.processVote(vote)) + newState, err = state.nextState() + s.Require().Nil(err) + select { + case vote := <-s.voteChan: + s.Equal(types.VotePass, vote.Type) + s.Equal(block.Hash, vote.BlockHash) + case <-time.After(50 * time.Millisecond): + s.FailNow("Expecting a proposed vote.\n") + } + s.Equal(statePass2, newState.state()) +} + +func (s *AgreementTestSuite) TestPass2State() { + a := s.newAgreement(4) + state := newPass2State(a.data) + s.Equal(statePass2, state.state()) + + // If there are 2f+1 ack-vote for block v not equal to {}, + // propose pass-vote for v. + block := s.blockProposer() + s.Require().Nil(a.processBlock(block)) + for vID := range a.validators { + vote := s.prepareVote(vID, types.VoteAck, block.Hash, 1) + s.Require().Nil(a.processVote(vote)) + } + s.Require().Nil(state.receiveVote()) + select { + case vote := <-s.voteChan: + s.Equal(types.VotePass, vote.Type) + s.Equal(block.Hash, vote.BlockHash) + case <-time.After(50 * time.Millisecond): + s.FailNow("Expecting a proposed vote.\n") + } + // Only propose one vote. + s.Require().Nil(state.receiveVote()) + select { + case <-s.voteChan: + s.FailNow("Unexpected proposed vote.\n") + case <-time.After(50 * time.Millisecond): + } + + // If period >= 2 and + // there are 2f+1 pass-vote in period-1 for block v equal to {} and + // no confirm-vote is proposed, propose pass-vote for {}. + a = s.newAgreement(4) + state = newPass2State(a.data) + a.data.period = 2 + for vID := range a.validators { + vote := s.prepareVote(vID, types.VotePass, common.Hash{}, 1) + s.Require().Nil(a.processVote(vote)) + } + vote := s.prepareVote(s.ID, types.VoteAck, common.Hash{}, 2) + s.Require().Nil(a.processVote(vote)) + s.Require().Nil(state.receiveVote()) + select { + case vote := <-s.voteChan: + s.Equal(types.VotePass, vote.Type) + s.Equal(common.Hash{}, vote.BlockHash) + case <-time.After(50 * time.Millisecond): + s.FailNow("Expecting a proposed vote.\n") + } + + // Test terminate. + ok := make(chan struct{}) + go func() { + go state.terminate() + newState, err := state.nextState() + s.Require().Nil(err) + s.Equal(statePrepare, newState.state()) + ok <- struct{}{} + }() + select { + case <-ok: + case <-time.After(50 * time.Millisecond): + s.FailNow("Terminate fail.\n") + } + + // If there are 2f+1 pass-vote, proceed to next period + a = s.newAgreement(4) + state = newPass2State(a.data) + a.data.period = 1 + for vID := range a.validators { + vote := s.prepareVote(vID, types.VotePass, common.Hash{}, 1) + s.Require().Nil(a.processVote(vote)) + } + s.Require().Nil(state.receiveVote()) + go func() { + newState, err := state.nextState() + s.Require().Nil(err) + s.Equal(statePrepare, newState.state()) + s.Equal(uint64(2), a.data.period) + ok <- struct{}{} + }() + select { + case <-ok: + case <-time.After(50 * time.Millisecond): + s.FailNow("Unable to proceed to next state.\n") + } +} + +func TestAgreement(t *testing.T) { + suite.Run(t, new(AgreementTestSuite)) +} diff --git a/core/agreement.go b/core/agreement.go new file mode 100644 index 0000000..6aeae07 --- /dev/null +++ b/core/agreement.go @@ -0,0 +1,262 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// . + +package core + +import ( + "fmt" + "sync" + "sync/atomic" + + "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/types" + "github.com/dexon-foundation/dexon-consensus-core/crypto" +) + +// Errors for agreement module. +var ( + ErrNotValidator = fmt.Errorf("not a validaotr") + ErrIncorrectVoteSignature = fmt.Errorf("incorrect vote signature") + ErrForkVote = fmt.Errorf("fork vote") +) + +// ErrFork for fork error in agreement. +type ErrFork struct { + vID types.ValidatorID + old, new common.Hash +} + +func (e *ErrFork) Error() string { + return fmt.Sprintf("fork is found for %s, old %s, new %s", + e.vID.String(), e.old, e.new) +} + +type blockProposerFn func() *types.Block + +func newVoteListMap() []map[types.ValidatorID]*types.Vote { + listMap := make([]map[types.ValidatorID]*types.Vote, types.MaxVoteType) + for idx := range listMap { + listMap[idx] = make(map[types.ValidatorID]*types.Vote) + } + return listMap +} + +// position is the current round of the agreement. +type position struct { + ShardID uint64 + ChainID uint64 + Height uint64 +} + +// agreementData is the data for agreementState. +type agreementData struct { + voteChan chan *types.Vote + blockChan chan common.Hash + + ID types.ValidatorID + leader *leaderSelector + defaultBlock common.Hash + period uint64 + requiredVote int + votes map[uint64][]map[types.ValidatorID]*types.Vote + votesLock sync.RWMutex + blocks map[types.ValidatorID]*types.Block + blockProposer blockProposerFn +} + +// agreement is the agreement protocal describe in the Crypto Shuffle Algorithm. +type agreement struct { + confirmChan chan common.Hash + + state agreementState + data *agreementData + aID *atomic.Value + validators map[types.ValidatorID]struct{} + sigToPub SigToPubFn + hasOutput bool +} + +// newAgreement creates a agreement instance. +func newAgreement( + ID types.ValidatorID, + validators types.ValidatorIDs, + sigToPub SigToPubFn, + blockProposer blockProposerFn) ( + *agreement, + <-chan *types.Vote, + <-chan common.Hash, + <-chan common.Hash, +) { + // TODO(jimmy-dexon): use callback instead of channel. + voteChan := make(chan *types.Vote, 3) + blockChan := make(chan common.Hash) + confirmChan := make(chan common.Hash) + agreement := &agreement{ + confirmChan: confirmChan, + data: &agreementData{ + ID: ID, + leader: newLeaderSelector(), + voteChan: voteChan, + blockChan: blockChan, + blockProposer: blockProposer, + }, + aID: &atomic.Value{}, + sigToPub: sigToPub, + } + agreement.restart(validators) + return agreement, voteChan, blockChan, confirmChan +} + +// terminate the current running state. +func (a *agreement) terminate() { + if a.state != nil { + a.state.terminate() + } +} + +// restart the agreement +func (a *agreement) restart(validators types.ValidatorIDs) { + a.data.votesLock.Lock() + defer a.data.votesLock.Unlock() + a.data.votes = make(map[uint64][]map[types.ValidatorID]*types.Vote) + a.data.votes[1] = newVoteListMap() + a.data.period = 1 + a.data.blocks = make(map[types.ValidatorID]*types.Block) + a.data.requiredVote = len(validators)/3*2 + 1 + a.hasOutput = false + a.state = newPrepareState(a.data) + a.validators = make(map[types.ValidatorID]struct{}) + for _, v := range validators { + a.validators[v] = struct{}{} + } +} + +// clocks returns how many time this state is required. +func (a *agreement) clocks() int { + return a.state.clocks() +} + +// agreementID returns the current agreementID. +func (a *agreement) agreementID() position { + return a.aID.Load().(position) +} + +// setAgreementID sets the current agreementID. +func (a *agreement) setAgreementID(ID position) { + a.aID.Store(ID) +} + +// nextState is called at the spcifi clock time. +func (a *agreement) nextState() (err error) { + a.state, err = a.state.nextState() + return +} + +func (a *agreement) sanityCheck(vote *types.Vote) error { + if _, exist := a.validators[vote.ProposerID]; !exist { + return ErrNotValidator + } + ok, err := verifyVoteSignature(vote, a.sigToPub) + if err != nil { + return err + } + if !ok { + return ErrIncorrectVoteSignature + } + if _, exist := a.data.votes[vote.Period]; exist { + if oldVote, exist := + a.data.votes[vote.Period][vote.Type][vote.ProposerID]; exist { + if vote.BlockHash != oldVote.BlockHash { + return ErrForkVote + } + } + } + return nil +} + +// prepareVote prepares a vote. +func (a *agreement) prepareVote(vote *types.Vote, prv crypto.PrivateKey) ( + err error) { + hash := hashVote(vote) + vote.Signature, err = prv.Sign(hash) + return +} + +// processVote is the entry point for processing Vote. +func (a *agreement) processVote(vote *types.Vote) error { + vote = vote.Clone() + if err := a.sanityCheck(vote); err != nil { + return err + } + if func() bool { + a.data.votesLock.Lock() + defer a.data.votesLock.Unlock() + if _, exist := a.data.votes[vote.Period]; !exist { + a.data.votes[vote.Period] = newVoteListMap() + } + a.data.votes[vote.Period][vote.Type][vote.ProposerID] = vote + if !a.hasOutput && vote.Type == types.VoteConfirm { + if len(a.data.votes[vote.Period][types.VoteConfirm]) >= + a.data.requiredVote { + a.hasOutput = true + a.confirmChan <- vote.BlockHash + } + } + return true + }() { + return a.state.receiveVote() + } + return nil +} + +// processBlock is the entry point for processing Block. +func (a *agreement) processBlock(block *types.Block) error { + if b, exist := a.data.blocks[block.ProposerID]; exist { + if b.Hash != block.Hash { + return &ErrFork{block.ProposerID, b.Hash, block.Hash} + } + return nil + } + a.data.blocks[block.ProposerID] = block + a.data.leader.processBlock(block) + return nil +} + +func (a *agreementData) countVote(period uint64, voteType types.VoteType) ( + blockHash common.Hash, ok bool) { + a.votesLock.RLock() + defer a.votesLock.RUnlock() + votes, exist := a.votes[period] + if !exist { + return + } + candidate := make(map[common.Hash]int) + for _, vote := range votes[voteType] { + if _, exist := candidate[vote.BlockHash]; !exist { + candidate[vote.BlockHash] = 0 + } + candidate[vote.BlockHash]++ + } + for candidateHash, votes := range candidate { + if votes >= a.requiredVote { + blockHash = candidateHash + ok = true + return + } + } + return +} diff --git a/core/leader-selector.go b/core/leader-selector.go new file mode 100644 index 0000000..cea31ce --- /dev/null +++ b/core/leader-selector.go @@ -0,0 +1,43 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// . + +package core + +import ( + "sort" + + "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +type leaderSelector struct { + common.Hashes +} + +func newLeaderSelector() *leaderSelector { + return &leaderSelector{} +} + +func (l *leaderSelector) leaderBlockHash() common.Hash { + // TODO(jimmy-dexon): return leader based on paper. + sort.Sort(l.Hashes) + return l.Hashes[len(l.Hashes)-1] +} + +func (l *leaderSelector) processBlock(block *types.Block) { + l.Hashes = append(l.Hashes, block.Hash) +} -- cgit v1.2.3