aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/byzantine-lab/dexon-consensus/core/syncer
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/byzantine-lab/dexon-consensus/core/syncer')
-rw-r--r--vendor/github.com/byzantine-lab/dexon-consensus/core/syncer/agreement.go301
-rw-r--r--vendor/github.com/byzantine-lab/dexon-consensus/core/syncer/consensus.go543
-rw-r--r--vendor/github.com/byzantine-lab/dexon-consensus/core/syncer/watch-cat.go156
3 files changed, 1000 insertions, 0 deletions
diff --git a/vendor/github.com/byzantine-lab/dexon-consensus/core/syncer/agreement.go b/vendor/github.com/byzantine-lab/dexon-consensus/core/syncer/agreement.go
new file mode 100644
index 000000000..274cbfc79
--- /dev/null
+++ b/vendor/github.com/byzantine-lab/dexon-consensus/core/syncer/agreement.go
@@ -0,0 +1,301 @@
+// Copyright 2018 The dexon-consensus Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package syncer
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/byzantine-lab/dexon-consensus/common"
+ "github.com/byzantine-lab/dexon-consensus/core"
+ "github.com/byzantine-lab/dexon-consensus/core/crypto"
+ "github.com/byzantine-lab/dexon-consensus/core/types"
+ "github.com/byzantine-lab/dexon-consensus/core/utils"
+)
+
+// Struct agreement implements struct of BA (Byzantine Agreement) protocol
+// needed in syncer, which only receives agreement results.
+type agreement struct {
+ chainTip uint64
+ cache *utils.NodeSetCache
+ tsigVerifierCache *core.TSigVerifierCache
+ inputChan chan interface{}
+ outputChan chan<- *types.Block
+ pullChan chan<- common.Hash
+ blocks map[types.Position]map[common.Hash]*types.Block
+ agreementResults map[common.Hash][]byte
+ latestCRSRound uint64
+ pendingAgrs map[uint64]map[common.Hash]*types.AgreementResult
+ pendingBlocks map[uint64]map[common.Hash]*types.Block
+ logger common.Logger
+ confirmedBlocks map[common.Hash]struct{}
+ ctx context.Context
+ ctxCancel context.CancelFunc
+}
+
+// newAgreement creates a new agreement instance.
+func newAgreement(chainTip uint64,
+ ch chan<- *types.Block, pullChan chan<- common.Hash,
+ cache *utils.NodeSetCache, verifier *core.TSigVerifierCache,
+ logger common.Logger) *agreement {
+ a := &agreement{
+ chainTip: chainTip,
+ cache: cache,
+ tsigVerifierCache: verifier,
+ inputChan: make(chan interface{}, 1000),
+ outputChan: ch,
+ pullChan: pullChan,
+ blocks: make(map[types.Position]map[common.Hash]*types.Block),
+ agreementResults: make(map[common.Hash][]byte),
+ logger: logger,
+ pendingAgrs: make(
+ map[uint64]map[common.Hash]*types.AgreementResult),
+ pendingBlocks: make(
+ map[uint64]map[common.Hash]*types.Block),
+ confirmedBlocks: make(map[common.Hash]struct{}),
+ }
+ a.ctx, a.ctxCancel = context.WithCancel(context.Background())
+ return a
+}
+
+// run starts the agreement, this does not start a new routine, go a new
+// routine explicitly in the caller.
+func (a *agreement) run() {
+ defer a.ctxCancel()
+ for {
+ select {
+ case val, ok := <-a.inputChan:
+ if !ok {
+ // InputChan is closed by network when network ends.
+ return
+ }
+ switch v := val.(type) {
+ case *types.Block:
+ if v.Position.Round >= core.DKGDelayRound && v.IsFinalized() {
+ a.processFinalizedBlock(v)
+ } else {
+ a.processBlock(v)
+ }
+ case *types.AgreementResult:
+ a.processAgreementResult(v)
+ case uint64:
+ a.processNewCRS(v)
+ }
+ }
+ }
+}
+
+func (a *agreement) processBlock(b *types.Block) {
+ if _, exist := a.confirmedBlocks[b.Hash]; exist {
+ return
+ }
+ if rand, exist := a.agreementResults[b.Hash]; exist {
+ if len(b.Randomness) == 0 {
+ b.Randomness = rand
+ }
+ a.confirm(b)
+ } else {
+ if _, exist := a.blocks[b.Position]; !exist {
+ a.blocks[b.Position] = make(map[common.Hash]*types.Block)
+ }
+ a.blocks[b.Position][b.Hash] = b
+ }
+}
+
+func (a *agreement) processFinalizedBlock(block *types.Block) {
+ // Cache those results that CRS is not ready yet.
+ if _, exists := a.confirmedBlocks[block.Hash]; exists {
+ a.logger.Trace("finalized block already confirmed", "block", block)
+ return
+ }
+ if block.Position.Round > a.latestCRSRound {
+ pendingsForRound, exists := a.pendingBlocks[block.Position.Round]
+ if !exists {
+ pendingsForRound = make(map[common.Hash]*types.Block)
+ a.pendingBlocks[block.Position.Round] = pendingsForRound
+ }
+ pendingsForRound[block.Hash] = block
+ a.logger.Trace("finalized block cached", "block", block)
+ return
+ }
+ if err := utils.VerifyBlockSignature(block); err != nil {
+ return
+ }
+ verifier, ok, err := a.tsigVerifierCache.UpdateAndGet(
+ block.Position.Round)
+ if err != nil {
+ a.logger.Error("error verifying block randomness",
+ "block", block,
+ "error", err)
+ return
+ }
+ if !ok {
+ a.logger.Error("cannot verify block randomness", "block", block)
+ return
+ }
+ if !verifier.VerifySignature(block.Hash, crypto.Signature{
+ Type: "bls",
+ Signature: block.Randomness,
+ }) {
+ a.logger.Error("incorrect block randomness", "block", block)
+ return
+ }
+ a.confirm(block)
+}
+
+func (a *agreement) processAgreementResult(r *types.AgreementResult) {
+ // Cache those results that CRS is not ready yet.
+ if _, exists := a.confirmedBlocks[r.BlockHash]; exists {
+ a.logger.Trace("Agreement result already confirmed", "result", r)
+ return
+ }
+ if r.Position.Round > a.latestCRSRound {
+ pendingsForRound, exists := a.pendingAgrs[r.Position.Round]
+ if !exists {
+ pendingsForRound = make(map[common.Hash]*types.AgreementResult)
+ a.pendingAgrs[r.Position.Round] = pendingsForRound
+ }
+ pendingsForRound[r.BlockHash] = r
+ a.logger.Trace("Agreement result cached", "result", r)
+ return
+ }
+ if err := core.VerifyAgreementResult(r, a.cache); err != nil {
+ a.logger.Error("Agreement result verification failed",
+ "result", r,
+ "error", err)
+ return
+ }
+ if r.Position.Round >= core.DKGDelayRound {
+ verifier, ok, err := a.tsigVerifierCache.UpdateAndGet(r.Position.Round)
+ if err != nil {
+ a.logger.Error("error verifying agreement result randomness",
+ "result", r,
+ "error", err)
+ return
+ }
+ if !ok {
+ a.logger.Error("cannot verify agreement result randomness", "result", r)
+ return
+ }
+ if !verifier.VerifySignature(r.BlockHash, crypto.Signature{
+ Type: "bls",
+ Signature: r.Randomness,
+ }) {
+ a.logger.Error("incorrect agreement result randomness", "result", r)
+ return
+ }
+ } else {
+ // Special case for rounds before DKGDelayRound.
+ if bytes.Compare(r.Randomness, core.NoRand) != 0 {
+ a.logger.Error("incorrect agreement result randomness", "result", r)
+ return
+ }
+ }
+ if r.IsEmptyBlock {
+ b := &types.Block{
+ Position: r.Position,
+ Randomness: r.Randomness,
+ }
+ // Empty blocks should be confirmed directly, they won't be sent over
+ // the wire.
+ a.confirm(b)
+ return
+ }
+ if bs, exist := a.blocks[r.Position]; exist {
+ if b, exist := bs[r.BlockHash]; exist {
+ b.Randomness = r.Randomness
+ a.confirm(b)
+ return
+ }
+ }
+ a.agreementResults[r.BlockHash] = r.Randomness
+loop:
+ for {
+ select {
+ case a.pullChan <- r.BlockHash:
+ break loop
+ case <-a.ctx.Done():
+ a.logger.Error("Pull request is not sent",
+ "position", &r.Position,
+ "hash", r.BlockHash.String()[:6])
+ return
+ case <-time.After(500 * time.Millisecond):
+ a.logger.Debug("Pull request is unable to send",
+ "position", &r.Position,
+ "hash", r.BlockHash.String()[:6])
+ }
+ }
+}
+
+func (a *agreement) processNewCRS(round uint64) {
+ if round <= a.latestCRSRound {
+ return
+ }
+ prevRound := a.latestCRSRound + 1
+ a.latestCRSRound = round
+ // Verify all pending results.
+ for r := prevRound; r <= a.latestCRSRound; r++ {
+ pendingsForRound := a.pendingAgrs[r]
+ if pendingsForRound == nil {
+ continue
+ }
+ delete(a.pendingAgrs, r)
+ for _, res := range pendingsForRound {
+ if err := core.VerifyAgreementResult(res, a.cache); err != nil {
+ a.logger.Error("Invalid agreement result",
+ "result", res,
+ "error", err)
+ continue
+ }
+ a.logger.Error("Flush agreement result", "result", res)
+ a.processAgreementResult(res)
+ break
+ }
+ }
+}
+
+// confirm notifies consensus the confirmation of a block in BA.
+func (a *agreement) confirm(b *types.Block) {
+ if !b.IsFinalized() {
+ panic(fmt.Errorf("confirm a block %s without randomness", b))
+ }
+ if _, exist := a.confirmedBlocks[b.Hash]; !exist {
+ delete(a.blocks, b.Position)
+ delete(a.agreementResults, b.Hash)
+ loop:
+ for {
+ select {
+ case a.outputChan <- b:
+ break loop
+ case <-a.ctx.Done():
+ a.logger.Error("Confirmed block is not sent", "block", b)
+ return
+ case <-time.After(500 * time.Millisecond):
+ a.logger.Debug("Agreement output channel is full", "block", b)
+ }
+ }
+ a.confirmedBlocks[b.Hash] = struct{}{}
+ }
+ if b.Position.Height > a.chainTip+1 {
+ if _, exist := a.confirmedBlocks[b.ParentHash]; !exist {
+ a.pullChan <- b.ParentHash
+ }
+ }
+}
diff --git a/vendor/github.com/byzantine-lab/dexon-consensus/core/syncer/consensus.go b/vendor/github.com/byzantine-lab/dexon-consensus/core/syncer/consensus.go
new file mode 100644
index 000000000..d12dc4863
--- /dev/null
+++ b/vendor/github.com/byzantine-lab/dexon-consensus/core/syncer/consensus.go
@@ -0,0 +1,543 @@
+// Copyright 2018 The dexon-consensus Authors
+// This file is part of the dexon-consensus library.
+//
+// The dexon-consensus library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package syncer
+
+import (
+ "context"
+ "fmt"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/byzantine-lab/dexon-consensus/common"
+ "github.com/byzantine-lab/dexon-consensus/core"
+ "github.com/byzantine-lab/dexon-consensus/core/crypto"
+ "github.com/byzantine-lab/dexon-consensus/core/db"
+ "github.com/byzantine-lab/dexon-consensus/core/types"
+ "github.com/byzantine-lab/dexon-consensus/core/utils"
+)
+
+var (
+ // ErrAlreadySynced is reported when syncer is synced.
+ ErrAlreadySynced = fmt.Errorf("already synced")
+ // ErrNotSynced is reported when syncer is not synced yet.
+ ErrNotSynced = fmt.Errorf("not synced yet")
+ // ErrGenesisBlockReached is reported when genesis block reached.
+ ErrGenesisBlockReached = fmt.Errorf("genesis block reached")
+ // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered
+ // blocks.
+ ErrInvalidBlockOrder = fmt.Errorf("invalid block order")
+ // ErrInvalidSyncingHeight raised when the blocks to sync is not following
+ // the compaction chain tip in database.
+ ErrInvalidSyncingHeight = fmt.Errorf("invalid syncing height")
+)
+
+// Consensus is for syncing consensus module.
+type Consensus struct {
+ db db.Database
+ gov core.Governance
+ dMoment time.Time
+ logger common.Logger
+ app core.Application
+ prv crypto.PrivateKey
+ network core.Network
+ nodeSetCache *utils.NodeSetCache
+ tsigVerifier *core.TSigVerifierCache
+
+ blocks types.BlocksByPosition
+ agreementModule *agreement
+ agreementRoundCut uint64
+ heightEvt *common.Event
+ roundEvt *utils.RoundEvent
+
+ // lock for accessing all fields.
+ lock sync.RWMutex
+ duringBuffering bool
+ latestCRSRound uint64
+ waitGroup sync.WaitGroup
+ agreementWaitGroup sync.WaitGroup
+ pullChan chan common.Hash
+ receiveChan chan *types.Block
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ syncedLastBlock *types.Block
+ syncedConsensus *core.Consensus
+ syncedSkipNext bool
+ dummyCancel context.CancelFunc
+ dummyFinished <-chan struct{}
+ dummyMsgBuffer []types.Msg
+ initChainTipHeight uint64
+}
+
+// NewConsensus creates an instance for Consensus (syncer consensus).
+func NewConsensus(
+ initHeight uint64,
+ dMoment time.Time,
+ app core.Application,
+ gov core.Governance,
+ db db.Database,
+ network core.Network,
+ prv crypto.PrivateKey,
+ logger common.Logger) *Consensus {
+
+ con := &Consensus{
+ dMoment: dMoment,
+ app: app,
+ gov: gov,
+ db: db,
+ network: network,
+ nodeSetCache: utils.NewNodeSetCache(gov),
+ tsigVerifier: core.NewTSigVerifierCache(gov, 7),
+ prv: prv,
+ logger: logger,
+ receiveChan: make(chan *types.Block, 1000),
+ pullChan: make(chan common.Hash, 1000),
+ heightEvt: common.NewEvent(),
+ }
+ con.ctx, con.ctxCancel = context.WithCancel(context.Background())
+ _, con.initChainTipHeight = db.GetCompactionChainTipInfo()
+ con.agreementModule = newAgreement(
+ con.initChainTipHeight,
+ con.receiveChan,
+ con.pullChan,
+ con.nodeSetCache,
+ con.tsigVerifier,
+ con.logger)
+ con.agreementWaitGroup.Add(1)
+ go func() {
+ defer con.agreementWaitGroup.Done()
+ con.agreementModule.run()
+ }()
+ if err := con.deliverPendingBlocks(initHeight); err != nil {
+ panic(err)
+ }
+ return con
+}
+
+func (con *Consensus) deliverPendingBlocks(height uint64) error {
+ if height >= con.initChainTipHeight {
+ return nil
+ }
+ blocks := make([]*types.Block, 0, con.initChainTipHeight-height)
+ hash, _ := con.db.GetCompactionChainTipInfo()
+ for {
+ block, err := con.db.GetBlock(hash)
+ if err != nil {
+ return err
+ }
+ if block.Position.Height == height {
+ break
+ }
+ blocks = append(blocks, &block)
+ hash = block.ParentHash
+ }
+ sort.Sort(types.BlocksByPosition(blocks))
+ for _, b := range blocks {
+ con.logger.Debug("Syncer BlockConfirmed", "block", b)
+ con.app.BlockConfirmed(*b)
+ con.logger.Debug("Syncer BlockDelivered", "block", b)
+ con.app.BlockDelivered(b.Hash, b.Position, b.Randomness)
+ }
+ return nil
+}
+
+func (con *Consensus) assureBuffering() {
+ if func() bool {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ return con.duringBuffering
+ }() {
+ return
+ }
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ if con.duringBuffering {
+ return
+ }
+ con.duringBuffering = true
+ // Get latest block to prepare utils.RoundEvent.
+ var (
+ err error
+ blockHash, height = con.db.GetCompactionChainTipInfo()
+ )
+ if height == 0 {
+ con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov, con.logger,
+ types.Position{}, core.ConfigRoundShift)
+ } else {
+ var b types.Block
+ if b, err = con.db.GetBlock(blockHash); err == nil {
+ con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov,
+ con.logger, b.Position, core.ConfigRoundShift)
+ }
+ }
+ if err != nil {
+ panic(err)
+ }
+ // Make sure con.roundEvt stopped before stopping con.agreementModule.
+ con.waitGroup.Add(1)
+ // Register a round event handler to reset node set cache, this handler
+ // should be the highest priority.
+ con.roundEvt.Register(func(evts []utils.RoundEventParam) {
+ for _, e := range evts {
+ if e.Reset == 0 {
+ continue
+ }
+ con.nodeSetCache.Purge(e.Round + 1)
+ con.tsigVerifier.Purge(e.Round + 1)
+ }
+ })
+ // Register a round event handler to notify CRS to agreementModule.
+ con.roundEvt.Register(func(evts []utils.RoundEventParam) {
+ con.waitGroup.Add(1)
+ go func() {
+ defer con.waitGroup.Done()
+ for _, e := range evts {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ for func() bool {
+ select {
+ case <-con.ctx.Done():
+ return false
+ case con.agreementModule.inputChan <- e.Round:
+ return false
+ case <-time.After(500 * time.Millisecond):
+ con.logger.Warn(
+ "Agreement input channel is full when notifying new round",
+ "round", e.Round,
+ )
+ return true
+ }
+ }() {
+ }
+ }
+ }()
+ })
+ // Register a round event handler to validate next round.
+ con.roundEvt.Register(func(evts []utils.RoundEventParam) {
+ con.heightEvt.RegisterHeight(
+ evts[len(evts)-1].NextRoundValidationHeight(),
+ utils.RoundEventRetryHandlerGenerator(con.roundEvt, con.heightEvt),
+ )
+ })
+ con.roundEvt.TriggerInitEvent()
+ con.startAgreement()
+ con.startNetwork()
+}
+
+func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ defer func() {
+ con.logger.Debug("Syncer synced status",
+ "last-block", blocks[len(blocks)-1],
+ "synced", synced,
+ )
+ }()
+ if len(con.blocks) == 0 || len(blocks) == 0 {
+ return
+ }
+ synced = !blocks[len(blocks)-1].Position.Older(con.blocks[0].Position)
+ return
+}
+
+func (con *Consensus) buildAllEmptyBlocks() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ // Clean empty blocks on tips of chains.
+ for len(con.blocks) > 0 && con.isEmptyBlock(con.blocks[0]) {
+ con.blocks = con.blocks[1:]
+ }
+ // Build empty blocks.
+ for i, b := range con.blocks {
+ if con.isEmptyBlock(b) {
+ if con.blocks[i-1].Position.Height+1 == b.Position.Height {
+ con.buildEmptyBlock(b, con.blocks[i-1])
+ }
+ }
+ }
+}
+
+// ForceSync forces syncer to become synced.
+func (con *Consensus) ForceSync(lastPos types.Position, skip bool) {
+ if con.syncedLastBlock != nil {
+ return
+ }
+ hash, height := con.db.GetCompactionChainTipInfo()
+ if height < lastPos.Height {
+ panic(fmt.Errorf("compaction chain not synced height %d, tip %d",
+ lastPos.Height, height))
+ } else if height > lastPos.Height {
+ skip = false
+ }
+ block, err := con.db.GetBlock(hash)
+ if err != nil {
+ panic(err)
+ }
+ con.syncedLastBlock = &block
+ con.stopBuffering()
+ // We might call stopBuffering without calling assureBuffering.
+ if con.dummyCancel == nil {
+ con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
+ context.Background(), con.network.ReceiveChan(),
+ func(msg types.Msg) {
+ con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
+ })
+ }
+ con.syncedSkipNext = skip
+ con.logger.Info("Force Sync", "block", &block, "skip", skip)
+}
+
+// SyncBlocks syncs blocks from compaction chain, latest is true if the caller
+// regards the blocks are the latest ones. Notice that latest can be true for
+// many times.
+// NOTICE: parameter "blocks" should be consecutive in compaction height.
+// NOTICE: this method is not expected to be called concurrently.
+func (con *Consensus) SyncBlocks(
+ blocks []*types.Block, latest bool) (synced bool, err error) {
+ defer func() {
+ con.logger.Debug("SyncBlocks returned",
+ "synced", synced,
+ "error", err,
+ "last-block", con.syncedLastBlock,
+ )
+ }()
+ if con.syncedLastBlock != nil {
+ synced, err = true, ErrAlreadySynced
+ return
+ }
+ if len(blocks) == 0 {
+ return
+ }
+ // Check if blocks are consecutive.
+ for i := 1; i < len(blocks); i++ {
+ if blocks[i].Position.Height != blocks[i-1].Position.Height+1 {
+ err = ErrInvalidBlockOrder
+ return
+ }
+ }
+ // Make sure the first block is the next block of current compaction chain
+ // tip in DB.
+ _, tipHeight := con.db.GetCompactionChainTipInfo()
+ if blocks[0].Position.Height != tipHeight+1 {
+ con.logger.Error("Mismatched block height",
+ "now", blocks[0].Position.Height,
+ "expected", tipHeight+1,
+ )
+ err = ErrInvalidSyncingHeight
+ return
+ }
+ con.logger.Trace("SyncBlocks",
+ "position", &blocks[0].Position,
+ "len", len(blocks),
+ "latest", latest,
+ )
+ for _, b := range blocks {
+ if err = con.db.PutBlock(*b); err != nil {
+ // A block might be put into db when confirmed by BA, but not
+ // finalized yet.
+ if err == db.ErrBlockExists {
+ err = con.db.UpdateBlock(*b)
+ }
+ if err != nil {
+ return
+ }
+ }
+ if err = con.db.PutCompactionChainTipInfo(
+ b.Hash, b.Position.Height); err != nil {
+ return
+ }
+ con.heightEvt.NotifyHeight(b.Position.Height)
+ }
+ if latest {
+ con.assureBuffering()
+ con.buildAllEmptyBlocks()
+ // Check if compaction and agreements' blocks are overlapped. The
+ // overlapping of compaction chain and BA's oldest blocks means the
+ // syncing is done.
+ if con.checkIfSynced(blocks) {
+ con.stopBuffering()
+ con.syncedLastBlock = blocks[len(blocks)-1]
+ synced = true
+ }
+ }
+ return
+}
+
+// GetSyncedConsensus returns the core.Consensus instance after synced.
+func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ if con.syncedConsensus != nil {
+ return con.syncedConsensus, nil
+ }
+ if con.syncedLastBlock == nil {
+ return nil, ErrNotSynced
+ }
+ // flush all blocks in con.blocks into core.Consensus, and build
+ // core.Consensus from syncer.
+ con.dummyCancel()
+ <-con.dummyFinished
+ var err error
+ con.syncedConsensus, err = core.NewConsensusFromSyncer(
+ con.syncedLastBlock,
+ con.syncedSkipNext,
+ con.dMoment,
+ con.app,
+ con.gov,
+ con.db,
+ con.network,
+ con.prv,
+ con.blocks,
+ con.dummyMsgBuffer,
+ con.logger)
+ return con.syncedConsensus, err
+}
+
+// stopBuffering stops the syncer buffering routines.
+//
+// This method is mainly for caller to stop the syncer before synced, the syncer
+// would call this method automatically after being synced.
+func (con *Consensus) stopBuffering() {
+ if func() (notBuffering bool) {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ notBuffering = !con.duringBuffering
+ return
+ }() {
+ return
+ }
+ if func() (alreadyCanceled bool) {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ if !con.duringBuffering {
+ alreadyCanceled = true
+ return
+ }
+ con.duringBuffering = false
+ con.logger.Trace("Syncer is about to stop")
+ // Stop network and CRS routines, wait until they are all stoped.
+ con.ctxCancel()
+ return
+ }() {
+ return
+ }
+ con.logger.Trace("Stop syncer modules")
+ con.roundEvt.Stop()
+ con.waitGroup.Done()
+ // Wait for all routines depends on con.agreementModule stopped.
+ con.waitGroup.Wait()
+ // Since there is no one waiting for the receive channel of fullnode, we
+ // need to launch a dummy receiver right away.
+ con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
+ context.Background(), con.network.ReceiveChan(),
+ func(msg types.Msg) {
+ con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
+ })
+ // Stop agreements.
+ con.logger.Trace("Stop syncer agreement modules")
+ con.stopAgreement()
+ con.logger.Trace("Syncer stopped")
+ return
+}
+
+// isEmptyBlock checks if a block is an empty block by both its hash and parent
+// hash are empty.
+func (con *Consensus) isEmptyBlock(b *types.Block) bool {
+ return b.Hash == common.Hash{} && b.ParentHash == common.Hash{}
+}
+
+// buildEmptyBlock builds an empty block in agreement.
+func (con *Consensus) buildEmptyBlock(b *types.Block, parent *types.Block) {
+ cfg := utils.GetConfigWithPanic(con.gov, b.Position.Round, con.logger)
+ b.Timestamp = parent.Timestamp.Add(cfg.MinBlockInterval)
+ b.Witness.Height = parent.Witness.Height
+ b.Witness.Data = make([]byte, len(parent.Witness.Data))
+ copy(b.Witness.Data, parent.Witness.Data)
+}
+
+// startAgreement starts agreements for receiving votes and agreements.
+func (con *Consensus) startAgreement() {
+ // Start a routine for listening receive channel and pull block channel.
+ go func() {
+ for {
+ select {
+ case b, ok := <-con.receiveChan:
+ if !ok {
+ return
+ }
+ func() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ if len(con.blocks) > 0 &&
+ !b.Position.Newer(con.blocks[0].Position) {
+ return
+ }
+ con.blocks = append(con.blocks, b)
+ sort.Sort(con.blocks)
+ }()
+ case h, ok := <-con.pullChan:
+ if !ok {
+ return
+ }
+ con.network.PullBlocks(common.Hashes{h})
+ }
+ }
+ }()
+}
+
+// startNetwork starts network for receiving blocks and agreement results.
+func (con *Consensus) startNetwork() {
+ con.waitGroup.Add(1)
+ go func() {
+ defer con.waitGroup.Done()
+ loop:
+ for {
+ select {
+ case val := <-con.network.ReceiveChan():
+ switch v := val.Payload.(type) {
+ case *types.Block:
+ case *types.AgreementResult:
+ // Avoid byzantine nodes attack by broadcasting older
+ // agreement results. Normal nodes might report 'synced'
+ // while still fall behind other nodes.
+ if v.Position.Height <= con.initChainTipHeight {
+ continue loop
+ }
+ default:
+ continue loop
+ }
+ con.agreementModule.inputChan <- val.Payload
+ case <-con.ctx.Done():
+ break loop
+ }
+ }
+ }()
+}
+
+func (con *Consensus) stopAgreement() {
+ if con.agreementModule.inputChan != nil {
+ close(con.agreementModule.inputChan)
+ }
+ con.agreementWaitGroup.Wait()
+ con.agreementModule.inputChan = nil
+ close(con.receiveChan)
+ close(con.pullChan)
+}
diff --git a/vendor/github.com/byzantine-lab/dexon-consensus/core/syncer/watch-cat.go b/vendor/github.com/byzantine-lab/dexon-consensus/core/syncer/watch-cat.go
new file mode 100644
index 000000000..e5ba911a7
--- /dev/null
+++ b/vendor/github.com/byzantine-lab/dexon-consensus/core/syncer/watch-cat.go
@@ -0,0 +1,156 @@
+// Copyright 2019 The dexon-consensus Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package syncer
+
+import (
+ "context"
+ "time"
+
+ "github.com/byzantine-lab/dexon-consensus/common"
+ "github.com/byzantine-lab/dexon-consensus/core"
+ "github.com/byzantine-lab/dexon-consensus/core/types"
+ "github.com/byzantine-lab/dexon-consensus/core/utils"
+)
+
+type configReader interface {
+ Configuration(round uint64) *types.Config
+}
+
+// WatchCat is reponsible for signaling if syncer object should be terminated.
+type WatchCat struct {
+ recovery core.Recovery
+ timeout time.Duration
+ configReader configReader
+ feed chan types.Position
+ lastPosition types.Position
+ polling time.Duration
+ ctx context.Context
+ cancel context.CancelFunc
+ logger common.Logger
+}
+
+// NewWatchCat creats a new WatchCat 🐱 object.
+func NewWatchCat(
+ recovery core.Recovery,
+ configReader configReader,
+ polling time.Duration,
+ timeout time.Duration,
+ logger common.Logger) *WatchCat {
+ wc := &WatchCat{
+ recovery: recovery,
+ timeout: timeout,
+ configReader: configReader,
+ feed: make(chan types.Position),
+ polling: polling,
+ logger: logger,
+ }
+ return wc
+}
+
+// Feed the WatchCat so it won't produce the termination signal.
+func (wc *WatchCat) Feed(position types.Position) {
+ wc.feed <- position
+}
+
+// Start the WatchCat.
+func (wc *WatchCat) Start() {
+ wc.Stop()
+ wc.lastPosition = types.Position{}
+ wc.ctx, wc.cancel = context.WithCancel(context.Background())
+ go func() {
+ var lastPos types.Position
+ MonitorLoop:
+ for {
+ select {
+ case <-wc.ctx.Done():
+ return
+ default:
+ }
+ select {
+ case <-wc.ctx.Done():
+ return
+ case pos := <-wc.feed:
+ if !pos.Newer(lastPos) {
+ wc.logger.Warn("Feed with older height",
+ "pos", pos, "lastPos", lastPos)
+ continue
+ }
+ lastPos = pos
+ case <-time.After(wc.timeout):
+ break MonitorLoop
+ }
+ }
+ go func() {
+ for {
+ select {
+ case <-wc.ctx.Done():
+ return
+ case <-wc.feed:
+ }
+ }
+ }()
+ defer wc.cancel()
+ proposed := false
+ threshold := uint64(
+ utils.GetConfigWithPanic(wc.configReader, lastPos.Round, wc.logger).
+ NotarySetSize / 2)
+ wc.logger.Info("Threshold for recovery", "votes", threshold)
+ ResetLoop:
+ for {
+ if !proposed {
+ wc.logger.Info("Calling Recovery.ProposeSkipBlock",
+ "height", lastPos.Height)
+ if err := wc.recovery.ProposeSkipBlock(lastPos.Height); err != nil {
+ wc.logger.Warn("Failed to proposeSkipBlock", "height", lastPos.Height, "error", err)
+ } else {
+ proposed = true
+ }
+ }
+ votes, err := wc.recovery.Votes(lastPos.Height)
+ if err != nil {
+ wc.logger.Error("Failed to get recovery votes", "height", lastPos.Height, "error", err)
+ } else if votes > threshold {
+ wc.logger.Info("Threshold for recovery reached!")
+ wc.lastPosition = lastPos
+ break ResetLoop
+ }
+ select {
+ case <-wc.ctx.Done():
+ return
+ case <-time.After(wc.polling):
+ }
+ }
+ }()
+}
+
+// Stop the WatchCat.
+func (wc *WatchCat) Stop() {
+ if wc.cancel != nil {
+ wc.cancel()
+ }
+}
+
+// Meow return a closed channel if syncer should be terminated.
+func (wc *WatchCat) Meow() <-chan struct{} {
+ return wc.ctx.Done()
+}
+
+// LastPosition returns the last position for recovery.
+func (wc *WatchCat) LastPosition() types.Position {
+ return wc.lastPosition
+}