aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com
diff options
context:
space:
mode:
authorWei-Ning Huang <w@dexon.org>2018-10-29 09:31:36 +0800
committerWei-Ning Huang <w@byzantine-lab.io>2019-06-12 17:27:17 +0800
commita69fb3e4c59fab52b6e10993c67400084879b1a8 (patch)
tree8e849faadee1e1c43068a4b0392a02050d271e3e /vendor/github.com
parent3f320c9048198d14bc44413861efcbc5665324b1 (diff)
downloadgo-tangerine-a69fb3e4c59fab52b6e10993c67400084879b1a8.tar
go-tangerine-a69fb3e4c59fab52b6e10993c67400084879b1a8.tar.gz
go-tangerine-a69fb3e4c59fab52b6e10993c67400084879b1a8.tar.bz2
go-tangerine-a69fb3e4c59fab52b6e10993c67400084879b1a8.tar.lz
go-tangerine-a69fb3e4c59fab52b6e10993c67400084879b1a8.tar.xz
go-tangerine-a69fb3e4c59fab52b6e10993c67400084879b1a8.tar.zst
go-tangerine-a69fb3e4c59fab52b6e10993c67400084879b1a8.zip
vendor: sync consensus core and fix conflict
Diffstat (limited to 'vendor/github.com')
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement.go61
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus-core/core/compaction-chain.go32
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus-core/core/configuration-chain.go8
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go185
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus-core/core/interfaces.go3
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice-data.go76
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice.go126
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus-core/core/leader-selector.go42
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/block.go1
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/config.go16
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/position.go23
11 files changed, 440 insertions, 133 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement.go
index 8618b5ff0..8c2218be0 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement.go
@@ -21,7 +21,6 @@ import (
"fmt"
"math"
"sync"
- "sync/atomic"
"time"
"github.com/dexon-foundation/dexon-consensus-core/common"
@@ -69,6 +68,7 @@ type agreementReceiver interface {
ProposeVote(vote *types.Vote)
ProposeBlock() common.Hash
ConfirmBlock(common.Hash, map[types.NodeID]*types.Vote)
+ PullBlocks(common.Hashes)
}
type pendingBlock struct {
@@ -101,7 +101,7 @@ type agreementData struct {
type agreement struct {
state agreementState
data *agreementData
- aID *atomic.Value
+ aID types.Position
notarySet map[types.NodeID]struct{}
hasOutput bool
lock sync.RWMutex
@@ -125,7 +125,6 @@ func newAgreement(
ID: ID,
leader: leader,
},
- aID: &atomic.Value{},
candidateBlock: make(map[common.Hash]*types.Block),
fastForward: make(chan uint64, 1),
authModule: authModule,
@@ -158,9 +157,13 @@ func (a *agreement) restart(
a.state = newInitialState(a.data)
a.notarySet = notarySet
a.candidateBlock = make(map[common.Hash]*types.Block)
- a.aID.Store(aID)
+ a.aID = *aID.Clone()
}()
+ if isStop(aID) {
+ return
+ }
+
expireTime := time.Now().Add(-10 * time.Second)
replayBlock := make([]*types.Block, 0)
func() {
@@ -168,7 +171,9 @@ func (a *agreement) restart(
defer a.lock.Unlock()
newPendingBlock := make([]pendingBlock, 0)
for _, pending := range a.pendingBlock {
- if pending.block.Position == aID {
+ if aID.Newer(&pending.block.Position) {
+ continue
+ } else if pending.block.Position == aID {
replayBlock = append(replayBlock, pending.block)
} else if pending.receivedTime.After(expireTime) {
newPendingBlock = append(newPendingBlock, pending)
@@ -183,7 +188,9 @@ func (a *agreement) restart(
defer a.lock.Unlock()
newPendingVote := make([]pendingVote, 0)
for _, pending := range a.pendingVote {
- if pending.vote.Position == aID {
+ if aID.Newer(&pending.vote.Position) {
+ continue
+ } else if pending.vote.Position == aID {
replayVote = append(replayVote, pending.vote)
} else if pending.receivedTime.After(expireTime) {
newPendingVote = append(newPendingVote, pending)
@@ -207,6 +214,10 @@ func (a *agreement) stop() {
})
}
+func isStop(aID types.Position) bool {
+ return aID.ChainID == math.MaxUint32
+}
+
// clocks returns how many time this state is required.
func (a *agreement) clocks() int {
return a.state.clocks()
@@ -214,7 +225,9 @@ func (a *agreement) clocks() int {
// agreementID returns the current agreementID.
func (a *agreement) agreementID() types.Position {
- return a.aID.Load().(types.Position)
+ a.lock.RLock()
+ defer a.lock.RUnlock()
+ return a.aID
}
// nextState is called at the specific clock time.
@@ -272,7 +285,14 @@ func (a *agreement) processVote(vote *types.Vote) error {
if err := a.sanityCheck(vote); err != nil {
return err
}
- if vote.Position != a.agreementID() {
+ aID := a.agreementID()
+ if vote.Position != aID {
+ // Agreement module has stopped.
+ if !isStop(aID) {
+ if aID.Newer(&vote.Position) {
+ return nil
+ }
+ }
a.lock.Lock()
defer a.lock.Unlock()
a.pendingVote = append(a.pendingVote, pendingVote{
@@ -327,6 +347,21 @@ func (a *agreement) processVote(vote *types.Vote) error {
// Condition 3.
if vote.Type == types.VoteCom && vote.Period >= a.data.period &&
len(a.data.votes[vote.Period][types.VoteCom]) >= a.data.requiredVote {
+ hashes := common.Hashes{}
+ addPullBlocks := func(voteType types.VoteType) {
+ for _, vote := range a.data.votes[vote.Period][voteType] {
+ if vote.BlockHash == nullBlockHash || vote.BlockHash == skipBlockHash {
+ continue
+ }
+ if _, found := a.findCandidateBlock(vote.BlockHash); !found {
+ hashes = append(hashes, vote.BlockHash)
+ }
+ }
+ }
+ addPullBlocks(types.VoteInit)
+ addPullBlocks(types.VotePreCom)
+ addPullBlocks(types.VoteCom)
+ a.data.recv.PullBlocks(hashes)
a.fastForward <- vote.Period + 1
return nil
}
@@ -356,7 +391,15 @@ func (a *agreement) done() <-chan struct{} {
func (a *agreement) processBlock(block *types.Block) error {
a.data.blocksLock.Lock()
defer a.data.blocksLock.Unlock()
- if block.Position != a.agreementID() {
+
+ aID := a.agreementID()
+ if block.Position != aID {
+ // Agreement module has stopped.
+ if !isStop(aID) {
+ if aID.Newer(&block.Position) {
+ return nil
+ }
+ }
a.pendingBlock = append(a.pendingBlock, pendingBlock{
block: block,
receivedTime: time.Now().UTC(),
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/compaction-chain.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/compaction-chain.go
index 451cb1355..50056a9d8 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/compaction-chain.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/compaction-chain.go
@@ -148,6 +148,11 @@ func (cc *compactionChain) processFinalizedBlock(block *types.Block) {
return
}
+ // Block of round 0 should not have randomness.
+ if block.Position.Round == 0 && len(block.Finalization.Randomness) != 0 {
+ return
+ }
+
cc.lock.Lock()
defer cc.lock.Unlock()
heap.Push(cc.pendingFinalizedBlocks, block)
@@ -191,18 +196,21 @@ func (cc *compactionChain) extractFinalizedBlocks() []*types.Block {
continue
}
round := b.Position.Round
- v, ok, err := cc.tsigVerifier.UpdateAndGet(round)
- if err != nil {
- continue
- }
- if !ok {
- toPending = append(toPending, b)
- continue
- }
- if ok := v.VerifySignature(b.Hash, crypto.Signature{
- Type: "bls",
- Signature: b.Finalization.Randomness}); !ok {
- continue
+ if round != 0 {
+ // Randomness is not available at round 0.
+ v, ok, err := cc.tsigVerifier.UpdateAndGet(round)
+ if err != nil {
+ continue
+ }
+ if !ok {
+ toPending = append(toPending, b)
+ continue
+ }
+ if ok := v.VerifySignature(b.Hash, crypto.Signature{
+ Type: "bls",
+ Signature: b.Finalization.Randomness}); !ok {
+ continue
+ }
}
// Fork resolution: choose block with smaller hash.
if prevBlock.Finalization.Height == b.Finalization.Height {
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/configuration-chain.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/configuration-chain.go
index 559eac0b7..bf24c31dc 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/configuration-chain.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/configuration-chain.go
@@ -205,6 +205,12 @@ func (cc *configurationChain) touchTSigHash(hash common.Hash) (first bool) {
return !exist
}
+func (cc *configurationChain) untouchTSigHash(hash common.Hash) {
+ cc.tsigReady.L.Lock()
+ defer cc.tsigReady.L.Unlock()
+ delete(cc.tsigTouched, hash)
+}
+
func (cc *configurationChain) runTSig(
round uint64, hash common.Hash) (
crypto.Signature, error) {
@@ -240,10 +246,10 @@ func (cc *configurationChain) runTSig(
signature, err = cc.tsig[hash].signature()
return err == ErrNotEnoughtPartialSignatures
}() {
+ // TODO(jimmy-dexon): add a timeout here.
cc.tsigReady.Wait()
}
delete(cc.tsig, hash)
- delete(cc.tsigTouched, hash)
if err != nil {
return crypto.Signature{}, err
}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go
index e20b4e79d..938337f1c 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go
@@ -47,6 +47,12 @@ var (
"incorrect agreement result position")
ErrNotEnoughVotes = fmt.Errorf(
"not enought votes")
+ ErrIncorrectVoteBlockHash = fmt.Errorf(
+ "incorrect vote block hash")
+ ErrIncorrectVoteType = fmt.Errorf(
+ "incorrect vote type")
+ ErrIncorrectVotePosition = fmt.Errorf(
+ "incorrect vote position")
ErrIncorrectVoteProposer = fmt.Errorf(
"incorrect vote proposer")
ErrIncorrectBlockRandomnessResult = fmt.Errorf(
@@ -113,8 +119,20 @@ func (recv *consensusBAReceiver) ConfirmBlock(
block, exist = recv.consensus.baModules[recv.chainID].
findCandidateBlock(hash)
if !exist {
- recv.consensus.logger.Error("Unknown block confirmed", "hash", hash)
- return
+ recv.consensus.logger.Error("Unknown block confirmed",
+ "hash", hash,
+ "chainID", recv.chainID)
+ ch := make(chan *types.Block)
+ func() {
+ recv.consensus.lock.Lock()
+ defer recv.consensus.lock.Unlock()
+ recv.consensus.baConfirmedBlock[hash] = ch
+ }()
+ recv.consensus.network.PullBlocks(common.Hashes{hash})
+ block = <-ch
+ recv.consensus.logger.Info("Receive unknown block",
+ "hash", hash,
+ "chainID", recv.chainID)
}
}
recv.consensus.ccModule.registerBlock(block)
@@ -145,6 +163,11 @@ func (recv *consensusBAReceiver) ConfirmBlock(
}
}
+func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) {
+ recv.consensus.logger.Debug("Calling Network.PullBlocks", "hashes", hashes)
+ recv.consensus.network.PullBlocks(hashes)
+}
+
// consensusDKGReceiver implements dkgReceiver.
type consensusDKGReceiver struct {
ID types.NodeID
@@ -236,8 +259,9 @@ type Consensus struct {
currentConfig *types.Config
// BA.
- baModules []*agreement
- receivers []*consensusBAReceiver
+ baModules []*agreement
+ receivers []*consensusBAReceiver
+ baConfirmedBlock map[common.Hash]chan<- *types.Block
// DKG.
dkgRunning int32
@@ -318,23 +342,28 @@ func NewConsensus(
recv.cfgModule = cfgModule
// Construct Consensus instance.
con := &Consensus{
- ID: ID,
- currentConfig: config,
- ccModule: newCompactionChain(gov),
- lattice: lattice,
- app: app,
- gov: gov,
- db: db,
- network: network,
- tickerObj: newTicker(gov, round, TickerBA),
- dkgReady: sync.NewCond(&sync.Mutex{}),
- cfgModule: cfgModule,
- dMoment: dMoment,
- nodeSetCache: nodeSetCache,
- authModule: authModule,
- event: common.NewEvent(),
- logger: logger,
- roundToNotify: roundToNotify,
+ ID: ID,
+ currentConfig: config,
+ ccModule: newCompactionChain(gov),
+ lattice: lattice,
+ app: app,
+ gov: gov,
+ db: db,
+ network: network,
+ tickerObj: newTicker(gov, round, TickerBA),
+ baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
+ dkgReady: sync.NewCond(&sync.Mutex{}),
+ cfgModule: cfgModule,
+ dMoment: dMoment,
+ nodeSetCache: nodeSetCache,
+ authModule: authModule,
+ event: common.NewEvent(),
+ logger: logger,
+ roundToNotify: roundToNotify,
+ }
+
+ validLeader := func(block *types.Block) bool {
+ return lattice.SanityCheck(block) == nil
}
con.baModules = make([]*agreement, config.NumChains)
@@ -350,7 +379,7 @@ func NewConsensus(
con.ID,
recv,
nodes.IDs,
- newLeaderSelector(crs),
+ newLeaderSelector(crs, validLeader),
con.authModule,
)
// Hacky way to make agreement module self contained.
@@ -608,6 +637,7 @@ func (con *Consensus) Stop() {
}
func (con *Consensus) processMsg(msgChan <-chan interface{}) {
+MessageLoop:
for {
var msg interface{}
select {
@@ -618,8 +648,30 @@ func (con *Consensus) processMsg(msgChan <-chan interface{}) {
switch val := msg.(type) {
case *types.Block:
- // For sync mode.
- if val.IsFinalized() {
+ if ch, exist := func() (chan<- *types.Block, bool) {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ ch, e := con.baConfirmedBlock[val.Hash]
+ return ch, e
+ }(); exist {
+ if err := con.lattice.SanityCheck(val); err != nil {
+ if err == ErrRetrySanityCheckLater {
+ err = nil
+ } else {
+ con.logger.Error("SanityCheck failed", "error", err)
+ continue MessageLoop
+ }
+ }
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ // In case of multiple delivered block.
+ if _, exist := con.baConfirmedBlock[val.Hash]; !exist {
+ continue MessageLoop
+ }
+ delete(con.baConfirmedBlock, val.Hash)
+ ch <- val
+ } else if val.IsFinalized() {
+ // For sync mode.
if err := con.processFinalizedBlock(val); err != nil {
con.logger.Error("Failed to process finalized block",
"error", err)
@@ -697,27 +749,28 @@ func (con *Consensus) ProcessVote(vote *types.Vote) (err error) {
// ProcessAgreementResult processes the randomness request.
func (con *Consensus) ProcessAgreementResult(
rand *types.AgreementResult) error {
- if rand.Position.Round == 0 {
- return nil
- }
- if !con.ccModule.blockRegistered(rand.BlockHash) {
- return nil
- }
- if DiffUint64(con.round, rand.Position.Round) > 1 {
- return nil
- }
- if len(rand.Votes) <= int(con.currentConfig.NotarySetSize/3*2) {
- return ErrNotEnoughVotes
- }
- if rand.Position.ChainID >= con.currentConfig.NumChains {
- return ErrIncorrectAgreementResultPosition
- }
+ // Sanity Check.
notarySet, err := con.nodeSetCache.GetNotarySet(
rand.Position.Round, rand.Position.ChainID)
if err != nil {
return err
}
+ if len(rand.Votes) < len(notarySet)/3*2+1 {
+ return ErrNotEnoughVotes
+ }
+ if len(rand.Votes) > len(notarySet) {
+ return ErrIncorrectVoteProposer
+ }
for _, vote := range rand.Votes {
+ if vote.BlockHash != rand.BlockHash {
+ return ErrIncorrectVoteBlockHash
+ }
+ if vote.Type != types.VoteCom {
+ return ErrIncorrectVoteType
+ }
+ if vote.Position != rand.Position {
+ return ErrIncorrectVotePosition
+ }
if _, exist := notarySet[vote.ProposerID]; !exist {
return ErrIncorrectVoteProposer
}
@@ -729,6 +782,37 @@ func (con *Consensus) ProcessAgreementResult(
return ErrIncorrectVoteSignature
}
}
+ // Syncing BA Module.
+ agreement := con.baModules[rand.Position.ChainID]
+ aID := agreement.agreementID()
+ if rand.Position.Newer(&aID) {
+ con.logger.Info("Syncing BA", "position", rand.Position)
+ nodes, err := con.nodeSetCache.GetNodeSet(rand.Position.Round)
+ if err != nil {
+ return err
+ }
+ con.logger.Debug("Calling Network.PullBlocks for syncing BA",
+ "hash", rand.BlockHash)
+ con.network.PullBlocks(common.Hashes{rand.BlockHash})
+ nIDs := nodes.GetSubSet(
+ int(con.gov.Configuration(rand.Position.Round).NotarySetSize),
+ types.NewNotarySetTarget(
+ con.gov.CRS(rand.Position.Round), rand.Position.ChainID))
+ for _, vote := range rand.Votes {
+ agreement.processVote(&vote)
+ }
+ agreement.restart(nIDs, rand.Position)
+ }
+ // Calculating randomness.
+ if rand.Position.Round == 0 {
+ return nil
+ }
+ if !con.ccModule.blockRegistered(rand.BlockHash) {
+ return nil
+ }
+ if DiffUint64(con.round, rand.Position.Round) > 1 {
+ return nil
+ }
// Sanity check done.
if !con.cfgModule.touchTSigHash(rand.BlockHash) {
return nil
@@ -816,7 +900,9 @@ func (con *Consensus) ProcessBlockRandomnessResult(
// preProcessBlock performs Byzantine Agreement on the block.
func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
if err = con.lattice.SanityCheck(b); err != nil {
- return
+ if err != ErrRetrySanityCheckLater {
+ return
+ }
}
if err = con.baModules[b.Position.ChainID].processBlock(b); err != nil {
return err
@@ -826,16 +912,16 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
// processBlock is the entry point to submit one block to a Consensus instance.
func (con *Consensus) processBlock(block *types.Block) (err error) {
- verifiedBlocks, deliveredBlocks, err := con.lattice.ProcessBlock(block)
- if err != nil {
+ if err = con.db.Put(*block); err != nil && err != blockdb.ErrBlockExists {
return
}
- // Pass verified blocks (pass sanity check) back to BA module.
- for _, b := range verifiedBlocks {
- if err :=
- con.baModules[b.Position.ChainID].processBlock(b); err != nil {
- return err
- }
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ // Block processed by lattice can be out-of-order. But the output of lattice
+ // (deliveredBlocks) cannot.
+ deliveredBlocks, err := con.lattice.ProcessBlock(block)
+ if err != nil {
+ return
}
// Pass delivered blocks to compaction chain.
for _, b := range deliveredBlocks {
@@ -846,9 +932,10 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
}
deliveredBlocks = con.ccModule.extractBlocks()
for _, b := range deliveredBlocks {
- if err = con.db.Put(*b); err != nil {
+ if err = con.db.Update(*b); err != nil {
panic(err)
}
+ con.cfgModule.untouchTSigHash(b.Hash)
// TODO(mission): clone types.FinalizationResult
con.app.BlockDelivered(b.Hash, b.Finalization)
}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/interfaces.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/interfaces.go
index 01e909667..4f6ad45a2 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/interfaces.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/interfaces.go
@@ -59,6 +59,9 @@ type Debug interface {
// Network describs the network interface that interacts with DEXON consensus
// core.
type Network interface {
+ // PullBlocks tries to pull blocks from the DEXON network.
+ PullBlocks(hashes common.Hashes)
+
// BroadcastVote broadcasts vote to all nodes in DEXON network.
BroadcastVote(vote *types.Vote)
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice-data.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice-data.go
index b0fe9cfdd..2a3ec299e 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice-data.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice-data.go
@@ -30,7 +30,6 @@ import (
// Errors for sanity check error.
var (
- ErrAckingBlockNotExists = fmt.Errorf("acking block not exists")
ErrDuplicatedAckOnOneChain = fmt.Errorf("duplicated ack on one chain")
ErrInvalidChainID = fmt.Errorf("invalid chain id")
ErrInvalidProposerID = fmt.Errorf("invalid proposer id")
@@ -50,6 +49,15 @@ var (
ErrUnexpectedGenesisBlock = fmt.Errorf("unexpected genesis block")
)
+// ErrAckingBlockNotExists is for sanity check error.
+type ErrAckingBlockNotExists struct {
+ hash common.Hash
+}
+
+func (e ErrAckingBlockNotExists) Error() string {
+ return fmt.Sprintf("acking block %s not exists", e.hash)
+}
+
// Errors for method usage
var (
ErrRoundNotIncreasing = errors.New("round not increasing")
@@ -144,7 +152,7 @@ func (data *latticeData) checkAckingRelations(b *types.Block) error {
bAck, err := data.findBlock(hash)
if err != nil {
if err == blockdb.ErrBlockDoesNotExist {
- return ErrAckingBlockNotExists
+ return &ErrAckingBlockNotExists{hash}
}
return err
}
@@ -185,7 +193,7 @@ func (data *latticeData) sanityCheck(b *types.Block) error {
chainTip := chain.tip
if chainTip == nil {
if !b.ParentHash.Equal(common.Hash{}) {
- return ErrAckingBlockNotExists
+ return &ErrAckingBlockNotExists{b.ParentHash}
}
if !b.IsGenesis() {
return ErrNotGenesisBlock
@@ -198,7 +206,7 @@ func (data *latticeData) sanityCheck(b *types.Block) error {
// Check parent block if parent hash is specified.
if !b.ParentHash.Equal(common.Hash{}) {
if !b.ParentHash.Equal(chainTip.Hash) {
- return ErrAckingBlockNotExists
+ return &ErrAckingBlockNotExists{b.ParentHash}
}
if !b.IsAcking(b.ParentHash) {
return ErrNotAckParent
@@ -268,18 +276,21 @@ func (data *latticeData) addBlock(
bAck *types.Block
updated bool
)
- if err = data.chains[block.Position.ChainID].addBlock(block); err != nil {
- return
- }
+ data.chains[block.Position.ChainID].addBlock(block)
data.blockByHash[block.Hash] = block
// Update lastAckPos.
for _, ack := range block.Acks {
if bAck, err = data.findBlock(ack); err != nil {
+ if err == blockdb.ErrBlockDoesNotExist {
+ err = nil
+ continue
+ }
return
}
data.chains[bAck.Position.ChainID].lastAckPos[block.Position.ChainID] =
bAck.Position.Clone()
}
+
// Extract blocks that deliverable to total ordering.
// A block is deliverable to total ordering iff:
// - All its acking blocks are delivered to total ordering.
@@ -293,19 +304,37 @@ func (data *latticeData) addBlock(
allAckingBlockDelivered := true
for _, ack := range tip.Acks {
if bAck, err = data.findBlock(ack); err != nil {
+ if err == blockdb.ErrBlockDoesNotExist {
+ err = nil
+ allAckingBlockDelivered = false
+ break
+ }
return
}
// Check if this block is outputed or not.
idx := data.chains[bAck.Position.ChainID].findBlock(
&bAck.Position)
- if idx == -1 ||
- idx < data.chains[bAck.Position.ChainID].nextOutputIndex {
+ var ok bool
+ if idx == -1 {
+ // Either the block is delivered or not added to chain yet.
+ if out :=
+ data.chains[bAck.Position.ChainID].lastOutputPosition; out != nil {
+ ok = !out.Older(&bAck.Position)
+ } else if ackTip :=
+ data.chains[bAck.Position.ChainID].tip; ackTip != nil {
+ ok = !ackTip.Position.Older(&bAck.Position)
+ }
+ } else {
+ ok = idx < data.chains[bAck.Position.ChainID].nextOutputIndex
+ }
+ if ok {
continue
}
// This acked block exists and not delivered yet.
allAckingBlockDelivered = false
}
if allAckingBlockDelivered {
+ status.lastOutputPosition = &tip.Position
status.nextOutputIndex++
deliverable = append(deliverable, tip)
updated = true
@@ -318,6 +347,30 @@ func (data *latticeData) addBlock(
return
}
+// addFinalizedBlock processes block for syncing internal data.
+func (data *latticeData) addFinalizedBlock(
+ block *types.Block) (err error) {
+ var bAck *types.Block
+ chain := data.chains[block.Position.ChainID]
+ if chain.tip != nil && chain.tip.Position.Height >=
+ block.Position.Height {
+ return
+ }
+ chain.nextOutputIndex = 0
+ chain.blocks = []*types.Block{}
+ chain.tip = block
+ chain.lastOutputPosition = nil
+ // Update lastAckPost.
+ for _, ack := range block.Acks {
+ if bAck, err = data.findBlock(ack); err != nil {
+ return
+ }
+ data.chains[bAck.Position.ChainID].lastAckPos[block.Position.ChainID] =
+ bAck.Position.Clone()
+ }
+ return
+}
+
// prepareBlock helps to setup fields of block based on its ChainID and Round,
// including:
// - Acks
@@ -524,6 +577,8 @@ type chainStatus struct {
lastAckPos []*types.Position
// the index to be output next time.
nextOutputIndex int
+ // the position of output last time.
+ lastOutputPosition *types.Position
}
// findBlock finds index of block in current pending blocks on this chain.
@@ -551,10 +606,9 @@ func (s *chainStatus) getBlock(idx int) (b *types.Block) {
}
// addBlock adds a block to pending blocks on this chain.
-func (s *chainStatus) addBlock(b *types.Block) error {
+func (s *chainStatus) addBlock(b *types.Block) {
s.blocks = append(s.blocks, b)
s.tip = b
- return nil
}
// TODO(mission): change back to nextHeight.
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice.go
index 3259f3540..68b05c2e6 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice.go
@@ -18,6 +18,7 @@
package core
import (
+ "fmt"
"sync"
"time"
@@ -26,6 +27,11 @@ import (
"github.com/dexon-foundation/dexon-consensus-core/core/types"
)
+// Errors for sanity check error.
+var (
+ ErrRetrySanityCheckLater = fmt.Errorf("retry sanity check later")
+)
+
// Lattice represents a unit to produce a global ordering from multiple chains.
type Lattice struct {
lock sync.RWMutex
@@ -34,6 +40,7 @@ type Lattice struct {
app Application
debug Debug
pool blockPool
+ retryAdd bool
data *latticeData
toModule *totalOrdering
ctModule *consensusTimestamp
@@ -137,10 +144,8 @@ func (s *Lattice) SanityCheck(b *types.Block) (err error) {
s.lock.RLock()
defer s.lock.RUnlock()
if err = s.data.sanityCheck(b); err != nil {
- // Add to block pool, once the lattice updated,
- // would be checked again.
- if err == ErrAckingBlockNotExists {
- s.pool.addBlock(b)
+ if _, ok := err.(*ErrAckingBlockNotExists); ok {
+ err = ErrRetrySanityCheckLater
}
s.logger.Error("Sanity Check failed", "error", err)
return
@@ -151,11 +156,67 @@ func (s *Lattice) SanityCheck(b *types.Block) (err error) {
}
// Verify data in application layer.
s.logger.Debug("Calling Application.VerifyBlock", "block", b)
- // TODO(jimmy-dexon): handle types.VerifyRetryLater.
- if s.app.VerifyBlock(b) == types.VerifyInvalidBlock {
+ switch s.app.VerifyBlock(b) {
+ case types.VerifyInvalidBlock:
err = ErrInvalidBlock
- return err
+ case types.VerifyRetryLater:
+ err = ErrRetrySanityCheckLater
+ }
+ return
+}
+
+// addBlockToLattice adds a block into lattice, and deliver blocks with the acks
+// already delivered.
+//
+// NOTE: assume the block passed sanity check.
+func (s *Lattice) addBlockToLattice(
+ input *types.Block) (outputBlocks []*types.Block, err error) {
+ if tip := s.data.chains[input.Position.ChainID].tip; tip != nil {
+ if !input.Position.Newer(&tip.Position) {
+ return
+ }
+ }
+ s.pool.addBlock(input)
+ // Replay tips in pool to check their validity.
+ for {
+ hasOutput := false
+ for i := uint32(0); i < s.chainNum; i++ {
+ var tip *types.Block
+ if tip = s.pool.tip(i); tip == nil {
+ continue
+ }
+ err = s.data.sanityCheck(tip)
+ if err == nil {
+ var output []*types.Block
+ if output, err = s.data.addBlock(tip); err != nil {
+ s.logger.Error("Sanity Check failed", "error", err)
+ continue
+ }
+ hasOutput = true
+ outputBlocks = append(outputBlocks, output...)
+ }
+ if _, ok := err.(*ErrAckingBlockNotExists); ok {
+ err = nil
+ continue
+ }
+ s.pool.removeTip(i)
+ }
+ if !hasOutput {
+ break
+ }
}
+
+ for _, b := range outputBlocks {
+ // TODO(jimmy-dexon): change this name of classic DEXON algorithm.
+ if s.debug != nil {
+ s.debug.StronglyAcked(b.Hash)
+ }
+ s.logger.Debug("Calling Application.BlockConfirmed", "block", input)
+ s.app.BlockConfirmed(*b.Clone())
+ // Purge blocks in pool with the same chainID and lower height.
+ s.pool.purgeBlocks(b.Position.ChainID, b.Position.Height)
+ }
+
return
}
@@ -165,45 +226,25 @@ func (s *Lattice) SanityCheck(b *types.Block) (err error) {
//
// NOTE: assume the block passed sanity check.
func (s *Lattice) ProcessBlock(
- input *types.Block) (verified, delivered []*types.Block, err error) {
-
+ input *types.Block) (delivered []*types.Block, err error) {
var (
- tip, b *types.Block
- toDelivered []*types.Block
+ b *types.Block
inLattice []*types.Block
+ toDelivered []*types.Block
deliveredMode uint32
)
+
s.lock.Lock()
defer s.lock.Unlock()
- if inLattice, err = s.data.addBlock(input); err != nil {
- // TODO(mission): if sanity check failed with "acking block doesn't
- // exists", we should keep it in a pool.
- s.logger.Error("Sanity Check failed when adding blocks", "error", err)
+
+ if inLattice, err = s.addBlockToLattice(input); err != nil {
return
}
- // TODO(mission): remove this hack, BA related stuffs should not
- // be done here.
- if s.debug != nil {
- s.debug.StronglyAcked(input.Hash)
- }
- s.logger.Debug("Calling Application.BlockConfirmed", "block", input)
- s.app.BlockConfirmed(*input.Clone())
- // Purge blocks in pool with the same chainID and lower height.
- s.pool.purgeBlocks(input.Position.ChainID, input.Position.Height)
- // Replay tips in pool to check their validity.
- for i := uint32(0); i < s.chainNum; i++ {
- if tip = s.pool.tip(i); tip == nil {
- continue
- }
- err = s.data.sanityCheck(tip)
- if err == nil {
- verified = append(verified, tip)
- }
- if err == ErrAckingBlockNotExists {
- continue
- }
- s.pool.removeTip(i)
+
+ if len(inLattice) == 0 {
+ return
}
+
// Perform total ordering for each block added to lattice.
for _, b = range inLattice {
toDelivered, deliveredMode, err = s.toModule.processBlock(b)
@@ -265,3 +306,14 @@ func (s *Lattice) AppendConfig(round uint64, config *types.Config) (err error) {
}
return
}
+
+// ProcessFinalizedBlock is used for syncing lattice data.
+func (s *Lattice) ProcessFinalizedBlock(input *types.Block) {
+ defer func() { s.retryAdd = true }()
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ if err := s.data.addFinalizedBlock(input); err != nil {
+ panic(err)
+ }
+ s.pool.purgeBlocks(input.Position.ChainID, input.Position.Height)
+}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/leader-selector.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/leader-selector.go
index 23b9bb12e..bfaa19c11 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/leader-selector.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/leader-selector.go
@@ -20,6 +20,7 @@ package core
import (
"fmt"
"math/big"
+ "sync"
"github.com/dexon-foundation/dexon-consensus-core/common"
"github.com/dexon-foundation/dexon-consensus-core/core/crypto"
@@ -31,6 +32,8 @@ var (
ErrIncorrectCRSSignature = fmt.Errorf("incorrect CRS signature")
)
+type validLeaderFn func(*types.Block) bool
+
// Some constant value.
var (
maxHash *big.Int
@@ -47,20 +50,24 @@ func init() {
}
type leaderSelector struct {
- hashCRS common.Hash
- numCRS *big.Int
- minCRSBlock *big.Int
- minBlockHash common.Hash
+ hashCRS common.Hash
+ numCRS *big.Int
+ minCRSBlock *big.Int
+ minBlockHash common.Hash
+ pendingBlocks []*types.Block
+ validLeader validLeaderFn
+ lock sync.Mutex
}
func newLeaderSelector(
- crs common.Hash) *leaderSelector {
+ crs common.Hash, validLeader validLeaderFn) *leaderSelector {
numCRS := big.NewInt(0)
numCRS.SetBytes(crs[:])
return &leaderSelector{
numCRS: numCRS,
hashCRS: crs,
minCRSBlock: maxHash,
+ validLeader: validLeader,
}
}
@@ -80,11 +87,25 @@ func (l *leaderSelector) probability(sig crypto.Signature) float64 {
}
func (l *leaderSelector) restart() {
+ l.lock.Lock()
+ defer l.lock.Unlock()
l.minCRSBlock = maxHash
l.minBlockHash = common.Hash{}
+ l.pendingBlocks = []*types.Block{}
}
func (l *leaderSelector) leaderBlockHash() common.Hash {
+ l.lock.Lock()
+ defer l.lock.Unlock()
+ newPendingBlocks := []*types.Block{}
+ for _, b := range l.pendingBlocks {
+ if l.validLeader(b) {
+ l.updateLeader(b)
+ } else {
+ newPendingBlocks = append(newPendingBlocks, b)
+ }
+ }
+ l.pendingBlocks = newPendingBlocks
return l.minBlockHash
}
@@ -96,11 +117,20 @@ func (l *leaderSelector) processBlock(block *types.Block) error {
if !ok {
return ErrIncorrectCRSSignature
}
+ l.lock.Lock()
+ defer l.lock.Unlock()
+ if !l.validLeader(block) {
+ l.pendingBlocks = append(l.pendingBlocks, block)
+ return nil
+ }
+ l.updateLeader(block)
+ return nil
+}
+func (l *leaderSelector) updateLeader(block *types.Block) {
dist := l.distance(block.CRSSignature)
cmp := l.minCRSBlock.Cmp(dist)
if cmp > 0 || (cmp == 0 && block.Hash.Less(l.minBlockHash)) {
l.minCRSBlock = dist
l.minBlockHash = block.Hash
}
- return nil
}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/block.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/block.go
index e12e0d5c7..67226927f 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/block.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/block.go
@@ -215,6 +215,7 @@ func (b *Block) Clone() (bcopy *Block) {
bcopy.ProposerID = b.ProposerID
bcopy.ParentHash = b.ParentHash
bcopy.Hash = b.Hash
+ bcopy.Position.Round = b.Position.Round
bcopy.Position.ChainID = b.Position.ChainID
bcopy.Position.Height = b.Position.Height
bcopy.Signature = b.Signature.Clone()
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/config.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/config.go
index 372ffb4da..df28b2055 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/config.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/config.go
@@ -46,6 +46,22 @@ type Config struct {
MaxBlockInterval time.Duration
}
+// Clone return a copied configuration.
+func (c *Config) Clone() *Config {
+ return &Config{
+ NumChains: c.NumChains,
+ LambdaBA: c.LambdaBA,
+ LambdaDKG: c.LambdaDKG,
+ K: c.K,
+ PhiRatio: c.PhiRatio,
+ NotarySetSize: c.NotarySetSize,
+ DKGSetSize: c.DKGSetSize,
+ RoundInterval: c.RoundInterval,
+ MinBlockInterval: c.MinBlockInterval,
+ MaxBlockInterval: c.MaxBlockInterval,
+ }
+}
+
// Bytes returns []byte representation of Config.
func (c *Config) Bytes() []byte {
binaryNumChains := make([]byte, 4)
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/position.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/position.go
index f41be324e..8e7e85298 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/position.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/types/position.go
@@ -18,15 +18,9 @@
package types
import (
- "errors"
"fmt"
)
-// ErrComparePositionOnDifferentChains raised when attempting to
-// compare two positions with different chain ID.
-var ErrComparePositionOnDifferentChains = errors.New(
- "position on different chain")
-
// Position describes the position in the block lattice of an entity.
type Position struct {
ChainID uint32 `json:"chain_id"`
@@ -42,7 +36,8 @@ func (pos *Position) String() string {
// are different.
func (pos *Position) Equal(other *Position) bool {
if pos.ChainID != other.ChainID {
- panic(ErrComparePositionOnDifferentChains)
+ panic(fmt.Errorf("unexpected chainID %d, should be %d",
+ other.ChainID, pos.ChainID))
}
return pos.Round == other.Round && pos.Height == other.Height
}
@@ -51,12 +46,24 @@ func (pos *Position) Equal(other *Position) bool {
// If two blocks on different chain compared by this function, it would panic.
func (pos *Position) Newer(other *Position) bool {
if pos.ChainID != other.ChainID {
- panic(ErrComparePositionOnDifferentChains)
+ panic(fmt.Errorf("unexpected chainID %d, should be %d",
+ other.ChainID, pos.ChainID))
}
return pos.Round > other.Round ||
(pos.Round == other.Round && pos.Height > other.Height)
}
+// Older checks if one block is older than another one on the same chain.
+// If two blocks on different chain compared by this function, it would panic.
+func (pos *Position) Older(other *Position) bool {
+ if pos.ChainID != other.ChainID {
+ panic(fmt.Errorf("unexpected chainID %d, should be %d",
+ other.ChainID, pos.ChainID))
+ }
+ return pos.Round < other.Round ||
+ (pos.Round == other.Round && pos.Height < other.Height)
+}
+
// Clone a position instance.
func (pos *Position) Clone() *Position {
return &Position{