From a69fb3e4c59fab52b6e10993c67400084879b1a8 Mon Sep 17 00:00:00 2001 From: Wei-Ning Huang Date: Mon, 29 Oct 2018 09:31:36 +0800 Subject: vendor: sync consensus core and fix conflict --- .../dexon-consensus-core/core/agreement.go | 61 ++++++- .../dexon-consensus-core/core/compaction-chain.go | 32 ++-- .../core/configuration-chain.go | 8 +- .../dexon-consensus-core/core/consensus.go | 185 +++++++++++++++------ .../dexon-consensus-core/core/interfaces.go | 3 + .../dexon-consensus-core/core/lattice-data.go | 76 +++++++-- .../dexon-consensus-core/core/lattice.go | 126 +++++++++----- .../dexon-consensus-core/core/leader-selector.go | 42 ++++- .../dexon-consensus-core/core/types/block.go | 1 + .../dexon-consensus-core/core/types/config.go | 16 ++ .../dexon-consensus-core/core/types/position.go | 23 ++- 11 files changed, 440 insertions(+), 133 deletions(-) (limited to 'vendor/github.com') diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement.go index 8618b5ff0..8c2218be0 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement.go @@ -21,7 +21,6 @@ import ( "fmt" "math" "sync" - "sync/atomic" "time" "github.com/dexon-foundation/dexon-consensus-core/common" @@ -69,6 +68,7 @@ type agreementReceiver interface { ProposeVote(vote *types.Vote) ProposeBlock() common.Hash ConfirmBlock(common.Hash, map[types.NodeID]*types.Vote) + PullBlocks(common.Hashes) } type pendingBlock struct { @@ -101,7 +101,7 @@ type agreementData struct { type agreement struct { state agreementState data *agreementData - aID *atomic.Value + aID types.Position notarySet map[types.NodeID]struct{} hasOutput bool lock sync.RWMutex @@ -125,7 +125,6 @@ func newAgreement( ID: ID, leader: leader, }, - aID: &atomic.Value{}, candidateBlock: make(map[common.Hash]*types.Block), fastForward: make(chan uint64, 1), authModule: authModule, @@ -158,9 +157,13 @@ func (a *agreement) restart( a.state = newInitialState(a.data) a.notarySet = notarySet a.candidateBlock = make(map[common.Hash]*types.Block) - a.aID.Store(aID) + a.aID = *aID.Clone() }() + if isStop(aID) { + return + } + expireTime := time.Now().Add(-10 * time.Second) replayBlock := make([]*types.Block, 0) func() { @@ -168,7 +171,9 @@ func (a *agreement) restart( defer a.lock.Unlock() newPendingBlock := make([]pendingBlock, 0) for _, pending := range a.pendingBlock { - if pending.block.Position == aID { + if aID.Newer(&pending.block.Position) { + continue + } else if pending.block.Position == aID { replayBlock = append(replayBlock, pending.block) } else if pending.receivedTime.After(expireTime) { newPendingBlock = append(newPendingBlock, pending) @@ -183,7 +188,9 @@ func (a *agreement) restart( defer a.lock.Unlock() newPendingVote := make([]pendingVote, 0) for _, pending := range a.pendingVote { - if pending.vote.Position == aID { + if aID.Newer(&pending.vote.Position) { + continue + } else if pending.vote.Position == aID { replayVote = append(replayVote, pending.vote) } else if pending.receivedTime.After(expireTime) { newPendingVote = append(newPendingVote, pending) @@ -207,6 +214,10 @@ func (a *agreement) stop() { }) } +func isStop(aID types.Position) bool { + return aID.ChainID == math.MaxUint32 +} + // clocks returns how many time this state is required. func (a *agreement) clocks() int { return a.state.clocks() @@ -214,7 +225,9 @@ func (a *agreement) clocks() int { // agreementID returns the current agreementID. func (a *agreement) agreementID() types.Position { - return a.aID.Load().(types.Position) + a.lock.RLock() + defer a.lock.RUnlock() + return a.aID } // nextState is called at the specific clock time. @@ -272,7 +285,14 @@ func (a *agreement) processVote(vote *types.Vote) error { if err := a.sanityCheck(vote); err != nil { return err } - if vote.Position != a.agreementID() { + aID := a.agreementID() + if vote.Position != aID { + // Agreement module has stopped. + if !isStop(aID) { + if aID.Newer(&vote.Position) { + return nil + } + } a.lock.Lock() defer a.lock.Unlock() a.pendingVote = append(a.pendingVote, pendingVote{ @@ -327,6 +347,21 @@ func (a *agreement) processVote(vote *types.Vote) error { // Condition 3. if vote.Type == types.VoteCom && vote.Period >= a.data.period && len(a.data.votes[vote.Period][types.VoteCom]) >= a.data.requiredVote { + hashes := common.Hashes{} + addPullBlocks := func(voteType types.VoteType) { + for _, vote := range a.data.votes[vote.Period][voteType] { + if vote.BlockHash == nullBlockHash || vote.BlockHash == skipBlockHash { + continue + } + if _, found := a.findCandidateBlock(vote.BlockHash); !found { + hashes = append(hashes, vote.BlockHash) + } + } + } + addPullBlocks(types.VoteInit) + addPullBlocks(types.VotePreCom) + addPullBlocks(types.VoteCom) + a.data.recv.PullBlocks(hashes) a.fastForward <- vote.Period + 1 return nil } @@ -356,7 +391,15 @@ func (a *agreement) done() <-chan struct{} { func (a *agreement) processBlock(block *types.Block) error { a.data.blocksLock.Lock() defer a.data.blocksLock.Unlock() - if block.Position != a.agreementID() { + + aID := a.agreementID() + if block.Position != aID { + // Agreement module has stopped. + if !isStop(aID) { + if aID.Newer(&block.Position) { + return nil + } + } a.pendingBlock = append(a.pendingBlock, pendingBlock{ block: block, receivedTime: time.Now().UTC(), diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/compaction-chain.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/compaction-chain.go index 451cb1355..50056a9d8 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/compaction-chain.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/compaction-chain.go @@ -148,6 +148,11 @@ func (cc *compactionChain) processFinalizedBlock(block *types.Block) { return } + // Block of round 0 should not have randomness. + if block.Position.Round == 0 && len(block.Finalization.Randomness) != 0 { + return + } + cc.lock.Lock() defer cc.lock.Unlock() heap.Push(cc.pendingFinalizedBlocks, block) @@ -191,18 +196,21 @@ func (cc *compactionChain) extractFinalizedBlocks() []*types.Block { continue } round := b.Position.Round - v, ok, err := cc.tsigVerifier.UpdateAndGet(round) - if err != nil { - continue - } - if !ok { - toPending = append(toPending, b) - continue - } - if ok := v.VerifySignature(b.Hash, crypto.Signature{ - Type: "bls", - Signature: b.Finalization.Randomness}); !ok { - continue + if round != 0 { + // Randomness is not available at round 0. + v, ok, err := cc.tsigVerifier.UpdateAndGet(round) + if err != nil { + continue + } + if !ok { + toPending = append(toPending, b) + continue + } + if ok := v.VerifySignature(b.Hash, crypto.Signature{ + Type: "bls", + Signature: b.Finalization.Randomness}); !ok { + continue + } } // Fork resolution: choose block with smaller hash. if prevBlock.Finalization.Height == b.Finalization.Height { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/configuration-chain.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/configuration-chain.go index 559eac0b7..bf24c31dc 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/configuration-chain.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/configuration-chain.go @@ -205,6 +205,12 @@ func (cc *configurationChain) touchTSigHash(hash common.Hash) (first bool) { return !exist } +func (cc *configurationChain) untouchTSigHash(hash common.Hash) { + cc.tsigReady.L.Lock() + defer cc.tsigReady.L.Unlock() + delete(cc.tsigTouched, hash) +} + func (cc *configurationChain) runTSig( round uint64, hash common.Hash) ( crypto.Signature, error) { @@ -240,10 +246,10 @@ func (cc *configurationChain) runTSig( signature, err = cc.tsig[hash].signature() return err == ErrNotEnoughtPartialSignatures }() { + // TODO(jimmy-dexon): add a timeout here. cc.tsigReady.Wait() } delete(cc.tsig, hash) - delete(cc.tsigTouched, hash) if err != nil { return crypto.Signature{}, err } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go index e20b4e79d..938337f1c 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go @@ -47,6 +47,12 @@ var ( "incorrect agreement result position") ErrNotEnoughVotes = fmt.Errorf( "not enought votes") + ErrIncorrectVoteBlockHash = fmt.Errorf( + "incorrect vote block hash") + ErrIncorrectVoteType = fmt.Errorf( + "incorrect vote type") + ErrIncorrectVotePosition = fmt.Errorf( + "incorrect vote position") ErrIncorrectVoteProposer = fmt.Errorf( "incorrect vote proposer") ErrIncorrectBlockRandomnessResult = fmt.Errorf( @@ -113,8 +119,20 @@ func (recv *consensusBAReceiver) ConfirmBlock( block, exist = recv.consensus.baModules[recv.chainID]. findCandidateBlock(hash) if !exist { - recv.consensus.logger.Error("Unknown block confirmed", "hash", hash) - return + recv.consensus.logger.Error("Unknown block confirmed", + "hash", hash, + "chainID", recv.chainID) + ch := make(chan *types.Block) + func() { + recv.consensus.lock.Lock() + defer recv.consensus.lock.Unlock() + recv.consensus.baConfirmedBlock[hash] = ch + }() + recv.consensus.network.PullBlocks(common.Hashes{hash}) + block = <-ch + recv.consensus.logger.Info("Receive unknown block", + "hash", hash, + "chainID", recv.chainID) } } recv.consensus.ccModule.registerBlock(block) @@ -145,6 +163,11 @@ func (recv *consensusBAReceiver) ConfirmBlock( } } +func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) { + recv.consensus.logger.Debug("Calling Network.PullBlocks", "hashes", hashes) + recv.consensus.network.PullBlocks(hashes) +} + // consensusDKGReceiver implements dkgReceiver. type consensusDKGReceiver struct { ID types.NodeID @@ -236,8 +259,9 @@ type Consensus struct { currentConfig *types.Config // BA. - baModules []*agreement - receivers []*consensusBAReceiver + baModules []*agreement + receivers []*consensusBAReceiver + baConfirmedBlock map[common.Hash]chan<- *types.Block // DKG. dkgRunning int32 @@ -318,23 +342,28 @@ func NewConsensus( recv.cfgModule = cfgModule // Construct Consensus instance. con := &Consensus{ - ID: ID, - currentConfig: config, - ccModule: newCompactionChain(gov), - lattice: lattice, - app: app, - gov: gov, - db: db, - network: network, - tickerObj: newTicker(gov, round, TickerBA), - dkgReady: sync.NewCond(&sync.Mutex{}), - cfgModule: cfgModule, - dMoment: dMoment, - nodeSetCache: nodeSetCache, - authModule: authModule, - event: common.NewEvent(), - logger: logger, - roundToNotify: roundToNotify, + ID: ID, + currentConfig: config, + ccModule: newCompactionChain(gov), + lattice: lattice, + app: app, + gov: gov, + db: db, + network: network, + tickerObj: newTicker(gov, round, TickerBA), + baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), + dkgReady: sync.NewCond(&sync.Mutex{}), + cfgModule: cfgModule, + dMoment: dMoment, + nodeSetCache: nodeSetCache, + authModule: authModule, + event: common.NewEvent(), + logger: logger, + roundToNotify: roundToNotify, + } + + validLeader := func(block *types.Block) bool { + return lattice.SanityCheck(block) == nil } con.baModules = make([]*agreement, config.NumChains) @@ -350,7 +379,7 @@ func NewConsensus( con.ID, recv, nodes.IDs, - newLeaderSelector(crs), + newLeaderSelector(crs, validLeader), con.authModule, ) // Hacky way to make agreement module self contained. @@ -608,6 +637,7 @@ func (con *Consensus) Stop() { } func (con *Consensus) processMsg(msgChan <-chan interface{}) { +MessageLoop: for { var msg interface{} select { @@ -618,8 +648,30 @@ func (con *Consensus) processMsg(msgChan <-chan interface{}) { switch val := msg.(type) { case *types.Block: - // For sync mode. - if val.IsFinalized() { + if ch, exist := func() (chan<- *types.Block, bool) { + con.lock.RLock() + defer con.lock.RUnlock() + ch, e := con.baConfirmedBlock[val.Hash] + return ch, e + }(); exist { + if err := con.lattice.SanityCheck(val); err != nil { + if err == ErrRetrySanityCheckLater { + err = nil + } else { + con.logger.Error("SanityCheck failed", "error", err) + continue MessageLoop + } + } + con.lock.Lock() + defer con.lock.Unlock() + // In case of multiple delivered block. + if _, exist := con.baConfirmedBlock[val.Hash]; !exist { + continue MessageLoop + } + delete(con.baConfirmedBlock, val.Hash) + ch <- val + } else if val.IsFinalized() { + // For sync mode. if err := con.processFinalizedBlock(val); err != nil { con.logger.Error("Failed to process finalized block", "error", err) @@ -697,27 +749,28 @@ func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { // ProcessAgreementResult processes the randomness request. func (con *Consensus) ProcessAgreementResult( rand *types.AgreementResult) error { - if rand.Position.Round == 0 { - return nil - } - if !con.ccModule.blockRegistered(rand.BlockHash) { - return nil - } - if DiffUint64(con.round, rand.Position.Round) > 1 { - return nil - } - if len(rand.Votes) <= int(con.currentConfig.NotarySetSize/3*2) { - return ErrNotEnoughVotes - } - if rand.Position.ChainID >= con.currentConfig.NumChains { - return ErrIncorrectAgreementResultPosition - } + // Sanity Check. notarySet, err := con.nodeSetCache.GetNotarySet( rand.Position.Round, rand.Position.ChainID) if err != nil { return err } + if len(rand.Votes) < len(notarySet)/3*2+1 { + return ErrNotEnoughVotes + } + if len(rand.Votes) > len(notarySet) { + return ErrIncorrectVoteProposer + } for _, vote := range rand.Votes { + if vote.BlockHash != rand.BlockHash { + return ErrIncorrectVoteBlockHash + } + if vote.Type != types.VoteCom { + return ErrIncorrectVoteType + } + if vote.Position != rand.Position { + return ErrIncorrectVotePosition + } if _, exist := notarySet[vote.ProposerID]; !exist { return ErrIncorrectVoteProposer } @@ -729,6 +782,37 @@ func (con *Consensus) ProcessAgreementResult( return ErrIncorrectVoteSignature } } + // Syncing BA Module. + agreement := con.baModules[rand.Position.ChainID] + aID := agreement.agreementID() + if rand.Position.Newer(&aID) { + con.logger.Info("Syncing BA", "position", rand.Position) + nodes, err := con.nodeSetCache.GetNodeSet(rand.Position.Round) + if err != nil { + return err + } + con.logger.Debug("Calling Network.PullBlocks for syncing BA", + "hash", rand.BlockHash) + con.network.PullBlocks(common.Hashes{rand.BlockHash}) + nIDs := nodes.GetSubSet( + int(con.gov.Configuration(rand.Position.Round).NotarySetSize), + types.NewNotarySetTarget( + con.gov.CRS(rand.Position.Round), rand.Position.ChainID)) + for _, vote := range rand.Votes { + agreement.processVote(&vote) + } + agreement.restart(nIDs, rand.Position) + } + // Calculating randomness. + if rand.Position.Round == 0 { + return nil + } + if !con.ccModule.blockRegistered(rand.BlockHash) { + return nil + } + if DiffUint64(con.round, rand.Position.Round) > 1 { + return nil + } // Sanity check done. if !con.cfgModule.touchTSigHash(rand.BlockHash) { return nil @@ -816,7 +900,9 @@ func (con *Consensus) ProcessBlockRandomnessResult( // preProcessBlock performs Byzantine Agreement on the block. func (con *Consensus) preProcessBlock(b *types.Block) (err error) { if err = con.lattice.SanityCheck(b); err != nil { - return + if err != ErrRetrySanityCheckLater { + return + } } if err = con.baModules[b.Position.ChainID].processBlock(b); err != nil { return err @@ -826,16 +912,16 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) { // processBlock is the entry point to submit one block to a Consensus instance. func (con *Consensus) processBlock(block *types.Block) (err error) { - verifiedBlocks, deliveredBlocks, err := con.lattice.ProcessBlock(block) - if err != nil { + if err = con.db.Put(*block); err != nil && err != blockdb.ErrBlockExists { return } - // Pass verified blocks (pass sanity check) back to BA module. - for _, b := range verifiedBlocks { - if err := - con.baModules[b.Position.ChainID].processBlock(b); err != nil { - return err - } + con.lock.Lock() + defer con.lock.Unlock() + // Block processed by lattice can be out-of-order. But the output of lattice + // (deliveredBlocks) cannot. + deliveredBlocks, err := con.lattice.ProcessBlock(block) + if err != nil { + return } // Pass delivered blocks to compaction chain. for _, b := range deliveredBlocks { @@ -846,9 +932,10 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { } deliveredBlocks = con.ccModule.extractBlocks() for _, b := range deliveredBlocks { - if err = con.db.Put(*b); err != nil { + if err = con.db.Update(*b); err != nil { panic(err) } + con.cfgModule.untouchTSigHash(b.Hash) // TODO(mission): clone types.FinalizationResult con.app.BlockDelivered(b.Hash, b.Finalization) } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/interfaces.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/interfaces.go index 01e909667..4f6ad45a2 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/interfaces.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/interfaces.go @@ -59,6 +59,9 @@ type Debug interface { // Network describs the network interface that interacts with DEXON consensus // core. type Network interface { + // PullBlocks tries to pull blocks from the DEXON network. + PullBlocks(hashes common.Hashes) + // BroadcastVote broadcasts vote to all nodes in DEXON network. BroadcastVote(vote *types.Vote) diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice-data.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice-data.go index b0fe9cfdd..2a3ec299e 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice-data.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice-data.go @@ -30,7 +30,6 @@ import ( // Errors for sanity check error. var ( - ErrAckingBlockNotExists = fmt.Errorf("acking block not exists") ErrDuplicatedAckOnOneChain = fmt.Errorf("duplicated ack on one chain") ErrInvalidChainID = fmt.Errorf("invalid chain id") ErrInvalidProposerID = fmt.Errorf("invalid proposer id") @@ -50,6 +49,15 @@ var ( ErrUnexpectedGenesisBlock = fmt.Errorf("unexpected genesis block") ) +// ErrAckingBlockNotExists is for sanity check error. +type ErrAckingBlockNotExists struct { + hash common.Hash +} + +func (e ErrAckingBlockNotExists) Error() string { + return fmt.Sprintf("acking block %s not exists", e.hash) +} + // Errors for method usage var ( ErrRoundNotIncreasing = errors.New("round not increasing") @@ -144,7 +152,7 @@ func (data *latticeData) checkAckingRelations(b *types.Block) error { bAck, err := data.findBlock(hash) if err != nil { if err == blockdb.ErrBlockDoesNotExist { - return ErrAckingBlockNotExists + return &ErrAckingBlockNotExists{hash} } return err } @@ -185,7 +193,7 @@ func (data *latticeData) sanityCheck(b *types.Block) error { chainTip := chain.tip if chainTip == nil { if !b.ParentHash.Equal(common.Hash{}) { - return ErrAckingBlockNotExists + return &ErrAckingBlockNotExists{b.ParentHash} } if !b.IsGenesis() { return ErrNotGenesisBlock @@ -198,7 +206,7 @@ func (data *latticeData) sanityCheck(b *types.Block) error { // Check parent block if parent hash is specified. if !b.ParentHash.Equal(common.Hash{}) { if !b.ParentHash.Equal(chainTip.Hash) { - return ErrAckingBlockNotExists + return &ErrAckingBlockNotExists{b.ParentHash} } if !b.IsAcking(b.ParentHash) { return ErrNotAckParent @@ -268,18 +276,21 @@ func (data *latticeData) addBlock( bAck *types.Block updated bool ) - if err = data.chains[block.Position.ChainID].addBlock(block); err != nil { - return - } + data.chains[block.Position.ChainID].addBlock(block) data.blockByHash[block.Hash] = block // Update lastAckPos. for _, ack := range block.Acks { if bAck, err = data.findBlock(ack); err != nil { + if err == blockdb.ErrBlockDoesNotExist { + err = nil + continue + } return } data.chains[bAck.Position.ChainID].lastAckPos[block.Position.ChainID] = bAck.Position.Clone() } + // Extract blocks that deliverable to total ordering. // A block is deliverable to total ordering iff: // - All its acking blocks are delivered to total ordering. @@ -293,19 +304,37 @@ func (data *latticeData) addBlock( allAckingBlockDelivered := true for _, ack := range tip.Acks { if bAck, err = data.findBlock(ack); err != nil { + if err == blockdb.ErrBlockDoesNotExist { + err = nil + allAckingBlockDelivered = false + break + } return } // Check if this block is outputed or not. idx := data.chains[bAck.Position.ChainID].findBlock( &bAck.Position) - if idx == -1 || - idx < data.chains[bAck.Position.ChainID].nextOutputIndex { + var ok bool + if idx == -1 { + // Either the block is delivered or not added to chain yet. + if out := + data.chains[bAck.Position.ChainID].lastOutputPosition; out != nil { + ok = !out.Older(&bAck.Position) + } else if ackTip := + data.chains[bAck.Position.ChainID].tip; ackTip != nil { + ok = !ackTip.Position.Older(&bAck.Position) + } + } else { + ok = idx < data.chains[bAck.Position.ChainID].nextOutputIndex + } + if ok { continue } // This acked block exists and not delivered yet. allAckingBlockDelivered = false } if allAckingBlockDelivered { + status.lastOutputPosition = &tip.Position status.nextOutputIndex++ deliverable = append(deliverable, tip) updated = true @@ -318,6 +347,30 @@ func (data *latticeData) addBlock( return } +// addFinalizedBlock processes block for syncing internal data. +func (data *latticeData) addFinalizedBlock( + block *types.Block) (err error) { + var bAck *types.Block + chain := data.chains[block.Position.ChainID] + if chain.tip != nil && chain.tip.Position.Height >= + block.Position.Height { + return + } + chain.nextOutputIndex = 0 + chain.blocks = []*types.Block{} + chain.tip = block + chain.lastOutputPosition = nil + // Update lastAckPost. + for _, ack := range block.Acks { + if bAck, err = data.findBlock(ack); err != nil { + return + } + data.chains[bAck.Position.ChainID].lastAckPos[block.Position.ChainID] = + bAck.Position.Clone() + } + return +} + // prepareBlock helps to setup fields of block based on its ChainID and Round, // including: // - Acks @@ -524,6 +577,8 @@ type chainStatus struct { lastAckPos []*types.Position // the index to be output next time. nextOutputIndex int + // the position of output last time. + lastOutputPosition *types.Position } // findBlock finds index of block in current pending blocks on this chain. @@ -551,10 +606,9 @@ func (s *chainStatus) getBlock(idx int) (b *types.Block) { } // addBlock adds a block to pending blocks on this chain. -func (s *chainStatus) addBlock(b *types.Block) error { +func (s *chainStatus) addBlock(b *types.Block) { s.blocks = append(s.blocks, b) s.tip = b - return nil } // TODO(mission): change back to nextHeight. diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice.go index 3259f3540..68b05c2e6 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice.go @@ -18,6 +18,7 @@ package core import ( + "fmt" "sync" "time" @@ -26,6 +27,11 @@ import ( "github.com/dexon-foundation/dexon-consensus-core/core/types" ) +// Errors for sanity check error. +var ( + ErrRetrySanityCheckLater = fmt.Errorf("retry sanity check later") +) + // Lattice represents a unit to produce a global ordering from multiple chains. type Lattice struct { lock sync.RWMutex @@ -34,6 +40,7 @@ type Lattice struct { app Application debug Debug pool blockPool + retryAdd bool data *latticeData toModule *totalOrdering ctModule *consensusTimestamp @@ -137,10 +144,8 @@ func (s *Lattice) SanityCheck(b *types.Block) (err error) { s.lock.RLock() defer s.lock.RUnlock() if err = s.data.sanityCheck(b); err != nil { - // Add to block pool, once the lattice updated, - // would be checked again. - if err == ErrAckingBlockNotExists { - s.pool.addBlock(b) + if _, ok := err.(*ErrAckingBlockNotExists); ok { + err = ErrRetrySanityCheckLater } s.logger.Error("Sanity Check failed", "error", err) return @@ -151,11 +156,67 @@ func (s *Lattice) SanityCheck(b *types.Block) (err error) { } // Verify data in application layer. s.logger.Debug("Calling Application.VerifyBlock", "block", b) - // TODO(jimmy-dexon): handle types.VerifyRetryLater. - if s.app.VerifyBlock(b) == types.VerifyInvalidBlock { + switch s.app.VerifyBlock(b) { + case types.VerifyInvalidBlock: err = ErrInvalidBlock - return err + case types.VerifyRetryLater: + err = ErrRetrySanityCheckLater + } + return +} + +// addBlockToLattice adds a block into lattice, and deliver blocks with the acks +// already delivered. +// +// NOTE: assume the block passed sanity check. +func (s *Lattice) addBlockToLattice( + input *types.Block) (outputBlocks []*types.Block, err error) { + if tip := s.data.chains[input.Position.ChainID].tip; tip != nil { + if !input.Position.Newer(&tip.Position) { + return + } + } + s.pool.addBlock(input) + // Replay tips in pool to check their validity. + for { + hasOutput := false + for i := uint32(0); i < s.chainNum; i++ { + var tip *types.Block + if tip = s.pool.tip(i); tip == nil { + continue + } + err = s.data.sanityCheck(tip) + if err == nil { + var output []*types.Block + if output, err = s.data.addBlock(tip); err != nil { + s.logger.Error("Sanity Check failed", "error", err) + continue + } + hasOutput = true + outputBlocks = append(outputBlocks, output...) + } + if _, ok := err.(*ErrAckingBlockNotExists); ok { + err = nil + continue + } + s.pool.removeTip(i) + } + if !hasOutput { + break + } } + + for _, b := range outputBlocks { + // TODO(jimmy-dexon): change this name of classic DEXON algorithm. + if s.debug != nil { + s.debug.StronglyAcked(b.Hash) + } + s.logger.Debug("Calling Application.BlockConfirmed", "block", input) + s.app.BlockConfirmed(*b.Clone()) + // Purge blocks in pool with the same chainID and lower height. + s.pool.purgeBlocks(b.Position.ChainID, b.Position.Height) + } + return } @@ -165,45 +226,25 @@ func (s *Lattice) SanityCheck(b *types.Block) (err error) { // // NOTE: assume the block passed sanity check. func (s *Lattice) ProcessBlock( - input *types.Block) (verified, delivered []*types.Block, err error) { - + input *types.Block) (delivered []*types.Block, err error) { var ( - tip, b *types.Block - toDelivered []*types.Block + b *types.Block inLattice []*types.Block + toDelivered []*types.Block deliveredMode uint32 ) + s.lock.Lock() defer s.lock.Unlock() - if inLattice, err = s.data.addBlock(input); err != nil { - // TODO(mission): if sanity check failed with "acking block doesn't - // exists", we should keep it in a pool. - s.logger.Error("Sanity Check failed when adding blocks", "error", err) + + if inLattice, err = s.addBlockToLattice(input); err != nil { return } - // TODO(mission): remove this hack, BA related stuffs should not - // be done here. - if s.debug != nil { - s.debug.StronglyAcked(input.Hash) - } - s.logger.Debug("Calling Application.BlockConfirmed", "block", input) - s.app.BlockConfirmed(*input.Clone()) - // Purge blocks in pool with the same chainID and lower height. - s.pool.purgeBlocks(input.Position.ChainID, input.Position.Height) - // Replay tips in pool to check their validity. - for i := uint32(0); i < s.chainNum; i++ { - if tip = s.pool.tip(i); tip == nil { - continue - } - err = s.data.sanityCheck(tip) - if err == nil { - verified = append(verified, tip) - } - if err == ErrAckingBlockNotExists { - continue - } - s.pool.removeTip(i) + + if len(inLattice) == 0 { + return } + // Perform total ordering for each block added to lattice. for _, b = range inLattice { toDelivered, deliveredMode, err = s.toModule.processBlock(b) @@ -265,3 +306,14 @@ func (s *Lattice) AppendConfig(round uint64, config *types.Config) (err error) { } return } + +// ProcessFinalizedBlock is used for syncing lattice data. +func (s *Lattice) ProcessFinalizedBlock(input *types.Block) { + defer func() { s.retryAdd = true }() + s.lock.Lock() + defer s.lock.Unlock() + if err := s.data.addFinalizedBlock(input); err != nil { + panic(err) + } + s.pool.purgeBlocks(input.Position.ChainID, input.Position.Height) +} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/leader-selector.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/leader-selector.go index 23b9bb12e..bfaa19c11 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/leader-selector.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/leader-selector.go @@ -20,6 +20,7 @@ package core import ( "fmt" "math/big" + "sync" "github.com/dexon-foundation/dexon-consensus-core/common" "github.com/dexon-foundation/dexon-consensus-core/core/crypto" @@ -31,6 +32,8 @@ var ( ErrIncorrectCRSSignature = fmt.Errorf("incorrect CRS signature") ) +type validLeaderFn func(*types.Block) bool + // Some constant value. var ( maxHash *big.Int @@ -47,20 +50,24 @@ func init() { } type leaderSelector struct { - hashCRS common.Hash - numCRS *big.Int - minCRSBlock *big.Int - minBlockHash common.Hash + hashCRS common.Hash + numCRS *big.Int + minCRSBlock *big.Int + minBlockHash common.Hash + pendingBlocks []*types.Block + validLeader validLeaderFn + lock sync.Mutex } func newLeaderSelector( - crs common.Hash) *leaderSelector { + crs common.Hash, validLeader validLeaderFn) *leaderSelector { numCRS := big.NewInt(0) numCRS.SetBytes(crs[:]) return &leaderSelector{ numCRS: numCRS, hashCRS: crs, minCRSBlock: maxHash, + validLeader: validLeader, } } @@ -80,11 +87,25 @@ func (l *leaderSelector) probability(sig crypto.Signature) float64 { } func (l *leaderSelector) restart() { + l.lock.Lock() + defer l.lock.Unlock() l.minCRSBlock = maxHash l.minBlockHash = common.Hash{} + l.pendingBlocks = []*types.Block{} } func (l *leaderSelector) leaderBlockHash() common.Hash { + l.lock.Lock() + defer l.lock.Unlock() + newPendingBlocks := []*types.Block{} + for _, b := range l.pendingBlocks { + if l.validLeader(b) { + l.updateLeader(b) + } else { + newPendingBlocks = append(newPendingBlocks, b) + } + } + l.pendingBlocks = newPendingBlocks return l.minBlockHash } @@ -96,11 +117,20 @@ func (l *leaderSelector) processBlock(block *types.Block) error { if !ok { return ErrIncorrectCRSSignature } + l.lock.Lock() + defer l.lock.Unlock() + if !l.validLeader(block) { + l.pendingBlocks = append(l.pendingBlocks, block) + return nil + } + l.updateLeader(block) + return nil +} +func (l *leaderSelector) updateLeader(block *types.Block) { dist := l.distance(block.CRSSignature) cmp := l.minCRSBlock.Cmp(dist) if cmp > 0 || (cmp == 0 && block.Hash.Less(l.minBlockHash)) { l.minCRSBlock = dist l.minBlockHash = block.Hash } - return nil } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/block.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/block.go index e12e0d5c7..67226927f 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/block.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/block.go @@ -215,6 +215,7 @@ func (b *Block) Clone() (bcopy *Block) { bcopy.ProposerID = b.ProposerID bcopy.ParentHash = b.ParentHash bcopy.Hash = b.Hash + bcopy.Position.Round = b.Position.Round bcopy.Position.ChainID = b.Position.ChainID bcopy.Position.Height = b.Position.Height bcopy.Signature = b.Signature.Clone() diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/config.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/config.go index 372ffb4da..df28b2055 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/config.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/config.go @@ -46,6 +46,22 @@ type Config struct { MaxBlockInterval time.Duration } +// Clone return a copied configuration. +func (c *Config) Clone() *Config { + return &Config{ + NumChains: c.NumChains, + LambdaBA: c.LambdaBA, + LambdaDKG: c.LambdaDKG, + K: c.K, + PhiRatio: c.PhiRatio, + NotarySetSize: c.NotarySetSize, + DKGSetSize: c.DKGSetSize, + RoundInterval: c.RoundInterval, + MinBlockInterval: c.MinBlockInterval, + MaxBlockInterval: c.MaxBlockInterval, + } +} + // Bytes returns []byte representation of Config. func (c *Config) Bytes() []byte { binaryNumChains := make([]byte, 4) diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/position.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/position.go index f41be324e..8e7e85298 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/position.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/position.go @@ -18,15 +18,9 @@ package types import ( - "errors" "fmt" ) -// ErrComparePositionOnDifferentChains raised when attempting to -// compare two positions with different chain ID. -var ErrComparePositionOnDifferentChains = errors.New( - "position on different chain") - // Position describes the position in the block lattice of an entity. type Position struct { ChainID uint32 `json:"chain_id"` @@ -42,7 +36,8 @@ func (pos *Position) String() string { // are different. func (pos *Position) Equal(other *Position) bool { if pos.ChainID != other.ChainID { - panic(ErrComparePositionOnDifferentChains) + panic(fmt.Errorf("unexpected chainID %d, should be %d", + other.ChainID, pos.ChainID)) } return pos.Round == other.Round && pos.Height == other.Height } @@ -51,12 +46,24 @@ func (pos *Position) Equal(other *Position) bool { // If two blocks on different chain compared by this function, it would panic. func (pos *Position) Newer(other *Position) bool { if pos.ChainID != other.ChainID { - panic(ErrComparePositionOnDifferentChains) + panic(fmt.Errorf("unexpected chainID %d, should be %d", + other.ChainID, pos.ChainID)) } return pos.Round > other.Round || (pos.Round == other.Round && pos.Height > other.Height) } +// Older checks if one block is older than another one on the same chain. +// If two blocks on different chain compared by this function, it would panic. +func (pos *Position) Older(other *Position) bool { + if pos.ChainID != other.ChainID { + panic(fmt.Errorf("unexpected chainID %d, should be %d", + other.ChainID, pos.ChainID)) + } + return pos.Round < other.Round || + (pos.Round == other.Round && pos.Height < other.Height) +} + // Clone a position instance. func (pos *Position) Clone() *Position { return &Position{ -- cgit v1.2.3