diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2018-12-27 09:17:28 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-04-09 21:32:55 +0800 |
commit | f79d09a12c8de2e1572292ef6bbd82352526930d (patch) | |
tree | 0ec9ce7fba237187b6d5b88b9401ad36798f7fe1 /vendor/github.com/dexon-foundation | |
parent | 509c6899caad7a66f7e64a1ef9718daa9018f7f1 (diff) | |
download | dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar.gz dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar.bz2 dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar.lz dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar.xz dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar.zst dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.zip |
dex: add pull randomness (#105)
* vendor: sync to latest core
* dex: Add PullRandomness
Diffstat (limited to 'vendor/github.com/dexon-foundation')
9 files changed, 135 insertions, 130 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go index a9fa21df2..2b5c4bc51 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go @@ -313,7 +313,7 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { if config = mgr.getConfig(nextRound); config != nil { break } else { - mgr.logger.Info("round is not ready", "round", nextRound) + mgr.logger.Debug("round is not ready", "round", nextRound) time.Sleep(1 * time.Second) } } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go index 14e3b265d..8e044293f 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go @@ -257,3 +257,15 @@ func (cc *compactionChain) lastPendingBlock() *types.Block { } return nil } + +func (cc *compactionChain) pendingBlocksWithoutRandomness() ( + hashes common.Hashes) { + cc.lock.RLock() + defer cc.lock.RUnlock() + for _, block := range cc.pendingBlocks { + if _, exist := cc.blockRandomness[block.Hash]; !exist { + hashes = append(hashes, block.Hash) + } + } + return +} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go index ad24e446d..5c389a70f 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go @@ -135,7 +135,7 @@ func (cc *configurationChain) runDKG(round uint64) error { } cc.logger.Debug("Calling Governance.IsDKGMPKReady", "round", round) for !cc.gov.IsDKGMPKReady(round) { - cc.logger.Info("DKG MPKs are not ready yet. Try again later...", + cc.logger.Debug("DKG MPKs are not ready yet. Try again later...", "nodeID", cc.ID) cc.dkgLock.Unlock() time.Sleep(500 * time.Millisecond) @@ -206,7 +206,7 @@ func (cc *configurationChain) runDKG(round uint64) error { // unexpected network fluctuation and ensure the robustness of DKG protocol. cc.logger.Debug("Calling Governance.IsDKGFinal", "round", round) for !cc.gov.IsDKGFinal(round) { - cc.logger.Info("DKG is not ready yet. Try again later...", + cc.logger.Debug("DKG is not ready yet. Try again later...", "nodeID", cc.ID) time.Sleep(500 * time.Millisecond) } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go index 0d4a38a91..3353d1d60 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go @@ -387,6 +387,7 @@ type Consensus struct { event *common.Event logger common.Logger nonFinalizedBlockDelivered bool + resetRandomnessTicker chan struct{} } // NewConsensus construct an Consensus instance. @@ -398,8 +399,8 @@ func NewConsensus( network Network, prv crypto.PrivateKey, logger common.Logger) *Consensus { - - return newConsensus(dMoment, app, gov, db, network, prv, logger, true) + return newConsensusForRound( + &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, true) } // NewConsensusForSimulation creates an instance of Consensus for simulation, @@ -412,19 +413,60 @@ func NewConsensusForSimulation( network Network, prv crypto.PrivateKey, logger common.Logger) *Consensus { + return newConsensusForRound( + &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, false) +} - return newConsensus(dMoment, app, gov, db, network, prv, logger, false) +// NewConsensusFromSyncer constructs an Consensus instance from information +// provided from syncer. +// +// You need to provide the initial block for this newly created Consensus +// instance to bootstrap with. A proper choice is the last finalized block you +// delivered to syncer. +func NewConsensusFromSyncer( + initBlock *types.Block, + initRoundBeginTime time.Time, + app Application, + gov Governance, + db db.Database, + networkModule Network, + prv crypto.PrivateKey, + latticeModule *Lattice, + blocks []*types.Block, + randomnessResults []*types.BlockRandomnessResult, + logger common.Logger) (*Consensus, error) { + // Setup Consensus instance. + con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db, + networkModule, prv, logger, latticeModule, true) + // Dump all BA-confirmed blocks to the consensus instance. + for _, b := range blocks { + con.ccModule.registerBlock(b) + if err := con.processBlock(b); err != nil { + return nil, err + } + } + // Dump all randomness result to the consensus instance. + for _, r := range randomnessResults { + if err := con.ProcessBlockRandomnessResult(r, false); err != nil { + con.logger.Error("failed to process randomness result when syncing", + "result", r) + continue + } + } + return con, nil } // newConsensus creates a Consensus instance. -func newConsensus( - dMoment time.Time, +func newConsensusForRound( + initBlock *types.Block, + initRoundBeginTime time.Time, app Application, gov Governance, db db.Database, network Network, prv crypto.PrivateKey, logger common.Logger, + latticeModule *Lattice, usingNonBlocking bool) *Consensus { // TODO(w): load latest blockHeight from DB, and use config at that height. @@ -436,12 +478,14 @@ func newConsensus( if a, ok := app.(Debug); ok { debugApp = a } - // Get configuration for genesis round. - var round uint64 - config := utils.GetConfigWithPanic(gov, round, logger) + // Get configuration for bootstrap round. + initRound := initBlock.Position.Round + initConfig := utils.GetConfigWithPanic(gov, initRound, logger) // Init lattice. - lattice := NewLattice( - dMoment, round, config, signer, app, debugApp, db, logger) + if latticeModule == nil { + latticeModule = NewLattice(initRoundBeginTime, initRound, initConfig, + signer, app, debugApp, db, logger) + } // Init configuration chain. ID := types.NewNodeID(prv.PublicKey()) recv := &consensusDKGReceiver{ @@ -452,13 +496,7 @@ func newConsensus( network: network, logger: logger, } - cfgModule := newConfigurationChain( - ID, - recv, - gov, - nodeSetCache, - db, - logger) + cfgModule := newConfigurationChain(ID, recv, gov, nodeSetCache, db, logger) recv.cfgModule = cfgModule appModule := app if usingNonBlocking { @@ -468,7 +506,7 @@ func newConsensus( con := &Consensus{ ID: ID, ccModule: newCompactionChain(gov), - lattice: lattice, + lattice: latticeModule, app: appModule, debugApp: debugApp, gov: gov, @@ -477,77 +515,6 @@ func newConsensus( baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), dkgReady: sync.NewCond(&sync.Mutex{}), cfgModule: cfgModule, - dMoment: dMoment, - nodeSetCache: nodeSetCache, - signer: signer, - event: common.NewEvent(), - logger: logger, - } - con.ctx, con.ctxCancel = context.WithCancel(context.Background()) - con.baMgr = newAgreementMgr(con, round, dMoment) - if err := con.prepare(&types.Block{}); err != nil { - panic(err) - } - return con -} - -// NewConsensusFromSyncer constructs an Consensus instance from information -// provided from syncer. -// -// You need to provide the initial block for this newly created Consensus -// instance to bootstrap with. A proper choice is the last finalized block you -// delivered to syncer. -func NewConsensusFromSyncer( - initBlock *types.Block, - initRoundBeginTime time.Time, - app Application, - gov Governance, - db db.Database, - networkModule Network, - prv crypto.PrivateKey, - latticeModule *Lattice, - blocks []*types.Block, - randomnessResults []*types.BlockRandomnessResult, - logger common.Logger) (*Consensus, error) { - // Setup the cache for node sets. - nodeSetCache := utils.NewNodeSetCache(gov) - // Setup signer module. - signer := utils.NewSigner(prv) - // Init configuration chain. - ID := types.NewNodeID(prv.PublicKey()) - recv := &consensusDKGReceiver{ - ID: ID, - gov: gov, - signer: signer, - nodeSetCache: nodeSetCache, - network: networkModule, - logger: logger, - } - cfgModule := newConfigurationChain( - ID, - recv, - gov, - nodeSetCache, - db, - logger) - recv.cfgModule = cfgModule - // Check if the application implement Debug interface. - var debugApp Debug - if a, ok := app.(Debug); ok { - debugApp = a - } - // Setup Consensus instance. - con := &Consensus{ - ID: ID, - ccModule: newCompactionChain(gov), - lattice: latticeModule, - app: newNonBlocking(app, debugApp), - gov: gov, - db: db, - network: networkModule, - baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), - dkgReady: sync.NewCond(&sync.Mutex{}), - cfgModule: cfgModule, dMoment: initRoundBeginTime, nodeSetCache: nodeSetCache, signer: signer, @@ -555,27 +522,11 @@ func NewConsensusFromSyncer( logger: logger, } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) - con.baMgr = newAgreementMgr(con, initBlock.Position.Round, initRoundBeginTime) - // Bootstrap the consensus instance. + con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime) if err := con.prepare(initBlock); err != nil { - return nil, err - } - // Dump all BA-confirmed blocks to the consensus instance. - for _, b := range blocks { - con.ccModule.registerBlock(b) - if err := con.processBlock(b); err != nil { - return nil, err - } - } - // Dump all randomness result to the consensus instance. - for _, r := range randomnessResults { - if err := con.ProcessBlockRandomnessResult(r, false); err != nil { - con.logger.Error("failed to process randomness result when syncing", - "result", r) - continue - } + panic(err) } - return con, nil + return con } // prepare the Consensus instance to be ready for blocks after 'initBlock'. @@ -634,6 +585,9 @@ func (con *Consensus) Run() { go con.processMsg(con.network.ReceiveChan()) // Sleep until dMoment come. time.Sleep(con.dMoment.Sub(time.Now().UTC())) + // Take some time to bootstrap. + time.Sleep(3 * time.Second) + go con.pullRandomness() // Block until done. select { case <-con.ctx.Done(): @@ -673,7 +627,7 @@ func (con *Consensus) runCRS(round uint64) { con.logger.Debug("Calling Governance.CRS to check if already proposed", "round", round+1) if (con.gov.CRS(round+1) != common.Hash{}) { - con.logger.Info("CRS already proposed", "round", round+1) + con.logger.Debug("CRS already proposed", "round", round+1) return } con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS", @@ -744,7 +698,7 @@ func (con *Consensus) initialRound( if (nextCRS != common.Hash{}) { return true } - con.logger.Info("CRS is not ready yet. Try again later...", + con.logger.Debug("CRS is not ready yet. Try again later...", "nodeID", con.ID, "round", round) return false @@ -757,7 +711,7 @@ func (con *Consensus) initialRound( go func(nextRound uint64) { if !checkWithCancel( con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Info("unable to prepare CRS for baMgr", + con.logger.Debug("unable to prepare CRS for baMgr", "round", nextRound) return } @@ -781,7 +735,7 @@ func (con *Consensus) initialRound( // unexpected network fluctuation and ensure the robustness. if !checkWithCancel( con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Info("unable to prepare CRS for DKG set", + con.logger.Debug("unable to prepare CRS for DKG set", "round", nextRound) return } @@ -1034,9 +988,7 @@ func (con *Consensus) ProcessBlockRandomnessResult( } if needBroadcast { con.logger.Debug("Calling Network.BroadcastRandomnessResult", - "hash", rand.BlockHash.String()[:6], - "position", &rand.Position, - "randomness", hex.EncodeToString(rand.Randomness)) + "randomness", rand) con.network.BroadcastRandomnessResult(rand) } return con.deliverFinalizedBlocks() @@ -1051,8 +1003,27 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) { return } +func (con *Consensus) pullRandomness() { + for { + select { + case <-con.ctx.Done(): + return + case <-con.resetRandomnessTicker: + case <-time.After(1500 * time.Millisecond): + // TODO(jimmy): pulling period should be related to lambdaBA. + hashes := con.ccModule.pendingBlocksWithoutRandomness() + con.logger.Debug("Calling Network.PullRandomness", "blocks", hashes) + con.network.PullRandomness(hashes) + } + } +} + // deliverBlock deliver a block to application layer. func (con *Consensus) deliverBlock(b *types.Block) { + select { + case con.resetRandomnessTicker <- struct{}{}: + default: + } if err := con.db.UpdateBlock(*b); err != nil { panic(err) } @@ -1134,7 +1105,7 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { if con.nonFinalizedBlockDelivered { panic(fmt.Errorf("attempting to skip finalized block: %s", b)) } - con.logger.Info("skip delivery of finalized block", + con.logger.Debug("skip delivery of finalized block", "block", b, "finalization-height", b.Finalization.Height) continue diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go index fc3bf09bc..20770328c 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go @@ -67,6 +67,9 @@ type Network interface { // PullVotes tries to pull votes from the DEXON network. PullVotes(position types.Position) + // PullRandomness tries to pull randomness from the DEXON network. + PullRandomness(hashes common.Hashes) + // BroadcastVote broadcasts vote to all nodes in DEXON network. BroadcastVote(vote *types.Vote) diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go index db19cf910..591c63dfd 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go @@ -193,17 +193,28 @@ func (l *Lattice) addBlockToLattice( if err == nil { var output []*types.Block if output, err = l.data.addBlock(tip); err != nil { - l.logger.Error("Sanity Check failed", "error", err) - continue + // We should be able to add this block once sanity check + // passed. + l.logger.Error("Failed to add sanity-checked block", + "block", tip, "error", err) + panic(err) } hasOutput = true outputBlocks = append(outputBlocks, output...) + l.pool.removeTip(i) + continue } if _, ok := err.(*ErrAckingBlockNotExists); ok { + l.logger.Debug("Pending block for lattice", + "pending", tip, + "last", l.data.chains[tip.Position.ChainID]) err = nil continue + } else { + l.logger.Error("Unexpected sanity check error", + "block", tip, "error", err) + panic(err) } - l.pool.removeTip(i) } if !hasOutput { break diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go index fee462442..32ea6547a 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go @@ -106,7 +106,7 @@ func (a *agreement) processBlock(b *types.Block) { func (a *agreement) processAgreementResult(r *types.AgreementResult) { // Cache those results that CRS is not ready yet. if _, exists := a.confirmedBlocks[r.BlockHash]; exists { - a.logger.Info("agreement result already confirmed", "result", r) + a.logger.Debug("agreement result already confirmed", "result", r) return } if r.Position.Round > a.latestCRSRound { @@ -116,7 +116,7 @@ func (a *agreement) processAgreementResult(r *types.AgreementResult) { a.pendings[r.Position.Round] = pendingsForRound } pendingsForRound[r.BlockHash] = r - a.logger.Info("agreement result cached", "result", r) + a.logger.Debug("agreement result cached", "result", r) return } if err := core.VerifyAgreementResult(r, a.cache); err != nil { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go index 32bbab3b2..c767a6d53 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go @@ -153,7 +153,7 @@ func (con *Consensus) checkIfValidated() bool { if validatedChainCount == numChains { return true } - con.logger.Info("not validated yet", "validated-chain", validatedChainCount) + con.logger.Debug("not validated yet", "validated-chain", validatedChainCount) return false } @@ -197,7 +197,7 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool { if overlapCount == numChains { return true } - con.logger.Info("not synced yet", + con.logger.Debug("not synced yet", "overlap-count", overlapCount, "num-chain", numChains, "last-block", blocks[len(blocks)-1]) @@ -262,7 +262,7 @@ func (con *Consensus) ensureAgreementOverlapRound() bool { } if tipRoundMap[r] == con.configs[r].NumChains { con.agreementRoundCut = r - con.logger.Info("agreement round cut found, round", r) + con.logger.Debug("agreement round cut found, round", r) return true } } @@ -411,7 +411,7 @@ func (con *Consensus) SyncBlocks( "expected", tipHeight+1) return false, ErrInvalidSyncingFinalizationHeight } - con.logger.Info("syncBlocks", + con.logger.Debug("syncBlocks", "position", &blocks[0].Position, "final height", blocks[0].Finalization.Height, "len", len(blocks), @@ -446,7 +446,7 @@ func (con *Consensus) SyncBlocks( return false, err } if syncBlock != nil { - con.logger.Info("deliver set found", "block", syncBlock) + con.logger.Debug("deliver set found", "block", syncBlock) // New lattice with the round of syncBlock. con.initConsensusObj(syncBlock) con.setupConfigs(blocks) @@ -700,7 +700,7 @@ func (con *Consensus) startCRSMonitor() { if round == lastNotifiedRound { return } - con.logger.Info("CRS is ready", "round", round) + con.logger.Debug("CRS is ready", "round", round) con.lock.RLock() defer con.lock.RUnlock() lastNotifiedRound = round diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go index 1c64d4ad9..65cb635ca 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go @@ -18,6 +18,7 @@ package types import ( + "encoding/hex" "fmt" "github.com/dexon-foundation/dexon-consensus/common" @@ -42,3 +43,10 @@ type BlockRandomnessResult struct { Position Position `json:"position"` Randomness []byte `json:"randomness"` } + +func (r *BlockRandomnessResult) String() string { + return fmt.Sprintf("blockRandomness{Block:%s Pos:%s Rand:%s}", + r.BlockHash.String()[:6], &r.Position, + hex.EncodeToString(r.Randomness)[:6], + ) +} |