aboutsummaryrefslogtreecommitdiffstats
path: root/core/consensus.go
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2019-03-15 18:17:53 +0800
committerWei-Ning Huang <w@dexon.org>2019-03-15 18:17:53 +0800
commitb02fa5ee430cff9dafc9d9c399099a88d554a083 (patch)
tree31f962f1e40e18a656693ebf38e8176b6e09c9f6 /core/consensus.go
parent6a127c42323b9b5cdde1cdb17e385d22ef9dfd10 (diff)
downloaddexon-consensus-b02fa5ee430cff9dafc9d9c399099a88d554a083.tar
dexon-consensus-b02fa5ee430cff9dafc9d9c399099a88d554a083.tar.gz
dexon-consensus-b02fa5ee430cff9dafc9d9c399099a88d554a083.tar.bz2
dexon-consensus-b02fa5ee430cff9dafc9d9c399099a88d554a083.tar.lz
dexon-consensus-b02fa5ee430cff9dafc9d9c399099a88d554a083.tar.xz
dexon-consensus-b02fa5ee430cff9dafc9d9c399099a88d554a083.tar.zst
dexon-consensus-b02fa5ee430cff9dafc9d9c399099a88d554a083.zip
core/syncer: add force sync (#468)
* core: Add Recovery Interface * core/syncer: modify recovery interface * core: fix Recovery interface * core/syncer: rename terminator to watchcat (#491) * core/syncer: rename terminator to watchcat * Add error log * Rename Pat to Feed * core/syncer: add force sync * run prepareRandomness if round >= DKGDelayRound * Add test for Forcsync
Diffstat (limited to 'core/consensus.go')
-rw-r--r--core/consensus.go141
1 files changed, 80 insertions, 61 deletions
diff --git a/core/consensus.go b/core/consensus.go
index 4201cbc..5ee64c2 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -453,6 +453,7 @@ func NewConsensusForSimulation(
func NewConsensusFromSyncer(
initBlock *types.Block,
initRoundBeginHeight uint64,
+ startWithEmpty bool,
dMoment time.Time,
app Application,
gov Governance,
@@ -495,6 +496,23 @@ func NewConsensusFromSyncer(
continue
}
}
+ if startWithEmpty {
+ pos := initBlock.Position
+ pos.Height++
+ block, err := con.bcModule.addEmptyBlock(pos)
+ if err != nil {
+ panic(err)
+ }
+ con.processBlockChan <- block
+ if pos.Round >= DKGDelayRound {
+ rand := &types.AgreementResult{
+ BlockHash: block.Hash,
+ Position: block.Position,
+ IsEmptyBlock: true,
+ }
+ go con.prepareRandomnessResult(rand)
+ }
+ }
return con, nil
}
@@ -863,6 +881,10 @@ func (con *Consensus) Stop() {
con.baMgr.stop()
con.event.Reset()
con.waitGroup.Wait()
+ if nbApp, ok := con.app.(*nonBlocking); ok {
+ fmt.Println("Stopping nonBlocking App")
+ nbApp.wait()
+ }
}
func (con *Consensus) deliverNetworkMsg() {
@@ -1014,62 +1036,64 @@ func (con *Consensus) ProcessAgreementResult(
con.logger.Debug("Rebroadcast AgreementResult",
"result", rand)
con.network.BroadcastAgreementResult(rand)
+ go con.prepareRandomnessResult(rand)
+ return nil
+}
- go func() {
- dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round)
- if err != nil {
- con.logger.Error("Failed to get dkg set",
- "round", rand.Position.Round, "error", err)
- return
- }
- if _, exist := dkgSet[con.ID]; !exist {
- return
- }
- psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash)
- if err != nil {
- con.logger.Error("Failed to prepare psig",
- "round", rand.Position.Round,
- "hash", rand.BlockHash.String()[:6],
- "error", err)
- return
- }
- if err = con.signer.SignDKGPartialSignature(psig); err != nil {
- con.logger.Error("Failed to sign psig",
- "hash", rand.BlockHash.String()[:6],
- "error", err)
- return
- }
- if err = con.cfgModule.processPartialSignature(psig); err != nil {
- con.logger.Error("Failed process psig",
+func (con *Consensus) prepareRandomnessResult(rand *types.AgreementResult) {
+ dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round)
+ if err != nil {
+ con.logger.Error("Failed to get dkg set",
+ "round", rand.Position.Round, "error", err)
+ return
+ }
+ if _, exist := dkgSet[con.ID]; !exist {
+ return
+ }
+ con.logger.Debug("PrepareRandomness", "round", rand.Position.Round, "hash", rand.BlockHash)
+ psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash)
+ if err != nil {
+ con.logger.Error("Failed to prepare psig",
+ "round", rand.Position.Round,
+ "hash", rand.BlockHash.String()[:6],
+ "error", err)
+ return
+ }
+ if err = con.signer.SignDKGPartialSignature(psig); err != nil {
+ con.logger.Error("Failed to sign psig",
+ "hash", rand.BlockHash.String()[:6],
+ "error", err)
+ return
+ }
+ if err = con.cfgModule.processPartialSignature(psig); err != nil {
+ con.logger.Error("Failed process psig",
+ "hash", rand.BlockHash.String()[:6],
+ "error", err)
+ return
+ }
+ con.logger.Debug("Calling Network.BroadcastDKGPartialSignature",
+ "proposer", psig.ProposerID,
+ "round", psig.Round,
+ "hash", psig.Hash.String()[:6])
+ con.network.BroadcastDKGPartialSignature(psig)
+ tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash)
+ if err != nil {
+ if err != ErrTSigAlreadyRunning {
+ con.logger.Error("Failed to run TSIG",
+ "position", rand.Position,
"hash", rand.BlockHash.String()[:6],
"error", err)
- return
- }
- con.logger.Debug("Calling Network.BroadcastDKGPartialSignature",
- "proposer", psig.ProposerID,
- "round", psig.Round,
- "hash", psig.Hash.String()[:6])
- con.network.BroadcastDKGPartialSignature(psig)
- tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash)
- if err != nil {
- if err != ErrTSigAlreadyRunning {
- con.logger.Error("Failed to run TSIG",
- "position", rand.Position,
- "hash", rand.BlockHash.String()[:6],
- "error", err)
- }
- return
}
- result := &types.BlockRandomnessResult{
- BlockHash: rand.BlockHash,
- Position: rand.Position,
- Randomness: tsig.Signature,
- }
- // ProcessBlockRandomnessResult is not thread-safe so we put the result in
- // the message channnel to be processed in the main thread.
- con.msgChan <- result
- }()
- return nil
+ return
+ }
+ result := &types.BlockRandomnessResult{
+ BlockHash: rand.BlockHash,
+ Position: rand.Position,
+ Randomness: tsig.Signature,
+ }
+ // ProcessBlockRandomnessResult is not thread-safe so we put the result in
+ // the message channnel to be processed in the main thread.
+ con.msgChan <- result
}
// ProcessBlockRandomnessResult processes the randomness result.
@@ -1094,14 +1118,6 @@ func (con *Consensus) ProcessBlockRandomnessResult(
// preProcessBlock performs Byzantine Agreement on the block.
func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
- var exist bool
- exist, err = con.nodeSetCache.Exists(b.Position.Round, b.ProposerID)
- if err != nil {
- return
- }
- if !exist {
- return ErrProposerNotInNodeSet
- }
err = con.baMgr.processBlock(b)
if err == nil && con.debugApp != nil {
con.debugApp.BlockReceived(b.Hash)
@@ -1137,7 +1153,10 @@ func (con *Consensus) deliveryGuard() {
defer con.waitGroup.Done()
time.Sleep(con.dMoment.Sub(time.Now()))
// Node takes time to start.
- time.Sleep(60 * time.Second)
+ select {
+ case <-con.ctx.Done():
+ case <-time.After(60 * time.Second):
+ }
for {
select {
case <-con.ctx.Done():