aboutsummaryrefslogtreecommitdiffstats
path: root/core/consensus.go
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2018-09-04 17:39:05 +0800
committerGitHub <noreply@github.com>2018-09-04 17:39:05 +0800
commit04a63a22a24abaaa91b1d981e6d95260d80dadf4 (patch)
treed8e7335984a1b53f097f00f4e4956112d22aa673 /core/consensus.go
parent09393166791785ab6730b1c812b4a4fd07a92293 (diff)
downloadtangerine-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar
tangerine-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar.gz
tangerine-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar.bz2
tangerine-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar.lz
tangerine-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar.xz
tangerine-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar.zst
tangerine-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.zip
core: BA-based consensus core. (#93)
Diffstat (limited to 'core/consensus.go')
-rw-r--r--core/consensus.go247
1 files changed, 220 insertions, 27 deletions
diff --git a/core/consensus.go b/core/consensus.go
index 2618b54..5b85fcb 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -18,7 +18,9 @@
package core
import (
+ "context"
"fmt"
+ "log"
"sort"
"sync"
"time"
@@ -53,24 +55,79 @@ var (
"signature of block is incorrect")
ErrGenesisBlockNotEmpty = fmt.Errorf(
"genesis block should be empty")
+ ErrUnknownBlockProposed = fmt.Errorf(
+ "unknown block is proposed")
+ ErrUnknownBlockConfirmed = fmt.Errorf(
+ "unknown block is confirmed")
+ ErrIncorrectBlockPosition = fmt.Errorf(
+ "position of block is incorrect")
)
+// consensusReceiver implements agreementReceiver.
+type consensusReceiver struct {
+ consensus *Consensus
+ chainID uint32
+ restart chan struct{}
+}
+
+func (recv *consensusReceiver) proposeVote(vote *types.Vote) {
+ // TODO(jimmy-dexon): move prepareVote() into agreement.
+ if err := recv.consensus.prepareVote(recv.chainID, vote); err != nil {
+ fmt.Println(err)
+ return
+ }
+ go func() {
+ if err := recv.consensus.ProcessVote(vote); err != nil {
+ fmt.Println(err)
+ return
+ }
+ recv.consensus.network.BroadcastVote(vote)
+ }()
+}
+func (recv *consensusReceiver) proposeBlock(hash common.Hash) {
+ block, exist := recv.consensus.baModules[recv.chainID].findCandidateBlock(hash)
+ if !exist {
+ fmt.Println(ErrUnknownBlockProposed)
+ fmt.Println(hash)
+ return
+ }
+ if err := recv.consensus.PreProcessBlock(block); err != nil {
+ fmt.Println(err)
+ return
+ }
+ recv.consensus.network.BroadcastBlock(block)
+}
+func (recv *consensusReceiver) confirmBlock(hash common.Hash) {
+ block, exist := recv.consensus.baModules[recv.chainID].findCandidateBlock(hash)
+ if !exist {
+ fmt.Println(ErrUnknownBlockConfirmed, hash)
+ return
+ }
+ recv.restart <- struct{}{}
+ if err := recv.consensus.ProcessBlock(block); err != nil {
+ fmt.Println(err)
+ return
+ }
+}
+
// Consensus implements DEXON Consensus algorithm.
type Consensus struct {
- ID types.ValidatorID
- app Application
- gov Governance
- rbModule *reliableBroadcast
- toModule *totalOrdering
- ctModule *consensusTimestamp
- ccModule *compactionChain
- db blockdb.BlockDatabase
- network Network
- tick *time.Ticker
- prvKey crypto.PrivateKey
- sigToPub SigToPubFn
- lock sync.RWMutex
- stopChan chan struct{}
+ ID types.ValidatorID
+ app Application
+ gov Governance
+ baModules []*agreement
+ receivers []*consensusReceiver
+ rbModule *reliableBroadcast
+ toModule *totalOrdering
+ ctModule *consensusTimestamp
+ ccModule *compactionChain
+ db blockdb.BlockDatabase
+ network Network
+ tick *time.Ticker
+ prvKey crypto.PrivateKey
+ sigToPub SigToPubFn
+ lock sync.RWMutex
+ stopChan chan struct{}
}
// NewConsensus construct an Consensus instance.
@@ -86,7 +143,7 @@ func NewConsensus(
// Setup acking by information returned from Governace.
rb := newReliableBroadcast()
- rb.setChainNum(len(validatorSet))
+ rb.setChainNum(gov.GetChainNumber())
for vID := range validatorSet {
rb.addValidator(vID)
}
@@ -101,7 +158,7 @@ func NewConsensus(
uint64(float32(len(validatorSet)-1)*gov.GetPhiRatio()+1),
validators)
- return &Consensus{
+ con := &Consensus{
ID: types.NewValidatorID(prv.PublicKey()),
rbModule: rb,
toModule: to,
@@ -116,11 +173,104 @@ func NewConsensus(
sigToPub: sigToPub,
stopChan: make(chan struct{}),
}
+
+ con.baModules = make([]*agreement, con.gov.GetChainNumber())
+ con.receivers = make([]*consensusReceiver, con.gov.GetChainNumber())
+ for i := uint32(0); i < con.gov.GetChainNumber(); i++ {
+ chainID := i
+ con.receivers[chainID] = &consensusReceiver{
+ consensus: con,
+ chainID: chainID,
+ restart: make(chan struct{}, 1),
+ }
+ blockProposer := func() *types.Block {
+ block := con.proposeBlock(chainID)
+ con.baModules[chainID].addCandidateBlock(block)
+ return block
+ }
+ con.baModules[chainID] = newAgreement(
+ con.ID,
+ con.receivers[chainID],
+ validators,
+ newGenesisLeaderSelector(con.gov.GetGenesisCRS(), con.sigToPub),
+ con.sigToPub,
+ blockProposer,
+ )
+ }
+
+ return con
}
-// Run starts running Consensus core.
+// Run starts running DEXON Consensus.
func (con *Consensus) Run() {
- go con.processMsg(con.network.ReceiveChan())
+ ctx, cancel := context.WithCancel(context.Background())
+ ticks := make([]chan struct{}, 0, con.gov.GetChainNumber())
+ for i := uint32(0); i < con.gov.GetChainNumber(); i++ {
+ tick := make(chan struct{})
+ ticks = append(ticks, tick)
+ go con.runBA(ctx, i, tick)
+ }
+ go func() {
+ <-con.stopChan
+ cancel()
+ }()
+ go con.processMsg(con.network.ReceiveChan(), con.PreProcessBlock)
+ // Reset ticker.
+ <-con.tick.C
+ <-con.tick.C
+ for {
+ <-con.tick.C
+ for _, tick := range ticks {
+ go func(tick chan struct{}) { tick <- struct{}{} }(tick)
+ }
+ }
+}
+
+func (con *Consensus) runBA(
+ ctx context.Context, chainID uint32, tick <-chan struct{}) {
+ // TODO(jimmy-dexon): move this function inside agreement.
+ validatorSet := con.gov.GetValidatorSet()
+ validators := make(types.ValidatorIDs, 0, len(validatorSet))
+ for vID := range validatorSet {
+ validators = append(validators, vID)
+ }
+ agreement := con.baModules[chainID]
+ recv := con.receivers[chainID]
+ recv.restart <- struct{}{}
+ // Reset ticker
+ <-tick
+BALoop:
+ for {
+ select {
+ case <-ctx.Done():
+ break BALoop
+ default:
+ }
+ for i := 0; i < agreement.clocks(); i++ {
+ <-tick
+ }
+ select {
+ case <-recv.restart:
+ // TODO(jimmy-dexon): handling change of validator set.
+ aID := types.Position{
+ ShardID: 0,
+ ChainID: chainID,
+ Height: con.rbModule.nextHeight(chainID),
+ }
+ agreement.restart(validators, aID)
+ default:
+ }
+ err := agreement.nextState()
+ if err != nil {
+ log.Printf("[%s] %s\n", con.ID.String(), err)
+ break BALoop
+ }
+ }
+}
+
+// RunLegacy starts running Legacy DEXON Consensus.
+func (con *Consensus) RunLegacy() {
+ go con.processMsg(con.network.ReceiveChan(), con.ProcessBlock)
chainID := uint32(0)
hashes := make(common.Hashes, 0, len(con.gov.GetValidatorSet()))
@@ -134,6 +284,7 @@ func (con *Consensus) Run() {
break
}
}
+ con.rbModule.setChainNum(uint32(len(hashes)))
genesisBlock := &types.Block{
ProposerID: con.ID,
@@ -178,7 +329,9 @@ func (con *Consensus) Stop() {
con.stopChan <- struct{}{}
}
-func (con *Consensus) processMsg(msgChan <-chan interface{}) {
+func (con *Consensus) processMsg(
+ msgChan <-chan interface{},
+ blockProcesser func(*types.Block) error) {
for {
var msg interface{}
select {
@@ -189,10 +342,10 @@ func (con *Consensus) processMsg(msgChan <-chan interface{}) {
switch val := msg.(type) {
case *types.Block:
- if err := con.ProcessBlock(val); err != nil {
+ if err := blockProcesser(val); err != nil {
fmt.Println(err)
}
- types.RecycleBlock(val)
+ //types.RecycleBlock(val)
case *types.NotaryAck:
if err := con.ProcessNotaryAck(val); err != nil {
fmt.Println(err)
@@ -205,13 +358,43 @@ func (con *Consensus) processMsg(msgChan <-chan interface{}) {
}
}
+func (con *Consensus) proposeBlock(chainID uint32) *types.Block {
+ block := &types.Block{
+ ProposerID: con.ID,
+ Position: types.Position{
+ ChainID: chainID,
+ Height: con.rbModule.nextHeight(chainID),
+ },
+ }
+ if err := con.PrepareBlock(block, time.Now().UTC()); err != nil {
+ fmt.Println(err)
+ return nil
+ }
+ if err := con.baModules[chainID].prepareBlock(block, con.prvKey); err != nil {
+ fmt.Println(err)
+ return nil
+ }
+ return block
+}
+
// ProcessVote is the entry point to submit ont vote to a Consensus instance.
func (con *Consensus) ProcessVote(vote *types.Vote) (err error) {
- return
+ v := vote.Clone()
+ err = con.baModules[v.Position.ChainID].processVote(v)
+ return err
+}
+
+// prepareVote prepares a vote.
+func (con *Consensus) prepareVote(chainID uint32, vote *types.Vote) error {
+ return con.baModules[chainID].prepareVote(vote, con.prvKey)
}
// sanityCheck checks if the block is a valid block
func (con *Consensus) sanityCheck(b *types.Block) (err error) {
+ // Check block.Position.
+ if b.Position.ShardID != 0 || b.Position.ChainID >= con.rbModule.chainNum() {
+ return ErrIncorrectBlockPosition
+ }
// Check the hash of block.
hash, err := hashBlock(b)
if err != nil || hash != b.Hash {
@@ -229,19 +412,29 @@ func (con *Consensus) sanityCheck(b *types.Block) (err error) {
return nil
}
-// ProcessBlock is the entry point to submit one block to a Consensus instance.
-func (con *Consensus) ProcessBlock(b *types.Block) (err error) {
- // TODO(jimmy-dexon): BlockConverter.Block() is called twice in this method.
+// PreProcessBlock performs Byzantine Agreement on the block.
+func (con *Consensus) PreProcessBlock(b *types.Block) (err error) {
if err := con.sanityCheck(b); err != nil {
return err
}
+ if err := con.baModules[b.Position.ChainID].processBlock(b); err != nil {
+ return err
+ }
+ return
+}
+
+// ProcessBlock is the entry point to submit one block to a Consensus instance.
+func (con *Consensus) ProcessBlock(block *types.Block) (err error) {
+ if err := con.sanityCheck(block); err != nil {
+ return err
+ }
var (
deliveredBlocks []*types.Block
earlyDelivered bool
)
// To avoid application layer modify the content of block during
// processing, we should always operate based on the cloned one.
- b = b.Clone()
+ b := block.Clone()
con.lock.Lock()
defer con.lock.Unlock()
@@ -249,7 +442,7 @@ func (con *Consensus) ProcessBlock(b *types.Block) (err error) {
if err = con.rbModule.processBlock(b); err != nil {
return err
}
- con.app.BlockConfirmed(b.Clone())
+ con.app.BlockConfirmed(block)
for _, b := range con.rbModule.extractBlocks() {
// Notify application layer that some block is strongly acked.
con.app.StronglyAcked(b.Hash)