// Copyright 2018 The dexon-consensus Authors // This file is part of the dexon-consensus-core library. // // The dexon-consensus-core library is free software: you can redistribute it // and/or modify it under the terms of the GNU Lesser General Public License as // published by the Free Software Foundation, either version 3 of the License, // or (at your option) any later version. // // The dexon-consensus-core library is distributed in the hope that it will be // useful, but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser // General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the dexon-consensus-core library. If not, see // . package syncer import ( "bytes" "context" "fmt" "time" "github.com/byzantine-lab/dexon-consensus/common" "github.com/byzantine-lab/dexon-consensus/core" "github.com/byzantine-lab/dexon-consensus/core/crypto" "github.com/byzantine-lab/dexon-consensus/core/types" "github.com/byzantine-lab/dexon-consensus/core/utils" ) // Struct agreement implements struct of BA (Byzantine Agreement) protocol // needed in syncer, which only receives agreement results. type agreement struct { chainTip uint64 cache *utils.NodeSetCache tsigVerifierCache *core.TSigVerifierCache inputChan chan interface{} outputChan chan<- *types.Block pullChan chan<- common.Hash blocks map[types.Position]map[common.Hash]*types.Block agreementResults map[common.Hash][]byte latestCRSRound uint64 pendingAgrs map[uint64]map[common.Hash]*types.AgreementResult pendingBlocks map[uint64]map[common.Hash]*types.Block logger common.Logger confirmedBlocks map[common.Hash]struct{} ctx context.Context ctxCancel context.CancelFunc } // newAgreement creates a new agreement instance. func newAgreement(chainTip uint64, ch chan<- *types.Block, pullChan chan<- common.Hash, cache *utils.NodeSetCache, verifier *core.TSigVerifierCache, logger common.Logger) *agreement { a := &agreement{ chainTip: chainTip, cache: cache, tsigVerifierCache: verifier, inputChan: make(chan interface{}, 1000), outputChan: ch, pullChan: pullChan, blocks: make(map[types.Position]map[common.Hash]*types.Block), agreementResults: make(map[common.Hash][]byte), logger: logger, pendingAgrs: make( map[uint64]map[common.Hash]*types.AgreementResult), pendingBlocks: make( map[uint64]map[common.Hash]*types.Block), confirmedBlocks: make(map[common.Hash]struct{}), } a.ctx, a.ctxCancel = context.WithCancel(context.Background()) return a } // run starts the agreement, this does not start a new routine, go a new // routine explicitly in the caller. func (a *agreement) run() { defer a.ctxCancel() for { select { case val, ok := <-a.inputChan: if !ok { // InputChan is closed by network when network ends. return } switch v := val.(type) { case *types.Block: if v.Position.Round >= core.DKGDelayRound && v.IsFinalized() { a.processFinalizedBlock(v) } else { a.processBlock(v) } case *types.AgreementResult: a.processAgreementResult(v) case uint64: a.processNewCRS(v) } } } } func (a *agreement) processBlock(b *types.Block) { if _, exist := a.confirmedBlocks[b.Hash]; exist { return } if rand, exist := a.agreementResults[b.Hash]; exist { if len(b.Randomness) == 0 { b.Randomness = rand } a.confirm(b) } else { if _, exist := a.blocks[b.Position]; !exist { a.blocks[b.Position] = make(map[common.Hash]*types.Block) } a.blocks[b.Position][b.Hash] = b } } func (a *agreement) processFinalizedBlock(block *types.Block) { // Cache those results that CRS is not ready yet. if _, exists := a.confirmedBlocks[block.Hash]; exists { a.logger.Trace("finalized block already confirmed", "block", block) return } if block.Position.Round > a.latestCRSRound { pendingsForRound, exists := a.pendingBlocks[block.Position.Round] if !exists { pendingsForRound = make(map[common.Hash]*types.Block) a.pendingBlocks[block.Position.Round] = pendingsForRound } pendingsForRound[block.Hash] = block a.logger.Trace("finalized block cached", "block", block) return } if err := utils.VerifyBlockSignature(block); err != nil { return } verifier, ok, err := a.tsigVerifierCache.UpdateAndGet( block.Position.Round) if err != nil { a.logger.Error("error verifying block randomness", "block", block, "error", err) return } if !ok { a.logger.Error("cannot verify block randomness", "block", block) return } if !verifier.VerifySignature(block.Hash, crypto.Signature{ Type: "bls", Signature: block.Randomness, }) { a.logger.Error("incorrect block randomness", "block", block) return } a.confirm(block) } func (a *agreement) processAgreementResult(r *types.AgreementResult) { // Cache those results that CRS is not ready yet. if _, exists := a.confirmedBlocks[r.BlockHash]; exists { a.logger.Trace("Agreement result already confirmed", "result", r) return } if r.Position.Round > a.latestCRSRound { pendingsForRound, exists := a.pendingAgrs[r.Position.Round] if !exists { pendingsForRound = make(map[common.Hash]*types.AgreementResult) a.pendingAgrs[r.Position.Round] = pendingsForRound } pendingsForRound[r.BlockHash] = r a.logger.Trace("Agreement result cached", "result", r) return } if err := core.VerifyAgreementResult(r, a.cache); err != nil { a.logger.Error("Agreement result verification failed", "result", r, "error", err) return } if r.Position.Round >= core.DKGDelayRound { verifier, ok, err := a.tsigVerifierCache.UpdateAndGet(r.Position.Round) if err != nil { a.logger.Error("error verifying agreement result randomness", "result", r, "error", err) return } if !ok { a.logger.Error("cannot verify agreement result randomness", "result", r) return } if !verifier.VerifySignature(r.BlockHash, crypto.Signature{ Type: "bls", Signature: r.Randomness, }) { a.logger.Error("incorrect agreement result randomness", "result", r) return } } else { // Special case for rounds before DKGDelayRound. if bytes.Compare(r.Randomness, core.NoRand) != 0 { a.logger.Error("incorrect agreement result randomness", "result", r) return } } if r.IsEmptyBlock { b := &types.Block{ Position: r.Position, Randomness: r.Randomness, } // Empty blocks should be confirmed directly, they won't be sent over // the wire. a.confirm(b) return } if bs, exist := a.blocks[r.Position]; exist { if b, exist := bs[r.BlockHash]; exist { b.Randomness = r.Randomness a.confirm(b) return } } a.agreementResults[r.BlockHash] = r.Randomness loop: for { select { case a.pullChan <- r.BlockHash: break loop case <-a.ctx.Done(): a.logger.Error("Pull request is not sent", "position", &r.Position, "hash", r.BlockHash.String()[:6]) return case <-time.After(500 * time.Millisecond): a.logger.Debug("Pull request is unable to send", "position", &r.Position, "hash", r.BlockHash.String()[:6]) } } } func (a *agreement) processNewCRS(round uint64) { if round <= a.latestCRSRound { return } prevRound := a.latestCRSRound + 1 a.latestCRSRound = round // Verify all pending results. for r := prevRound; r <= a.latestCRSRound; r++ { pendingsForRound := a.pendingAgrs[r] if pendingsForRound == nil { continue } delete(a.pendingAgrs, r) for _, res := range pendingsForRound { if err := core.VerifyAgreementResult(res, a.cache); err != nil { a.logger.Error("Invalid agreement result", "result", res, "error", err) continue } a.logger.Error("Flush agreement result", "result", res) a.processAgreementResult(res) break } } } // confirm notifies consensus the confirmation of a block in BA. func (a *agreement) confirm(b *types.Block) { if !b.IsFinalized() { panic(fmt.Errorf("confirm a block %s without randomness", b)) } if _, exist := a.confirmedBlocks[b.Hash]; !exist { delete(a.blocks, b.Position) delete(a.agreementResults, b.Hash) loop: for { select { case a.outputChan <- b: break loop case <-a.ctx.Done(): a.logger.Error("Confirmed block is not sent", "block", b) return case <-time.After(500 * time.Millisecond): a.logger.Debug("Agreement output channel is full", "block", b) } } a.confirmedBlocks[b.Hash] = struct{}{} } if b.Position.Height > a.chainTip+1 { if _, exist := a.confirmedBlocks[b.ParentHash]; !exist { a.pullChan <- b.ParentHash } } }