diff options
Diffstat (limited to 'vendor/github.com')
13 files changed, 298 insertions, 196 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go index 57fb5c549..4cb47b105 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go @@ -20,6 +20,7 @@ package core import ( "context" "errors" + "math" "sync" "time" @@ -174,7 +175,7 @@ func (mgr *agreementMgr) appendConfig( recv := &consensusBAReceiver{ consensus: mgr.con, chainID: i, - restartNotary: make(chan bool, 1), + restartNotary: make(chan types.Position, 1), } agrModule := newAgreement( mgr.con.ID, @@ -252,7 +253,9 @@ func (mgr *agreementMgr) processAgreementResult( int(mgr.gov.Configuration(result.Position.Round).NotarySetSize), types.NewNotarySetTarget(crs, result.Position.ChainID)) for key := range result.Votes { - agreement.processVote(&result.Votes[key]) + if err := agreement.processVote(&result.Votes[key]); err != nil { + return err + } } agreement.restart(nIDs, result.Position, crs) } @@ -298,7 +301,7 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { // Check if this routine needs to awake in this round and prepare essential // variables when yes. - checkRound := func() (awake bool) { + checkRound := func() (isNotary, isDisabled bool) { defer func() { currentRound = nextRound nextRound++ @@ -318,7 +321,8 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { roundEndTime = config.beginTime.Add(config.roundInterval) // Check if this chain handled by this routine included in this round. if chainID >= config.numChains { - return false + isDisabled = true + return } // Check if this node in notary set of this chain in this round. nodeSet, err := mgr.cache.GetNodeSet(nextRound) @@ -329,7 +333,18 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { setting.notarySet = nodeSet.GetSubSet( int(config.notarySetSize), types.NewNotarySetTarget(config.crs, chainID)) - _, awake = setting.notarySet[mgr.ID] + _, isNotary = setting.notarySet[mgr.ID] + if isNotary { + mgr.logger.Info("selected as notary set", + "ID", mgr.ID, + "round", nextRound, + "chainID", chainID) + } else { + mgr.logger.Info("not selected as notary set", + "ID", mgr.ID, + "round", nextRound, + "chainID", chainID) + } // Setup ticker if tickDuration != config.lambdaBA { if setting.ticker != nil { @@ -348,12 +363,9 @@ Loop: default: } now := time.Now().UTC() - if !checkRound() { - if now.After(roundEndTime) { - // That round is passed. - continue Loop - } - // Sleep until next checkpoint. + var isDisabled bool + setting.recv.isNotary, isDisabled = checkRound() + if isDisabled { select { case <-mgr.ctx.Done(): break Loop @@ -379,7 +391,7 @@ Loop: // Run BA for this round. recv.round = currentRound recv.changeNotaryTime = roundEndTime - recv.restartNotary <- false + recv.restartNotary <- types.Position{ChainID: math.MaxUint32} if err := mgr.baRoutineForOneRound(&setting); err != nil { mgr.logger.Error("BA routine failed", "error", err, @@ -394,6 +406,7 @@ func (mgr *agreementMgr) baRoutineForOneRound( setting *baRoundSetting) (err error) { agr := setting.agr recv := setting.recv + oldPos := agr.agreementID() Loop: for { select { @@ -402,12 +415,18 @@ Loop: default: } select { - case newNotary := <-recv.restartNotary: - if newNotary { - // This round is finished. - break Loop + case restartPos := <-recv.restartNotary: + if !isStop(restartPos) { + if restartPos.Round > oldPos.Round { + // This round is finished. + break Loop + } + if restartPos.Older(&oldPos) { + // The restartNotary event is triggered by 'BlockConfirmed' + // of some older block. + break + } } - oldPos := agr.agreementID() var nextHeight uint64 for { nextHeight, err = mgr.lattice.NextHeight(recv.round, setting.chainID) @@ -425,15 +444,16 @@ Loop: if nextHeight > oldPos.Height { break } - time.Sleep(100 * time.Millisecond) mgr.logger.Debug("Lattice not ready!!!", "old", &oldPos, "next", nextHeight) + time.Sleep(100 * time.Millisecond) } nextPos := types.Position{ Round: recv.round, ChainID: setting.chainID, Height: nextHeight, } + oldPos = nextPos agr.restart(setting.notarySet, nextPos, setting.crs) default: } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go index fdfcd13d0..364f2c75c 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go @@ -216,6 +216,18 @@ func (cc *configurationChain) runDKG(round uint64) error { return nil } +func (cc *configurationChain) isDKGReady(round uint64) bool { + if !cc.gov.IsDKGFinal(round) { + return false + } + return func() bool { + cc.dkgResult.RLock() + defer cc.dkgResult.RUnlock() + _, exist := cc.gpk[round] + return exist + }() +} + func (cc *configurationChain) preparePartialSignature( round uint64, hash common.Hash) (*typesDKG.PartialSignature, error) { signer, exist := func() (*dkgShareSecret, bool) { @@ -305,19 +317,6 @@ func (cc *configurationChain) runTSig( return signature, nil } -func (cc *configurationChain) runBlockTSig( - round uint64, hash common.Hash) (crypto.Signature, error) { - sig, err := cc.runTSig(round, hash) - if err != nil { - return crypto.Signature{}, err - } - cc.logger.Info("Block TSIG", - "nodeID", cc.ID, - "round", round, - "signature", sig) - return sig, nil -} - func (cc *configurationChain) runCRSTSig( round uint64, crs common.Hash) ([]byte, error) { sig, err := cc.runTSig(round, crs) diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go index bfe893cd5..a6d80371d 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go @@ -25,8 +25,8 @@ import ( "time" "github.com/dexon-foundation/dexon-consensus/common" - "github.com/dexon-foundation/dexon-consensus/core/blockdb" "github.com/dexon-foundation/dexon-consensus/core/crypto" + "github.com/dexon-foundation/dexon-consensus/core/db" "github.com/dexon-foundation/dexon-consensus/core/types" typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" "github.com/dexon-foundation/dexon-consensus/core/utils" @@ -70,17 +70,23 @@ type consensusBAReceiver struct { chainID uint32 changeNotaryTime time.Time round uint64 - restartNotary chan bool + isNotary bool + restartNotary chan types.Position } func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { + if !recv.isNotary { + return + } if err := recv.agreementModule.prepareVote(vote); err != nil { recv.consensus.logger.Error("Failed to prepare vote", "error", err) return } go func() { if err := recv.agreementModule.processVote(vote); err != nil { - recv.consensus.logger.Error("Failed to process vote", "error", err) + recv.consensus.logger.Error("Failed to process self vote", + "error", err, + "vote", vote) return } recv.consensus.logger.Debug("Calling Network.BroadcastVote", @@ -90,6 +96,9 @@ func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { } func (recv *consensusBAReceiver) ProposeBlock() common.Hash { + if !recv.isNotary { + return common.Hash{} + } block := recv.consensus.proposeBlock(recv.chainID, recv.round) if block == nil { recv.consensus.logger.Error("unable to propose block") @@ -123,7 +132,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( block, exist = recv.agreementModule.findCandidateBlockNoLock(hash) if !exist { recv.consensus.logger.Error("Unknown block confirmed", - "hash", hash, + "hash", hash.String()[:6], "chainID", recv.chainID) ch := make(chan *types.Block) func() { @@ -135,7 +144,8 @@ func (recv *consensusBAReceiver) ConfirmBlock( go func() { block = <-ch recv.consensus.logger.Info("Receive unknown block", - "hash", hash, + "hash", hash.String()[:6], + "position", &block.Position, "chainID", recv.chainID) recv.agreementModule.addCandidateBlock(block) recv.agreementModule.lock.Lock() @@ -152,8 +162,8 @@ func (recv *consensusBAReceiver) ConfirmBlock( parentHash := hash for { recv.consensus.logger.Warn("Parent block not confirmed", - "hash", parentHash, - "chainID", recv.chainID) + "parent-hash", parentHash.String()[:6], + "cur-position", &block.Position) ch := make(chan *types.Block) if !func() bool { recv.consensus.lock.Lock() @@ -179,11 +189,14 @@ func (recv *consensusBAReceiver) ConfirmBlock( } } recv.consensus.logger.Info("Receive parent block", - "hash", block.ParentHash, + "parent-hash", block.ParentHash.String()[:6], + "cur-position", &block.Position, "chainID", recv.chainID) recv.consensus.ccModule.registerBlock(block) if err := recv.consensus.processBlock(block); err != nil { - recv.consensus.logger.Error("Failed to process block", "error", err) + recv.consensus.logger.Error("Failed to process block", + "block", block, + "error", err) return } parentHash = block.ParentHash @@ -194,24 +207,28 @@ func (recv *consensusBAReceiver) ConfirmBlock( } }(block.ParentHash) } - voteList := make([]types.Vote, 0, len(votes)) - for _, vote := range votes { - if vote.BlockHash != hash { - continue + if recv.isNotary { + voteList := make([]types.Vote, 0, len(votes)) + for _, vote := range votes { + if vote.BlockHash != hash { + continue + } + voteList = append(voteList, *vote) } - voteList = append(voteList, *vote) - } - result := &types.AgreementResult{ - BlockHash: block.Hash, - Position: block.Position, - Votes: voteList, - IsEmptyBlock: isEmptyBlockConfirmed, + result := &types.AgreementResult{ + BlockHash: block.Hash, + Position: block.Position, + Votes: voteList, + IsEmptyBlock: isEmptyBlockConfirmed, + } + recv.consensus.logger.Debug("Propose AgreementResult", + "result", result) + recv.consensus.network.BroadcastAgreementResult(result) } - recv.consensus.logger.Debug("Propose AgreementResult", - "result", result) - recv.consensus.network.BroadcastAgreementResult(result) if err := recv.consensus.processBlock(block); err != nil { - recv.consensus.logger.Error("Failed to process block", "error", err) + recv.consensus.logger.Error("Failed to process block", + "block", block, + "error", err) return } // Clean the restartNotary channel so BA will not stuck by deadlock. @@ -223,15 +240,18 @@ CleanChannelLoop: break CleanChannelLoop } } + newPos := block.Position if block.Timestamp.After(recv.changeNotaryTime) { recv.round++ - recv.restartNotary <- true - } else { - recv.restartNotary <- false + newPos.Round++ } + recv.restartNotary <- newPos } func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) { + if !recv.isNotary { + return + } recv.consensus.logger.Debug("Calling Network.PullBlocks", "hashes", hashes) recv.consensus.network.PullBlocks(hashes) } @@ -312,7 +332,7 @@ func (recv *consensusDKGReceiver) ProposeDKGAntiNackComplaint( // ProposeDKGFinalize propose a DKGFinalize message. func (recv *consensusDKGReceiver) ProposeDKGFinalize(final *typesDKG.Finalize) { if err := recv.authModule.SignDKGFinalize(final); err != nil { - recv.logger.Error("Faield to sign DKG finalize", "error", err) + recv.logger.Error("Failed to sign DKG finalize", "error", err) return } recv.logger.Debug("Calling Governance.AddDKGFinalize", "final", final) @@ -340,21 +360,23 @@ type Consensus struct { toSyncer *totalOrderingSyncer // Interfaces. - db blockdb.BlockDatabase - app Application - gov Governance - network Network + db db.Database + app Application + debugApp Debug + gov Governance + network Network // Misc. - dMoment time.Time - nodeSetCache *utils.NodeSetCache - round uint64 - roundToNotify uint64 - lock sync.RWMutex - ctx context.Context - ctxCancel context.CancelFunc - event *common.Event - logger common.Logger + dMoment time.Time + nodeSetCache *utils.NodeSetCache + round uint64 + roundToNotify uint64 + lock sync.RWMutex + ctx context.Context + ctxCancel context.CancelFunc + event *common.Event + logger common.Logger + nonFinalizedBlockDelivered bool } // NewConsensus construct an Consensus instance. @@ -362,7 +384,7 @@ func NewConsensus( dMoment time.Time, app Application, gov Governance, - db blockdb.BlockDatabase, + db db.Database, network Network, prv crypto.PrivateKey, logger common.Logger) *Consensus { @@ -372,7 +394,10 @@ func NewConsensus( // Setup auth module. authModule := NewAuthenticator(prv) // Check if the application implement Debug interface. - debugApp, _ := app.(Debug) + var debugApp Debug + if a, ok := app.(Debug); ok { + debugApp = a + } // Get configuration for genesis round. var round uint64 logger.Debug("Calling Governance.Configuration", "round", round) @@ -407,6 +432,7 @@ func NewConsensus( ccModule: newCompactionChain(gov), lattice: lattice, app: newNonBlocking(app, debugApp), + debugApp: debugApp, gov: gov, db: db, network: network, @@ -438,7 +464,7 @@ func NewConsensusFromSyncer( initRoundBeginTime time.Time, app Application, gov Governance, - db blockdb.BlockDatabase, + db db.Database, networkModule Network, prv crypto.PrivateKey, latticeModule *Lattice, @@ -492,7 +518,6 @@ func NewConsensusFromSyncer( } // Dump all BA-confirmed blocks to the consensus instance. for _, b := range blocks { - con.app.BlockConfirmed(*b) con.ccModule.registerBlock(b) if err := con.processBlock(b); err != nil { return nil, err @@ -500,7 +525,7 @@ func NewConsensusFromSyncer( } // Dump all randomness result to the consensus instance. for _, r := range randomnessResults { - if err := con.ProcessBlockRandomnessResult(r); err != nil { + if err := con.ProcessBlockRandomnessResult(r, false); err != nil { con.logger.Error("failed to process randomness result when syncing", "result", r) continue @@ -605,21 +630,22 @@ func (con *Consensus) runDKG(round uint64, config *types.Config) { } func (con *Consensus) runCRS(round uint64) { - con.logger.Debug("Calling Governance.CRS to check if already proposed", - "round", round+1) - if (con.gov.CRS(round+1) != common.Hash{}) { - con.logger.Info("CRS already proposed", "round", round+1) - return - } - con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS", - "round", round) - for !con.gov.IsDKGFinal(round) { + for { + con.logger.Debug("Calling Governance.CRS to check if already proposed", + "round", round+1) + if (con.gov.CRS(round+1) != common.Hash{}) { + con.logger.Info("CRS already proposed", "round", round+1) + return + } + con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS", + "round", round) + if con.cfgModule.isDKGReady(round) { + break + } con.logger.Debug("DKG is not ready for running CRS. Retry later...", "round", round) time.Sleep(500 * time.Millisecond) } - // Wait some time for DKG to recover private share. - time.Sleep(100 * time.Millisecond) // Start running next round CRS. con.logger.Debug("Calling Governance.CRS", "round", round) psig, err := con.cfgModule.preparePartialSignature(round, con.gov.CRS(round)) @@ -794,26 +820,30 @@ MessageLoop: // For sync mode. if err := con.processFinalizedBlock(val); err != nil { con.logger.Error("Failed to process finalized block", + "block", val, "error", err) } } else { if err := con.preProcessBlock(val); err != nil { con.logger.Error("Failed to pre process block", + "block", val, "error", err) } } case *types.Vote: if err := con.ProcessVote(val); err != nil { con.logger.Error("Failed to process vote", + "vote", val, "error", err) } case *types.AgreementResult: if err := con.ProcessAgreementResult(val); err != nil { con.logger.Error("Failed to process agreement result", + "result", val, "error", err) } case *types.BlockRandomnessResult: - if err := con.ProcessBlockRandomnessResult(val); err != nil { + if err := con.ProcessBlockRandomnessResult(val, true); err != nil { con.logger.Error("Failed to process block randomness result", "hash", val.BlockHash.String()[:6], "position", &val.Position, @@ -884,9 +914,7 @@ func (con *Consensus) ProcessAgreementResult( if rand.Position.Round == 0 { return nil } - if !con.ccModule.blockRegistered(rand.BlockHash) { - return nil - } + // TODO(mission): find a way to avoid spamming by older agreement results. // Sanity check done. if !con.cfgModule.touchTSigHash(rand.BlockHash) { return nil @@ -914,13 +942,16 @@ func (con *Consensus) ProcessAgreementResult( con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", "proposer", psig.ProposerID, "round", psig.Round, - "hash", psig.Hash) + "hash", psig.Hash.String()[:6]) con.network.BroadcastDKGPartialSignature(psig) go func() { tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash) if err != nil { if err != ErrTSigAlreadyRunning { - con.logger.Error("Faield to run TSIG", "error", err) + con.logger.Error("Failed to run TSIG", + "position", &rand.Position, + "hash", rand.BlockHash.String()[:6], + "error", err) } return } @@ -929,7 +960,7 @@ func (con *Consensus) ProcessAgreementResult( Position: rand.Position, Randomness: tsig.Signature, } - if err := con.ProcessBlockRandomnessResult(result); err != nil { + if err := con.ProcessBlockRandomnessResult(result, true); err != nil { con.logger.Error("Failed to process randomness result", "error", err) return @@ -940,27 +971,33 @@ func (con *Consensus) ProcessAgreementResult( // ProcessBlockRandomnessResult processes the randomness result. func (con *Consensus) ProcessBlockRandomnessResult( - rand *types.BlockRandomnessResult) error { + rand *types.BlockRandomnessResult, needBroadcast bool) error { if rand.Position.Round == 0 { return nil } if err := con.ccModule.processBlockRandomnessResult(rand); err != nil { if err == ErrBlockNotRegistered { err = nil + } else { + return err } - return err } - con.logger.Debug("Calling Network.BroadcastRandomnessResult", - "hash", rand.BlockHash, - "position", &rand.Position, - "randomness", hex.EncodeToString(rand.Randomness)) - con.network.BroadcastRandomnessResult(rand) + if needBroadcast { + con.logger.Debug("Calling Network.BroadcastRandomnessResult", + "hash", rand.BlockHash.String()[:6], + "position", &rand.Position, + "randomness", hex.EncodeToString(rand.Randomness)) + con.network.BroadcastRandomnessResult(rand) + } return nil } // preProcessBlock performs Byzantine Agreement on the block. func (con *Consensus) preProcessBlock(b *types.Block) (err error) { err = con.baMgr.processBlock(b) + if err == nil && con.debugApp != nil { + con.debugApp.BlockReceived(b.Hash) + } return } @@ -999,7 +1036,7 @@ func (con *Consensus) deliverBlock(b *types.Block) { // processBlock is the entry point to submit one block to a Consensus instance. func (con *Consensus) processBlock(block *types.Block) (err error) { - if err = con.db.Put(*block); err != nil && err != blockdb.ErrBlockExists { + if err = con.db.PutBlock(*block); err != nil && err != db.ErrBlockExists { return } con.lock.Lock() @@ -1012,6 +1049,19 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { } // Pass delivered blocks to compaction chain. for _, b := range deliveredBlocks { + if b.IsFinalized() { + if con.nonFinalizedBlockDelivered { + panic(fmt.Errorf("attempting to skip finalized block: %s", b)) + } + con.logger.Info("skip delivery of finalized block", + "block", b, + "finalization-height", b.Finalization.Height) + continue + } else { + // Mark that some non-finalized block delivered. After this flag + // turned on, it's not allowed to deliver finalized blocks anymore. + con.nonFinalizedBlockDelivered = true + } if err = con.ccModule.processBlock(b); err != nil { return } @@ -1022,11 +1072,14 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { "delivered", con.ccModule.lastDeliveredBlock(), "pending", con.ccModule.lastPendingBlock()) for _, b := range deliveredBlocks { - if err = con.db.Update(*b); err != nil { + if err = con.db.UpdateBlock(*b); err != nil { panic(err) } con.cfgModule.untouchTSigHash(b.Hash) con.deliverBlock(b) + if con.debugApp != nil { + con.debugApp.BlockReady(b.Hash) + } } if err = con.lattice.PurgeBlocks(deliveredBlocks); err != nil { return diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockdb/interfaces.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/db/interfaces.go index c85630775..5e13dc604 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockdb/interfaces.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/db/interfaces.go @@ -15,7 +15,7 @@ // along with the dexon-consensus library. If not, see // <http://www.gnu.org/licenses/>. -package blockdb +package db import ( "errors" @@ -40,8 +40,8 @@ var ( ErrNotImplemented = fmt.Errorf("not implemented") ) -// BlockDatabase is the interface for a BlockDatabase. -type BlockDatabase interface { +// Database is the interface for a Database. +type Database interface { Reader Writer @@ -52,19 +52,19 @@ type BlockDatabase interface { // Reader defines the interface for reading blocks into DB. type Reader interface { - Has(hash common.Hash) bool - Get(hash common.Hash) (types.Block, error) - GetAll() (BlockIterator, error) + HasBlock(hash common.Hash) bool + GetBlock(hash common.Hash) (types.Block, error) + GetAllBlocks() (BlockIterator, error) } // Writer defines the interface for writing blocks into DB. type Writer interface { - Update(block types.Block) error - Put(block types.Block) error + UpdateBlock(block types.Block) error + PutBlock(block types.Block) error } // BlockIterator defines an iterator on blocks hold // in a DB. type BlockIterator interface { - Next() (types.Block, error) + NextBlock() (types.Block, error) } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockdb/level-db.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/db/level-db.go index 76730fc9c..6983d3a5e 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockdb/level-db.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/db/level-db.go @@ -15,7 +15,7 @@ // along with the dexon-consensus library. If not, see // <http://www.gnu.org/licenses/>. -package blockdb +package db import ( "encoding/json" @@ -26,30 +26,30 @@ import ( "github.com/dexon-foundation/dexon-consensus/core/types" ) -// LevelDBBackedBlockDB is a leveldb backed BlockDB implementation. -type LevelDBBackedBlockDB struct { +// LevelDBBackedDB is a leveldb backed DB implementation. +type LevelDBBackedDB struct { db *leveldb.DB } -// NewLevelDBBackedBlockDB initialize a leveldb-backed block database. -func NewLevelDBBackedBlockDB( - path string) (lvl *LevelDBBackedBlockDB, err error) { +// NewLevelDBBackedDB initialize a leveldb-backed database. +func NewLevelDBBackedDB( + path string) (lvl *LevelDBBackedDB, err error) { - db, err := leveldb.OpenFile(path, nil) + dbInst, err := leveldb.OpenFile(path, nil) if err != nil { return } - lvl = &LevelDBBackedBlockDB{db: db} + lvl = &LevelDBBackedDB{db: dbInst} return } // Close implement Closer interface, which would release allocated resource. -func (lvl *LevelDBBackedBlockDB) Close() error { +func (lvl *LevelDBBackedDB) Close() error { return lvl.db.Close() } -// Has implements the Reader.Has method. -func (lvl *LevelDBBackedBlockDB) Has(hash common.Hash) bool { +// HasBlock implements the Reader.Has method. +func (lvl *LevelDBBackedDB) HasBlock(hash common.Hash) bool { exists, err := lvl.db.Has([]byte(hash[:]), nil) if err != nil { // TODO(missionliao): Modify the interface to return error. @@ -58,8 +58,8 @@ func (lvl *LevelDBBackedBlockDB) Has(hash common.Hash) bool { return exists } -// Get implements the Reader.Get method. -func (lvl *LevelDBBackedBlockDB) Get( +// GetBlock implements the Reader.GetBlock method. +func (lvl *LevelDBBackedDB) GetBlock( hash common.Hash) (block types.Block, err error) { queried, err := lvl.db.Get([]byte(hash[:]), nil) @@ -76,8 +76,8 @@ func (lvl *LevelDBBackedBlockDB) Get( return } -// Update implements the Writer.Update method. -func (lvl *LevelDBBackedBlockDB) Update(block types.Block) (err error) { +// UpdateBlock implements the Writer.UpdateBlock method. +func (lvl *LevelDBBackedDB) UpdateBlock(block types.Block) (err error) { // NOTE: we didn't handle changes of block hash (and it // should not happen). marshaled, err := json.Marshal(&block) @@ -85,7 +85,7 @@ func (lvl *LevelDBBackedBlockDB) Update(block types.Block) (err error) { return } - if !lvl.Has(block.Hash) { + if !lvl.HasBlock(block.Hash) { err = ErrBlockDoesNotExist return } @@ -99,13 +99,13 @@ func (lvl *LevelDBBackedBlockDB) Update(block types.Block) (err error) { return } -// Put implements the Writer.Put method. -func (lvl *LevelDBBackedBlockDB) Put(block types.Block) (err error) { +// PutBlock implements the Writer.PutBlock method. +func (lvl *LevelDBBackedDB) PutBlock(block types.Block) (err error) { marshaled, err := json.Marshal(&block) if err != nil { return } - if lvl.Has(block.Hash) { + if lvl.HasBlock(block.Hash) { err = ErrBlockExists return } @@ -119,9 +119,9 @@ func (lvl *LevelDBBackedBlockDB) Put(block types.Block) (err error) { return } -// GetAll implements Reader.GetAll method, which allows callers +// GetAllBlocks implements Reader.GetAllBlocks method, which allows callers // to retrieve all blocks in DB. -func (lvl *LevelDBBackedBlockDB) GetAll() (BlockIterator, error) { +func (lvl *LevelDBBackedDB) GetAllBlocks() (BlockIterator, error) { // TODO (mission): Implement this part via goleveldb's iterator. return nil, ErrNotImplemented } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockdb/memory.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/db/memory.go index b45af229b..4246e4fe1 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockdb/memory.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/db/memory.go @@ -15,7 +15,7 @@ // along with the dexon-consensus library. If not, see // <http://www.gnu.org/licenses/>. -package blockdb +package db import ( "encoding/json" @@ -27,36 +27,38 @@ import ( "github.com/dexon-foundation/dexon-consensus/core/types" ) -type seqIterator struct { +type blockSeqIterator struct { idx int - db *MemBackedBlockDB + db *MemBackedDB } -func (seq *seqIterator) Next() (types.Block, error) { +// NextBlock implemenets BlockIterator.NextBlock method. +func (seq *blockSeqIterator) NextBlock() (types.Block, error) { curIdx := seq.idx seq.idx++ - return seq.db.getByIndex(curIdx) + return seq.db.getBlockByIndex(curIdx) } -// MemBackedBlockDB is a memory backed BlockDB implementation. -type MemBackedBlockDB struct { +// MemBackedDB is a memory backed DB implementation. +type MemBackedDB struct { blocksMutex sync.RWMutex blockHashSequence common.Hashes blocksByHash map[common.Hash]*types.Block persistantFilePath string } -// NewMemBackedBlockDB initialize a memory-backed block database. -func NewMemBackedBlockDB(persistantFilePath ...string) (db *MemBackedBlockDB, err error) { - db = &MemBackedBlockDB{ +// NewMemBackedDB initialize a memory-backed database. +func NewMemBackedDB(persistantFilePath ...string) ( + dbInst *MemBackedDB, err error) { + dbInst = &MemBackedDB{ blockHashSequence: common.Hashes{}, blocksByHash: make(map[common.Hash]*types.Block), } if len(persistantFilePath) == 0 || len(persistantFilePath[0]) == 0 { return } - db.persistantFilePath = persistantFilePath[0] - buf, err := ioutil.ReadFile(db.persistantFilePath) + dbInst.persistantFilePath = persistantFilePath[0] + buf, err := ioutil.ReadFile(dbInst.persistantFilePath) if err != nil { if !os.IsNotExist(err) { // Something unexpected happened. @@ -78,13 +80,13 @@ func NewMemBackedBlockDB(persistantFilePath ...string) (db *MemBackedBlockDB, er if err != nil { return } - db.blockHashSequence = toLoad.Sequence - db.blocksByHash = toLoad.ByHash + dbInst.blockHashSequence = toLoad.Sequence + dbInst.blocksByHash = toLoad.ByHash return } -// Has returns wheter or not the DB has a block identified with the hash. -func (m *MemBackedBlockDB) Has(hash common.Hash) bool { +// HasBlock returns wheter or not the DB has a block identified with the hash. +func (m *MemBackedDB) HasBlock(hash common.Hash) bool { m.blocksMutex.RLock() defer m.blocksMutex.RUnlock() @@ -92,15 +94,15 @@ func (m *MemBackedBlockDB) Has(hash common.Hash) bool { return ok } -// Get returns a block given a hash. -func (m *MemBackedBlockDB) Get(hash common.Hash) (types.Block, error) { +// GetBlock returns a block given a hash. +func (m *MemBackedDB) GetBlock(hash common.Hash) (types.Block, error) { m.blocksMutex.RLock() defer m.blocksMutex.RUnlock() - return m.internalGet(hash) + return m.internalGetBlock(hash) } -func (m *MemBackedBlockDB) internalGet(hash common.Hash) (types.Block, error) { +func (m *MemBackedDB) internalGetBlock(hash common.Hash) (types.Block, error) { b, ok := m.blocksByHash[hash] if !ok { return types.Block{}, ErrBlockDoesNotExist @@ -108,9 +110,9 @@ func (m *MemBackedBlockDB) internalGet(hash common.Hash) (types.Block, error) { return *b, nil } -// Put inserts a new block into the database. -func (m *MemBackedBlockDB) Put(block types.Block) error { - if m.Has(block.Hash) { +// PutBlock inserts a new block into the database. +func (m *MemBackedDB) PutBlock(block types.Block) error { + if m.HasBlock(block.Hash) { return ErrBlockExists } @@ -122,9 +124,9 @@ func (m *MemBackedBlockDB) Put(block types.Block) error { return nil } -// Update updates a block in the database. -func (m *MemBackedBlockDB) Update(block types.Block) error { - if !m.Has(block.Hash) { +// UpdateBlock updates a block in the database. +func (m *MemBackedDB) UpdateBlock(block types.Block) error { + if !m.HasBlock(block.Hash) { return ErrBlockDoesNotExist } @@ -136,7 +138,7 @@ func (m *MemBackedBlockDB) Update(block types.Block) error { } // Close implement Closer interface, which would release allocated resource. -func (m *MemBackedBlockDB) Close() (err error) { +func (m *MemBackedDB) Close() (err error) { // Save internal state to a pretty-print json file. It's a temporary way // to dump private file via JSON encoding. if len(m.persistantFilePath) == 0 { @@ -164,7 +166,7 @@ func (m *MemBackedBlockDB) Close() (err error) { return } -func (m *MemBackedBlockDB) getByIndex(idx int) (types.Block, error) { +func (m *MemBackedDB) getBlockByIndex(idx int) (types.Block, error) { m.blocksMutex.RLock() defer m.blocksMutex.RUnlock() @@ -173,11 +175,11 @@ func (m *MemBackedBlockDB) getByIndex(idx int) (types.Block, error) { } hash := m.blockHashSequence[idx] - return m.internalGet(hash) + return m.internalGetBlock(hash) } -// GetAll implement Reader.GetAll method, which allows caller +// GetAllBlocks implement Reader.GetAllBlocks method, which allows caller // to retrieve all blocks in DB. -func (m *MemBackedBlockDB) GetAll() (BlockIterator, error) { - return &seqIterator{db: m}, nil +func (m *MemBackedDB) GetAllBlocks() (BlockIterator, error) { + return &blockSeqIterator{db: m}, nil } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go index 69798540f..2ebfe8621 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go @@ -41,7 +41,7 @@ type Application interface { // BlockConfirmed is called when a block is confirmed and added to lattice. BlockConfirmed(block types.Block) - // BlockDelivered is called when a block is add to the compaction chain. + // BlockDelivered is called when a block is added to the compaction chain. BlockDelivered(blockHash common.Hash, blockPosition types.Position, result types.FinalizationResult) } @@ -49,9 +49,13 @@ type Application interface { // Debug describes the application interface that requires // more detailed consensus execution. type Debug interface { + // BlockReceived is called when the block received in agreement. + BlockReceived(common.Hash) // TotalOrderingDelivered is called when the total ordering algorithm deliver // a set of block. TotalOrderingDelivered(common.Hashes, uint32) + // BlockReady is called when the block's randomness is ready. + BlockReady(common.Hash) } // Network describs the network interface that interacts with DEXON consensus diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go index f1ab2de6a..e55c0dbfc 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go @@ -24,7 +24,7 @@ import ( "time" "github.com/dexon-foundation/dexon-consensus/common" - "github.com/dexon-foundation/dexon-consensus/core/blockdb" + "github.com/dexon-foundation/dexon-consensus/core/db" "github.com/dexon-foundation/dexon-consensus/core/types" "github.com/dexon-foundation/dexon-consensus/core/utils" ) @@ -105,8 +105,8 @@ func newLatticeDataConfig( // latticeData is a module for storing lattice. type latticeData struct { - // BlockDB for getting blocks purged in memory. - db blockdb.Reader + // DB for getting blocks purged in memory. + db db.Reader // chains stores chains' blocks and other info. chains []*chainStatus // blockByHash stores blocks, indexed by block hash. @@ -117,7 +117,7 @@ type latticeData struct { // newLatticeData creates a new latticeData instance. func newLatticeData( - db blockdb.Reader, + db db.Reader, dMoment time.Time, round uint64, config *types.Config) (data *latticeData) { @@ -146,7 +146,7 @@ func (data *latticeData) checkAckingRelations(b *types.Block) error { for _, hash := range b.Acks { bAck, err := data.findBlock(hash) if err != nil { - if err == blockdb.ErrBlockDoesNotExist { + if err == db.ErrBlockDoesNotExist { return &ErrAckingBlockNotExists{hash} } return err @@ -276,7 +276,7 @@ func (data *latticeData) addBlock( // Update lastAckPos. for _, ack := range block.Acks { if bAck, err = data.findBlock(ack); err != nil { - if err == blockdb.ErrBlockDoesNotExist { + if err == db.ErrBlockDoesNotExist { err = nil continue } @@ -298,7 +298,7 @@ func (data *latticeData) addBlock( allAckingBlockDelivered := true for _, ack := range tip.Acks { if bAck, err = data.findBlock(ack); err != nil { - if err == blockdb.ErrBlockDoesNotExist { + if err == db.ErrBlockDoesNotExist { err = nil allAckingBlockDelivered = false break @@ -525,7 +525,7 @@ func (data *latticeData) findBlock(h common.Hash) (b *types.Block, err error) { return } var tmpB types.Block - if tmpB, err = data.db.Get(h); err != nil { + if tmpB, err = data.db.GetBlock(h); err != nil { return } b = &tmpB @@ -632,7 +632,7 @@ func (s *chainStatus) addBlock(b *types.Block) { } // purgeBlock purges a block from cache, make sure this block is already saved -// in blockdb. +// in db. func (s *chainStatus) purgeBlock(b *types.Block) error { if b.Hash != s.blocks[0].Hash || s.nextOutputIndex <= 0 { return ErrPurgeNotDeliveredBlock diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go index e578e3f4f..8780bbaec 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go @@ -23,7 +23,7 @@ import ( "time" "github.com/dexon-foundation/dexon-consensus/common" - "github.com/dexon-foundation/dexon-consensus/core/blockdb" + "github.com/dexon-foundation/dexon-consensus/core/db" "github.com/dexon-foundation/dexon-consensus/core/types" ) @@ -53,7 +53,7 @@ func NewLattice( authModule *Authenticator, app Application, debug Debug, - db blockdb.BlockDatabase, + db db.Database, logger common.Logger) *Lattice { // Create genesis latticeDataConfig. @@ -304,21 +304,21 @@ func (l *Lattice) AppendConfig(round uint64, config *types.Config) (err error) { } // ProcessFinalizedBlock is used for syncing lattice data. -func (l *Lattice) ProcessFinalizedBlock(b *types.Block) { +func (l *Lattice) ProcessFinalizedBlock(b *types.Block) ([]*types.Block, error) { l.lock.Lock() defer l.lock.Unlock() // Syncing state for core.latticeData module. if err := l.data.addFinalizedBlock(b); err != nil { - panic(err) + return nil, err } l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height) // Syncing state for core.totalOrdering module. toDelivered, deliveredMode, err := l.toModule.processBlock(b) if err != nil { - panic(err) + return nil, err } if len(toDelivered) == 0 { - return + return nil, nil } hashes := make(common.Hashes, len(toDelivered)) for idx := range toDelivered { @@ -329,7 +329,7 @@ func (l *Lattice) ProcessFinalizedBlock(b *types.Block) { } // Sync core.consensusTimestamp module. if err = l.ctModule.processBlocks(toDelivered); err != nil { - panic(err) + return nil, err } - return + return toDelivered, nil } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/nonblocking.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/nonblocking.go index f94d3c631..56c42fec6 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/nonblocking.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/nonblocking.go @@ -44,7 +44,7 @@ type blockDeliveredEvent struct { // them that makes the methods to be non-blocking. // - Application // - Debug -// - It also provides nonblockig for blockdb update. +// - It also provides nonblockig for db update. type nonBlocking struct { app Application debug Debug @@ -75,7 +75,7 @@ func (nb *nonBlocking) addEvent(event interface{}) { func (nb *nonBlocking) run() { // This go routine consume the first event from events and call the - // corresponding methods of Application/Debug/blockdb. + // corresponding methods of Application/Debug/db. for { var event interface{} func() { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go index 52f927005..3bf6946ae 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/total-ordering.go @@ -535,11 +535,13 @@ type totalOrderingGlobalVector struct { cachedCandidateInfo *totalOrderingCandidateInfo } -func newTotalOrderingGlobalVector(numChains uint32) *totalOrderingGlobalVector { +func newTotalOrderingGlobalVector( + initRound uint64, numChains uint32) *totalOrderingGlobalVector { return &totalOrderingGlobalVector{ blocks: make([][]*types.Block, numChains), tips: make([]*types.Block, numChains), breakpoints: make([][]*totalOrderingBreakpoint, numChains), + curRound: initRound, } } @@ -792,14 +794,14 @@ type totalOrdering struct { } // newTotalOrdering constructs an totalOrdering instance. -func newTotalOrdering(dMoment time.Time, round uint64, cfg *types.Config) *totalOrdering { +func newTotalOrdering( + dMoment time.Time, round uint64, cfg *types.Config) *totalOrdering { config := &totalOrderingConfig{} config.fromConfig(round, cfg) config.setRoundBeginTime(dMoment) candidates := make([]*totalOrderingCandidateInfo, config.numChains) to := &totalOrdering{ pendings: make(map[common.Hash]*types.Block), - globalVector: newTotalOrderingGlobalVector(config.numChains), dirtyChainIDs: make([]int, 0, config.numChains), acked: make(map[common.Hash]map[common.Hash]struct{}), objCache: newTotalOrderingObjectCache(config.numChains), @@ -807,6 +809,8 @@ func newTotalOrdering(dMoment time.Time, round uint64, cfg *types.Config) *total candidates: candidates, candidateChainIDs: make([]uint32, 0, config.numChains), curRound: config.roundID, + globalVector: newTotalOrderingGlobalVector( + config.roundID, config.numChains), } to.configs = []*totalOrderingConfig{config} return to @@ -898,7 +902,8 @@ func (to *totalOrdering) clean(b *types.Block) { } // updateVectors is a helper function to update all cached vectors. -func (to *totalOrdering) updateVectors(b *types.Block) (isOldest bool, err error) { +func (to *totalOrdering) updateVectors( + b *types.Block) (isOldest bool, err error) { var ( candidateHash common.Hash chainID uint32 diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go index bc5e33636..2ef243757 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go @@ -219,3 +219,15 @@ func DiffUint64(a, b uint64) uint64 { } return b - a } + +func isCI() bool { + return os.Getenv("CI") != "" +} + +func isCircleCI() bool { + return isCI() && os.Getenv("CIRCLECI") == "true" +} + +func isTravisCI() bool { + return isCI() && os.Getenv("TRAVIS") == "true" +} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go index a8f8fe58f..6d4f7b0ba 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/nodeset-cache.go @@ -27,8 +27,12 @@ import ( ) var ( - // ErrRoundNotReady means we got nil config. - ErrRoundNotReady = errors.New("round is not ready") + // ErrNodeSetNotReady means we got nil empty node set. + ErrNodeSetNotReady = errors.New("node set is not ready") + // ErrCRSNotReady means we got empty CRS. + ErrCRSNotReady = errors.New("crs is not ready") + // ErrConfigurationNotReady means we go nil configuration. + ErrConfigurationNotReady = errors.New("configuration is not ready") // ErrInvalidChainID means the chain ID is unexpected. ErrInvalidChainID = errors.New("invalid chain id") ) @@ -172,16 +176,15 @@ func (cache *NodeSetCache) update( cache.lock.Lock() defer cache.lock.Unlock() - // Get the requested round. + // Get information for the requested round. keySet := cache.nsIntf.NodeSet(round) if keySet == nil { - // That round is not ready yet. - err = ErrRoundNotReady + err = ErrNodeSetNotReady return } crs := cache.nsIntf.CRS(round) if (crs == common.Hash{}) { - err = ErrRoundNotReady + err = ErrCRSNotReady return } // Cache new round. @@ -199,6 +202,10 @@ func (cache *NodeSetCache) update( } } cfg := cache.nsIntf.Configuration(round) + if cfg == nil { + err = ErrConfigurationNotReady + return + } nIDs = &sets{ nodeSet: nodeSet, notarySet: make([]map[types.NodeID]struct{}, cfg.NumChains), |