diff options
Diffstat (limited to 'core/consensus.go')
-rw-r--r-- | core/consensus.go | 247 |
1 files changed, 220 insertions, 27 deletions
diff --git a/core/consensus.go b/core/consensus.go index 2618b54..5b85fcb 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -18,7 +18,9 @@ package core import ( + "context" "fmt" + "log" "sort" "sync" "time" @@ -53,24 +55,79 @@ var ( "signature of block is incorrect") ErrGenesisBlockNotEmpty = fmt.Errorf( "genesis block should be empty") + ErrUnknownBlockProposed = fmt.Errorf( + "unknown block is proposed") + ErrUnknownBlockConfirmed = fmt.Errorf( + "unknown block is confirmed") + ErrIncorrectBlockPosition = fmt.Errorf( + "position of block is incorrect") ) +// consensusReceiver implements agreementReceiver. +type consensusReceiver struct { + consensus *Consensus + chainID uint32 + restart chan struct{} +} + +func (recv *consensusReceiver) proposeVote(vote *types.Vote) { + // TODO(jimmy-dexon): move prepareVote() into agreement. + if err := recv.consensus.prepareVote(recv.chainID, vote); err != nil { + fmt.Println(err) + return + } + go func() { + if err := recv.consensus.ProcessVote(vote); err != nil { + fmt.Println(err) + return + } + recv.consensus.network.BroadcastVote(vote) + }() +} +func (recv *consensusReceiver) proposeBlock(hash common.Hash) { + block, exist := recv.consensus.baModules[recv.chainID].findCandidateBlock(hash) + if !exist { + fmt.Println(ErrUnknownBlockProposed) + fmt.Println(hash) + return + } + if err := recv.consensus.PreProcessBlock(block); err != nil { + fmt.Println(err) + return + } + recv.consensus.network.BroadcastBlock(block) +} +func (recv *consensusReceiver) confirmBlock(hash common.Hash) { + block, exist := recv.consensus.baModules[recv.chainID].findCandidateBlock(hash) + if !exist { + fmt.Println(ErrUnknownBlockConfirmed, hash) + return + } + recv.restart <- struct{}{} + if err := recv.consensus.ProcessBlock(block); err != nil { + fmt.Println(err) + return + } +} + // Consensus implements DEXON Consensus algorithm. type Consensus struct { - ID types.ValidatorID - app Application - gov Governance - rbModule *reliableBroadcast - toModule *totalOrdering - ctModule *consensusTimestamp - ccModule *compactionChain - db blockdb.BlockDatabase - network Network - tick *time.Ticker - prvKey crypto.PrivateKey - sigToPub SigToPubFn - lock sync.RWMutex - stopChan chan struct{} + ID types.ValidatorID + app Application + gov Governance + baModules []*agreement + receivers []*consensusReceiver + rbModule *reliableBroadcast + toModule *totalOrdering + ctModule *consensusTimestamp + ccModule *compactionChain + db blockdb.BlockDatabase + network Network + tick *time.Ticker + prvKey crypto.PrivateKey + sigToPub SigToPubFn + lock sync.RWMutex + stopChan chan struct{} } // NewConsensus construct an Consensus instance. @@ -86,7 +143,7 @@ func NewConsensus( // Setup acking by information returned from Governace. rb := newReliableBroadcast() - rb.setChainNum(len(validatorSet)) + rb.setChainNum(gov.GetChainNumber()) for vID := range validatorSet { rb.addValidator(vID) } @@ -101,7 +158,7 @@ func NewConsensus( uint64(float32(len(validatorSet)-1)*gov.GetPhiRatio()+1), validators) - return &Consensus{ + con := &Consensus{ ID: types.NewValidatorID(prv.PublicKey()), rbModule: rb, toModule: to, @@ -116,11 +173,104 @@ func NewConsensus( sigToPub: sigToPub, stopChan: make(chan struct{}), } + + con.baModules = make([]*agreement, con.gov.GetChainNumber()) + con.receivers = make([]*consensusReceiver, con.gov.GetChainNumber()) + for i := uint32(0); i < con.gov.GetChainNumber(); i++ { + chainID := i + con.receivers[chainID] = &consensusReceiver{ + consensus: con, + chainID: chainID, + restart: make(chan struct{}, 1), + } + blockProposer := func() *types.Block { + block := con.proposeBlock(chainID) + con.baModules[chainID].addCandidateBlock(block) + return block + } + con.baModules[chainID] = newAgreement( + con.ID, + con.receivers[chainID], + validators, + newGenesisLeaderSelector(con.gov.GetGenesisCRS(), con.sigToPub), + con.sigToPub, + blockProposer, + ) + } + + return con } -// Run starts running Consensus core. +// Run starts running DEXON Consensus. func (con *Consensus) Run() { - go con.processMsg(con.network.ReceiveChan()) + ctx, cancel := context.WithCancel(context.Background()) + ticks := make([]chan struct{}, 0, con.gov.GetChainNumber()) + for i := uint32(0); i < con.gov.GetChainNumber(); i++ { + tick := make(chan struct{}) + ticks = append(ticks, tick) + go con.runBA(ctx, i, tick) + } + go func() { + <-con.stopChan + cancel() + }() + go con.processMsg(con.network.ReceiveChan(), con.PreProcessBlock) + // Reset ticker. + <-con.tick.C + <-con.tick.C + for { + <-con.tick.C + for _, tick := range ticks { + go func(tick chan struct{}) { tick <- struct{}{} }(tick) + } + } +} + +func (con *Consensus) runBA( + ctx context.Context, chainID uint32, tick <-chan struct{}) { + // TODO(jimmy-dexon): move this function inside agreement. + validatorSet := con.gov.GetValidatorSet() + validators := make(types.ValidatorIDs, 0, len(validatorSet)) + for vID := range validatorSet { + validators = append(validators, vID) + } + agreement := con.baModules[chainID] + recv := con.receivers[chainID] + recv.restart <- struct{}{} + // Reset ticker + <-tick +BALoop: + for { + select { + case <-ctx.Done(): + break BALoop + default: + } + for i := 0; i < agreement.clocks(); i++ { + <-tick + } + select { + case <-recv.restart: + // TODO(jimmy-dexon): handling change of validator set. + aID := types.Position{ + ShardID: 0, + ChainID: chainID, + Height: con.rbModule.nextHeight(chainID), + } + agreement.restart(validators, aID) + default: + } + err := agreement.nextState() + if err != nil { + log.Printf("[%s] %s\n", con.ID.String(), err) + break BALoop + } + } +} + +// RunLegacy starts running Legacy DEXON Consensus. +func (con *Consensus) RunLegacy() { + go con.processMsg(con.network.ReceiveChan(), con.ProcessBlock) chainID := uint32(0) hashes := make(common.Hashes, 0, len(con.gov.GetValidatorSet())) @@ -134,6 +284,7 @@ func (con *Consensus) Run() { break } } + con.rbModule.setChainNum(uint32(len(hashes))) genesisBlock := &types.Block{ ProposerID: con.ID, @@ -178,7 +329,9 @@ func (con *Consensus) Stop() { con.stopChan <- struct{}{} } -func (con *Consensus) processMsg(msgChan <-chan interface{}) { +func (con *Consensus) processMsg( + msgChan <-chan interface{}, + blockProcesser func(*types.Block) error) { for { var msg interface{} select { @@ -189,10 +342,10 @@ func (con *Consensus) processMsg(msgChan <-chan interface{}) { switch val := msg.(type) { case *types.Block: - if err := con.ProcessBlock(val); err != nil { + if err := blockProcesser(val); err != nil { fmt.Println(err) } - types.RecycleBlock(val) + //types.RecycleBlock(val) case *types.NotaryAck: if err := con.ProcessNotaryAck(val); err != nil { fmt.Println(err) @@ -205,13 +358,43 @@ func (con *Consensus) processMsg(msgChan <-chan interface{}) { } } +func (con *Consensus) proposeBlock(chainID uint32) *types.Block { + block := &types.Block{ + ProposerID: con.ID, + Position: types.Position{ + ChainID: chainID, + Height: con.rbModule.nextHeight(chainID), + }, + } + if err := con.PrepareBlock(block, time.Now().UTC()); err != nil { + fmt.Println(err) + return nil + } + if err := con.baModules[chainID].prepareBlock(block, con.prvKey); err != nil { + fmt.Println(err) + return nil + } + return block +} + // ProcessVote is the entry point to submit ont vote to a Consensus instance. func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { - return + v := vote.Clone() + err = con.baModules[v.Position.ChainID].processVote(v) + return err +} + +// prepareVote prepares a vote. +func (con *Consensus) prepareVote(chainID uint32, vote *types.Vote) error { + return con.baModules[chainID].prepareVote(vote, con.prvKey) } // sanityCheck checks if the block is a valid block func (con *Consensus) sanityCheck(b *types.Block) (err error) { + // Check block.Position. + if b.Position.ShardID != 0 || b.Position.ChainID >= con.rbModule.chainNum() { + return ErrIncorrectBlockPosition + } // Check the hash of block. hash, err := hashBlock(b) if err != nil || hash != b.Hash { @@ -229,19 +412,29 @@ func (con *Consensus) sanityCheck(b *types.Block) (err error) { return nil } -// ProcessBlock is the entry point to submit one block to a Consensus instance. -func (con *Consensus) ProcessBlock(b *types.Block) (err error) { - // TODO(jimmy-dexon): BlockConverter.Block() is called twice in this method. +// PreProcessBlock performs Byzantine Agreement on the block. +func (con *Consensus) PreProcessBlock(b *types.Block) (err error) { if err := con.sanityCheck(b); err != nil { return err } + if err := con.baModules[b.Position.ChainID].processBlock(b); err != nil { + return err + } + return +} + +// ProcessBlock is the entry point to submit one block to a Consensus instance. +func (con *Consensus) ProcessBlock(block *types.Block) (err error) { + if err := con.sanityCheck(block); err != nil { + return err + } var ( deliveredBlocks []*types.Block earlyDelivered bool ) // To avoid application layer modify the content of block during // processing, we should always operate based on the cloned one. - b = b.Clone() + b := block.Clone() con.lock.Lock() defer con.lock.Unlock() @@ -249,7 +442,7 @@ func (con *Consensus) ProcessBlock(b *types.Block) (err error) { if err = con.rbModule.processBlock(b); err != nil { return err } - con.app.BlockConfirmed(b.Clone()) + con.app.BlockConfirmed(block) for _, b := range con.rbModule.extractBlocks() { // Notify application layer that some block is strongly acked. con.app.StronglyAcked(b.Hash) |