aboutsummaryrefslogtreecommitdiffstats
path: root/core/syncer
diff options
context:
space:
mode:
Diffstat (limited to 'core/syncer')
-rw-r--r--core/syncer/agreement.go119
-rw-r--r--core/syncer/agreement_test.go110
-rw-r--r--core/syncer/consensus.go7
3 files changed, 98 insertions, 138 deletions
diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go
index f172b3b..98e86b1 100644
--- a/core/syncer/agreement.go
+++ b/core/syncer/agreement.go
@@ -23,6 +23,7 @@ import (
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core"
+ "github.com/dexon-foundation/dexon-consensus/core/crypto"
"github.com/dexon-foundation/dexon-consensus/core/types"
"github.com/dexon-foundation/dexon-consensus/core/utils"
)
@@ -30,33 +31,42 @@ import (
// Struct agreement implements struct of BA (Byzantine Agreement) protocol
// needed in syncer, which only receives agreement results.
type agreement struct {
- cache *utils.NodeSetCache
- inputChan chan interface{}
- outputChan chan<- *types.Block
- pullChan chan<- common.Hash
- blocks map[types.Position]map[common.Hash]*types.Block
- agreementResults map[common.Hash]struct{}
- latestCRSRound uint64
- pendings map[uint64]map[common.Hash]*types.AgreementResult
- logger common.Logger
- confirmedBlocks map[common.Hash]struct{}
- ctx context.Context
- ctxCancel context.CancelFunc
+ chainTip uint64
+ cache *utils.NodeSetCache
+ tsigVerifierCache *core.TSigVerifierCache
+ inputChan chan interface{}
+ outputChan chan<- *types.Block
+ pullChan chan<- common.Hash
+ blocks map[types.Position]map[common.Hash]*types.Block
+ agreementResults map[common.Hash]struct{}
+ latestCRSRound uint64
+ pendingAgrs map[uint64]map[common.Hash]*types.AgreementResult
+ pendingBlocks map[uint64]map[common.Hash]*types.Block
+ logger common.Logger
+ confirmedBlocks map[common.Hash]struct{}
+ ctx context.Context
+ ctxCancel context.CancelFunc
}
// newAgreement creates a new agreement instance.
-func newAgreement(ch chan<- *types.Block, pullChan chan<- common.Hash,
- cache *utils.NodeSetCache, logger common.Logger) *agreement {
+func newAgreement(chainTip uint64,
+ ch chan<- *types.Block, pullChan chan<- common.Hash,
+ cache *utils.NodeSetCache, verifier *core.TSigVerifierCache,
+ logger common.Logger) *agreement {
a := &agreement{
- cache: cache,
- inputChan: make(chan interface{}, 1000),
- outputChan: ch,
- pullChan: pullChan,
- blocks: make(map[types.Position]map[common.Hash]*types.Block),
- agreementResults: make(map[common.Hash]struct{}),
- logger: logger,
- pendings: make(
+ chainTip: chainTip,
+ cache: cache,
+ tsigVerifierCache: verifier,
+ inputChan: make(chan interface{}, 1000),
+ outputChan: ch,
+ pullChan: pullChan,
+ blocks: make(map[types.Position]map[common.Hash]*types.Block),
+ agreementResults: make(map[common.Hash]struct{}),
+ logger: logger,
+ pendingAgrs: make(
map[uint64]map[common.Hash]*types.AgreementResult),
+ pendingBlocks: make(
+ map[uint64]map[common.Hash]*types.Block),
confirmedBlocks: make(map[common.Hash]struct{}),
}
a.ctx, a.ctxCancel = context.WithCancel(context.Background())
@@ -76,7 +86,11 @@ func (a *agreement) run() {
}
switch v := val.(type) {
case *types.Block:
- a.processBlock(v)
+ if v.IsFinalized() {
+ a.processFinalizedBlock(v)
+ } else {
+ a.processBlock(v)
+ }
case *types.AgreementResult:
a.processAgreementResult(v)
case uint64:
@@ -100,17 +114,68 @@ func (a *agreement) processBlock(b *types.Block) {
}
}
+func (a *agreement) processFinalizedBlock(block *types.Block) {
+ if block.Position.Round < core.DKGDelayRound {
+ return
+ }
+ // Cache those results that CRS is not ready yet.
+ if _, exists := a.confirmedBlocks[block.Hash]; exists {
+ a.logger.Trace("finalized block already confirmed", "block", block)
+ return
+ }
+ if block.Position.Round > a.latestCRSRound {
+ pendingsForRound, exists := a.pendingBlocks[block.Position.Round]
+ if !exists {
+ pendingsForRound = make(map[common.Hash]*types.Block)
+ a.pendingBlocks[block.Position.Round] = pendingsForRound
+ }
+ pendingsForRound[block.Hash] = block
+ a.logger.Trace("finalized block cached", "block", block)
+ return
+ }
+ if err := utils.VerifyBlockSignature(block); err != nil {
+ return
+ }
+ verifier, ok, err := a.tsigVerifierCache.UpdateAndGet(block.Position.Round)
+ if err != nil {
+ a.logger.Error("error verifying block randomness",
+ "block", block,
+ "error", err)
+ return
+ }
+ if !ok {
+ a.logger.Error("cannot verify block randomness", "block", block)
+ return
+ }
+ if !verifier.VerifySignature(block.Hash, crypto.Signature{
+ Type: "bls",
+ Signature: block.Finalization.Randomness,
+ }) {
+ a.logger.Error("incorrect block randomness", "block", block)
+ return
+ }
+ a.confirm(block)
+ if block.Position.Height > a.chainTip+1 {
+ if _, exist := a.confirmedBlocks[block.ParentHash]; !exist {
+ a.pullChan <- block.ParentHash
+ }
+ }
+}
+
func (a *agreement) processAgreementResult(r *types.AgreementResult) {
+ if r.Position.Round >= core.DKGDelayRound {
+ return
+ }
// Cache those results that CRS is not ready yet.
if _, exists := a.confirmedBlocks[r.BlockHash]; exists {
a.logger.Trace("Agreement result already confirmed", "result", r)
return
}
if r.Position.Round > a.latestCRSRound {
- pendingsForRound, exists := a.pendings[r.Position.Round]
+ pendingsForRound, exists := a.pendingAgrs[r.Position.Round]
if !exists {
pendingsForRound = make(map[common.Hash]*types.AgreementResult)
- a.pendings[r.Position.Round] = pendingsForRound
+ a.pendingAgrs[r.Position.Round] = pendingsForRound
}
pendingsForRound[r.BlockHash] = r
a.logger.Trace("Agreement result cached", "result", r)
@@ -164,11 +229,11 @@ func (a *agreement) processNewCRS(round uint64) {
a.latestCRSRound = round
// Verify all pending results.
for r := prevRound; r <= a.latestCRSRound; r++ {
- pendingsForRound := a.pendings[r]
+ pendingsForRound := a.pendingAgrs[r]
if pendingsForRound == nil {
continue
}
- delete(a.pendings, r)
+ delete(a.pendingAgrs, r)
for _, res := range pendingsForRound {
if err := core.VerifyAgreementResult(res, a.cache); err != nil {
a.logger.Error("Invalid agreement result",
diff --git a/core/syncer/agreement_test.go b/core/syncer/agreement_test.go
deleted file mode 100644
index 0a12b3c..0000000
--- a/core/syncer/agreement_test.go
+++ /dev/null
@@ -1,110 +0,0 @@
-// Copyright 2019 The dexon-consensus Authors
-// This file is part of the dexon-consensus library.
-//
-// The dexon-consensus library is free software: you can redistribute it
-// and/or modify it under the terms of the GNU Lesser General Public License as
-// published by the Free Software Foundation, either version 3 of the License,
-// or (at your option) any later version. //
-// The dexon-consensus library is distributed in the hope that it will be
-// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
-// General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the dexon-consensus library. If not, see
-// <http://www.gnu.org/licenses/>.
-
-package syncer
-
-import (
- "testing"
- "time"
-
- "github.com/dexon-foundation/dexon-consensus/common"
- "github.com/dexon-foundation/dexon-consensus/core"
- "github.com/dexon-foundation/dexon-consensus/core/crypto"
- "github.com/dexon-foundation/dexon-consensus/core/test"
- "github.com/dexon-foundation/dexon-consensus/core/types"
- "github.com/dexon-foundation/dexon-consensus/core/utils"
- "github.com/stretchr/testify/suite"
-)
-
-type AgreementTestSuite struct {
- suite.Suite
-
- signers []*utils.Signer
- pubKeys []crypto.PublicKey
- prvKeys []crypto.PrivateKey
-}
-
-func (s *AgreementTestSuite) SetupSuite() {
- var err error
- s.prvKeys, s.pubKeys, err = test.NewKeys(4)
- s.Require().NoError(err)
- for _, k := range s.prvKeys {
- s.signers = append(s.signers, utils.NewSigner(k))
- }
-}
-
-func (s *AgreementTestSuite) prepareAgreementResult(pos types.Position,
- hash common.Hash) *types.AgreementResult {
- votes := []types.Vote{}
- for _, signer := range s.signers {
- v := types.NewVote(types.VoteCom, hash, 0)
- v.Position = pos
- s.Require().NoError(signer.SignVote(v))
- votes = append(votes, *v)
- }
- return &types.AgreementResult{
- BlockHash: hash,
- Position: pos,
- Votes: votes,
- }
-}
-
-func (s *AgreementTestSuite) prepareBlock(pos types.Position) *types.Block {
- b := &types.Block{
- Position: pos,
- }
- s.Require().NoError(s.signers[0].SignBlock(b))
- return b
-}
-
-func (s *AgreementTestSuite) TestFutureAgreementResult() {
- // Make sure future types.AgreementResult could be processed correctly
- // when corresponding CRS is ready.
- var (
- futureRound = uint64(7)
- pos = types.Position{Round: 7, Height: 1000}
- )
- gov, err := test.NewGovernance(
- test.NewState(1, s.pubKeys, time.Second, &common.NullLogger{}, true),
- core.ConfigRoundShift,
- )
- s.Require().NoError(err)
- // Make sure goverance is ready for some future round, including CRS.
- gov.CatchUpWithRound(futureRound)
- for i := uint64(2); i <= futureRound; i++ {
- gov.ProposeCRS(i, common.NewRandomHash().Bytes())
- }
- s.Require().NoError(err)
- blockChan := make(chan *types.Block, 10)
- agr := newAgreement(blockChan, make(chan common.Hash, 100),
- utils.NewNodeSetCache(gov), &common.SimpleLogger{})
- go agr.run()
- block := s.prepareBlock(pos)
- result := s.prepareAgreementResult(pos, block.Hash)
- agr.inputChan <- result
- agr.inputChan <- block
- agr.inputChan <- futureRound
- select {
- case confirmedBlock := <-blockChan:
- s.Require().Equal(block.Hash, confirmedBlock.Hash)
- case <-time.After(2 * time.Second):
- s.Require().True(false)
- }
-}
-
-func TestAgreement(t *testing.T) {
- suite.Run(t, new(AgreementTestSuite))
-}
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index 65068a4..b692b56 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -115,7 +115,12 @@ func NewConsensus(
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
_, con.initChainTipHeight = db.GetCompactionChainTipInfo()
con.agreementModule = newAgreement(
- con.receiveChan, con.pullChan, con.nodeSetCache, con.logger)
+ con.initChainTipHeight,
+ con.receiveChan,
+ con.pullChan,
+ con.nodeSetCache,
+ con.tsigVerifier,
+ con.logger)
con.agreementWaitGroup.Add(1)
go func() {
defer con.agreementWaitGroup.Done()