aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go')
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go332
1 files changed, 127 insertions, 205 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
index f46501038..f4c0a372d 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
@@ -57,10 +57,9 @@ var (
// consensusBAReceiver implements agreementReceiver.
type consensusBAReceiver struct {
- // TODO(mission): consensus would be replaced by lattice and network.
+ // TODO(mission): consensus would be replaced by blockChain and network.
consensus *Consensus
agreementModule *agreement
- chainID uint32
changeNotaryTime time.Time
roundValue *atomic.Value
isNotary bool
@@ -96,9 +95,9 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash {
if !recv.isNotary {
return common.Hash{}
}
- block := recv.consensus.proposeBlock(recv.chainID, recv.round())
- if block == nil {
- recv.consensus.logger.Error("unable to propose block")
+ block, err := recv.consensus.proposeBlock(recv.agreementModule.agreementID())
+ if err != nil || block == nil {
+ recv.consensus.logger.Error("unable to propose block", "error", err)
return types.NullBlockHash
}
go func() {
@@ -115,25 +114,29 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash {
func (recv *consensusBAReceiver) ConfirmBlock(
hash common.Hash, votes map[types.NodeID]*types.Vote) {
- var block *types.Block
+ var (
+ block *types.Block
+ aID = recv.agreementModule.agreementID()
+ )
isEmptyBlockConfirmed := hash == common.Hash{}
if isEmptyBlockConfirmed {
- aID := recv.agreementModule.agreementID()
- recv.consensus.logger.Info("Empty block is confirmed",
- "position", &aID)
+ recv.consensus.logger.Info("Empty block is confirmed", "position", aID)
var err error
- block, err = recv.consensus.proposeEmptyBlock(recv.round(), recv.chainID)
+ block, err = recv.consensus.bcModule.addEmptyBlock(aID)
if err != nil {
- recv.consensus.logger.Error("Propose empty block failed", "error", err)
+ recv.consensus.logger.Error("Add position for empty failed",
+ "error", err)
return
}
+ if block == nil {
+ panic(fmt.Errorf("empty block should be proposed directly: %s", aID))
+ }
} else {
var exist bool
block, exist = recv.agreementModule.findBlockNoLock(hash)
if !exist {
recv.consensus.logger.Error("Unknown block confirmed",
- "hash", hash.String()[:6],
- "chainID", recv.chainID)
+ "hash", hash.String()[:6])
ch := make(chan *types.Block)
func() {
recv.consensus.lock.Lock()
@@ -155,8 +158,7 @@ func (recv *consensusBAReceiver) ConfirmBlock(
}
recv.consensus.logger.Info("Receive unknown block",
"hash", hash.String()[:6],
- "position", &block.Position,
- "chainID", recv.chainID)
+ "position", block.Position)
recv.agreementModule.addCandidateBlock(block)
recv.agreementModule.lock.Lock()
defer recv.agreementModule.lock.Unlock()
@@ -165,15 +167,14 @@ func (recv *consensusBAReceiver) ConfirmBlock(
return
}
}
- recv.consensus.ccModule.registerBlock(block)
if block.Position.Height != 0 &&
- !recv.consensus.lattice.Exist(block.ParentHash) {
+ !recv.consensus.bcModule.confirmed(block.Position.Height-1) {
go func(hash common.Hash) {
parentHash := hash
for {
recv.consensus.logger.Warn("Parent block not confirmed",
"parent-hash", parentHash.String()[:6],
- "cur-position", &block.Position)
+ "cur-position", block.Position)
ch := make(chan *types.Block)
if !func() bool {
recv.consensus.lock.Lock()
@@ -200,13 +201,12 @@ func (recv *consensusBAReceiver) ConfirmBlock(
}
recv.consensus.logger.Info("Receive parent block",
"parent-hash", block.ParentHash.String()[:6],
- "cur-position", &block.Position,
- "chainID", recv.chainID)
- recv.consensus.ccModule.registerBlock(block)
+ "cur-position", block.Position)
recv.consensus.processBlockChan <- block
parentHash = block.ParentHash
if block.Position.Height == 0 ||
- recv.consensus.lattice.Exist(parentHash) {
+ recv.consensus.bcModule.confirmed(
+ block.Position.Height-1) {
return
}
}
@@ -372,11 +372,6 @@ type Consensus struct {
dkgReady *sync.Cond
cfgModule *configurationChain
- // Dexon consensus v1's modules.
- lattice *Lattice
- ccModule *compactionChain
- toSyncer *totalOrderingSyncer
-
// Interfaces.
db db.Database
app Application
@@ -385,21 +380,21 @@ type Consensus struct {
network Network
// Misc.
- dMoment time.Time
- nodeSetCache *utils.NodeSetCache
- round uint64
- roundForNewConfig uint64
- lock sync.RWMutex
- ctx context.Context
- ctxCancel context.CancelFunc
- event *common.Event
- logger common.Logger
- nonFinalizedBlockDelivered bool
- resetRandomnessTicker chan struct{}
- resetDeliveryGuardTicker chan struct{}
- msgChan chan interface{}
- waitGroup sync.WaitGroup
- processBlockChan chan *types.Block
+ bcModule *blockChain
+ dMoment time.Time
+ nodeSetCache *utils.NodeSetCache
+ round uint64
+ roundForNewConfig uint64
+ lock sync.RWMutex
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ event *common.Event
+ logger common.Logger
+ resetRandomnessTicker chan struct{}
+ resetDeliveryGuardTicker chan struct{}
+ msgChan chan interface{}
+ waitGroup sync.WaitGroup
+ processBlockChan chan *types.Block
// Context of Dummy receiver during switching from syncer.
dummyCancel context.CancelFunc
@@ -417,7 +412,7 @@ func NewConsensus(
prv crypto.PrivateKey,
logger common.Logger) *Consensus {
return newConsensusForRound(
- &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, true)
+ nil, dMoment, app, gov, db, network, prv, logger, true)
}
// NewConsensusForSimulation creates an instance of Consensus for simulation,
@@ -431,7 +426,7 @@ func NewConsensusForSimulation(
prv crypto.PrivateKey,
logger common.Logger) *Consensus {
return newConsensusForRound(
- &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, false)
+ nil, dMoment, app, gov, db, network, prv, logger, false)
}
// NewConsensusFromSyncer constructs an Consensus instance from information
@@ -451,14 +446,13 @@ func NewConsensusFromSyncer(
db db.Database,
networkModule Network,
prv crypto.PrivateKey,
- latticeModule *Lattice,
- confirmedBlocks [][]*types.Block,
+ confirmedBlocks []*types.Block,
randomnessResults []*types.BlockRandomnessResult,
cachedMessages []interface{},
logger common.Logger) (*Consensus, error) {
// Setup Consensus instance.
con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db,
- networkModule, prv, logger, latticeModule, true)
+ networkModule, prv, logger, true)
// Launch a dummy receiver before we start receiving from network module.
con.dummyMsgBuffer = cachedMessages
con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
@@ -467,29 +461,18 @@ func NewConsensusFromSyncer(
})
// Dump all BA-confirmed blocks to the consensus instance, make sure these
// added blocks forming a DAG.
- for {
- updated := false
- for idx, bs := range confirmedBlocks {
- for bIdx, b := range bs {
- // Only when its parent block is already added to lattice, we can
- // then add this block. If not, our pulling mechanism would stop at
- // the block we added, and lost its parent block forever.
- if !latticeModule.Exist(b.ParentHash) {
- logger.Debug("Skip discontinuous confirmed block",
- "from", b,
- "until", bs[len(bs)-1])
- confirmedBlocks[idx] = bs[bIdx:]
- break
- }
- con.ccModule.registerBlock(b)
- if err := con.processBlock(b); err != nil {
- return nil, err
- }
- }
- }
- if !updated {
+ refBlock := initBlock
+ for _, b := range confirmedBlocks {
+ // Only when its parent block is already added to lattice, we can
+ // then add this block. If not, our pulling mechanism would stop at
+ // the block we added, and lost its parent block forever.
+ if b.Position.Height != refBlock.Position.Height+1 {
break
}
+ if err := con.processBlock(b); err != nil {
+ return nil, err
+ }
+ refBlock = b
}
// Dump all randomness result to the consensus instance.
for _, r := range randomnessResults {
@@ -502,7 +485,7 @@ func NewConsensusFromSyncer(
return con, nil
}
-// newConsensus creates a Consensus instance.
+// newConsensusForRound creates a Consensus instance.
func newConsensusForRound(
initBlock *types.Block,
initRoundBeginTime time.Time,
@@ -512,9 +495,7 @@ func newConsensusForRound(
network Network,
prv crypto.PrivateKey,
logger common.Logger,
- latticeModule *Lattice,
usingNonBlocking bool) *Consensus {
-
// TODO(w): load latest blockHeight from DB, and use config at that height.
nodeSetCache := utils.NewNodeSetCache(gov)
// Setup signer module.
@@ -525,13 +506,11 @@ func newConsensusForRound(
debugApp = a
}
// Get configuration for bootstrap round.
- initRound := initBlock.Position.Round
- initConfig := utils.GetConfigWithPanic(gov, initRound, logger)
- // Init lattice.
- if latticeModule == nil {
- latticeModule = NewLattice(initRoundBeginTime, initRound, initConfig,
- signer, app, debugApp, db, logger)
+ initRound := uint64(0)
+ if initBlock != nil {
+ initRound = initBlock.Position.Round
}
+ initConfig := utils.GetConfigWithPanic(gov, initRound, logger)
// Init configuration chain.
ID := types.NewNodeID(prv.PublicKey())
recv := &consensusDKGReceiver{
@@ -548,11 +527,14 @@ func newConsensusForRound(
if usingNonBlocking {
appModule = newNonBlocking(app, debugApp)
}
+ bcConfig := blockChainConfig{}
+ bcConfig.fromConfig(initRound, initConfig)
+ bcConfig.setRoundBeginTime(initRoundBeginTime)
+ bcModule := newBlockChain(ID, initBlock, bcConfig, appModule,
+ NewTSigVerifierCache(gov, 7), signer, logger)
// Construct Consensus instance.
con := &Consensus{
ID: ID,
- ccModule: newCompactionChain(gov),
- lattice: latticeModule,
app: appModule,
debugApp: debugApp,
gov: gov,
@@ -561,6 +543,7 @@ func newConsensusForRound(
baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
dkgReady: sync.NewCond(&sync.Mutex{}),
cfgModule: cfgModule,
+ bcModule: bcModule,
dMoment: initRoundBeginTime,
nodeSetCache: nodeSetCache,
signer: signer,
@@ -581,52 +564,56 @@ func newConsensusForRound(
// prepare the Consensus instance to be ready for blocks after 'initBlock'.
// 'initBlock' could be either:
-// - an empty block
+// - nil
// - the last finalized block
-func (con *Consensus) prepare(initBlock *types.Block) error {
+func (con *Consensus) prepare(initBlock *types.Block) (err error) {
// The block past from full node should be delivered already or known by
// full node. We don't have to notify it.
- con.roundForNewConfig = initBlock.Position.Round + 1
- initRound := initBlock.Position.Round
+ initRound := uint64(0)
+ if initBlock != nil {
+ initRound = initBlock.Position.Round
+ }
+ con.roundForNewConfig = initRound + 1
initConfig := utils.GetConfigWithPanic(con.gov, initRound, con.logger)
// Setup context.
- con.ccModule.init(initBlock)
con.logger.Debug("Calling Governance.CRS", "round", initRound)
initCRS := con.gov.CRS(initRound)
if (initCRS == common.Hash{}) {
- return ErrCRSNotReady
+ err = ErrCRSNotReady
+ return
}
- if err := con.baMgr.appendConfig(initRound, initConfig, initCRS); err != nil {
- return err
+ if err = con.baMgr.appendConfig(initRound, initConfig, initCRS); err != nil {
+ return
}
- // Setup lattice module.
+ // Setup blockChain module.
initPlusOneCfg := utils.GetConfigWithPanic(con.gov, initRound+1, con.logger)
- if err := con.lattice.AppendConfig(initRound+1, initPlusOneCfg); err != nil {
- if err == ErrRoundNotIncreasing {
- err = nil
- } else {
+ if err = con.bcModule.appendConfig(initRound+1, initPlusOneCfg); err != nil {
+ return
+ }
+ if initRound == 0 {
+ dkgSet, err := con.nodeSetCache.GetDKGSet(initRound)
+ if err != nil {
return err
}
+ if _, exist := dkgSet[con.ID]; exist {
+ con.logger.Info("Selected as DKG set", "round", initRound)
+ go func() {
+ // Sleep until dMoment come.
+ time.Sleep(con.dMoment.Sub(time.Now().UTC()))
+ // Network is not stable upon starting. Wait some time to ensure first
+ // DKG would success. Three is a magic number.
+ time.Sleep(initConfig.MinBlockInterval * 3)
+ con.cfgModule.registerDKG(initRound, getDKGThreshold(initConfig))
+ con.event.RegisterTime(con.dMoment.Add(initConfig.RoundInterval/4),
+ func(time.Time) {
+ con.runDKG(initRound, initConfig)
+ })
+ }()
+ }
}
// Register events.
- dkgSet, err := con.nodeSetCache.GetDKGSet(initRound)
- if err != nil {
- return err
- }
- if _, exist := dkgSet[con.ID]; exist {
- con.logger.Info("Selected as DKG set", "round", initRound)
- go func() {
- // Sleep until dMoment come.
- time.Sleep(con.dMoment.Sub(time.Now().UTC()))
- con.cfgModule.registerDKG(initRound, getDKGThreshold(initConfig))
- con.event.RegisterTime(con.dMoment.Add(initConfig.RoundInterval/4),
- func(time.Time) {
- con.runDKG(initRound, initConfig)
- })
- }()
- }
con.initialRound(con.dMoment, initRound, initConfig)
- return nil
+ return
}
// Run starts running DEXON Consensus.
@@ -847,7 +834,7 @@ func (con *Consensus) initialRound(
})
}(round + 1)
})
- // Prepare lattice module for next round and next "initialRound" routine.
+ // Prepare blockChain module for next round and next "initialRound" routine.
con.event.RegisterTime(startTime.Add(config.RoundInterval),
func(time.Time) {
// Change round.
@@ -917,7 +904,7 @@ MessageLoop:
ch, e := con.baConfirmedBlock[val.Hash]
return ch, e
}(); exist {
- if err := con.lattice.SanityCheck(val, false); err != nil {
+ if err := con.bcModule.sanityCheck(val); err != nil {
if err == ErrRetrySanityCheckLater {
err = nil
} else {
@@ -965,7 +952,7 @@ MessageLoop:
if err := con.ProcessBlockRandomnessResult(val, true); err != nil {
con.logger.Error("Failed to process block randomness result",
"hash", val.BlockHash.String()[:6],
- "position", &val.Position,
+ "position", val.Position,
"error", err)
}
case *typesDKG.PrivateShare:
@@ -983,34 +970,6 @@ MessageLoop:
}
}
-func (con *Consensus) proposeBlock(chainID uint32, round uint64) *types.Block {
- block := &types.Block{
- Position: types.Position{
- ChainID: chainID,
- Round: round,
- },
- }
- if err := con.prepareBlock(block, time.Now().UTC()); err != nil {
- con.logger.Error("Failed to prepare block", "error", err)
- return nil
- }
- return block
-}
-
-func (con *Consensus) proposeEmptyBlock(
- round uint64, chainID uint32) (*types.Block, error) {
- block := &types.Block{
- Position: types.Position{
- Round: round,
- ChainID: chainID,
- },
- }
- if err := con.lattice.PrepareEmptyBlock(block); err != nil {
- return nil, err
- }
- return block, nil
-}
-
// ProcessVote is the entry point to submit ont vote to a Consensus instance.
func (con *Consensus) ProcessVote(vote *types.Vote) (err error) {
v := vote.Clone()
@@ -1024,14 +983,11 @@ func (con *Consensus) ProcessAgreementResult(
if !con.baMgr.touchAgreementResult(rand) {
return nil
}
-
// Sanity Check.
if err := VerifyAgreementResult(rand, con.nodeSetCache); err != nil {
con.baMgr.untouchAgreementResult(rand)
return err
}
- con.lattice.AddShallowBlock(rand.BlockHash, rand.Position)
-
// Syncing BA Module.
if err := con.baMgr.processAgreementResult(rand); err != nil {
return err
@@ -1089,7 +1045,7 @@ func (con *Consensus) ProcessAgreementResult(
if err != nil {
if err != ErrTSigAlreadyRunning {
con.logger.Error("Failed to run TSIG",
- "position", &rand.Position,
+ "position", rand.Position,
"hash", rand.BlockHash.String()[:6],
"error", err)
}
@@ -1113,15 +1069,8 @@ func (con *Consensus) ProcessBlockRandomnessResult(
if rand.Position.Round == 0 {
return nil
}
- if !con.ccModule.touchBlockRandomnessResult(rand) {
- return nil
- }
- if err := con.ccModule.processBlockRandomnessResult(rand); err != nil {
- if err == ErrBlockNotRegistered {
- err = nil
- } else {
- return err
- }
+ if err := con.bcModule.addRandomness(rand); err != nil {
+ return err
}
if needBroadcast {
con.logger.Debug("Calling Network.BroadcastRandomnessResult",
@@ -1154,7 +1103,7 @@ func (con *Consensus) pullRandomness() {
case <-con.resetRandomnessTicker:
case <-time.After(1500 * time.Millisecond):
// TODO(jimmy): pulling period should be related to lambdaBA.
- hashes := con.ccModule.pendingBlocksWithoutRandomness()
+ hashes := con.bcModule.pendingBlocksWithoutRandomness()
if len(hashes) > 0 {
con.logger.Debug(
"Calling Network.PullRandomness", "blocks", hashes)
@@ -1196,7 +1145,8 @@ func (con *Consensus) deliverBlock(b *types.Block) {
case con.resetDeliveryGuardTicker <- struct{}{}:
default:
}
- if err := con.db.UpdateBlock(*b); err != nil {
+ // TODO(mission): do we need to put block when confirmed now?
+ if err := con.db.PutBlock(*b); err != nil {
panic(err)
}
if err := con.db.PutCompactionChainTipInfo(
@@ -1209,13 +1159,13 @@ func (con *Consensus) deliverBlock(b *types.Block) {
if b.Position.Round == con.roundForNewConfig {
// Get configuration for the round next to next round. Configuration
// for that round should be ready at this moment and is required for
- // lattice module. This logic is related to:
+ // blockChain module. This logic is related to:
// - roundShift
// - notifyGenesisRound
futureRound := con.roundForNewConfig + 1
futureConfig := utils.GetConfigWithPanic(con.gov, futureRound, con.logger)
con.logger.Debug("Append Config", "round", futureRound)
- if err := con.lattice.AppendConfig(
+ if err := con.bcModule.appendConfig(
futureRound, futureConfig); err != nil {
con.logger.Debug("Unable to append config",
"round", futureRound,
@@ -1238,14 +1188,14 @@ func (con *Consensus) deliverFinalizedBlocks() error {
}
func (con *Consensus) deliverFinalizedBlocksWithoutLock() (err error) {
- deliveredBlocks := con.ccModule.extractBlocks()
+ deliveredBlocks := con.bcModule.extractBlocks()
con.logger.Debug("Last blocks in compaction chain",
- "delivered", con.ccModule.lastDeliveredBlock(),
- "pending", con.ccModule.lastPendingBlock())
+ "delivered", con.bcModule.lastDeliveredBlock(),
+ "pending", con.bcModule.lastPendingBlock())
for _, b := range deliveredBlocks {
con.deliverBlock(b)
+ go con.event.NotifyTime(b.Finalization.Timestamp)
}
- err = con.lattice.PurgeBlocks(deliveredBlocks)
return
}
@@ -1271,34 +1221,14 @@ func (con *Consensus) processBlockLoop() {
// processBlock is the entry point to submit one block to a Consensus instance.
func (con *Consensus) processBlock(block *types.Block) (err error) {
+ // Block processed by blockChain can be out-of-order. But the output from
+ // blockChain (deliveredBlocks) cannot, thus we need to protect the part
+ // below with writer lock.
con.lock.Lock()
defer con.lock.Unlock()
- // Block processed by lattice can be out-of-order. But the output of lattice
- // (deliveredBlocks) cannot.
- deliveredBlocks, err := con.lattice.ProcessBlock(block)
- if err != nil {
+ if err = con.bcModule.addBlock(block); err != nil {
return
}
- // Pass delivered blocks to compaction chain.
- for _, b := range deliveredBlocks {
- if b.IsFinalized() {
- if con.nonFinalizedBlockDelivered {
- panic(fmt.Errorf("attempting to skip finalized block: %s", b))
- }
- con.logger.Debug("skip delivery of finalized block",
- "block", b,
- "finalization-height", b.Finalization.Height)
- continue
- } else {
- // Mark that some non-finalized block delivered. After this flag
- // turned on, it's not allowed to deliver finalized blocks anymore.
- con.nonFinalizedBlockDelivered = true
- }
- if err = con.ccModule.processBlock(b); err != nil {
- return
- }
- go con.event.NotifyTime(b.Finalization.Timestamp)
- }
if err = con.deliverFinalizedBlocksWithoutLock(); err != nil {
return
}
@@ -1307,36 +1237,28 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
// processFinalizedBlock is the entry point for handling finalized blocks.
func (con *Consensus) processFinalizedBlock(block *types.Block) error {
- return con.ccModule.processFinalizedBlock(block)
+ return con.bcModule.processFinalizedBlock(block)
}
// PrepareBlock would setup header fields of block based on its ProposerID.
-func (con *Consensus) prepareBlock(b *types.Block,
- proposeTime time.Time) (err error) {
- if err = con.lattice.PrepareBlock(b, proposeTime); err != nil {
- return
+func (con *Consensus) proposeBlock(position types.Position) (
+ *types.Block, error) {
+ b, err := con.bcModule.proposeBlock(position, time.Now().UTC())
+ if err != nil {
+ return nil, err
}
con.logger.Debug("Calling Governance.CRS", "round", b.Position.Round)
crs := con.gov.CRS(b.Position.Round)
if crs.Equal(common.Hash{}) {
con.logger.Error("CRS for round is not ready, unable to prepare block",
"position", &b.Position)
- err = ErrCRSNotReady
- return
+ return nil, ErrCRSNotReady
}
- err = con.signer.SignCRS(b, crs)
- return
-}
-
-// PrepareGenesisBlock would setup header fields for genesis block.
-func (con *Consensus) PrepareGenesisBlock(b *types.Block,
- proposeTime time.Time) (err error) {
- if err = con.prepareBlock(b, proposeTime); err != nil {
- return
+ if err = con.signer.SignCRS(b, crs); err != nil {
+ return nil, err
}
- if len(b.Payload) != 0 {
- err = ErrGenesisBlockNotEmpty
- return
+ if b.IsGenesis() && len(b.Payload) != 0 {
+ return nil, ErrGenesisBlockNotEmpty
}
- return
+ return b, nil
}