aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2019-03-25 10:14:42 +0800
committerJimmy Hu <jimmy.hu@dexon.org>2019-03-27 15:25:10 +0800
commitb8ced165b1fb03394f8758e08148b0e5d06aa07b (patch)
treefa327764a4cf564bb4aa39c1570ffd7f292c7ba1
parent6efe199cb38eb4cb9a9a64d98ff5f8c4fb997da7 (diff)
downloaddexon-consensus-b8ced165b1fb03394f8758e08148b0e5d06aa07b.tar
dexon-consensus-b8ced165b1fb03394f8758e08148b0e5d06aa07b.tar.gz
dexon-consensus-b8ced165b1fb03394f8758e08148b0e5d06aa07b.tar.bz2
dexon-consensus-b8ced165b1fb03394f8758e08148b0e5d06aa07b.tar.lz
dexon-consensus-b8ced165b1fb03394f8758e08148b0e5d06aa07b.tar.xz
dexon-consensus-b8ced165b1fb03394f8758e08148b0e5d06aa07b.tar.zst
dexon-consensus-b8ced165b1fb03394f8758e08148b0e5d06aa07b.zip
core: Remove agreement result (#514)
* core: remove agreement result for round with randomness * remove agr test in syncer * fixup * remove randomness field from agreement result * modify test
-rw-r--r--core/agreement-mgr.go9
-rw-r--r--core/agreement.go20
-rw-r--r--core/agreement_test.go18
-rw-r--r--core/blockchain.go44
-rw-r--r--core/consensus.go135
-rw-r--r--core/syncer/agreement.go119
-rw-r--r--core/syncer/agreement_test.go110
-rw-r--r--core/syncer/consensus.go7
-rw-r--r--core/test/network.go100
-rw-r--r--core/test/network_test.go75
-rw-r--r--core/types/block-randomness.go21
-rw-r--r--integration_test/byzantine_test.go66
12 files changed, 492 insertions, 232 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go
index 02b7b7b..bc6923a 100644
--- a/core/agreement-mgr.go
+++ b/core/agreement-mgr.go
@@ -304,6 +304,15 @@ func (mgr *agreementMgr) processAgreementResult(
return nil
}
+func (mgr *agreementMgr) processFinalizedBlock(block *types.Block) error {
+ aID := mgr.baModule.agreementID()
+ if block.Position.Older(aID) {
+ return nil
+ }
+ mgr.baModule.processFinalizedBlock(block)
+ return nil
+}
+
func (mgr *agreementMgr) stop() {
// Stop all running agreement modules.
func() {
diff --git a/core/agreement.go b/core/agreement.go
index 1ba9034..5e7b7de 100644
--- a/core/agreement.go
+++ b/core/agreement.go
@@ -42,6 +42,7 @@ var (
ErrNotInNotarySet = fmt.Errorf("not in notary set")
ErrIncorrectVoteSignature = fmt.Errorf("incorrect vote signature")
ErrIncorrectVotePartialSignature = fmt.Errorf("incorrect vote psig")
+ ErrMismatchBlockPosition = fmt.Errorf("mismatch block position")
)
// ErrFork for fork error in agreement.
@@ -513,6 +514,25 @@ func (a *agreement) processVote(vote *types.Vote) error {
return nil
}
+func (a *agreement) processFinalizedBlock(block *types.Block) {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ if a.hasOutput {
+ return
+ }
+ aID := a.agreementID()
+ if aID.Older(block.Position) {
+ return
+ }
+ a.addCandidateBlockNoLock(block)
+ a.hasOutput = true
+ a.data.recv.ConfirmBlock(block.Hash, nil)
+ if a.doneChan != nil {
+ close(a.doneChan)
+ a.doneChan = nil
+ }
+}
+
func (a *agreement) done() <-chan struct{} {
a.lock.Lock()
defer a.lock.Unlock()
diff --git a/core/agreement_test.go b/core/agreement_test.go
index dfeb783..175249f 100644
--- a/core/agreement_test.go
+++ b/core/agreement_test.go
@@ -559,6 +559,24 @@ func (s *AgreementTestSuite) TestFindBlockInPendingSet() {
s.Require().NotNil(block)
}
+func (s *AgreementTestSuite) TestConfirmWithBlock() {
+ a, _ := s.newAgreement(4, -1, func(*types.Block) (bool, error) {
+ return true, nil
+ })
+ block := &types.Block{
+ Hash: common.NewRandomHash(),
+ Position: a.agreementID(),
+ Finalization: types.FinalizationResult{
+ Randomness: []byte{0x1, 0x2, 0x3, 0x4},
+ },
+ }
+ a.processFinalizedBlock(block)
+ s.Require().Len(s.confirmChan, 1)
+ confirm := <-s.confirmChan
+ s.Equal(block.Hash, confirm)
+ s.True(a.confirmed())
+}
+
func TestAgreement(t *testing.T) {
suite.Run(t, new(AgreementTestSuite))
}
diff --git a/core/blockchain.go b/core/blockchain.go
index 0ecd083..798a080 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -127,19 +127,18 @@ type tsigVerifierGetter interface {
}
type blockChain struct {
- lock sync.RWMutex
- ID types.NodeID
- lastConfirmed *types.Block
- lastDelivered *types.Block
- signer *utils.Signer
- vGetter tsigVerifierGetter
- app Application
- logger common.Logger
- pendingRandomnesses map[types.Position]*types.AgreementResult
- configs []blockChainConfig
- pendingBlocks pendingBlockRecords
- confirmedBlocks types.BlocksByPosition
- dMoment time.Time
+ lock sync.RWMutex
+ ID types.NodeID
+ lastConfirmed *types.Block
+ lastDelivered *types.Block
+ signer *utils.Signer
+ vGetter tsigVerifierGetter
+ app Application
+ logger common.Logger
+ configs []blockChainConfig
+ pendingBlocks pendingBlockRecords
+ confirmedBlocks types.BlocksByPosition
+ dMoment time.Time
}
func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block,
@@ -154,8 +153,6 @@ func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block,
app: app,
logger: logger,
dMoment: dMoment,
- pendingRandomnesses: make(
- map[types.Position]*types.AgreementResult),
}
}
@@ -228,8 +225,6 @@ func (bc *blockChain) extractBlocks() (ret []*types.Block) {
// to single chain.
c.Finalization.ParentHash = c.ParentHash
c.Finalization.Timestamp = c.Timestamp
- // It's a workaround, the height for application is one-based.
- c.Finalization.Height = c.Position.Height + 1
ret = append(ret, c)
bc.lastDelivered = c
}
@@ -290,6 +285,7 @@ func (bc *blockChain) addEmptyBlock(position types.Position) (
// to be confirmed.
panic(err)
}
+ emptyB.Finalization.Height = emptyB.Position.Height + 1
bc.confirmBlock(emptyB)
bc.checkIfBlocksConfirmed()
return emptyB
@@ -446,9 +442,6 @@ func (bc *blockChain) addPendingBlockRecord(p pendingBlockRecord) error {
}
return err
}
- if p.block != nil {
- bc.setRandomnessFromPending(p.block)
- }
return nil
}
@@ -610,17 +603,6 @@ func (bc *blockChain) confirmBlock(b *types.Block) {
bc.logger.Debug("Calling Application.BlockConfirmed", "block", b)
bc.app.BlockConfirmed(*b)
bc.lastConfirmed = b
- bc.setRandomnessFromPending(b)
bc.confirmedBlocks = append(bc.confirmedBlocks, b)
bc.purgeConfig()
}
-
-func (bc *blockChain) setRandomnessFromPending(b *types.Block) {
- if r, exist := bc.pendingRandomnesses[b.Position]; exist {
- if !r.BlockHash.Equal(b.Hash) {
- panic(fmt.Errorf("mismathed randomness: %s %s", b, r))
- }
- b.Finalization.Randomness = r.Randomness
- delete(bc.pendingRandomnesses, b.Position)
- }
-}
diff --git a/core/consensus.go b/core/consensus.go
index 2b0d5a4..5a331d2 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -54,6 +54,8 @@ var (
"Configuration not ready")
ErrIncorrectBlockRandomness = fmt.Errorf(
"randomness of block is incorrect")
+ ErrCannotVerifyBlockRandomness = fmt.Errorf(
+ "cannot verify block randomness")
)
// consensusBAReceiver implements agreementReceiver.
@@ -225,44 +227,59 @@ func (recv *consensusBAReceiver) ConfirmBlock(
}
}
- voteList := make([]types.Vote, 0, len(votes))
- IDs := make(cryptoDKG.IDs, 0, len(votes))
- psigs := make([]cryptoDKG.PartialSignature, 0, len(votes))
- for _, vote := range votes {
- if vote.BlockHash != hash {
- continue
- }
- if recv.round() >= DKGDelayRound {
- ID, exist := recv.npks.IDMap[vote.ProposerID]
- if !exist {
+ if len(votes) == 0 && len(block.Finalization.Randomness) == 0 {
+ recv.consensus.logger.Error("No votes to recover randomness",
+ "block", block)
+ } else if votes != nil {
+ voteList := make([]types.Vote, 0, len(votes))
+ IDs := make(cryptoDKG.IDs, 0, len(votes))
+ psigs := make([]cryptoDKG.PartialSignature, 0, len(votes))
+ for _, vote := range votes {
+ if vote.BlockHash != hash {
continue
}
- IDs = append(IDs, ID)
- psigs = append(psigs, vote.PartialSignature)
+ if recv.round() >= DKGDelayRound {
+ ID, exist := recv.npks.IDMap[vote.ProposerID]
+ if !exist {
+ continue
+ }
+ IDs = append(IDs, ID)
+ psigs = append(psigs, vote.PartialSignature)
+ }
+ voteList = append(voteList, *vote)
}
- voteList = append(voteList, *vote)
- }
- if recv.round() >= DKGDelayRound {
- rand, err := cryptoDKG.RecoverSignature(psigs, IDs)
- if err != nil {
- recv.consensus.logger.Warn("Unable to recover randomness",
- "block", block,
- "error", err)
- } else {
- block.Finalization.Randomness = rand.Signature[:]
+ if recv.round() >= DKGDelayRound {
+ rand, err := cryptoDKG.RecoverSignature(psigs, IDs)
+ if err != nil {
+ recv.consensus.logger.Warn("Unable to recover randomness",
+ "block", block,
+ "error", err)
+ } else {
+ block.Finalization.Randomness = rand.Signature[:]
+ }
}
- }
- if recv.isNotary {
- result := &types.AgreementResult{
- BlockHash: block.Hash,
- Position: block.Position,
- Votes: voteList,
- Randomness: block.Finalization.Randomness,
- IsEmptyBlock: isEmptyBlockConfirmed,
+ // It's a workaround, the height for application is one-based.
+ block.Finalization.Height = block.Position.Height + 1
+
+ if recv.isNotary {
+ if block.Position.Round < DKGDelayRound {
+ result := &types.AgreementResult{
+ BlockHash: block.Hash,
+ Position: block.Position,
+ Votes: voteList,
+ FinalizationHeight: block.Finalization.Height,
+ IsEmptyBlock: isEmptyBlockConfirmed,
+ }
+ recv.consensus.logger.Debug("Propose AgreementResult",
+ "result", result)
+ recv.consensus.msgChan <- result
+ } else {
+ recv.consensus.logger.Debug(
+ "Propose AgreementResult as finalized block",
+ "block", block)
+ recv.consensus.network.BroadcastBlock(block)
+ }
}
- recv.consensus.logger.Debug("Propose AgreementResult",
- "result", result)
- recv.consensus.msgChan <- result
}
if block.Position.Height != 0 &&
@@ -300,6 +317,11 @@ func (recv *consensusBAReceiver) ConfirmBlock(
recv.consensus.logger.Info("Receive parent block",
"parent-hash", block.ParentHash.String()[:6],
"cur-position", block.Position)
+ if block.Finalization.Height == 0 {
+ // TODO(jimmy): use a seperate message to pull finalized
+ // block. Here, we pull it again as workaround.
+ continue
+ }
recv.consensus.processBlockChan <- block
parentHash = block.ParentHash
if block.Position.Height == 0 ||
@@ -1135,6 +1157,12 @@ MessageLoop:
delete(con.baConfirmedBlock, val.Hash)
ch <- val
}()
+ } else if val.IsFinalized() {
+ if err := con.processFinalizedBlock(val); err != nil {
+ con.logger.Error("Failed to process finalized block",
+ "block", val,
+ "error", err)
+ }
} else {
if err := con.preProcessBlock(val); err != nil {
con.logger.Error("Failed to pre process block",
@@ -1179,17 +1207,11 @@ func (con *Consensus) ProcessVote(vote *types.Vote) (err error) {
// ProcessAgreementResult processes the randomness request.
func (con *Consensus) ProcessAgreementResult(
rand *types.AgreementResult) error {
- if !con.baMgr.touchAgreementResult(rand) {
+ if rand.Position.Round >= DKGDelayRound {
return nil
}
- // TODO(jimmy): merge tsig check to VerifyAgreementResult
- ok, err := con.bcModule.verifyRandomness(
- rand.BlockHash, rand.Position.Round, rand.Randomness)
- if err != nil {
- return err
- }
- if !ok {
- return ErrIncorrectBlockRandomness
+ if !con.baMgr.touchAgreementResult(rand) {
+ return nil
}
// Sanity Check.
if err := VerifyAgreementResult(rand, con.nodeSetCache); err != nil {
@@ -1217,6 +1239,35 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
return
}
+func (con *Consensus) processFinalizedBlock(b *types.Block) (err error) {
+ if b.Position.Round < DKGDelayRound {
+ return
+ }
+ if err = utils.VerifyBlockSignature(b); err != nil {
+ return
+ }
+ verifier, ok, err := con.tsigVerifierCache.UpdateAndGet(b.Position.Round)
+ if err != nil {
+ return
+ }
+ if !ok {
+ err = ErrCannotVerifyBlockRandomness
+ return
+ }
+ if !verifier.VerifySignature(b.Hash, crypto.Signature{
+ Type: "bls",
+ Signature: b.Finalization.Randomness,
+ }) {
+ err = ErrIncorrectBlockRandomness
+ return
+ }
+ err = con.baMgr.processFinalizedBlock(b)
+ if err == nil && con.debugApp != nil {
+ con.debugApp.BlockReceived(b.Hash)
+ }
+ return
+}
+
func (con *Consensus) deliveryGuard() {
defer con.waitGroup.Done()
select {
diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go
index f172b3b..98e86b1 100644
--- a/core/syncer/agreement.go
+++ b/core/syncer/agreement.go
@@ -23,6 +23,7 @@ import (
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core"
+ "github.com/dexon-foundation/dexon-consensus/core/crypto"
"github.com/dexon-foundation/dexon-consensus/core/types"
"github.com/dexon-foundation/dexon-consensus/core/utils"
)
@@ -30,33 +31,42 @@ import (
// Struct agreement implements struct of BA (Byzantine Agreement) protocol
// needed in syncer, which only receives agreement results.
type agreement struct {
- cache *utils.NodeSetCache
- inputChan chan interface{}
- outputChan chan<- *types.Block
- pullChan chan<- common.Hash
- blocks map[types.Position]map[common.Hash]*types.Block
- agreementResults map[common.Hash]struct{}
- latestCRSRound uint64
- pendings map[uint64]map[common.Hash]*types.AgreementResult
- logger common.Logger
- confirmedBlocks map[common.Hash]struct{}
- ctx context.Context
- ctxCancel context.CancelFunc
+ chainTip uint64
+ cache *utils.NodeSetCache
+ tsigVerifierCache *core.TSigVerifierCache
+ inputChan chan interface{}
+ outputChan chan<- *types.Block
+ pullChan chan<- common.Hash
+ blocks map[types.Position]map[common.Hash]*types.Block
+ agreementResults map[common.Hash]struct{}
+ latestCRSRound uint64
+ pendingAgrs map[uint64]map[common.Hash]*types.AgreementResult
+ pendingBlocks map[uint64]map[common.Hash]*types.Block
+ logger common.Logger
+ confirmedBlocks map[common.Hash]struct{}
+ ctx context.Context
+ ctxCancel context.CancelFunc
}
// newAgreement creates a new agreement instance.
-func newAgreement(ch chan<- *types.Block, pullChan chan<- common.Hash,
- cache *utils.NodeSetCache, logger common.Logger) *agreement {
+func newAgreement(chainTip uint64,
+ ch chan<- *types.Block, pullChan chan<- common.Hash,
+ cache *utils.NodeSetCache, verifier *core.TSigVerifierCache,
+ logger common.Logger) *agreement {
a := &agreement{
- cache: cache,
- inputChan: make(chan interface{}, 1000),
- outputChan: ch,
- pullChan: pullChan,
- blocks: make(map[types.Position]map[common.Hash]*types.Block),
- agreementResults: make(map[common.Hash]struct{}),
- logger: logger,
- pendings: make(
+ chainTip: chainTip,
+ cache: cache,
+ tsigVerifierCache: verifier,
+ inputChan: make(chan interface{}, 1000),
+ outputChan: ch,
+ pullChan: pullChan,
+ blocks: make(map[types.Position]map[common.Hash]*types.Block),
+ agreementResults: make(map[common.Hash]struct{}),
+ logger: logger,
+ pendingAgrs: make(
map[uint64]map[common.Hash]*types.AgreementResult),
+ pendingBlocks: make(
+ map[uint64]map[common.Hash]*types.Block),
confirmedBlocks: make(map[common.Hash]struct{}),
}
a.ctx, a.ctxCancel = context.WithCancel(context.Background())
@@ -76,7 +86,11 @@ func (a *agreement) run() {
}
switch v := val.(type) {
case *types.Block:
- a.processBlock(v)
+ if v.IsFinalized() {
+ a.processFinalizedBlock(v)
+ } else {
+ a.processBlock(v)
+ }
case *types.AgreementResult:
a.processAgreementResult(v)
case uint64:
@@ -100,17 +114,68 @@ func (a *agreement) processBlock(b *types.Block) {
}
}
+func (a *agreement) processFinalizedBlock(block *types.Block) {
+ if block.Position.Round < core.DKGDelayRound {
+ return
+ }
+ // Cache those results that CRS is not ready yet.
+ if _, exists := a.confirmedBlocks[block.Hash]; exists {
+ a.logger.Trace("finalized block already confirmed", "block", block)
+ return
+ }
+ if block.Position.Round > a.latestCRSRound {
+ pendingsForRound, exists := a.pendingBlocks[block.Position.Round]
+ if !exists {
+ pendingsForRound = make(map[common.Hash]*types.Block)
+ a.pendingBlocks[block.Position.Round] = pendingsForRound
+ }
+ pendingsForRound[block.Hash] = block
+ a.logger.Trace("finalized block cached", "block", block)
+ return
+ }
+ if err := utils.VerifyBlockSignature(block); err != nil {
+ return
+ }
+ verifier, ok, err := a.tsigVerifierCache.UpdateAndGet(block.Position.Round)
+ if err != nil {
+ a.logger.Error("error verifying block randomness",
+ "block", block,
+ "error", err)
+ return
+ }
+ if !ok {
+ a.logger.Error("cannot verify block randomness", "block", block)
+ return
+ }
+ if !verifier.VerifySignature(block.Hash, crypto.Signature{
+ Type: "bls",
+ Signature: block.Finalization.Randomness,
+ }) {
+ a.logger.Error("incorrect block randomness", "block", block)
+ return
+ }
+ a.confirm(block)
+ if block.Position.Height > a.chainTip+1 {
+ if _, exist := a.confirmedBlocks[block.ParentHash]; !exist {
+ a.pullChan <- block.ParentHash
+ }
+ }
+}
+
func (a *agreement) processAgreementResult(r *types.AgreementResult) {
+ if r.Position.Round >= core.DKGDelayRound {
+ return
+ }
// Cache those results that CRS is not ready yet.
if _, exists := a.confirmedBlocks[r.BlockHash]; exists {
a.logger.Trace("Agreement result already confirmed", "result", r)
return
}
if r.Position.Round > a.latestCRSRound {
- pendingsForRound, exists := a.pendings[r.Position.Round]
+ pendingsForRound, exists := a.pendingAgrs[r.Position.Round]
if !exists {
pendingsForRound = make(map[common.Hash]*types.AgreementResult)
- a.pendings[r.Position.Round] = pendingsForRound
+ a.pendingAgrs[r.Position.Round] = pendingsForRound
}
pendingsForRound[r.BlockHash] = r
a.logger.Trace("Agreement result cached", "result", r)
@@ -164,11 +229,11 @@ func (a *agreement) processNewCRS(round uint64) {
a.latestCRSRound = round
// Verify all pending results.
for r := prevRound; r <= a.latestCRSRound; r++ {
- pendingsForRound := a.pendings[r]
+ pendingsForRound := a.pendingAgrs[r]
if pendingsForRound == nil {
continue
}
- delete(a.pendings, r)
+ delete(a.pendingAgrs, r)
for _, res := range pendingsForRound {
if err := core.VerifyAgreementResult(res, a.cache); err != nil {
a.logger.Error("Invalid agreement result",
diff --git a/core/syncer/agreement_test.go b/core/syncer/agreement_test.go
deleted file mode 100644
index 0a12b3c..0000000
--- a/core/syncer/agreement_test.go
+++ /dev/null
@@ -1,110 +0,0 @@
-// Copyright 2019 The dexon-consensus Authors
-// This file is part of the dexon-consensus library.
-//
-// The dexon-consensus library is free software: you can redistribute it
-// and/or modify it under the terms of the GNU Lesser General Public License as
-// published by the Free Software Foundation, either version 3 of the License,
-// or (at your option) any later version. //
-// The dexon-consensus library is distributed in the hope that it will be
-// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
-// General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the dexon-consensus library. If not, see
-// <http://www.gnu.org/licenses/>.
-
-package syncer
-
-import (
- "testing"
- "time"
-
- "github.com/dexon-foundation/dexon-consensus/common"
- "github.com/dexon-foundation/dexon-consensus/core"
- "github.com/dexon-foundation/dexon-consensus/core/crypto"
- "github.com/dexon-foundation/dexon-consensus/core/test"
- "github.com/dexon-foundation/dexon-consensus/core/types"
- "github.com/dexon-foundation/dexon-consensus/core/utils"
- "github.com/stretchr/testify/suite"
-)
-
-type AgreementTestSuite struct {
- suite.Suite
-
- signers []*utils.Signer
- pubKeys []crypto.PublicKey
- prvKeys []crypto.PrivateKey
-}
-
-func (s *AgreementTestSuite) SetupSuite() {
- var err error
- s.prvKeys, s.pubKeys, err = test.NewKeys(4)
- s.Require().NoError(err)
- for _, k := range s.prvKeys {
- s.signers = append(s.signers, utils.NewSigner(k))
- }
-}
-
-func (s *AgreementTestSuite) prepareAgreementResult(pos types.Position,
- hash common.Hash) *types.AgreementResult {
- votes := []types.Vote{}
- for _, signer := range s.signers {
- v := types.NewVote(types.VoteCom, hash, 0)
- v.Position = pos
- s.Require().NoError(signer.SignVote(v))
- votes = append(votes, *v)
- }
- return &types.AgreementResult{
- BlockHash: hash,
- Position: pos,
- Votes: votes,
- }
-}
-
-func (s *AgreementTestSuite) prepareBlock(pos types.Position) *types.Block {
- b := &types.Block{
- Position: pos,
- }
- s.Require().NoError(s.signers[0].SignBlock(b))
- return b
-}
-
-func (s *AgreementTestSuite) TestFutureAgreementResult() {
- // Make sure future types.AgreementResult could be processed correctly
- // when corresponding CRS is ready.
- var (
- futureRound = uint64(7)
- pos = types.Position{Round: 7, Height: 1000}
- )
- gov, err := test.NewGovernance(
- test.NewState(1, s.pubKeys, time.Second, &common.NullLogger{}, true),
- core.ConfigRoundShift,
- )
- s.Require().NoError(err)
- // Make sure goverance is ready for some future round, including CRS.
- gov.CatchUpWithRound(futureRound)
- for i := uint64(2); i <= futureRound; i++ {
- gov.ProposeCRS(i, common.NewRandomHash().Bytes())
- }
- s.Require().NoError(err)
- blockChan := make(chan *types.Block, 10)
- agr := newAgreement(blockChan, make(chan common.Hash, 100),
- utils.NewNodeSetCache(gov), &common.SimpleLogger{})
- go agr.run()
- block := s.prepareBlock(pos)
- result := s.prepareAgreementResult(pos, block.Hash)
- agr.inputChan <- result
- agr.inputChan <- block
- agr.inputChan <- futureRound
- select {
- case confirmedBlock := <-blockChan:
- s.Require().Equal(block.Hash, confirmedBlock.Hash)
- case <-time.After(2 * time.Second):
- s.Require().True(false)
- }
-}
-
-func TestAgreement(t *testing.T) {
- suite.Run(t, new(AgreementTestSuite))
-}
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index 65068a4..b692b56 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -115,7 +115,12 @@ func NewConsensus(
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
_, con.initChainTipHeight = db.GetCompactionChainTipInfo()
con.agreementModule = newAgreement(
- con.receiveChan, con.pullChan, con.nodeSetCache, con.logger)
+ con.initChainTipHeight,
+ con.receiveChan,
+ con.pullChan,
+ con.nodeSetCache,
+ con.tsigVerifier,
+ con.logger)
con.agreementWaitGroup.Add(1)
go func() {
defer con.agreementWaitGroup.Done()
diff --git a/core/test/network.go b/core/test/network.go
index 6034fa6..c903c57 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -130,13 +130,52 @@ func (req *PullRequest) UnmarshalJSON(data []byte) (err error) {
return
}
+// NetworkCensor is a interface to determine if a message should be censored.
+type NetworkCensor interface {
+ Censor(interface{}) bool
+}
+
+type censorClient struct {
+ TransportClient
+
+ censor NetworkCensor
+ lock sync.RWMutex
+}
+
+func (cc *censorClient) Send(ID types.NodeID, msg interface{}) error {
+ if func() bool {
+ cc.lock.RLock()
+ defer cc.lock.RUnlock()
+ return cc.censor.Censor(msg)
+ }() {
+ return nil
+ }
+ return cc.TransportClient.Send(ID, msg)
+}
+
+func (cc *censorClient) Broadcast(
+ IDs map[types.NodeID]struct{}, latency LatencyModel, msg interface{}) error {
+ if func() bool {
+ cc.lock.RLock()
+ defer cc.lock.RUnlock()
+ return cc.censor.Censor(msg)
+ }() {
+ return nil
+ }
+ return cc.TransportClient.Broadcast(IDs, latency, msg)
+}
+
+type dummyCensor struct{}
+
+func (dc *dummyCensor) Censor(interface{}) bool { return false }
+
// Network implements core.Network interface based on TransportClient.
type Network struct {
ID types.NodeID
config NetworkConfig
ctx context.Context
ctxCancel context.CancelFunc
- trans TransportClient
+ trans *censorClient
dMoment time.Time
fromTransport <-chan *TransportEnvelope
toConsensus chan interface{}
@@ -156,6 +195,8 @@ type Network struct {
cache *utils.NodeSetCache
notarySetCachesLock sync.Mutex
notarySetCaches map[uint64]map[types.NodeID]struct{}
+ censor NetworkCensor
+ censorLock sync.RWMutex
}
// NewNetwork setup network stuffs for nodes, which provides an
@@ -175,22 +216,48 @@ func NewNetwork(pubKey crypto.PublicKey, config NetworkConfig) (
notarySetCaches: make(map[uint64]map[types.NodeID]struct{}),
voteCache: make(
map[types.Position]map[types.VoteHeader]*types.Vote),
+ censor: &dummyCensor{},
}
n.ctx, n.ctxCancel = context.WithCancel(context.Background())
// Construct transport layer.
+ var trans TransportClient
switch config.Type {
case NetworkTypeTCPLocal:
- n.trans = NewTCPTransportClient(pubKey, config.Marshaller, true)
+ trans = NewTCPTransportClient(pubKey, config.Marshaller, true)
case NetworkTypeTCP:
- n.trans = NewTCPTransportClient(pubKey, config.Marshaller, false)
+ trans = NewTCPTransportClient(pubKey, config.Marshaller, false)
case NetworkTypeFake:
- n.trans = NewFakeTransportClient(pubKey)
+ trans = NewFakeTransportClient(pubKey)
default:
panic(fmt.Errorf("unknown network type: %v", config.Type))
}
+ n.trans = &censorClient{
+ TransportClient: trans,
+ censor: &dummyCensor{},
+ }
return
}
+// SetCensor to this network module.
+func (n *Network) SetCensor(censorIn, censorOut NetworkCensor) {
+ if censorIn == nil {
+ censorIn = &dummyCensor{}
+ }
+ if censorOut == nil {
+ censorOut = &dummyCensor{}
+ }
+ func() {
+ n.censorLock.Lock()
+ defer n.censorLock.Unlock()
+ n.censor = censorIn
+ }()
+ func() {
+ n.trans.lock.Lock()
+ defer n.trans.lock.Unlock()
+ n.trans.censor = censorOut
+ }()
+}
+
// PullBlocks implements core.Network interface.
func (n *Network) PullBlocks(hashes common.Hashes) {
go n.pullBlocksAsync(hashes)
@@ -224,6 +291,12 @@ func (n *Network) BroadcastBlock(block *types.Block) {
panic(err)
}
n.addBlockToCache(block)
+ if block.IsFinalized() {
+ n.addBlockFinalizationToCache(
+ block.Hash,
+ block.Finalization.Height,
+ block.Finalization.Randomness)
+ }
}
// BroadcastAgreementResult implements core.Network interface.
@@ -232,9 +305,13 @@ func (n *Network) BroadcastAgreementResult(
if !n.markAgreementResultAsSent(result.BlockHash) {
return
}
- n.addBlockRandomnessToCache(result.BlockHash, result.Randomness)
+ n.addBlockFinalizationToCache(
+ result.BlockHash,
+ result.FinalizationHeight,
+ nil,
+ )
notarySet := n.getNotarySet(result.Position.Round)
- count := len(notarySet) * gossipAgreementResultPercent / 100
+ count := len(notarySet)*gossipAgreementResultPercent/100 + 1
for nID := range notarySet {
if count--; count < 0 {
break
@@ -303,6 +380,13 @@ func (n *Network) Setup(serverEndpoint interface{}) (err error) {
}
func (n *Network) dispatchMsg(e *TransportEnvelope) {
+ if func() bool {
+ n.censorLock.RLock()
+ defer n.censorLock.RUnlock()
+ return n.censor.Censor(e.Msg)
+ }() {
+ return
+ }
msg := n.cloneForFake(e.Msg)
switch v := msg.(type) {
case *types.Block:
@@ -539,13 +623,15 @@ func (n *Network) addBlockToCache(b *types.Block) {
n.blockCache[b.Hash] = b.Clone()
}
-func (n *Network) addBlockRandomnessToCache(hash common.Hash, rand []byte) {
+func (n *Network) addBlockFinalizationToCache(
+ hash common.Hash, height uint64, rand []byte) {
n.blockCacheLock.Lock()
defer n.blockCacheLock.Unlock()
block, exist := n.blockCache[hash]
if !exist {
return
}
+ block.Finalization.Height = height
block.Finalization.Randomness = rand
}
diff --git a/core/test/network_test.go b/core/test/network_test.go
index 993ae70..d0e9fb2 100644
--- a/core/test/network_test.go
+++ b/core/test/network_test.go
@@ -291,6 +291,81 @@ func (s *NetworkTestSuite) TestBroadcastToSet() {
req.IsType(&types.Block{}, <-notaryNode.ReceiveChan())
}
+type testVoteCensor struct{}
+
+func (vc *testVoteCensor) Censor(msg interface{}) bool {
+ if _, ok := msg.(*types.Vote); ok {
+ return true
+ }
+ return false
+}
+
+func (s *NetworkTestSuite) TestCensor() {
+ var (
+ req = s.Require()
+ peerCount = 5
+ )
+ _, pubKeys, err := NewKeys(peerCount)
+ req.NoError(err)
+ networks := s.setupNetworks(pubKeys)
+ receiveChans := make(map[types.NodeID]<-chan interface{}, peerCount)
+ for nID, node := range networks {
+ receiveChans[nID] = node.ReceiveChan()
+ }
+
+ censor := &testVoteCensor{}
+ vote := &types.Vote{}
+ censorNodeID := types.NewNodeID(pubKeys[0])
+ otherNodeID := types.NewNodeID(pubKeys[1])
+ censorNode := networks[censorNodeID]
+ otherNode := networks[otherNodeID]
+
+ // Censor incomming votes.
+ censorNode.SetCensor(censor, nil)
+ otherNode.BroadcastVote(vote)
+ time.Sleep(50 * time.Millisecond)
+ for nID, receiveChan := range receiveChans {
+ if nID == otherNodeID || nID == censorNodeID {
+ req.Equal(0, len(receiveChan))
+ } else {
+ req.Equal(1, len(receiveChan))
+ req.IsType(&types.Vote{}, <-receiveChan)
+ }
+ }
+
+ // Censor outgoing votes.
+ censorNode.SetCensor(nil, censor)
+ censorNode.BroadcastVote(vote)
+ time.Sleep(50 * time.Millisecond)
+ for _, receiveChan := range receiveChans {
+ req.Equal(0, len(receiveChan))
+ }
+
+ // No censorship.
+ censorNode.SetCensor(nil, nil)
+ otherNode.BroadcastVote(vote)
+ time.Sleep(50 * time.Millisecond)
+ for nID, receiveChan := range receiveChans {
+ if nID == otherNodeID {
+ req.Equal(0, len(receiveChan))
+ } else {
+ req.Equal(1, len(receiveChan))
+ req.IsType(&types.Vote{}, <-receiveChan)
+ }
+ }
+ censorNode.BroadcastVote(vote)
+ time.Sleep(50 * time.Millisecond)
+ for nID, receiveChan := range receiveChans {
+ if nID == censorNodeID {
+ req.Equal(0, len(receiveChan))
+ } else {
+ req.Equal(1, len(receiveChan))
+ req.IsType(&types.Vote{}, <-receiveChan)
+ }
+ }
+
+}
+
func TestNetwork(t *testing.T) {
suite.Run(t, new(NetworkTestSuite))
}
diff --git a/core/types/block-randomness.go b/core/types/block-randomness.go
index 9a0a65e..a3ba631 100644
--- a/core/types/block-randomness.go
+++ b/core/types/block-randomness.go
@@ -18,7 +18,6 @@
package types
import (
- "encoding/hex"
"fmt"
"github.com/dexon-foundation/dexon-consensus/common"
@@ -26,20 +25,14 @@ import (
// AgreementResult describes an agremeent result.
type AgreementResult struct {
- BlockHash common.Hash `json:"block_hash"`
- Position Position `json:"position"`
- // TODO(jimmy): remove Votes
- Votes []Vote `json:"votes"`
- IsEmptyBlock bool `json:"is_empty_block"`
- Randomness []byte `json:"randomness"`
+ BlockHash common.Hash `json:"block_hash"`
+ Position Position `json:"position"`
+ Votes []Vote `json:"votes"`
+ IsEmptyBlock bool `json:"is_empty_block"`
+ FinalizationHeight uint64 `json:"finalization_height"`
}
func (r *AgreementResult) String() string {
- if len(r.Randomness) == 0 {
- return fmt.Sprintf("agreementResult{Block:%s Pos:%s}",
- r.BlockHash.String()[:6], r.Position)
- }
- return fmt.Sprintf("agreementResult{Block:%s Pos:%s Rand:%s}",
- r.BlockHash.String()[:6], r.Position,
- hex.EncodeToString(r.Randomness)[:6])
+ return fmt.Sprintf("agreementResult{Block:%s Pos:%s}",
+ r.BlockHash.String()[:6], r.Position)
}
diff --git a/integration_test/byzantine_test.go b/integration_test/byzantine_test.go
index 3aea057..62dfe7a 100644
--- a/integration_test/byzantine_test.go
+++ b/integration_test/byzantine_test.go
@@ -191,6 +191,72 @@ Loop:
s.verifyNodes(nodes)
}
+type voteCensor struct{}
+
+func (vc *voteCensor) Censor(msg interface{}) bool {
+ _, ok := msg.(*types.Vote)
+ return ok
+}
+
+func (s *ByzantineTestSuite) TestOneNodeWithoutVote() {
+ // 4 nodes setup with one node's votes been censored.
+ // so it will always do syncing BA.
+ var (
+ req = s.Require()
+ peerCount = 4
+ dMoment = time.Now().UTC()
+ untilRound = uint64(3)
+ tolerence = uint64(2)
+ )
+ if testing.Short() {
+ untilRound = 2
+ }
+ prvKeys, pubKeys, err := test.NewKeys(peerCount)
+ req.NoError(err)
+ // Setup seed governance instance. Give a short latency to make this test
+ // run faster.
+ lambda := 100 * time.Millisecond
+ seedGov, err := test.NewGovernance(
+ test.NewState(core.DKGDelayRound,
+ pubKeys, lambda, &common.NullLogger{}, true),
+ core.ConfigRoundShift)
+ req.NoError(err)
+ req.NoError(seedGov.State().RequestChange(
+ test.StateChangeRoundLength, uint64(100)))
+ votelessNodeID := types.NewNodeID(pubKeys[0])
+ nodes := s.setupNodes(dMoment, prvKeys, seedGov)
+ votelessNode := nodes[votelessNodeID]
+ votelessNode.network.SetCensor(&voteCensor{}, &voteCensor{})
+ for _, n := range nodes {
+ go n.con.Run()
+ defer n.con.Stop()
+ }
+Loop:
+ for {
+ <-time.After(5 * time.Second)
+ fmt.Println("check latest position delivered by voteless node")
+ latestPos := votelessNode.app.GetLatestDeliveredPosition()
+ fmt.Println("latestPos", votelessNode.ID, &latestPos)
+ for _, n := range nodes {
+ if n.ID == votelessNodeID {
+ continue
+ }
+ otherPos := n.app.GetLatestDeliveredPosition()
+ if otherPos.Newer(latestPos) {
+ fmt.Println("otherPos", n.ID, &otherPos)
+ s.Require().True(
+ otherPos.Height-latestPos.Height <= tolerence)
+ }
+ }
+ if latestPos.Round < untilRound {
+ continue Loop
+ }
+ // Oh ya.
+ break
+ }
+ s.verifyNodes(nodes)
+}
+
func TestByzantine(t *testing.T) {
suite.Run(t, new(ByzantineTestSuite))
}