aboutsummaryrefslogtreecommitdiffstats
path: root/core/consensus.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/consensus.go')
-rw-r--r--core/consensus.go383
1 files changed, 181 insertions, 202 deletions
diff --git a/core/consensus.go b/core/consensus.go
index d74a4a2..2b0d5a4 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -27,6 +27,7 @@ import (
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
+ cryptoDKG "github.com/dexon-foundation/dexon-consensus/core/crypto/dkg"
"github.com/dexon-foundation/dexon-consensus/core/db"
"github.com/dexon-foundation/dexon-consensus/core/types"
typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
@@ -51,6 +52,8 @@ var (
"CRS not ready")
ErrConfigurationNotReady = fmt.Errorf(
"Configuration not ready")
+ ErrIncorrectBlockRandomness = fmt.Errorf(
+ "randomness of block is incorrect")
)
// consensusBAReceiver implements agreementReceiver.
@@ -62,6 +65,8 @@ type consensusBAReceiver struct {
roundValue *atomic.Value
isNotary bool
restartNotary chan types.Position
+ npks *typesDKG.NodePublicKeys
+ psigSigner *dkgShareSecret
}
func (recv *consensusBAReceiver) round() uint64 {
@@ -72,10 +77,51 @@ func (recv *consensusBAReceiver) changeNotaryHeight() uint64 {
return recv.changeNotaryHeightValue.Load().(uint64)
}
+func (recv *consensusBAReceiver) VerifyPartialSignature(vote *types.Vote) bool {
+ if recv.round() >= DKGDelayRound && vote.BlockHash != types.SkipBlockHash {
+ if vote.Type == types.VoteCom || vote.Type == types.VoteFastCom {
+ if recv.npks == nil || recv.npks.Round != vote.Position.Round {
+ var err error
+ recv.npks, _, err =
+ recv.consensus.cfgModule.getDKGInfo(vote.Position.Round, true)
+ if err != nil || recv.npks == nil {
+ recv.consensus.logger.Warn("cannot get npks",
+ "round", vote.Position.Round, "error", err)
+ return false
+ }
+ }
+ pubKey, exist := recv.npks.PublicKeys[vote.ProposerID]
+ if !exist {
+ return false
+ }
+ blockHash := vote.BlockHash
+ if blockHash == types.NullBlockHash {
+ blockHash = utils.HashPosition(vote.Position)
+ }
+ return pubKey.VerifySignature(
+ vote.BlockHash, crypto.Signature(vote.PartialSignature))
+ }
+ }
+ return len(vote.PartialSignature.Signature) == 0
+}
+
func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) {
if !recv.isNotary {
return
}
+ if recv.round() >= DKGDelayRound && vote.BlockHash != types.SkipBlockHash {
+ if vote.Type == types.VoteCom || vote.Type == types.VoteFastCom {
+ if recv.psigSigner == nil {
+ return
+ }
+ if vote.BlockHash == types.NullBlockHash {
+ vote.PartialSignature = recv.psigSigner.sign(
+ utils.HashPosition(vote.Position))
+ } else {
+ vote.PartialSignature = recv.psigSigner.sign(vote.BlockHash)
+ }
+ }
+ }
if err := recv.agreementModule.prepareVote(vote); err != nil {
recv.consensus.logger.Error("Failed to prepare vote", "error", err)
return
@@ -120,6 +166,7 @@ func (recv *consensusBAReceiver) ConfirmBlock(
block *types.Block
aID = recv.agreementModule.agreementID()
)
+
isEmptyBlockConfirmed := hash == common.Hash{}
if isEmptyBlockConfirmed {
recv.consensus.logger.Info("Empty block is confirmed", "position", aID)
@@ -177,6 +224,47 @@ func (recv *consensusBAReceiver) ConfirmBlock(
return
}
}
+
+ voteList := make([]types.Vote, 0, len(votes))
+ IDs := make(cryptoDKG.IDs, 0, len(votes))
+ psigs := make([]cryptoDKG.PartialSignature, 0, len(votes))
+ for _, vote := range votes {
+ if vote.BlockHash != hash {
+ continue
+ }
+ if recv.round() >= DKGDelayRound {
+ ID, exist := recv.npks.IDMap[vote.ProposerID]
+ if !exist {
+ continue
+ }
+ IDs = append(IDs, ID)
+ psigs = append(psigs, vote.PartialSignature)
+ }
+ voteList = append(voteList, *vote)
+ }
+ if recv.round() >= DKGDelayRound {
+ rand, err := cryptoDKG.RecoverSignature(psigs, IDs)
+ if err != nil {
+ recv.consensus.logger.Warn("Unable to recover randomness",
+ "block", block,
+ "error", err)
+ } else {
+ block.Finalization.Randomness = rand.Signature[:]
+ }
+ }
+ if recv.isNotary {
+ result := &types.AgreementResult{
+ BlockHash: block.Hash,
+ Position: block.Position,
+ Votes: voteList,
+ Randomness: block.Finalization.Randomness,
+ IsEmptyBlock: isEmptyBlockConfirmed,
+ }
+ recv.consensus.logger.Debug("Propose AgreementResult",
+ "result", result)
+ recv.consensus.msgChan <- result
+ }
+
if block.Position.Height != 0 &&
!recv.consensus.bcModule.confirmed(block.Position.Height-1) {
go func(hash common.Hash) {
@@ -222,25 +310,9 @@ func (recv *consensusBAReceiver) ConfirmBlock(
}
}(block.ParentHash)
}
- if recv.isNotary {
- voteList := make([]types.Vote, 0, len(votes))
- for _, vote := range votes {
- if vote.BlockHash != hash {
- continue
- }
- voteList = append(voteList, *vote)
- }
- result := &types.AgreementResult{
- BlockHash: block.Hash,
- Position: block.Position,
- Votes: voteList,
- IsEmptyBlock: isEmptyBlockConfirmed,
- }
- recv.consensus.logger.Debug("Propose AgreementResult",
- "result", result)
- recv.consensus.network.BroadcastAgreementResult(result)
+ if !block.IsEmpty() {
+ recv.consensus.processBlockChan <- block
}
- recv.consensus.processBlockChan <- block
// Clean the restartNotary channel so BA will not stuck by deadlock.
CleanChannelLoop:
for {
@@ -253,7 +325,7 @@ CleanChannelLoop:
newPos := block.Position
if block.Position.Height+1 == recv.changeNotaryHeight() {
newPos.Round++
- recv.roundValue.Store(newPos.Round)
+ recv.updateRound(newPos.Round)
}
currentRound := recv.round()
changeNotaryHeight := recv.changeNotaryHeight()
@@ -282,6 +354,18 @@ func (recv *consensusBAReceiver) ReportForkBlock(b1, b2 *types.Block) {
recv.consensus.gov.ReportForkBlock(b1, b2)
}
+func (recv *consensusBAReceiver) updateRound(round uint64) {
+ recv.roundValue.Store(round)
+ var err error
+ _, recv.psigSigner, err =
+ recv.consensus.cfgModule.getDKGInfo(round, false)
+ if err != nil {
+ recv.consensus.logger.Warn("cannot get dkg info",
+ "round", round, "error", err)
+ recv.psigSigner = nil
+ }
+}
+
// consensusDKGReceiver implements dkgReceiver.
type consensusDKGReceiver struct {
ID types.NodeID
@@ -401,13 +485,13 @@ type Consensus struct {
bcModule *blockChain
dMoment time.Time
nodeSetCache *utils.NodeSetCache
+ tsigVerifierCache *TSigVerifierCache
lock sync.RWMutex
ctx context.Context
ctxCancel context.CancelFunc
event *common.Event
roundEvent *utils.RoundEvent
logger common.Logger
- resetRandomnessTicker chan struct{}
resetDeliveryGuardTicker chan struct{}
msgChan chan interface{}
waitGroup sync.WaitGroup
@@ -465,7 +549,6 @@ func NewConsensusFromSyncer(
networkModule Network,
prv crypto.PrivateKey,
confirmedBlocks []*types.Block,
- randomnessResults []*types.BlockRandomnessResult,
cachedMessages []interface{},
logger common.Logger) (*Consensus, error) {
// Setup Consensus instance.
@@ -492,30 +575,13 @@ func NewConsensusFromSyncer(
}
refBlock = b
}
- // Dump all randomness result to the consensus instance.
- for _, r := range randomnessResults {
- if err := con.ProcessBlockRandomnessResult(r, false); err != nil {
- con.logger.Error("failed to process randomness result when syncing",
- "result", r)
- continue
- }
- }
if startWithEmpty {
pos := initBlock.Position
pos.Height++
- block, err := con.bcModule.addEmptyBlock(pos)
+ _, err := con.bcModule.addEmptyBlock(pos)
if err != nil {
panic(err)
}
- con.processBlockChan <- block
- if pos.Round >= DKGDelayRound {
- rand := &types.AgreementResult{
- BlockHash: block.Hash,
- Position: block.Position,
- IsEmptyBlock: true,
- }
- go con.prepareRandomnessResult(rand)
- }
}
return con, nil
}
@@ -566,8 +632,9 @@ func newConsensusForRound(
if usingNonBlocking {
appModule = newNonBlocking(app, debugApp)
}
+ tsigVerifierCache := NewTSigVerifierCache(gov, 7)
bcModule := newBlockChain(ID, dMoment, initBlock, appModule,
- NewTSigVerifierCache(gov, 7), signer, logger)
+ tsigVerifierCache, signer, logger)
// Construct Consensus instance.
con := &Consensus{
ID: ID,
@@ -582,10 +649,10 @@ func newConsensusForRound(
bcModule: bcModule,
dMoment: dMoment,
nodeSetCache: nodeSetCache,
+ tsigVerifierCache: tsigVerifierCache,
signer: signer,
event: common.NewEvent(),
logger: logger,
- resetRandomnessTicker: make(chan struct{}),
resetDeliveryGuardTicker: make(chan struct{}),
msgChan: make(chan interface{}, 1024),
processBlockChan: make(chan *types.Block, 1024),
@@ -690,14 +757,14 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) {
if nextRound < DKGDelayRound {
return
}
- curDKGSet, err := con.nodeSetCache.GetDKGSet(e.Round)
+ curNotarySet, err := con.nodeSetCache.GetNotarySet(e.Round)
if err != nil {
- con.logger.Error("Error getting DKG set when proposing CRS",
+ con.logger.Error("Error getting notary set when proposing CRS",
"round", e.Round,
"error", err)
return
}
- if _, exist := curDKGSet[con.ID]; !exist {
+ if _, exist := curNotarySet[con.ID]; !exist {
return
}
isDKGValid := func() bool {
@@ -733,18 +800,18 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) {
// Register round event handler to propose new CRS.
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
// We don't have to propose new CRS during DKG reset, the reset of DKG
- // would be done by the DKG set in previous round.
+ // would be done by the notary set in previous round.
e := evts[len(evts)-1]
defer elapse("propose-CRS", e)()
if e.Reset != 0 || e.Round < DKGDelayRound {
return
}
- if curDkgSet, err := con.nodeSetCache.GetDKGSet(e.Round); err != nil {
- con.logger.Error("Error getting DKG set when proposing CRS",
+ if curNotarySet, err := con.nodeSetCache.GetNotarySet(e.Round); err != nil {
+ con.logger.Error("Error getting notary set when proposing CRS",
"round", e.Round,
"error", err)
} else {
- if _, exist := curDkgSet[con.ID]; !exist {
+ if _, exist := curNotarySet[con.ID]; !exist {
return
}
con.event.RegisterHeight(e.NextCRSProposingHeight(), func(uint64) {
@@ -809,26 +876,26 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) {
// of unexpected network fluctuation and ensure the robustness.
if !checkWithCancel(
con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
- con.logger.Debug("unable to prepare CRS for DKG set",
+ con.logger.Debug("unable to prepare CRS for notary set",
"round", nextRound,
"reset", e.Reset)
return
}
- nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound)
+ nextNotarySet, err := con.nodeSetCache.GetNotarySet(nextRound)
if err != nil {
- con.logger.Error("Error getting DKG set for next round",
+ con.logger.Error("Error getting notary set for next round",
"round", nextRound,
"reset", e.Reset,
"error", err)
return
}
- if _, exist := nextDkgSet[con.ID]; !exist {
- con.logger.Info("Not selected as DKG set",
+ if _, exist := nextNotarySet[con.ID]; !exist {
+ con.logger.Info("Not selected as notary set",
"round", nextRound,
"reset", e.Reset)
return
}
- con.logger.Info("Selected as DKG set",
+ con.logger.Info("Selected as notary set",
"round", nextRound,
"reset", e.Reset)
nextConfig := utils.GetConfigWithPanic(con.gov, nextRound,
@@ -865,12 +932,6 @@ func (con *Consensus) Run() {
con.waitGroup.Add(1)
go con.processMsg()
go con.processBlockLoop()
- // Sleep until dMoment come.
- time.Sleep(con.dMoment.Sub(time.Now().UTC()))
- // Take some time to bootstrap.
- time.Sleep(3 * time.Second)
- con.waitGroup.Add(1)
- go con.pullRandomness()
// Stop dummy receiver if launched.
if con.dummyCancel != nil {
con.logger.Trace("Stop dummy receiver")
@@ -893,6 +954,10 @@ func (con *Consensus) Run() {
}
con.logger.Trace("Finish dumping cached messages")
}
+ // Sleep until dMoment come.
+ time.Sleep(con.dMoment.Sub(time.Now().UTC()))
+ // Take some time to bootstrap.
+ time.Sleep(3 * time.Second)
con.waitGroup.Add(1)
go con.deliveryGuard()
// Block until done.
@@ -964,7 +1029,6 @@ func (con *Consensus) Stop() {
con.event.Reset()
con.waitGroup.Wait()
if nbApp, ok := con.app.(*nonBlocking); ok {
- fmt.Println("Stopping nonBlocking App")
nbApp.wait()
}
}
@@ -1019,11 +1083,47 @@ MessageLoop:
ch, e := con.baConfirmedBlock[val.Hash]
return ch, e
}(); exist {
- if err := utils.VerifyBlockSignature(val); err != nil {
- con.logger.Error("VerifyBlockSignature failed",
- "block", val,
- "error", err)
- continue MessageLoop
+ if val.IsEmpty() {
+ hash, err := utils.HashBlock(val)
+ if err != nil {
+ con.logger.Error("error verifying empty block hash",
+ "block", val,
+ "error, err")
+ continue MessageLoop
+ }
+ if hash != val.Hash {
+ con.logger.Error("incorrect confirmed empty block hash",
+ "block", val,
+ "hash", hash)
+ continue MessageLoop
+ }
+ if _, err := con.bcModule.proposeBlock(
+ val.Position, time.Time{}, true); err != nil {
+ con.logger.Error("error adding empty block",
+ "block", val,
+ "error", err)
+ continue MessageLoop
+ }
+ } else {
+ ok, err := con.bcModule.verifyRandomness(
+ val.Hash, val.Position.Round, val.Finalization.Randomness)
+ if err != nil {
+ con.logger.Error("error verifying confirmed block randomness",
+ "block", val,
+ "error", err)
+ continue MessageLoop
+ }
+ if !ok {
+ con.logger.Error("incorrect confirmed block randomness",
+ "block", val)
+ continue MessageLoop
+ }
+ if err := utils.VerifyBlockSignature(val); err != nil {
+ con.logger.Error("VerifyBlockSignature failed",
+ "block", val,
+ "error", err)
+ continue MessageLoop
+ }
}
func() {
con.lock.Lock()
@@ -1035,13 +1135,6 @@ MessageLoop:
delete(con.baConfirmedBlock, val.Hash)
ch <- val
}()
- } else if val.IsFinalized() {
- // For sync mode.
- if err := con.processFinalizedBlock(val); err != nil {
- con.logger.Error("Failed to process finalized block",
- "block", val,
- "error", err)
- }
} else {
if err := con.preProcessBlock(val); err != nil {
con.logger.Error("Failed to pre process block",
@@ -1061,13 +1154,6 @@ MessageLoop:
"result", val,
"error", err)
}
- case *types.BlockRandomnessResult:
- if err := con.ProcessBlockRandomnessResult(val, true); err != nil {
- con.logger.Error("Failed to process block randomness result",
- "hash", val.BlockHash.String()[:6],
- "position", val.Position,
- "error", err)
- }
case *typesDKG.PrivateShare:
if err := con.cfgModule.processPrivateShare(val); err != nil {
con.logger.Error("Failed to process private share",
@@ -1096,6 +1182,15 @@ func (con *Consensus) ProcessAgreementResult(
if !con.baMgr.touchAgreementResult(rand) {
return nil
}
+ // TODO(jimmy): merge tsig check to VerifyAgreementResult
+ ok, err := con.bcModule.verifyRandomness(
+ rand.BlockHash, rand.Position.Round, rand.Randomness)
+ if err != nil {
+ return err
+ }
+ if !ok {
+ return ErrIncorrectBlockRandomness
+ }
// Sanity Check.
if err := VerifyAgreementResult(rand, con.nodeSetCache); err != nil {
con.baMgr.untouchAgreementResult(rand)
@@ -1105,96 +1200,11 @@ func (con *Consensus) ProcessAgreementResult(
if err := con.baMgr.processAgreementResult(rand); err != nil {
return err
}
- // Calculating randomness.
- if rand.Position.Round == 0 {
- return nil
- }
- // TODO(mission): find a way to avoid spamming by older agreement results.
- // Sanity check done.
- if !con.cfgModule.touchTSigHash(rand.BlockHash) {
- return nil
- }
- con.logger.Debug("Rebroadcast AgreementResult",
+ con.logger.Debug("Broadcast AgreementResult",
"result", rand)
con.network.BroadcastAgreementResult(rand)
- go con.prepareRandomnessResult(rand)
- return nil
-}
-func (con *Consensus) prepareRandomnessResult(rand *types.AgreementResult) {
- dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round)
- if err != nil {
- con.logger.Error("Failed to get dkg set",
- "round", rand.Position.Round, "error", err)
- return
- }
- if _, exist := dkgSet[con.ID]; !exist {
- return
- }
- con.logger.Debug("PrepareRandomness", "round", rand.Position.Round, "hash", rand.BlockHash)
- psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash)
- if err != nil {
- con.logger.Error("Failed to prepare psig",
- "round", rand.Position.Round,
- "hash", rand.BlockHash.String()[:6],
- "error", err)
- return
- }
- if err = con.signer.SignDKGPartialSignature(psig); err != nil {
- con.logger.Error("Failed to sign psig",
- "hash", rand.BlockHash.String()[:6],
- "error", err)
- return
- }
- if err = con.cfgModule.processPartialSignature(psig); err != nil {
- con.logger.Error("Failed process psig",
- "hash", rand.BlockHash.String()[:6],
- "error", err)
- return
- }
- con.logger.Debug("Calling Network.BroadcastDKGPartialSignature",
- "proposer", psig.ProposerID,
- "round", psig.Round,
- "hash", psig.Hash.String()[:6])
- con.network.BroadcastDKGPartialSignature(psig)
- tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash)
- if err != nil {
- if err != ErrTSigAlreadyRunning {
- con.logger.Error("Failed to run TSIG",
- "position", rand.Position,
- "hash", rand.BlockHash.String()[:6],
- "error", err)
- }
- return
- }
- result := &types.BlockRandomnessResult{
- BlockHash: rand.BlockHash,
- Position: rand.Position,
- Randomness: tsig.Signature,
- }
- // ProcessBlockRandomnessResult is not thread-safe so we put the result in
- // the message channnel to be processed in the main thread.
- con.msgChan <- result
-}
-
-// ProcessBlockRandomnessResult processes the randomness result.
-func (con *Consensus) ProcessBlockRandomnessResult(
- rand *types.BlockRandomnessResult, needBroadcast bool) error {
- if rand.Position.Round == 0 {
- return nil
- }
- if !con.bcModule.shouldAddRandomness(rand) {
- return nil
- }
- if err := con.bcModule.addRandomness(rand); err != nil {
- return err
- }
- if needBroadcast {
- con.logger.Debug("Calling Network.BroadcastRandomnessResult",
- "randomness", rand)
- con.network.BroadcastRandomnessResult(rand)
- }
return con.deliverFinalizedBlocks()
}
@@ -1207,33 +1217,12 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
return
}
-func (con *Consensus) pullRandomness() {
- defer con.waitGroup.Done()
- for {
- select {
- case <-con.ctx.Done():
- return
- default:
- }
- select {
- case <-con.ctx.Done():
- return
- case <-con.resetRandomnessTicker:
- case <-time.After(1500 * time.Millisecond):
- // TODO(jimmy): pulling period should be related to lambdaBA.
- hashes := con.bcModule.pendingBlocksWithoutRandomness()
- if len(hashes) > 0 {
- con.logger.Debug(
- "Calling Network.PullRandomness", "blocks", hashes)
- con.network.PullRandomness(hashes)
- }
- }
- }
-}
-
func (con *Consensus) deliveryGuard() {
defer con.waitGroup.Done()
- time.Sleep(con.dMoment.Sub(time.Now()))
+ select {
+ case <-con.ctx.Done():
+ case <-time.After(con.dMoment.Sub(time.Now())):
+ }
// Node takes time to start.
select {
case <-con.ctx.Done():
@@ -1259,10 +1248,6 @@ func (con *Consensus) deliveryGuard() {
// deliverBlock deliver a block to application layer.
func (con *Consensus) deliverBlock(b *types.Block) {
select {
- case con.resetRandomnessTicker <- struct{}{}:
- default:
- }
- select {
case con.resetDeliveryGuardTicker <- struct{}{}:
default:
}
@@ -1274,7 +1259,6 @@ func (con *Consensus) deliverBlock(b *types.Block) {
b.Hash, b.Finalization.Height); err != nil {
panic(err)
}
- con.cfgModule.untouchTSigHash(b.Hash)
con.logger.Debug("Calling Application.BlockDelivered", "block", b)
con.app.BlockDelivered(b.Hash, b.Position, b.Finalization.Clone())
if con.debugApp != nil {
@@ -1338,15 +1322,10 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
return
}
-// processFinalizedBlock is the entry point for handling finalized blocks.
-func (con *Consensus) processFinalizedBlock(block *types.Block) error {
- return con.bcModule.processFinalizedBlock(block)
-}
-
// PrepareBlock would setup header fields of block based on its ProposerID.
func (con *Consensus) proposeBlock(position types.Position) (
*types.Block, error) {
- b, err := con.bcModule.proposeBlock(position, time.Now().UTC())
+ b, err := con.bcModule.proposeBlock(position, time.Now().UTC(), false)
if err != nil {
return nil, err
}