// Copyright 2018 The dexon-consensus Authors
// This file is part of the dexon-consensus-core library.
//
// The dexon-consensus-core library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus-core library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus-core library. If not, see
// <http://www.gnu.org/licenses/>.
package syncer
import (
"bytes"
"context"
"fmt"
"time"
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
"github.com/dexon-foundation/dexon-consensus/core/types"
"github.com/dexon-foundation/dexon-consensus/core/utils"
)
// Struct agreement implements struct of BA (Byzantine Agreement) protocol
// needed in syncer, which only receives agreement results.
type agreement struct {
chainTip uint64
cache *utils.NodeSetCache
tsigVerifierCache *core.TSigVerifierCache
inputChan chan interface{}
outputChan chan<- *types.Block
pullChan chan<- common.Hash
blocks map[types.Position]map[common.Hash]*types.Block
agreementResults map[common.Hash][]byte
latestCRSRound uint64
pendingAgrs map[uint64]map[common.Hash]*types.AgreementResult
pendingBlocks map[uint64]map[common.Hash]*types.Block
logger common.Logger
confirmedBlocks map[common.Hash]struct{}
ctx context.Context
ctxCancel context.CancelFunc
}
// newAgreement creates a new agreement instance.
func newAgreement(chainTip uint64,
ch chan<- *types.Block, pullChan chan<- common.Hash,
cache *utils.NodeSetCache, verifier *core.TSigVerifierCache,
logger common.Logger) *agreement {
a := &agreement{
chainTip: chainTip,
cache: cache,
tsigVerifierCache: verifier,
inputChan: make(chan interface{}, 1000),
outputChan: ch,
pullChan: pullChan,
blocks: make(map[types.Position]map[common.Hash]*types.Block),
agreementResults: make(map[common.Hash][]byte),
logger: logger,
pendingAgrs: make(
map[uint64]map[common.Hash]*types.AgreementResult),
pendingBlocks: make(
map[uint64]map[common.Hash]*types.Block),
confirmedBlocks: make(map[common.Hash]struct{}),
}
a.ctx, a.ctxCancel = context.WithCancel(context.Background())
return a
}
// run starts the agreement, this does not start a new routine, go a new
// routine explicitly in the caller.
func (a *agreement) run() {
defer a.ctxCancel()
for {
select {
case val, ok := <-a.inputChan:
if !ok {
// InputChan is closed by network when network ends.
return
}
switch v := val.(type) {
case *types.Block:
if v.Position.Round >= core.DKGDelayRound && v.IsFinalized() {
a.processFinalizedBlock(v)
} else {
a.processBlock(v)
}
case *types.AgreementResult:
a.processAgreementResult(v)
case uint64:
a.processNewCRS(v)
}
}
}
}
func (a *agreement) processBlock(b *types.Block) {
if _, exist := a.confirmedBlocks[b.Hash]; exist {
return
}
if rand, exist := a.agreementResults[b.Hash]; exist {
if len(b.Randomness) == 0 {
b.Randomness = rand
}
a.confirm(b)
} else {
if _, exist := a.blocks[b.Position]; !exist {
a.blocks[b.Position] = make(map[common.Hash]*types.Block)
}
a.blocks[b.Position][b.Hash] = b
}
}
func (a *agreement) processFinalizedBlock(block *types.Block) {
// Cache those results that CRS is not ready yet.
if _, exists := a.confirmedBlocks[block.Hash]; exists {
a.logger.Trace("finalized block already confirmed", "block", block)
return
}
if block.Position.Round > a.latestCRSRound {
pendingsForRound, exists := a.pendingBlocks[block.Position.Round]
if !exists {
pendingsForRound = make(map[common.Hash]*types.Block)
a.pendingBlocks[block.Position.Round] = pendingsForRound
}
pendingsForRound[block.Hash] = block
a.logger.Trace("finalized block cached", "block", block)
return
}
if err := utils.VerifyBlockSignature(block); err != nil {
return
}
verifier, ok, err := a.tsigVerifierCache.UpdateAndGet(
block.Position.Round)
if err != nil {
a.logger.Error("error verifying block randomness",
"block", block,
"error", err)
return
}
if !ok {
a.logger.Error("cannot verify block randomness", "block", block)
return
}
if !verifier.VerifySignature(block.Hash, crypto.Signature{
Type: "bls",
Signature: block.Randomness,
}) {
a.logger.Error("incorrect block randomness", "block", block)
return
}
a.confirm(block)
}
func (a *agreement) processAgreementResult(r *types.AgreementResult) {
// Cache those results that CRS is not ready yet.
if _, exists := a.confirmedBlocks[r.BlockHash]; exists {
a.logger.Trace("Agreement result already confirmed", "result", r)
return
}
if r.Position.Round > a.latestCRSRound {
pendingsForRound, exists := a.pendingAgrs[r.Position.Round]
if !exists {
pendingsForRound = make(map[common.Hash]*types.AgreementResult)
a.pendingAgrs[r.Position.Round] = pendingsForRound
}
pendingsForRound[r.BlockHash] = r
a.logger.Trace("Agreement result cached", "result", r)
return
}
notarySet, err := a.cache.GetNotarySet(r.Position.Round)
if err != nil {
a.logger.Error("unable to get notary set", "result", r, "error", err)
return
}
if err := core.VerifyAgreementResult(r, notarySet); err != nil {
a.logger.Error("Agreement result verification failed",
"result", r,
"error", err)
return
}
if r.Position.Round >= core.DKGDelayRound {
verifier, ok, err := a.tsigVerifierCache.UpdateAndGet(r.Position.Round)
if err != nil {
a.logger.Error("error verifying agreement result randomness",
"result", r,
"error", err)
return
}
if !ok {
a.logger.Error("cannot verify agreement result randomness", "result", r)
return
}
if !verifier.VerifySignature(r.BlockHash, crypto.Signature{
Type: "bls",
Signature: r.Randomness,
}) {
a.logger.Error("incorrect agreement result randomness", "result", r)
return
}
} else {
// Special case for rounds before DKGDelayRound.
if bytes.Compare(r.Randomness, core.NoRand) != 0 {
a.logger.Error("incorrect agreement result randomness", "result", r)
return
}
}
if r.IsEmptyBlock {
b := &types.Block{
Position: r.Position,
Randomness: r.Randomness,
}
// Empty blocks should be confirmed directly, they won't be sent over
// the wire.
a.confirm(b)
return
}
if bs, exist := a.blocks[r.Position]; exist {
if b, exist := bs[r.BlockHash]; exist {
b.Randomness = r.Randomness
a.confirm(b)
return
}
}
a.agreementResults[r.BlockHash] = r.Randomness
loop:
for {
select {
case a.pullChan <- r.BlockHash:
break loop
case <-a.ctx.Done():
a.logger.Error("Pull request is not sent",
"position", &r.Position,
"hash", r.BlockHash.String()[:6])
return
case <-time.After(500 * time.Millisecond):
a.logger.Debug("Pull request is unable to send",
"position", &r.Position,
"hash", r.BlockHash.String()[:6])
}
}
}
func (a *agreement) processNewCRS(round uint64) {
if round <= a.latestCRSRound {
return
}
prevRound := a.latestCRSRound + 1
a.latestCRSRound = round
// Verify all pending results.
for r := prevRound; r <= a.latestCRSRound; r++ {
notarySet, err := a.cache.GetNotarySet(r)
if err != nil {
a.logger.Error("Unable to get notary set", "round", r, "error", err)
continue
}
pendingsForRound := a.pendingAgrs[r]
if pendingsForRound == nil {
continue
}
delete(a.pendingAgrs, r)
for _, res := range pendingsForRound {
if err := core.VerifyAgreementResult(res, notarySet); err != nil {
a.logger.Error("Invalid agreement result",
"result", res,
"error", err)
continue
}
a.logger.Error("Flush agreement result", "result", res)
a.processAgreementResult(res)
break
}
}
}
// confirm notifies consensus the confirmation of a block in BA.
func (a *agreement) confirm(b *types.Block) {
if !b.IsFinalized() {
panic(fmt.Errorf("confirm a block %s without randomness", b))
}
if _, exist := a.confirmedBlocks[b.Hash]; !exist {
delete(a.blocks, b.Position)
delete(a.agreementResults, b.Hash)
loop:
for {
select {
case a.outputChan <- b:
break loop
case <-a.ctx.Done():
a.logger.Error("Confirmed block is not sent", "block", b)
return
case <-time.After(500 * time.Millisecond):
a.logger.Debug("Agreement output channel is full", "block", b)
}
}
a.confirmedBlocks[b.Hash] = struct{}{}
}
if b.Position.Height > a.chainTip+1 {
if _, exist := a.confirmedBlocks[b.ParentHash]; !exist {
a.pullChan <- b.ParentHash
}
}
}