diff options
Diffstat (limited to 'core/consensus.go')
-rw-r--r-- | core/consensus.go | 184 |
1 files changed, 163 insertions, 21 deletions
diff --git a/core/consensus.go b/core/consensus.go index 7700296..dc5bbba 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -111,26 +111,87 @@ func (recv *consensusReceiver) ConfirmBlock(hash common.Hash) { recv.restart <- struct{}{} } +// consensusDKGReceiver implements dkgReceiver. +type consensusDKGReceiver struct { + ID types.NodeID + gov Governance + prvKey crypto.PrivateKey + network Network +} + +// ProposeDKGComplaint proposes a DKGComplaint. +func (recv *consensusDKGReceiver) ProposeDKGComplaint( + complaint *types.DKGComplaint) { + var err error + complaint.Signature, err = recv.prvKey.Sign(hashDKGComplaint(complaint)) + if err != nil { + log.Println(err) + return + } + recv.gov.AddDKGComplaint(complaint) +} + +// ProposeDKGMasterPublicKey propose a DKGMasterPublicKey. +func (recv *consensusDKGReceiver) ProposeDKGMasterPublicKey( + mpk *types.DKGMasterPublicKey) { + var err error + mpk.Signature, err = recv.prvKey.Sign(hashDKGMasterPublicKey(mpk)) + if err != nil { + log.Println(err) + return + } + recv.gov.AddDKGMasterPublicKey(mpk) +} + +// ProposeDKGPrivateShare propose a DKGPrivateShare. +func (recv *consensusDKGReceiver) ProposeDKGPrivateShare( + prv *types.DKGPrivateShare) { + var err error + prv.Signature, err = recv.prvKey.Sign(hashDKGPrivateShare(prv)) + if err != nil { + log.Println(err) + return + } + recv.network.SendDKGPrivateShare(prv.ReceiverID, prv) +} + +// ProposeDKGAntiNackComplaint propose a DKGPrivateShare as an anti complaint. +func (recv *consensusDKGReceiver) ProposeDKGAntiNackComplaint( + prv *types.DKGPrivateShare) { + if prv.ProposerID == recv.ID { + var err error + prv.Signature, err = recv.prvKey.Sign(hashDKGPrivateShare(prv)) + if err != nil { + log.Println(err) + return + } + } + recv.network.BroadcastDKGPrivateShare(prv) +} + // Consensus implements DEXON Consensus algorithm. type Consensus struct { - ID types.NodeID - app Application - gov Governance - config *types.Config - baModules []*agreement - receivers []*consensusReceiver - rbModule *reliableBroadcast - toModule *totalOrdering - ctModule *consensusTimestamp - ccModule *compactionChain - db blockdb.BlockDatabase - network Network - tickerObj Ticker - prvKey crypto.PrivateKey - sigToPub SigToPubFn - lock sync.RWMutex - ctx context.Context - ctxCancel context.CancelFunc + ID types.NodeID + app Application + gov Governance + config *types.Config + baModules []*agreement + receivers []*consensusReceiver + rbModule *reliableBroadcast + toModule *totalOrdering + ctModule *consensusTimestamp + ccModule *compactionChain + db blockdb.BlockDatabase + network Network + tickerObj Ticker + prvKey crypto.PrivateKey + dkgRunning int32 + dkgReady *sync.Cond + dkgModule *dkgProtocol + sigToPub SigToPubFn + lock sync.RWMutex + ctx context.Context + ctxCancel context.CancelFunc } // NewConsensus construct an Consensus instance. @@ -144,6 +205,7 @@ func NewConsensus( config := gov.GetConfiguration(0) notarySet := gov.GetNotarySet() + ID := types.NewNodeID(prv.PublicKey()) // Setup acking by information returned from Governace. rb := newReliableBroadcast() @@ -164,8 +226,21 @@ func NewConsensus( uint64(float32(len(notarySet)-1)*config.PhiRatio+1), config.NumChains) + // Setup DKG Protocol. + dkgModule := newDKGProtocol( + ID, + &consensusDKGReceiver{ + ID: ID, + gov: gov, + prvKey: prv, + network: network, + }, + 0, + len(gov.GetNotarySet())/3, + sigToPub) + con := &Consensus{ - ID: types.NewNodeID(prv.PublicKey()), + ID: ID, rbModule: rb, toModule: to, ctModule: newConsensusTimestamp(), @@ -175,8 +250,10 @@ func NewConsensus( config: config, db: db, network: network, - tickerObj: newTicker(gov), + tickerObj: newTicker(gov, TickerBA), prvKey: prv, + dkgReady: sync.NewCond(&sync.Mutex{}), + dkgModule: dkgModule, sigToPub: sigToPub, ctx: ctx, ctxCancel: ctxCancel, @@ -210,13 +287,19 @@ func NewConsensus( // Run starts running DEXON Consensus. func (con *Consensus) Run() { + go con.processMsg(con.network.ReceiveChan(), con.PreProcessBlock) + con.runDKG() + con.dkgReady.L.Lock() + defer con.dkgReady.L.Unlock() + for con.dkgRunning != 2 { + con.dkgReady.Wait() + } ticks := make([]chan struct{}, 0, con.config.NumChains) for i := uint32(0); i < con.config.NumChains; i++ { tick := make(chan struct{}) ticks = append(ticks, tick) go con.runBA(i, tick) } - go con.processMsg(con.network.ReceiveChan(), con.PreProcessBlock) go con.processWitnessData() // Reset ticker. @@ -271,6 +354,58 @@ BALoop: } } +// runDKG starts running DKG protocol. +func (con *Consensus) runDKG() { + con.dkgReady.L.Lock() + defer con.dkgReady.L.Unlock() + if con.dkgRunning != 0 { + return + } + con.dkgRunning = 1 + go func() { + defer func() { + con.dkgReady.L.Lock() + defer con.dkgReady.L.Unlock() + con.dkgReady.Broadcast() + con.dkgRunning = 2 + }() + ticker := newTicker(con.gov, TickerDKG) + round := con.dkgModule.round + <-ticker.Tick() + // Phase 2(T = 0): Exchange DKG secret key share. + con.dkgModule.processMasterPublicKeys(con.gov.DKGMasterPublicKeys(round)) + // Phase 3(T = 0~λ): Propose complaint. + // Propose complaint is done in `processMasterPublicKeys`. + <-ticker.Tick() + // Phase 4(T = λ): Propose nack complaints. + con.dkgModule.proposeNackComplaints() + <-ticker.Tick() + // Phase 5(T = 2λ): Propose Anti nack complaint. + con.dkgModule.processNackComplaints(con.gov.DKGComplaints(round)) + <-ticker.Tick() + // Phase 6(T = 3λ): Rebroadcast anti nack complaint. + // Rebroadcast is done in `processPrivateShare`. + <-ticker.Tick() + // Phase 7(T = 4λ): Enforce complaints and nack complaints. + con.dkgModule.enforceNackComplaints(con.gov.DKGComplaints(round)) + // Enforce complaint is done in `processPrivateShare`. + // Phase 8(T = 5λ): DKG is ready. + gpk, err := newDKGGroupPublicKey(round, + con.gov.DKGMasterPublicKeys(round), + con.gov.DKGComplaints(round), + con.dkgModule.threshold, con.sigToPub) + if err != nil { + panic(err) + } + qualifies := "" + for _, nID := range gpk.qualifyNodeIDs { + qualifies += fmt.Sprintf("%s ", nID.String()[:6]) + } + log.Printf("[%s] Qualify Nodes(%d): %s\n", + con.ID, len(gpk.qualifyIDs), qualifies) + }() +} + // RunLegacy starts running Legacy DEXON Consensus. func (con *Consensus) RunLegacy() { go con.processMsg(con.network.ReceiveChan(), con.processBlock) @@ -356,6 +491,13 @@ func (con *Consensus) processMsg( if err := con.ProcessVote(val); err != nil { log.Println(err) } + case *types.DKGPrivateShare: + if con.dkgRunning == 0 { + break + } + if err := con.dkgModule.processPrivateShare(val); err != nil { + log.Println(err) + } } } } |