diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2019-03-25 10:14:42 +0800 |
---|---|---|
committer | Jimmy Hu <jimmy.hu@dexon.org> | 2019-03-27 15:25:10 +0800 |
commit | b8ced165b1fb03394f8758e08148b0e5d06aa07b (patch) | |
tree | fa327764a4cf564bb4aa39c1570ffd7f292c7ba1 | |
parent | 6efe199cb38eb4cb9a9a64d98ff5f8c4fb997da7 (diff) | |
download | dexon-consensus-b8ced165b1fb03394f8758e08148b0e5d06aa07b.tar dexon-consensus-b8ced165b1fb03394f8758e08148b0e5d06aa07b.tar.gz dexon-consensus-b8ced165b1fb03394f8758e08148b0e5d06aa07b.tar.bz2 dexon-consensus-b8ced165b1fb03394f8758e08148b0e5d06aa07b.tar.lz dexon-consensus-b8ced165b1fb03394f8758e08148b0e5d06aa07b.tar.xz dexon-consensus-b8ced165b1fb03394f8758e08148b0e5d06aa07b.tar.zst dexon-consensus-b8ced165b1fb03394f8758e08148b0e5d06aa07b.zip |
core: Remove agreement result (#514)
* core: remove agreement result for round with randomness
* remove agr test in syncer
* fixup
* remove randomness field from agreement result
* modify test
-rw-r--r-- | core/agreement-mgr.go | 9 | ||||
-rw-r--r-- | core/agreement.go | 20 | ||||
-rw-r--r-- | core/agreement_test.go | 18 | ||||
-rw-r--r-- | core/blockchain.go | 44 | ||||
-rw-r--r-- | core/consensus.go | 135 | ||||
-rw-r--r-- | core/syncer/agreement.go | 119 | ||||
-rw-r--r-- | core/syncer/agreement_test.go | 110 | ||||
-rw-r--r-- | core/syncer/consensus.go | 7 | ||||
-rw-r--r-- | core/test/network.go | 100 | ||||
-rw-r--r-- | core/test/network_test.go | 75 | ||||
-rw-r--r-- | core/types/block-randomness.go | 21 | ||||
-rw-r--r-- | integration_test/byzantine_test.go | 66 |
12 files changed, 492 insertions, 232 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index 02b7b7b..bc6923a 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -304,6 +304,15 @@ func (mgr *agreementMgr) processAgreementResult( return nil } +func (mgr *agreementMgr) processFinalizedBlock(block *types.Block) error { + aID := mgr.baModule.agreementID() + if block.Position.Older(aID) { + return nil + } + mgr.baModule.processFinalizedBlock(block) + return nil +} + func (mgr *agreementMgr) stop() { // Stop all running agreement modules. func() { diff --git a/core/agreement.go b/core/agreement.go index 1ba9034..5e7b7de 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -42,6 +42,7 @@ var ( ErrNotInNotarySet = fmt.Errorf("not in notary set") ErrIncorrectVoteSignature = fmt.Errorf("incorrect vote signature") ErrIncorrectVotePartialSignature = fmt.Errorf("incorrect vote psig") + ErrMismatchBlockPosition = fmt.Errorf("mismatch block position") ) // ErrFork for fork error in agreement. @@ -513,6 +514,25 @@ func (a *agreement) processVote(vote *types.Vote) error { return nil } +func (a *agreement) processFinalizedBlock(block *types.Block) { + a.lock.Lock() + defer a.lock.Unlock() + if a.hasOutput { + return + } + aID := a.agreementID() + if aID.Older(block.Position) { + return + } + a.addCandidateBlockNoLock(block) + a.hasOutput = true + a.data.recv.ConfirmBlock(block.Hash, nil) + if a.doneChan != nil { + close(a.doneChan) + a.doneChan = nil + } +} + func (a *agreement) done() <-chan struct{} { a.lock.Lock() defer a.lock.Unlock() diff --git a/core/agreement_test.go b/core/agreement_test.go index dfeb783..175249f 100644 --- a/core/agreement_test.go +++ b/core/agreement_test.go @@ -559,6 +559,24 @@ func (s *AgreementTestSuite) TestFindBlockInPendingSet() { s.Require().NotNil(block) } +func (s *AgreementTestSuite) TestConfirmWithBlock() { + a, _ := s.newAgreement(4, -1, func(*types.Block) (bool, error) { + return true, nil + }) + block := &types.Block{ + Hash: common.NewRandomHash(), + Position: a.agreementID(), + Finalization: types.FinalizationResult{ + Randomness: []byte{0x1, 0x2, 0x3, 0x4}, + }, + } + a.processFinalizedBlock(block) + s.Require().Len(s.confirmChan, 1) + confirm := <-s.confirmChan + s.Equal(block.Hash, confirm) + s.True(a.confirmed()) +} + func TestAgreement(t *testing.T) { suite.Run(t, new(AgreementTestSuite)) } diff --git a/core/blockchain.go b/core/blockchain.go index 0ecd083..798a080 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -127,19 +127,18 @@ type tsigVerifierGetter interface { } type blockChain struct { - lock sync.RWMutex - ID types.NodeID - lastConfirmed *types.Block - lastDelivered *types.Block - signer *utils.Signer - vGetter tsigVerifierGetter - app Application - logger common.Logger - pendingRandomnesses map[types.Position]*types.AgreementResult - configs []blockChainConfig - pendingBlocks pendingBlockRecords - confirmedBlocks types.BlocksByPosition - dMoment time.Time + lock sync.RWMutex + ID types.NodeID + lastConfirmed *types.Block + lastDelivered *types.Block + signer *utils.Signer + vGetter tsigVerifierGetter + app Application + logger common.Logger + configs []blockChainConfig + pendingBlocks pendingBlockRecords + confirmedBlocks types.BlocksByPosition + dMoment time.Time } func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block, @@ -154,8 +153,6 @@ func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block, app: app, logger: logger, dMoment: dMoment, - pendingRandomnesses: make( - map[types.Position]*types.AgreementResult), } } @@ -228,8 +225,6 @@ func (bc *blockChain) extractBlocks() (ret []*types.Block) { // to single chain. c.Finalization.ParentHash = c.ParentHash c.Finalization.Timestamp = c.Timestamp - // It's a workaround, the height for application is one-based. - c.Finalization.Height = c.Position.Height + 1 ret = append(ret, c) bc.lastDelivered = c } @@ -290,6 +285,7 @@ func (bc *blockChain) addEmptyBlock(position types.Position) ( // to be confirmed. panic(err) } + emptyB.Finalization.Height = emptyB.Position.Height + 1 bc.confirmBlock(emptyB) bc.checkIfBlocksConfirmed() return emptyB @@ -446,9 +442,6 @@ func (bc *blockChain) addPendingBlockRecord(p pendingBlockRecord) error { } return err } - if p.block != nil { - bc.setRandomnessFromPending(p.block) - } return nil } @@ -610,17 +603,6 @@ func (bc *blockChain) confirmBlock(b *types.Block) { bc.logger.Debug("Calling Application.BlockConfirmed", "block", b) bc.app.BlockConfirmed(*b) bc.lastConfirmed = b - bc.setRandomnessFromPending(b) bc.confirmedBlocks = append(bc.confirmedBlocks, b) bc.purgeConfig() } - -func (bc *blockChain) setRandomnessFromPending(b *types.Block) { - if r, exist := bc.pendingRandomnesses[b.Position]; exist { - if !r.BlockHash.Equal(b.Hash) { - panic(fmt.Errorf("mismathed randomness: %s %s", b, r)) - } - b.Finalization.Randomness = r.Randomness - delete(bc.pendingRandomnesses, b.Position) - } -} diff --git a/core/consensus.go b/core/consensus.go index 2b0d5a4..5a331d2 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -54,6 +54,8 @@ var ( "Configuration not ready") ErrIncorrectBlockRandomness = fmt.Errorf( "randomness of block is incorrect") + ErrCannotVerifyBlockRandomness = fmt.Errorf( + "cannot verify block randomness") ) // consensusBAReceiver implements agreementReceiver. @@ -225,44 +227,59 @@ func (recv *consensusBAReceiver) ConfirmBlock( } } - voteList := make([]types.Vote, 0, len(votes)) - IDs := make(cryptoDKG.IDs, 0, len(votes)) - psigs := make([]cryptoDKG.PartialSignature, 0, len(votes)) - for _, vote := range votes { - if vote.BlockHash != hash { - continue - } - if recv.round() >= DKGDelayRound { - ID, exist := recv.npks.IDMap[vote.ProposerID] - if !exist { + if len(votes) == 0 && len(block.Finalization.Randomness) == 0 { + recv.consensus.logger.Error("No votes to recover randomness", + "block", block) + } else if votes != nil { + voteList := make([]types.Vote, 0, len(votes)) + IDs := make(cryptoDKG.IDs, 0, len(votes)) + psigs := make([]cryptoDKG.PartialSignature, 0, len(votes)) + for _, vote := range votes { + if vote.BlockHash != hash { continue } - IDs = append(IDs, ID) - psigs = append(psigs, vote.PartialSignature) + if recv.round() >= DKGDelayRound { + ID, exist := recv.npks.IDMap[vote.ProposerID] + if !exist { + continue + } + IDs = append(IDs, ID) + psigs = append(psigs, vote.PartialSignature) + } + voteList = append(voteList, *vote) } - voteList = append(voteList, *vote) - } - if recv.round() >= DKGDelayRound { - rand, err := cryptoDKG.RecoverSignature(psigs, IDs) - if err != nil { - recv.consensus.logger.Warn("Unable to recover randomness", - "block", block, - "error", err) - } else { - block.Finalization.Randomness = rand.Signature[:] + if recv.round() >= DKGDelayRound { + rand, err := cryptoDKG.RecoverSignature(psigs, IDs) + if err != nil { + recv.consensus.logger.Warn("Unable to recover randomness", + "block", block, + "error", err) + } else { + block.Finalization.Randomness = rand.Signature[:] + } } - } - if recv.isNotary { - result := &types.AgreementResult{ - BlockHash: block.Hash, - Position: block.Position, - Votes: voteList, - Randomness: block.Finalization.Randomness, - IsEmptyBlock: isEmptyBlockConfirmed, + // It's a workaround, the height for application is one-based. + block.Finalization.Height = block.Position.Height + 1 + + if recv.isNotary { + if block.Position.Round < DKGDelayRound { + result := &types.AgreementResult{ + BlockHash: block.Hash, + Position: block.Position, + Votes: voteList, + FinalizationHeight: block.Finalization.Height, + IsEmptyBlock: isEmptyBlockConfirmed, + } + recv.consensus.logger.Debug("Propose AgreementResult", + "result", result) + recv.consensus.msgChan <- result + } else { + recv.consensus.logger.Debug( + "Propose AgreementResult as finalized block", + "block", block) + recv.consensus.network.BroadcastBlock(block) + } } - recv.consensus.logger.Debug("Propose AgreementResult", - "result", result) - recv.consensus.msgChan <- result } if block.Position.Height != 0 && @@ -300,6 +317,11 @@ func (recv *consensusBAReceiver) ConfirmBlock( recv.consensus.logger.Info("Receive parent block", "parent-hash", block.ParentHash.String()[:6], "cur-position", block.Position) + if block.Finalization.Height == 0 { + // TODO(jimmy): use a seperate message to pull finalized + // block. Here, we pull it again as workaround. + continue + } recv.consensus.processBlockChan <- block parentHash = block.ParentHash if block.Position.Height == 0 || @@ -1135,6 +1157,12 @@ MessageLoop: delete(con.baConfirmedBlock, val.Hash) ch <- val }() + } else if val.IsFinalized() { + if err := con.processFinalizedBlock(val); err != nil { + con.logger.Error("Failed to process finalized block", + "block", val, + "error", err) + } } else { if err := con.preProcessBlock(val); err != nil { con.logger.Error("Failed to pre process block", @@ -1179,17 +1207,11 @@ func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { // ProcessAgreementResult processes the randomness request. func (con *Consensus) ProcessAgreementResult( rand *types.AgreementResult) error { - if !con.baMgr.touchAgreementResult(rand) { + if rand.Position.Round >= DKGDelayRound { return nil } - // TODO(jimmy): merge tsig check to VerifyAgreementResult - ok, err := con.bcModule.verifyRandomness( - rand.BlockHash, rand.Position.Round, rand.Randomness) - if err != nil { - return err - } - if !ok { - return ErrIncorrectBlockRandomness + if !con.baMgr.touchAgreementResult(rand) { + return nil } // Sanity Check. if err := VerifyAgreementResult(rand, con.nodeSetCache); err != nil { @@ -1217,6 +1239,35 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) { return } +func (con *Consensus) processFinalizedBlock(b *types.Block) (err error) { + if b.Position.Round < DKGDelayRound { + return + } + if err = utils.VerifyBlockSignature(b); err != nil { + return + } + verifier, ok, err := con.tsigVerifierCache.UpdateAndGet(b.Position.Round) + if err != nil { + return + } + if !ok { + err = ErrCannotVerifyBlockRandomness + return + } + if !verifier.VerifySignature(b.Hash, crypto.Signature{ + Type: "bls", + Signature: b.Finalization.Randomness, + }) { + err = ErrIncorrectBlockRandomness + return + } + err = con.baMgr.processFinalizedBlock(b) + if err == nil && con.debugApp != nil { + con.debugApp.BlockReceived(b.Hash) + } + return +} + func (con *Consensus) deliveryGuard() { defer con.waitGroup.Done() select { diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go index f172b3b..98e86b1 100644 --- a/core/syncer/agreement.go +++ b/core/syncer/agreement.go @@ -23,6 +23,7 @@ import ( "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core" + "github.com/dexon-foundation/dexon-consensus/core/crypto" "github.com/dexon-foundation/dexon-consensus/core/types" "github.com/dexon-foundation/dexon-consensus/core/utils" ) @@ -30,33 +31,42 @@ import ( // Struct agreement implements struct of BA (Byzantine Agreement) protocol // needed in syncer, which only receives agreement results. type agreement struct { - cache *utils.NodeSetCache - inputChan chan interface{} - outputChan chan<- *types.Block - pullChan chan<- common.Hash - blocks map[types.Position]map[common.Hash]*types.Block - agreementResults map[common.Hash]struct{} - latestCRSRound uint64 - pendings map[uint64]map[common.Hash]*types.AgreementResult - logger common.Logger - confirmedBlocks map[common.Hash]struct{} - ctx context.Context - ctxCancel context.CancelFunc + chainTip uint64 + cache *utils.NodeSetCache + tsigVerifierCache *core.TSigVerifierCache + inputChan chan interface{} + outputChan chan<- *types.Block + pullChan chan<- common.Hash + blocks map[types.Position]map[common.Hash]*types.Block + agreementResults map[common.Hash]struct{} + latestCRSRound uint64 + pendingAgrs map[uint64]map[common.Hash]*types.AgreementResult + pendingBlocks map[uint64]map[common.Hash]*types.Block + logger common.Logger + confirmedBlocks map[common.Hash]struct{} + ctx context.Context + ctxCancel context.CancelFunc } // newAgreement creates a new agreement instance. -func newAgreement(ch chan<- *types.Block, pullChan chan<- common.Hash, - cache *utils.NodeSetCache, logger common.Logger) *agreement { +func newAgreement(chainTip uint64, + ch chan<- *types.Block, pullChan chan<- common.Hash, + cache *utils.NodeSetCache, verifier *core.TSigVerifierCache, + logger common.Logger) *agreement { a := &agreement{ - cache: cache, - inputChan: make(chan interface{}, 1000), - outputChan: ch, - pullChan: pullChan, - blocks: make(map[types.Position]map[common.Hash]*types.Block), - agreementResults: make(map[common.Hash]struct{}), - logger: logger, - pendings: make( + chainTip: chainTip, + cache: cache, + tsigVerifierCache: verifier, + inputChan: make(chan interface{}, 1000), + outputChan: ch, + pullChan: pullChan, + blocks: make(map[types.Position]map[common.Hash]*types.Block), + agreementResults: make(map[common.Hash]struct{}), + logger: logger, + pendingAgrs: make( map[uint64]map[common.Hash]*types.AgreementResult), + pendingBlocks: make( + map[uint64]map[common.Hash]*types.Block), confirmedBlocks: make(map[common.Hash]struct{}), } a.ctx, a.ctxCancel = context.WithCancel(context.Background()) @@ -76,7 +86,11 @@ func (a *agreement) run() { } switch v := val.(type) { case *types.Block: - a.processBlock(v) + if v.IsFinalized() { + a.processFinalizedBlock(v) + } else { + a.processBlock(v) + } case *types.AgreementResult: a.processAgreementResult(v) case uint64: @@ -100,17 +114,68 @@ func (a *agreement) processBlock(b *types.Block) { } } +func (a *agreement) processFinalizedBlock(block *types.Block) { + if block.Position.Round < core.DKGDelayRound { + return + } + // Cache those results that CRS is not ready yet. + if _, exists := a.confirmedBlocks[block.Hash]; exists { + a.logger.Trace("finalized block already confirmed", "block", block) + return + } + if block.Position.Round > a.latestCRSRound { + pendingsForRound, exists := a.pendingBlocks[block.Position.Round] + if !exists { + pendingsForRound = make(map[common.Hash]*types.Block) + a.pendingBlocks[block.Position.Round] = pendingsForRound + } + pendingsForRound[block.Hash] = block + a.logger.Trace("finalized block cached", "block", block) + return + } + if err := utils.VerifyBlockSignature(block); err != nil { + return + } + verifier, ok, err := a.tsigVerifierCache.UpdateAndGet(block.Position.Round) + if err != nil { + a.logger.Error("error verifying block randomness", + "block", block, + "error", err) + return + } + if !ok { + a.logger.Error("cannot verify block randomness", "block", block) + return + } + if !verifier.VerifySignature(block.Hash, crypto.Signature{ + Type: "bls", + Signature: block.Finalization.Randomness, + }) { + a.logger.Error("incorrect block randomness", "block", block) + return + } + a.confirm(block) + if block.Position.Height > a.chainTip+1 { + if _, exist := a.confirmedBlocks[block.ParentHash]; !exist { + a.pullChan <- block.ParentHash + } + } +} + func (a *agreement) processAgreementResult(r *types.AgreementResult) { + if r.Position.Round >= core.DKGDelayRound { + return + } // Cache those results that CRS is not ready yet. if _, exists := a.confirmedBlocks[r.BlockHash]; exists { a.logger.Trace("Agreement result already confirmed", "result", r) return } if r.Position.Round > a.latestCRSRound { - pendingsForRound, exists := a.pendings[r.Position.Round] + pendingsForRound, exists := a.pendingAgrs[r.Position.Round] if !exists { pendingsForRound = make(map[common.Hash]*types.AgreementResult) - a.pendings[r.Position.Round] = pendingsForRound + a.pendingAgrs[r.Position.Round] = pendingsForRound } pendingsForRound[r.BlockHash] = r a.logger.Trace("Agreement result cached", "result", r) @@ -164,11 +229,11 @@ func (a *agreement) processNewCRS(round uint64) { a.latestCRSRound = round // Verify all pending results. for r := prevRound; r <= a.latestCRSRound; r++ { - pendingsForRound := a.pendings[r] + pendingsForRound := a.pendingAgrs[r] if pendingsForRound == nil { continue } - delete(a.pendings, r) + delete(a.pendingAgrs, r) for _, res := range pendingsForRound { if err := core.VerifyAgreementResult(res, a.cache); err != nil { a.logger.Error("Invalid agreement result", diff --git a/core/syncer/agreement_test.go b/core/syncer/agreement_test.go deleted file mode 100644 index 0a12b3c..0000000 --- a/core/syncer/agreement_test.go +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright 2019 The dexon-consensus Authors -// This file is part of the dexon-consensus library. -// -// The dexon-consensus library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. // -// The dexon-consensus library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus library. If not, see -// <http://www.gnu.org/licenses/>. - -package syncer - -import ( - "testing" - "time" - - "github.com/dexon-foundation/dexon-consensus/common" - "github.com/dexon-foundation/dexon-consensus/core" - "github.com/dexon-foundation/dexon-consensus/core/crypto" - "github.com/dexon-foundation/dexon-consensus/core/test" - "github.com/dexon-foundation/dexon-consensus/core/types" - "github.com/dexon-foundation/dexon-consensus/core/utils" - "github.com/stretchr/testify/suite" -) - -type AgreementTestSuite struct { - suite.Suite - - signers []*utils.Signer - pubKeys []crypto.PublicKey - prvKeys []crypto.PrivateKey -} - -func (s *AgreementTestSuite) SetupSuite() { - var err error - s.prvKeys, s.pubKeys, err = test.NewKeys(4) - s.Require().NoError(err) - for _, k := range s.prvKeys { - s.signers = append(s.signers, utils.NewSigner(k)) - } -} - -func (s *AgreementTestSuite) prepareAgreementResult(pos types.Position, - hash common.Hash) *types.AgreementResult { - votes := []types.Vote{} - for _, signer := range s.signers { - v := types.NewVote(types.VoteCom, hash, 0) - v.Position = pos - s.Require().NoError(signer.SignVote(v)) - votes = append(votes, *v) - } - return &types.AgreementResult{ - BlockHash: hash, - Position: pos, - Votes: votes, - } -} - -func (s *AgreementTestSuite) prepareBlock(pos types.Position) *types.Block { - b := &types.Block{ - Position: pos, - } - s.Require().NoError(s.signers[0].SignBlock(b)) - return b -} - -func (s *AgreementTestSuite) TestFutureAgreementResult() { - // Make sure future types.AgreementResult could be processed correctly - // when corresponding CRS is ready. - var ( - futureRound = uint64(7) - pos = types.Position{Round: 7, Height: 1000} - ) - gov, err := test.NewGovernance( - test.NewState(1, s.pubKeys, time.Second, &common.NullLogger{}, true), - core.ConfigRoundShift, - ) - s.Require().NoError(err) - // Make sure goverance is ready for some future round, including CRS. - gov.CatchUpWithRound(futureRound) - for i := uint64(2); i <= futureRound; i++ { - gov.ProposeCRS(i, common.NewRandomHash().Bytes()) - } - s.Require().NoError(err) - blockChan := make(chan *types.Block, 10) - agr := newAgreement(blockChan, make(chan common.Hash, 100), - utils.NewNodeSetCache(gov), &common.SimpleLogger{}) - go agr.run() - block := s.prepareBlock(pos) - result := s.prepareAgreementResult(pos, block.Hash) - agr.inputChan <- result - agr.inputChan <- block - agr.inputChan <- futureRound - select { - case confirmedBlock := <-blockChan: - s.Require().Equal(block.Hash, confirmedBlock.Hash) - case <-time.After(2 * time.Second): - s.Require().True(false) - } -} - -func TestAgreement(t *testing.T) { - suite.Run(t, new(AgreementTestSuite)) -} diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index 65068a4..b692b56 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -115,7 +115,12 @@ func NewConsensus( con.ctx, con.ctxCancel = context.WithCancel(context.Background()) _, con.initChainTipHeight = db.GetCompactionChainTipInfo() con.agreementModule = newAgreement( - con.receiveChan, con.pullChan, con.nodeSetCache, con.logger) + con.initChainTipHeight, + con.receiveChan, + con.pullChan, + con.nodeSetCache, + con.tsigVerifier, + con.logger) con.agreementWaitGroup.Add(1) go func() { defer con.agreementWaitGroup.Done() diff --git a/core/test/network.go b/core/test/network.go index 6034fa6..c903c57 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -130,13 +130,52 @@ func (req *PullRequest) UnmarshalJSON(data []byte) (err error) { return } +// NetworkCensor is a interface to determine if a message should be censored. +type NetworkCensor interface { + Censor(interface{}) bool +} + +type censorClient struct { + TransportClient + + censor NetworkCensor + lock sync.RWMutex +} + +func (cc *censorClient) Send(ID types.NodeID, msg interface{}) error { + if func() bool { + cc.lock.RLock() + defer cc.lock.RUnlock() + return cc.censor.Censor(msg) + }() { + return nil + } + return cc.TransportClient.Send(ID, msg) +} + +func (cc *censorClient) Broadcast( + IDs map[types.NodeID]struct{}, latency LatencyModel, msg interface{}) error { + if func() bool { + cc.lock.RLock() + defer cc.lock.RUnlock() + return cc.censor.Censor(msg) + }() { + return nil + } + return cc.TransportClient.Broadcast(IDs, latency, msg) +} + +type dummyCensor struct{} + +func (dc *dummyCensor) Censor(interface{}) bool { return false } + // Network implements core.Network interface based on TransportClient. type Network struct { ID types.NodeID config NetworkConfig ctx context.Context ctxCancel context.CancelFunc - trans TransportClient + trans *censorClient dMoment time.Time fromTransport <-chan *TransportEnvelope toConsensus chan interface{} @@ -156,6 +195,8 @@ type Network struct { cache *utils.NodeSetCache notarySetCachesLock sync.Mutex notarySetCaches map[uint64]map[types.NodeID]struct{} + censor NetworkCensor + censorLock sync.RWMutex } // NewNetwork setup network stuffs for nodes, which provides an @@ -175,22 +216,48 @@ func NewNetwork(pubKey crypto.PublicKey, config NetworkConfig) ( notarySetCaches: make(map[uint64]map[types.NodeID]struct{}), voteCache: make( map[types.Position]map[types.VoteHeader]*types.Vote), + censor: &dummyCensor{}, } n.ctx, n.ctxCancel = context.WithCancel(context.Background()) // Construct transport layer. + var trans TransportClient switch config.Type { case NetworkTypeTCPLocal: - n.trans = NewTCPTransportClient(pubKey, config.Marshaller, true) + trans = NewTCPTransportClient(pubKey, config.Marshaller, true) case NetworkTypeTCP: - n.trans = NewTCPTransportClient(pubKey, config.Marshaller, false) + trans = NewTCPTransportClient(pubKey, config.Marshaller, false) case NetworkTypeFake: - n.trans = NewFakeTransportClient(pubKey) + trans = NewFakeTransportClient(pubKey) default: panic(fmt.Errorf("unknown network type: %v", config.Type)) } + n.trans = &censorClient{ + TransportClient: trans, + censor: &dummyCensor{}, + } return } +// SetCensor to this network module. +func (n *Network) SetCensor(censorIn, censorOut NetworkCensor) { + if censorIn == nil { + censorIn = &dummyCensor{} + } + if censorOut == nil { + censorOut = &dummyCensor{} + } + func() { + n.censorLock.Lock() + defer n.censorLock.Unlock() + n.censor = censorIn + }() + func() { + n.trans.lock.Lock() + defer n.trans.lock.Unlock() + n.trans.censor = censorOut + }() +} + // PullBlocks implements core.Network interface. func (n *Network) PullBlocks(hashes common.Hashes) { go n.pullBlocksAsync(hashes) @@ -224,6 +291,12 @@ func (n *Network) BroadcastBlock(block *types.Block) { panic(err) } n.addBlockToCache(block) + if block.IsFinalized() { + n.addBlockFinalizationToCache( + block.Hash, + block.Finalization.Height, + block.Finalization.Randomness) + } } // BroadcastAgreementResult implements core.Network interface. @@ -232,9 +305,13 @@ func (n *Network) BroadcastAgreementResult( if !n.markAgreementResultAsSent(result.BlockHash) { return } - n.addBlockRandomnessToCache(result.BlockHash, result.Randomness) + n.addBlockFinalizationToCache( + result.BlockHash, + result.FinalizationHeight, + nil, + ) notarySet := n.getNotarySet(result.Position.Round) - count := len(notarySet) * gossipAgreementResultPercent / 100 + count := len(notarySet)*gossipAgreementResultPercent/100 + 1 for nID := range notarySet { if count--; count < 0 { break @@ -303,6 +380,13 @@ func (n *Network) Setup(serverEndpoint interface{}) (err error) { } func (n *Network) dispatchMsg(e *TransportEnvelope) { + if func() bool { + n.censorLock.RLock() + defer n.censorLock.RUnlock() + return n.censor.Censor(e.Msg) + }() { + return + } msg := n.cloneForFake(e.Msg) switch v := msg.(type) { case *types.Block: @@ -539,13 +623,15 @@ func (n *Network) addBlockToCache(b *types.Block) { n.blockCache[b.Hash] = b.Clone() } -func (n *Network) addBlockRandomnessToCache(hash common.Hash, rand []byte) { +func (n *Network) addBlockFinalizationToCache( + hash common.Hash, height uint64, rand []byte) { n.blockCacheLock.Lock() defer n.blockCacheLock.Unlock() block, exist := n.blockCache[hash] if !exist { return } + block.Finalization.Height = height block.Finalization.Randomness = rand } diff --git a/core/test/network_test.go b/core/test/network_test.go index 993ae70..d0e9fb2 100644 --- a/core/test/network_test.go +++ b/core/test/network_test.go @@ -291,6 +291,81 @@ func (s *NetworkTestSuite) TestBroadcastToSet() { req.IsType(&types.Block{}, <-notaryNode.ReceiveChan()) } +type testVoteCensor struct{} + +func (vc *testVoteCensor) Censor(msg interface{}) bool { + if _, ok := msg.(*types.Vote); ok { + return true + } + return false +} + +func (s *NetworkTestSuite) TestCensor() { + var ( + req = s.Require() + peerCount = 5 + ) + _, pubKeys, err := NewKeys(peerCount) + req.NoError(err) + networks := s.setupNetworks(pubKeys) + receiveChans := make(map[types.NodeID]<-chan interface{}, peerCount) + for nID, node := range networks { + receiveChans[nID] = node.ReceiveChan() + } + + censor := &testVoteCensor{} + vote := &types.Vote{} + censorNodeID := types.NewNodeID(pubKeys[0]) + otherNodeID := types.NewNodeID(pubKeys[1]) + censorNode := networks[censorNodeID] + otherNode := networks[otherNodeID] + + // Censor incomming votes. + censorNode.SetCensor(censor, nil) + otherNode.BroadcastVote(vote) + time.Sleep(50 * time.Millisecond) + for nID, receiveChan := range receiveChans { + if nID == otherNodeID || nID == censorNodeID { + req.Equal(0, len(receiveChan)) + } else { + req.Equal(1, len(receiveChan)) + req.IsType(&types.Vote{}, <-receiveChan) + } + } + + // Censor outgoing votes. + censorNode.SetCensor(nil, censor) + censorNode.BroadcastVote(vote) + time.Sleep(50 * time.Millisecond) + for _, receiveChan := range receiveChans { + req.Equal(0, len(receiveChan)) + } + + // No censorship. + censorNode.SetCensor(nil, nil) + otherNode.BroadcastVote(vote) + time.Sleep(50 * time.Millisecond) + for nID, receiveChan := range receiveChans { + if nID == otherNodeID { + req.Equal(0, len(receiveChan)) + } else { + req.Equal(1, len(receiveChan)) + req.IsType(&types.Vote{}, <-receiveChan) + } + } + censorNode.BroadcastVote(vote) + time.Sleep(50 * time.Millisecond) + for nID, receiveChan := range receiveChans { + if nID == censorNodeID { + req.Equal(0, len(receiveChan)) + } else { + req.Equal(1, len(receiveChan)) + req.IsType(&types.Vote{}, <-receiveChan) + } + } + +} + func TestNetwork(t *testing.T) { suite.Run(t, new(NetworkTestSuite)) } diff --git a/core/types/block-randomness.go b/core/types/block-randomness.go index 9a0a65e..a3ba631 100644 --- a/core/types/block-randomness.go +++ b/core/types/block-randomness.go @@ -18,7 +18,6 @@ package types import ( - "encoding/hex" "fmt" "github.com/dexon-foundation/dexon-consensus/common" @@ -26,20 +25,14 @@ import ( // AgreementResult describes an agremeent result. type AgreementResult struct { - BlockHash common.Hash `json:"block_hash"` - Position Position `json:"position"` - // TODO(jimmy): remove Votes - Votes []Vote `json:"votes"` - IsEmptyBlock bool `json:"is_empty_block"` - Randomness []byte `json:"randomness"` + BlockHash common.Hash `json:"block_hash"` + Position Position `json:"position"` + Votes []Vote `json:"votes"` + IsEmptyBlock bool `json:"is_empty_block"` + FinalizationHeight uint64 `json:"finalization_height"` } func (r *AgreementResult) String() string { - if len(r.Randomness) == 0 { - return fmt.Sprintf("agreementResult{Block:%s Pos:%s}", - r.BlockHash.String()[:6], r.Position) - } - return fmt.Sprintf("agreementResult{Block:%s Pos:%s Rand:%s}", - r.BlockHash.String()[:6], r.Position, - hex.EncodeToString(r.Randomness)[:6]) + return fmt.Sprintf("agreementResult{Block:%s Pos:%s}", + r.BlockHash.String()[:6], r.Position) } diff --git a/integration_test/byzantine_test.go b/integration_test/byzantine_test.go index 3aea057..62dfe7a 100644 --- a/integration_test/byzantine_test.go +++ b/integration_test/byzantine_test.go @@ -191,6 +191,72 @@ Loop: s.verifyNodes(nodes) } +type voteCensor struct{} + +func (vc *voteCensor) Censor(msg interface{}) bool { + _, ok := msg.(*types.Vote) + return ok +} + +func (s *ByzantineTestSuite) TestOneNodeWithoutVote() { + // 4 nodes setup with one node's votes been censored. + // so it will always do syncing BA. + var ( + req = s.Require() + peerCount = 4 + dMoment = time.Now().UTC() + untilRound = uint64(3) + tolerence = uint64(2) + ) + if testing.Short() { + untilRound = 2 + } + prvKeys, pubKeys, err := test.NewKeys(peerCount) + req.NoError(err) + // Setup seed governance instance. Give a short latency to make this test + // run faster. + lambda := 100 * time.Millisecond + seedGov, err := test.NewGovernance( + test.NewState(core.DKGDelayRound, + pubKeys, lambda, &common.NullLogger{}, true), + core.ConfigRoundShift) + req.NoError(err) + req.NoError(seedGov.State().RequestChange( + test.StateChangeRoundLength, uint64(100))) + votelessNodeID := types.NewNodeID(pubKeys[0]) + nodes := s.setupNodes(dMoment, prvKeys, seedGov) + votelessNode := nodes[votelessNodeID] + votelessNode.network.SetCensor(&voteCensor{}, &voteCensor{}) + for _, n := range nodes { + go n.con.Run() + defer n.con.Stop() + } +Loop: + for { + <-time.After(5 * time.Second) + fmt.Println("check latest position delivered by voteless node") + latestPos := votelessNode.app.GetLatestDeliveredPosition() + fmt.Println("latestPos", votelessNode.ID, &latestPos) + for _, n := range nodes { + if n.ID == votelessNodeID { + continue + } + otherPos := n.app.GetLatestDeliveredPosition() + if otherPos.Newer(latestPos) { + fmt.Println("otherPos", n.ID, &otherPos) + s.Require().True( + otherPos.Height-latestPos.Height <= tolerence) + } + } + if latestPos.Round < untilRound { + continue Loop + } + // Oh ya. + break + } + s.verifyNodes(nodes) +} + func TestByzantine(t *testing.T) { suite.Run(t, new(ByzantineTestSuite)) } |