aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go')
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go533
1 files changed, 342 insertions, 191 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
index d74a4a290..4a95eac6f 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
@@ -27,6 +27,7 @@ import (
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
+ cryptoDKG "github.com/dexon-foundation/dexon-consensus/core/crypto/dkg"
"github.com/dexon-foundation/dexon-consensus/core/db"
"github.com/dexon-foundation/dexon-consensus/core/types"
typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
@@ -51,6 +52,10 @@ var (
"CRS not ready")
ErrConfigurationNotReady = fmt.Errorf(
"Configuration not ready")
+ ErrIncorrectBlockRandomness = fmt.Errorf(
+ "randomness of block is incorrect")
+ ErrCannotVerifyBlockRandomness = fmt.Errorf(
+ "cannot verify block randomness")
)
// consensusBAReceiver implements agreementReceiver.
@@ -60,8 +65,11 @@ type consensusBAReceiver struct {
agreementModule *agreement
changeNotaryHeightValue *atomic.Value
roundValue *atomic.Value
+ emptyBlockHashMap *sync.Map
isNotary bool
restartNotary chan types.Position
+ npks *typesDKG.NodePublicKeys
+ psigSigner *dkgShareSecret
}
func (recv *consensusBAReceiver) round() uint64 {
@@ -72,10 +80,85 @@ func (recv *consensusBAReceiver) changeNotaryHeight() uint64 {
return recv.changeNotaryHeightValue.Load().(uint64)
}
+func (recv *consensusBAReceiver) emptyBlockHash(pos types.Position) (
+ common.Hash, error) {
+ hashVal, ok := recv.emptyBlockHashMap.Load(pos)
+ if ok {
+ return hashVal.(common.Hash), nil
+ }
+ emptyBlock, err := recv.consensus.bcModule.prepareBlock(
+ pos, time.Time{}, true)
+ if err != nil {
+ return common.Hash{}, err
+ }
+ hash, err := utils.HashBlock(emptyBlock)
+ if err != nil {
+ return common.Hash{}, err
+ }
+ recv.emptyBlockHashMap.Store(pos, hash)
+ return hash, nil
+}
+
+func (recv *consensusBAReceiver) VerifyPartialSignature(vote *types.Vote) bool {
+ if recv.round() >= DKGDelayRound && vote.BlockHash != types.SkipBlockHash {
+ if vote.Type == types.VoteCom || vote.Type == types.VoteFastCom {
+ if recv.npks == nil || recv.npks.Round != vote.Position.Round {
+ var err error
+ recv.npks, _, err =
+ recv.consensus.cfgModule.getDKGInfo(vote.Position.Round, true)
+ if err != nil || recv.npks == nil {
+ recv.consensus.logger.Warn("cannot get npks",
+ "round", vote.Position.Round, "error", err)
+ return false
+ }
+ }
+ pubKey, exist := recv.npks.PublicKeys[vote.ProposerID]
+ if !exist {
+ return false
+ }
+ blockHash := vote.BlockHash
+ if blockHash == types.NullBlockHash {
+ var err error
+ blockHash, err = recv.emptyBlockHash(vote.Position)
+ if err != nil {
+ recv.consensus.logger.Error(
+ "Failed to verify vote for empty block",
+ "position", vote.Position,
+ "error", err)
+ return false
+ }
+ }
+ return pubKey.VerifySignature(
+ vote.BlockHash, crypto.Signature(vote.PartialSignature))
+ }
+ }
+ return len(vote.PartialSignature.Signature) == 0
+}
+
func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) {
if !recv.isNotary {
return
}
+ if recv.round() >= DKGDelayRound && vote.BlockHash != types.SkipBlockHash {
+ if vote.Type == types.VoteCom || vote.Type == types.VoteFastCom {
+ if recv.psigSigner == nil {
+ return
+ }
+ if vote.BlockHash == types.NullBlockHash {
+ hash, err := recv.emptyBlockHash(vote.Position)
+ if err != nil {
+ recv.consensus.logger.Error(
+ "Failed to propose vote for empty block",
+ "position", vote.Position,
+ "error", err)
+ return
+ }
+ vote.PartialSignature = recv.psigSigner.sign(hash)
+ } else {
+ vote.PartialSignature = recv.psigSigner.sign(vote.BlockHash)
+ }
+ }
+ }
if err := recv.agreementModule.prepareVote(vote); err != nil {
recv.consensus.logger.Error("Failed to prepare vote", "error", err)
return
@@ -120,6 +203,7 @@ func (recv *consensusBAReceiver) ConfirmBlock(
block *types.Block
aID = recv.agreementModule.agreementID()
)
+
isEmptyBlockConfirmed := hash == common.Hash{}
if isEmptyBlockConfirmed {
recv.consensus.logger.Info("Empty block is confirmed", "position", aID)
@@ -177,6 +261,72 @@ func (recv *consensusBAReceiver) ConfirmBlock(
return
}
}
+
+ // It's a workaround, the height for application is one-based.
+ block.Finalization.Height = block.Position.Height + 1
+
+ if len(votes) == 0 && len(block.Finalization.Randomness) == 0 {
+ recv.consensus.logger.Error("No votes to recover randomness",
+ "block", block)
+ } else if votes != nil {
+ voteList := make([]types.Vote, 0, len(votes))
+ IDs := make(cryptoDKG.IDs, 0, len(votes))
+ psigs := make([]cryptoDKG.PartialSignature, 0, len(votes))
+ for _, vote := range votes {
+ if vote.BlockHash != hash {
+ continue
+ }
+ if recv.round() >= DKGDelayRound {
+ ID, exist := recv.npks.IDMap[vote.ProposerID]
+ if !exist {
+ continue
+ }
+ IDs = append(IDs, ID)
+ psigs = append(psigs, vote.PartialSignature)
+ }
+ voteList = append(voteList, *vote)
+ }
+ if recv.round() >= DKGDelayRound {
+ rand, err := cryptoDKG.RecoverSignature(psigs, IDs)
+ if err != nil {
+ recv.consensus.logger.Warn("Unable to recover randomness",
+ "block", block,
+ "error", err)
+ } else {
+ block.Finalization.Randomness = rand.Signature[:]
+ }
+ }
+
+ if recv.isNotary {
+ result := &types.AgreementResult{
+ BlockHash: block.Hash,
+ Position: block.Position,
+ Votes: voteList,
+ FinalizationHeight: block.Finalization.Height,
+ IsEmptyBlock: isEmptyBlockConfirmed,
+ Randomness: block.Finalization.Randomness,
+ }
+ recv.consensus.logger.Debug("Broadcast AgreementResult",
+ "result", result)
+ recv.consensus.network.BroadcastAgreementResult(result)
+ if block.IsEmpty() {
+ if err :=
+ recv.consensus.bcModule.processAgreementResult(
+ result); err != nil {
+ recv.consensus.logger.Warn(
+ "Failed to process agreement result",
+ "result", result)
+ }
+ }
+ if block.Position.Round >= DKGDelayRound {
+ recv.consensus.logger.Debug(
+ "Broadcast finalized block",
+ "block", block)
+ recv.consensus.network.BroadcastBlock(block)
+ }
+ }
+ }
+
if block.Position.Height != 0 &&
!recv.consensus.bcModule.confirmed(block.Position.Height-1) {
go func(hash common.Hash) {
@@ -212,6 +362,11 @@ func (recv *consensusBAReceiver) ConfirmBlock(
recv.consensus.logger.Info("Receive parent block",
"parent-hash", block.ParentHash.String()[:6],
"cur-position", block.Position)
+ if block.Finalization.Height == 0 {
+ // TODO(jimmy): use a seperate message to pull finalized
+ // block. Here, we pull it again as workaround.
+ continue
+ }
recv.consensus.processBlockChan <- block
parentHash = block.ParentHash
if block.Position.Height == 0 ||
@@ -222,25 +377,9 @@ func (recv *consensusBAReceiver) ConfirmBlock(
}
}(block.ParentHash)
}
- if recv.isNotary {
- voteList := make([]types.Vote, 0, len(votes))
- for _, vote := range votes {
- if vote.BlockHash != hash {
- continue
- }
- voteList = append(voteList, *vote)
- }
- result := &types.AgreementResult{
- BlockHash: block.Hash,
- Position: block.Position,
- Votes: voteList,
- IsEmptyBlock: isEmptyBlockConfirmed,
- }
- recv.consensus.logger.Debug("Propose AgreementResult",
- "result", result)
- recv.consensus.network.BroadcastAgreementResult(result)
+ if !block.IsEmpty() {
+ recv.consensus.processBlockChan <- block
}
- recv.consensus.processBlockChan <- block
// Clean the restartNotary channel so BA will not stuck by deadlock.
CleanChannelLoop:
for {
@@ -253,14 +392,14 @@ CleanChannelLoop:
newPos := block.Position
if block.Position.Height+1 == recv.changeNotaryHeight() {
newPos.Round++
- recv.roundValue.Store(newPos.Round)
+ recv.updateRound(newPos.Round)
}
currentRound := recv.round()
changeNotaryHeight := recv.changeNotaryHeight()
if block.Position.Height > changeNotaryHeight &&
block.Position.Round <= currentRound {
panic(fmt.Errorf(
- "round not switch when confirmig: %s, %d, should switch at %d, %s",
+ "round not switch when confirming: %s, %d, should switch at %d, %s",
block, currentRound, changeNotaryHeight, newPos))
}
recv.restartNotary <- newPos
@@ -282,6 +421,18 @@ func (recv *consensusBAReceiver) ReportForkBlock(b1, b2 *types.Block) {
recv.consensus.gov.ReportForkBlock(b1, b2)
}
+func (recv *consensusBAReceiver) updateRound(round uint64) {
+ recv.roundValue.Store(round)
+ var err error
+ _, recv.psigSigner, err =
+ recv.consensus.cfgModule.getDKGInfo(round, false)
+ if err != nil {
+ recv.consensus.logger.Warn("cannot get dkg info",
+ "round", round, "error", err)
+ recv.psigSigner = nil
+ }
+}
+
// consensusDKGReceiver implements dkgReceiver.
type consensusDKGReceiver struct {
ID types.NodeID
@@ -401,13 +552,13 @@ type Consensus struct {
bcModule *blockChain
dMoment time.Time
nodeSetCache *utils.NodeSetCache
+ tsigVerifierCache *TSigVerifierCache
lock sync.RWMutex
ctx context.Context
ctxCancel context.CancelFunc
event *common.Event
roundEvent *utils.RoundEvent
logger common.Logger
- resetRandomnessTicker chan struct{}
resetDeliveryGuardTicker chan struct{}
msgChan chan interface{}
waitGroup sync.WaitGroup
@@ -465,7 +616,6 @@ func NewConsensusFromSyncer(
networkModule Network,
prv crypto.PrivateKey,
confirmedBlocks []*types.Block,
- randomnessResults []*types.BlockRandomnessResult,
cachedMessages []interface{},
logger common.Logger) (*Consensus, error) {
// Setup Consensus instance.
@@ -492,30 +642,13 @@ func NewConsensusFromSyncer(
}
refBlock = b
}
- // Dump all randomness result to the consensus instance.
- for _, r := range randomnessResults {
- if err := con.ProcessBlockRandomnessResult(r, false); err != nil {
- con.logger.Error("failed to process randomness result when syncing",
- "result", r)
- continue
- }
- }
if startWithEmpty {
pos := initBlock.Position
pos.Height++
- block, err := con.bcModule.addEmptyBlock(pos)
+ _, err := con.bcModule.addEmptyBlock(pos)
if err != nil {
panic(err)
}
- con.processBlockChan <- block
- if pos.Round >= DKGDelayRound {
- rand := &types.AgreementResult{
- BlockHash: block.Hash,
- Position: block.Position,
- IsEmptyBlock: true,
- }
- go con.prepareRandomnessResult(rand)
- }
}
return con, nil
}
@@ -566,8 +699,9 @@ func newConsensusForRound(
if usingNonBlocking {
appModule = newNonBlocking(app, debugApp)
}
+ tsigVerifierCache := NewTSigVerifierCache(gov, 7)
bcModule := newBlockChain(ID, dMoment, initBlock, appModule,
- NewTSigVerifierCache(gov, 7), signer, logger)
+ tsigVerifierCache, signer, logger)
// Construct Consensus instance.
con := &Consensus{
ID: ID,
@@ -582,10 +716,10 @@ func newConsensusForRound(
bcModule: bcModule,
dMoment: dMoment,
nodeSetCache: nodeSetCache,
+ tsigVerifierCache: tsigVerifierCache,
signer: signer,
event: common.NewEvent(),
logger: logger,
- resetRandomnessTicker: make(chan struct{}),
resetDeliveryGuardTicker: make(chan struct{}),
msgChan: make(chan interface{}, 1024),
processBlockChan: make(chan *types.Block, 1024),
@@ -600,7 +734,7 @@ func newConsensusForRound(
baConfig := agreementMgrConfig{}
baConfig.from(initRound, initConfig, initCRS)
baConfig.SetRoundBeginHeight(gov.GetRoundHeight(initRound))
- con.baMgr, err = newAgreementMgr(con, initRound, baConfig)
+ con.baMgr, err = newAgreementMgr(con, baConfig)
if err != nil {
panic(err)
}
@@ -690,14 +824,14 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) {
if nextRound < DKGDelayRound {
return
}
- curDKGSet, err := con.nodeSetCache.GetDKGSet(e.Round)
+ curNotarySet, err := con.nodeSetCache.GetNotarySet(e.Round)
if err != nil {
- con.logger.Error("Error getting DKG set when proposing CRS",
+ con.logger.Error("Error getting notary set when proposing CRS",
"round", e.Round,
"error", err)
return
}
- if _, exist := curDKGSet[con.ID]; !exist {
+ if _, exist := curNotarySet[con.ID]; !exist {
return
}
isDKGValid := func() bool {
@@ -733,18 +867,18 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) {
// Register round event handler to propose new CRS.
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
// We don't have to propose new CRS during DKG reset, the reset of DKG
- // would be done by the DKG set in previous round.
+ // would be done by the notary set in previous round.
e := evts[len(evts)-1]
defer elapse("propose-CRS", e)()
if e.Reset != 0 || e.Round < DKGDelayRound {
return
}
- if curDkgSet, err := con.nodeSetCache.GetDKGSet(e.Round); err != nil {
- con.logger.Error("Error getting DKG set when proposing CRS",
+ if curNotarySet, err := con.nodeSetCache.GetNotarySet(e.Round); err != nil {
+ con.logger.Error("Error getting notary set when proposing CRS",
"round", e.Round,
"error", err)
} else {
- if _, exist := curDkgSet[con.ID]; !exist {
+ if _, exist := curNotarySet[con.ID]; !exist {
return
}
con.event.RegisterHeight(e.NextCRSProposingHeight(), func(uint64) {
@@ -809,26 +943,26 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) {
// of unexpected network fluctuation and ensure the robustness.
if !checkWithCancel(
con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
- con.logger.Debug("unable to prepare CRS for DKG set",
+ con.logger.Debug("unable to prepare CRS for notary set",
"round", nextRound,
"reset", e.Reset)
return
}
- nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound)
+ nextNotarySet, err := con.nodeSetCache.GetNotarySet(nextRound)
if err != nil {
- con.logger.Error("Error getting DKG set for next round",
+ con.logger.Error("Error getting notary set for next round",
"round", nextRound,
"reset", e.Reset,
"error", err)
return
}
- if _, exist := nextDkgSet[con.ID]; !exist {
- con.logger.Info("Not selected as DKG set",
+ if _, exist := nextNotarySet[con.ID]; !exist {
+ con.logger.Info("Not selected as notary set",
"round", nextRound,
"reset", e.Reset)
return
}
- con.logger.Info("Selected as DKG set",
+ con.logger.Info("Selected as notary set",
"round", nextRound,
"reset", e.Reset)
nextConfig := utils.GetConfigWithPanic(con.gov, nextRound,
@@ -851,11 +985,14 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) {
if initBlock != nil {
con.event.NotifyHeight(initBlock.Finalization.Height)
}
+ con.baMgr.prepare()
return
}
// Run starts running DEXON Consensus.
func (con *Consensus) Run() {
+ // There may have emptys block in blockchain added by force sync.
+ blocksWithoutRandomness := con.bcModule.pendingBlocksWithoutRandomness()
// Launch BA routines.
con.baMgr.run()
// Launch network handler.
@@ -865,12 +1002,6 @@ func (con *Consensus) Run() {
con.waitGroup.Add(1)
go con.processMsg()
go con.processBlockLoop()
- // Sleep until dMoment come.
- time.Sleep(con.dMoment.Sub(time.Now().UTC()))
- // Take some time to bootstrap.
- time.Sleep(3 * time.Second)
- con.waitGroup.Add(1)
- go con.pullRandomness()
// Stop dummy receiver if launched.
if con.dummyCancel != nil {
con.logger.Trace("Stop dummy receiver")
@@ -893,6 +1024,11 @@ func (con *Consensus) Run() {
}
con.logger.Trace("Finish dumping cached messages")
}
+ con.generateBlockRandomness(blocksWithoutRandomness)
+ // Sleep until dMoment come.
+ time.Sleep(con.dMoment.Sub(time.Now().UTC()))
+ // Take some time to bootstrap.
+ time.Sleep(3 * time.Second)
con.waitGroup.Add(1)
go con.deliveryGuard()
// Block until done.
@@ -901,6 +1037,76 @@ func (con *Consensus) Run() {
}
}
+func (con *Consensus) generateBlockRandomness(blocks []*types.Block) {
+ con.logger.Debug("Start generating block randomness", "blocks", blocks)
+ isNotarySet := make(map[uint64]bool)
+ for _, block := range blocks {
+ if block.Position.Round < DKGDelayRound {
+ continue
+ }
+ doRun, exist := isNotarySet[block.Position.Round]
+ if !exist {
+ curNotarySet, err := con.nodeSetCache.GetNotarySet(block.Position.Round)
+ if err != nil {
+ con.logger.Error("Error getting notary set when generate block tsig",
+ "round", block.Position.Round,
+ "error", err)
+ continue
+ }
+ _, exist := curNotarySet[con.ID]
+ isNotarySet[block.Position.Round] = exist
+ doRun = exist
+ }
+ if !doRun {
+ continue
+ }
+ go func(block *types.Block) {
+ psig, err := con.cfgModule.preparePartialSignature(
+ block.Position.Round, block.Hash)
+ if err != nil {
+ con.logger.Error("Failed to prepare partial signature",
+ "block", block,
+ "error", err)
+ } else if err = con.signer.SignDKGPartialSignature(psig); err != nil {
+ con.logger.Error("Failed to sign DKG partial signature",
+ "block", block,
+ "error", err)
+ } else if err = con.cfgModule.processPartialSignature(psig); err != nil {
+ con.logger.Error("Failed to process partial signature",
+ "block", block,
+ "error", err)
+ } else {
+ con.logger.Debug("Calling Network.BroadcastDKGPartialSignature",
+ "proposer", psig.ProposerID,
+ "block", block)
+ con.network.BroadcastDKGPartialSignature(psig)
+ sig, err := con.cfgModule.runTSig(block.Position.Round, block.Hash)
+ if err != nil {
+ con.logger.Error("Failed to run Block Tsig",
+ "block", block,
+ "error", err)
+ return
+ }
+ result := &types.AgreementResult{
+ BlockHash: block.Hash,
+ Position: block.Position,
+ Randomness: sig.Signature[:],
+ }
+ if err := con.bcModule.processAgreementResult(result); err != nil {
+ con.logger.Error("Failed to process BlockRandomness",
+ "result", result,
+ "error", err)
+ return
+ }
+ con.logger.Debug("Broadcast BlockRandomness",
+ "block", block,
+ "result", result)
+ con.network.BroadcastAgreementResult(result)
+ }
+ }(block)
+ }
+}
+
// runDKG starts running DKG protocol.
func (con *Consensus) runDKG(round, reset uint64, config *types.Config) {
con.dkgReady.L.Lock()
@@ -964,7 +1170,6 @@ func (con *Consensus) Stop() {
con.event.Reset()
con.waitGroup.Wait()
if nbApp, ok := con.app.(*nonBlocking); ok {
- fmt.Println("Stopping nonBlocking App")
nbApp.wait()
}
}
@@ -1019,11 +1224,47 @@ MessageLoop:
ch, e := con.baConfirmedBlock[val.Hash]
return ch, e
}(); exist {
- if err := utils.VerifyBlockSignature(val); err != nil {
- con.logger.Error("VerifyBlockSignature failed",
- "block", val,
- "error", err)
- continue MessageLoop
+ if val.IsEmpty() {
+ hash, err := utils.HashBlock(val)
+ if err != nil {
+ con.logger.Error("error verifying empty block hash",
+ "block", val,
+ "error, err")
+ continue MessageLoop
+ }
+ if hash != val.Hash {
+ con.logger.Error("incorrect confirmed empty block hash",
+ "block", val,
+ "hash", hash)
+ continue MessageLoop
+ }
+ if _, err := con.bcModule.proposeBlock(
+ val.Position, time.Time{}, true); err != nil {
+ con.logger.Error("error adding empty block",
+ "block", val,
+ "error", err)
+ continue MessageLoop
+ }
+ } else {
+ ok, err := con.bcModule.verifyRandomness(
+ val.Hash, val.Position.Round, val.Finalization.Randomness)
+ if err != nil {
+ con.logger.Error("error verifying confirmed block randomness",
+ "block", val,
+ "error", err)
+ continue MessageLoop
+ }
+ if !ok {
+ con.logger.Error("incorrect confirmed block randomness",
+ "block", val)
+ continue MessageLoop
+ }
+ if err := utils.VerifyBlockSignature(val); err != nil {
+ con.logger.Error("VerifyBlockSignature failed",
+ "block", val,
+ "error", err)
+ continue MessageLoop
+ }
}
func() {
con.lock.Lock()
@@ -1036,7 +1277,6 @@ MessageLoop:
ch <- val
}()
} else if val.IsFinalized() {
- // For sync mode.
if err := con.processFinalizedBlock(val); err != nil {
con.logger.Error("Failed to process finalized block",
"block", val,
@@ -1061,13 +1301,6 @@ MessageLoop:
"result", val,
"error", err)
}
- case *types.BlockRandomnessResult:
- if err := con.ProcessBlockRandomnessResult(val, true); err != nil {
- con.logger.Error("Failed to process block randomness result",
- "hash", val.BlockHash.String()[:6],
- "position", val.Position,
- "error", err)
- }
case *typesDKG.PrivateShare:
if err := con.cfgModule.processPrivateShare(val); err != nil {
con.logger.Error("Failed to process private share",
@@ -1101,139 +1334,67 @@ func (con *Consensus) ProcessAgreementResult(
con.baMgr.untouchAgreementResult(rand)
return err
}
+ if err := con.bcModule.processAgreementResult(rand); err != nil {
+ con.baMgr.untouchAgreementResult(rand)
+ return err
+ }
// Syncing BA Module.
if err := con.baMgr.processAgreementResult(rand); err != nil {
+ con.baMgr.untouchAgreementResult(rand)
return err
}
- // Calculating randomness.
- if rand.Position.Round == 0 {
- return nil
- }
- // TODO(mission): find a way to avoid spamming by older agreement results.
- // Sanity check done.
- if !con.cfgModule.touchTSigHash(rand.BlockHash) {
- return nil
- }
con.logger.Debug("Rebroadcast AgreementResult",
"result", rand)
con.network.BroadcastAgreementResult(rand)
- go con.prepareRandomnessResult(rand)
- return nil
+
+ return con.deliverFinalizedBlocks()
}
-func (con *Consensus) prepareRandomnessResult(rand *types.AgreementResult) {
- dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round)
- if err != nil {
- con.logger.Error("Failed to get dkg set",
- "round", rand.Position.Round, "error", err)
- return
+// preProcessBlock performs Byzantine Agreement on the block.
+func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
+ err = con.baMgr.processBlock(b)
+ if err == nil && con.debugApp != nil {
+ con.debugApp.BlockReceived(b.Hash)
}
- if _, exist := dkgSet[con.ID]; !exist {
+ return
+}
+
+func (con *Consensus) processFinalizedBlock(b *types.Block) (err error) {
+ if b.Position.Round < DKGDelayRound {
return
}
- con.logger.Debug("PrepareRandomness", "round", rand.Position.Round, "hash", rand.BlockHash)
- psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash)
- if err != nil {
- con.logger.Error("Failed to prepare psig",
- "round", rand.Position.Round,
- "hash", rand.BlockHash.String()[:6],
- "error", err)
+ if err = utils.VerifyBlockSignature(b); err != nil {
return
}
- if err = con.signer.SignDKGPartialSignature(psig); err != nil {
- con.logger.Error("Failed to sign psig",
- "hash", rand.BlockHash.String()[:6],
- "error", err)
+ verifier, ok, err := con.tsigVerifierCache.UpdateAndGet(b.Position.Round)
+ if err != nil {
return
}
- if err = con.cfgModule.processPartialSignature(psig); err != nil {
- con.logger.Error("Failed process psig",
- "hash", rand.BlockHash.String()[:6],
- "error", err)
+ if !ok {
+ err = ErrCannotVerifyBlockRandomness
return
}
- con.logger.Debug("Calling Network.BroadcastDKGPartialSignature",
- "proposer", psig.ProposerID,
- "round", psig.Round,
- "hash", psig.Hash.String()[:6])
- con.network.BroadcastDKGPartialSignature(psig)
- tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash)
- if err != nil {
- if err != ErrTSigAlreadyRunning {
- con.logger.Error("Failed to run TSIG",
- "position", rand.Position,
- "hash", rand.BlockHash.String()[:6],
- "error", err)
- }
+ if !verifier.VerifySignature(b.Hash, crypto.Signature{
+ Type: "bls",
+ Signature: b.Finalization.Randomness,
+ }) {
+ err = ErrIncorrectBlockRandomness
return
}
- result := &types.BlockRandomnessResult{
- BlockHash: rand.BlockHash,
- Position: rand.Position,
- Randomness: tsig.Signature,
- }
- // ProcessBlockRandomnessResult is not thread-safe so we put the result in
- // the message channnel to be processed in the main thread.
- con.msgChan <- result
-}
-
-// ProcessBlockRandomnessResult processes the randomness result.
-func (con *Consensus) ProcessBlockRandomnessResult(
- rand *types.BlockRandomnessResult, needBroadcast bool) error {
- if rand.Position.Round == 0 {
- return nil
- }
- if !con.bcModule.shouldAddRandomness(rand) {
- return nil
- }
- if err := con.bcModule.addRandomness(rand); err != nil {
- return err
- }
- if needBroadcast {
- con.logger.Debug("Calling Network.BroadcastRandomnessResult",
- "randomness", rand)
- con.network.BroadcastRandomnessResult(rand)
- }
- return con.deliverFinalizedBlocks()
-}
-
-// preProcessBlock performs Byzantine Agreement on the block.
-func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
- err = con.baMgr.processBlock(b)
+ err = con.baMgr.processFinalizedBlock(b)
if err == nil && con.debugApp != nil {
con.debugApp.BlockReceived(b.Hash)
}
return
}
-func (con *Consensus) pullRandomness() {
- defer con.waitGroup.Done()
- for {
- select {
- case <-con.ctx.Done():
- return
- default:
- }
- select {
- case <-con.ctx.Done():
- return
- case <-con.resetRandomnessTicker:
- case <-time.After(1500 * time.Millisecond):
- // TODO(jimmy): pulling period should be related to lambdaBA.
- hashes := con.bcModule.pendingBlocksWithoutRandomness()
- if len(hashes) > 0 {
- con.logger.Debug(
- "Calling Network.PullRandomness", "blocks", hashes)
- con.network.PullRandomness(hashes)
- }
- }
- }
-}
-
func (con *Consensus) deliveryGuard() {
defer con.waitGroup.Done()
- time.Sleep(con.dMoment.Sub(time.Now()))
+ select {
+ case <-con.ctx.Done():
+ case <-time.After(con.dMoment.Sub(time.Now())):
+ }
// Node takes time to start.
select {
case <-con.ctx.Done():
@@ -1259,10 +1420,6 @@ func (con *Consensus) deliveryGuard() {
// deliverBlock deliver a block to application layer.
func (con *Consensus) deliverBlock(b *types.Block) {
select {
- case con.resetRandomnessTicker <- struct{}{}:
- default:
- }
- select {
case con.resetDeliveryGuardTicker <- struct{}{}:
default:
}
@@ -1274,7 +1431,6 @@ func (con *Consensus) deliverBlock(b *types.Block) {
b.Hash, b.Finalization.Height); err != nil {
panic(err)
}
- con.cfgModule.untouchTSigHash(b.Hash)
con.logger.Debug("Calling Application.BlockDelivered", "block", b)
con.app.BlockDelivered(b.Hash, b.Position, b.Finalization.Clone())
if con.debugApp != nil {
@@ -1338,15 +1494,10 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
return
}
-// processFinalizedBlock is the entry point for handling finalized blocks.
-func (con *Consensus) processFinalizedBlock(block *types.Block) error {
- return con.bcModule.processFinalizedBlock(block)
-}
-
// PrepareBlock would setup header fields of block based on its ProposerID.
func (con *Consensus) proposeBlock(position types.Position) (
*types.Block, error) {
- b, err := con.bcModule.proposeBlock(position, time.Now().UTC())
+ b, err := con.bcModule.proposeBlock(position, time.Now().UTC(), false)
if err != nil {
return nil, err
}