diff options
author | Wei-Ning Huang <w@dexon.org> | 2018-09-20 16:08:08 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-20 16:08:08 +0800 |
commit | a4b6b9e6a28a4d8fc49ee76c191454a819265713 (patch) | |
tree | 716e5724b182b8dccb01a49faec4c163f1fafdb0 | |
parent | 2f1e71d9d298d1f6ade8d17a1db7a657b0223872 (diff) | |
download | dexon-consensus-a4b6b9e6a28a4d8fc49ee76c191454a819265713.tar dexon-consensus-a4b6b9e6a28a4d8fc49ee76c191454a819265713.tar.gz dexon-consensus-a4b6b9e6a28a4d8fc49ee76c191454a819265713.tar.bz2 dexon-consensus-a4b6b9e6a28a4d8fc49ee76c191454a819265713.tar.lz dexon-consensus-a4b6b9e6a28a4d8fc49ee76c191454a819265713.tar.xz dexon-consensus-a4b6b9e6a28a4d8fc49ee76c191454a819265713.tar.zst dexon-consensus-a4b6b9e6a28a4d8fc49ee76c191454a819265713.zip |
core: refactor witness data processing flow (#124)
Since witness data need to include data from application after it
processed a block (e.g. stateRoot). We should make the process of
witness data asynchronous.
An interface `BlockProcessedChan()` is added to the application
interface to return a channel for notifying the consensus core when a
block is processed. The notification object includes a byte slice
(witenss data) which will be include in the final witness data object.
-rw-r--r-- | core/compaction-chain.go | 36 | ||||
-rw-r--r-- | core/compaction-chain_test.go | 28 | ||||
-rw-r--r-- | core/consensus.go | 52 | ||||
-rw-r--r-- | core/crypto.go | 3 | ||||
-rw-r--r-- | core/interfaces.go | 4 | ||||
-rw-r--r-- | core/nonblocking-application.go | 6 | ||||
-rw-r--r-- | core/nonblocking-application_test.go | 6 | ||||
-rw-r--r-- | core/test/app.go | 8 | ||||
-rw-r--r-- | core/types/witness.go | 8 | ||||
-rw-r--r-- | simulation/app.go | 15 |
10 files changed, 133 insertions, 33 deletions
diff --git a/core/compaction-chain.go b/core/compaction-chain.go index c72cd7b..f5c5548 100644 --- a/core/compaction-chain.go +++ b/core/compaction-chain.go @@ -107,14 +107,34 @@ func (cc *compactionChain) processBlock(block *types.Block) error { return nil } -func (cc *compactionChain) prepareWitnessAck(prvKey crypto.PrivateKey) ( - witnessAck *types.WitnessAck, err error) { - lastBlock := cc.lastBlock() - if lastBlock == nil { - err = ErrNoWitnessToAck - return +func (cc *compactionChain) processWitnessResult( + block *types.Block, result types.WitnessResult) error { + block.Witness.Data = result.Data + + // block is a genesis block, no need to update witness parent hash. + if block.IsGenesis() { + return nil } - hash, err := hashWitness(lastBlock) + + prevBlock, err := cc.db.Get(block.ParentHash) + if err != nil { + return err + } + + hash, err := hashWitness(&prevBlock) + if err != nil { + return err + } + + block.Witness.ParentHash = hash + return nil +} + +func (cc *compactionChain) prepareWitnessAck( + block *types.Block, prvKey crypto.PrivateKey) ( + witnessAck *types.WitnessAck, err error) { + + hash, err := hashWitness(block) if err != nil { return } @@ -124,7 +144,7 @@ func (cc *compactionChain) prepareWitnessAck(prvKey crypto.PrivateKey) ( } witnessAck = &types.WitnessAck{ ProposerID: types.NewNodeID(prvKey.PublicKey()), - WitnessBlockHash: lastBlock.Hash, + WitnessBlockHash: block.Hash, Signature: sig, Hash: hash, } diff --git a/core/compaction-chain_test.go b/core/compaction-chain_test.go index 5c08798..3603fb6 100644 --- a/core/compaction-chain_test.go +++ b/core/compaction-chain_test.go @@ -93,20 +93,20 @@ func (s *CompactionChainTestSuite) TestProcessBlock() { func (s *CompactionChainTestSuite) TestPrepareWitnessAck() { cc := s.newCompactionChain() - blocks := s.generateBlocks(10, cc) + blocks := s.generateBlocks(2, cc) prv, err := eth.NewPrivateKey() s.Require().Nil(err) - for _, block := range blocks { - witnessAck, err := cc.prepareWitnessAck(prv) - s.Require().Nil(err) - if cc.prevBlock != nil { - s.True(verifyWitnessSignature( - prv.PublicKey(), - cc.prevBlock, - witnessAck.Signature)) - s.Equal(witnessAck.WitnessBlockHash, cc.prevBlock.Hash) - } - cc.prevBlock = block + + block := blocks[1] + witnessAck, err := cc.prepareWitnessAck(block, prv) + s.Require().Nil(err) + if cc.prevBlock != nil { + verified, _ := verifyWitnessSignature( + prv.PublicKey(), + cc.prevBlock, + witnessAck.Signature) + s.True(verified) + s.Equal(witnessAck.WitnessBlockHash, block.Hash) } } @@ -123,9 +123,9 @@ func (s *CompactionChainTestSuite) TestProcessWitnessAck() { witnessAcks2 := []*types.WitnessAck{} for _, block := range blocks { cc.prevBlock = block - witnessAck1, err := cc.prepareWitnessAck(prv1) + witnessAck1, err := cc.prepareWitnessAck(block, prv1) s.Require().Nil(err) - witnessAck2, err := cc.prepareWitnessAck(prv2) + witnessAck2, err := cc.prepareWitnessAck(block, prv2) s.Require().Nil(err) witnessAcks1 = append(witnessAcks1, witnessAck1) witnessAcks2 = append(witnessAcks2, witnessAck2) diff --git a/core/consensus.go b/core/consensus.go index 1af66b3..33a3d7f 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -217,6 +217,8 @@ func (con *Consensus) Run() { go con.runBA(i, tick) } go con.processMsg(con.network.ReceiveChan(), con.PreProcessBlock) + go con.processWitnessData() + // Reset ticker. <-con.tickerObj.Tick() <-con.tickerObj.Tick() @@ -272,6 +274,7 @@ BALoop: // RunLegacy starts running Legacy DEXON Consensus. func (con *Consensus) RunLegacy() { go con.processMsg(con.network.ReceiveChan(), con.ProcessBlock) + go con.processWitnessData() chainID := uint32(0) hashes := make(common.Hashes, 0, len(con.gov.GetNotarySet())) @@ -383,6 +386,45 @@ func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { return err } +// processWitnessData process witness acks. +func (con *Consensus) processWitnessData() { + ch := con.app.BlockProcessedChan() + + for { + select { + case <-con.ctx.Done(): + return + case result := <-ch: + block, err := con.db.Get(result.BlockHash) + if err != nil { + panic(err) + } + + if err = con.ccModule.processWitnessResult(&block, result); err != nil { + panic(err) + } + if err := con.db.Update(block); err != nil { + panic(err) + } + + // TODO(w): move the acking interval into governance. + if block.Witness.Height%5 != 0 { + continue + } + + witnessAck, err := con.ccModule.prepareWitnessAck(&block, con.prvKey) + if err != nil { + panic(err) + } + err = con.ProcessWitnessAck(witnessAck) + if err != nil { + panic(err) + } + con.app.WitnessAckDeliver(witnessAck) + } + } +} + // prepareVote prepares a vote. func (con *Consensus) prepareVote(chainID uint32, vote *types.Vote) error { return con.baModules[chainID].prepareVote(vote, con.prvKey) @@ -482,16 +524,6 @@ func (con *Consensus) ProcessBlock(block *types.Block) (err error) { // nonBlockingApplication and let them recycle the // block. } - var witnessAck *types.WitnessAck - witnessAck, err = con.ccModule.prepareWitnessAck(con.prvKey) - if err != nil { - return - } - err = con.ProcessWitnessAck(witnessAck) - if err != nil { - return - } - con.app.WitnessAckDeliver(witnessAck) } return } diff --git a/core/crypto.go b/core/crypto.go index e68d7cc..111f709 100644 --- a/core/crypto.go +++ b/core/crypto.go @@ -35,7 +35,8 @@ func hashWitness(block *types.Block) (common.Hash, error) { hash := crypto.Keccak256Hash( block.Witness.ParentHash[:], binaryTime, - binaryHeight) + binaryHeight, + block.Witness.Data[:]) return hash, nil } diff --git a/core/interfaces.go b/core/interfaces.go index 4f67e1e..8ecfb3c 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -46,6 +46,10 @@ type Application interface { // DeliverBlock is called when a block is add to the compaction chain. DeliverBlock(blockHash common.Hash, timestamp time.Time) + // BlockProcessedChan returns a channel to receive the block hashes that have + // finished processing by the application. + BlockProcessedChan() <-chan types.WitnessResult + // WitnessAckDeliver is called when a witness ack is created. WitnessAckDeliver(witnessAck *types.WitnessAck) } diff --git a/core/nonblocking-application.go b/core/nonblocking-application.go index 2226eb0..98f92fc 100644 --- a/core/nonblocking-application.go +++ b/core/nonblocking-application.go @@ -154,6 +154,12 @@ func (app *nonBlockingApplication) DeliverBlock( app.addEvent(deliverBlockEvent{blockHash, timestamp}) } +// BlockProcessedChan returns a channel to receive the block hashes that have +// finished processing by the application. +func (app *nonBlockingApplication) BlockProcessedChan() <-chan types.WitnessResult { + return app.app.BlockProcessedChan() +} + // WitnessAckDeliver is called when a witness ack is created. func (app *nonBlockingApplication) WitnessAckDeliver(witnessAck *types.WitnessAck) { app.addEvent(witnessAckEvent{witnessAck}) diff --git a/core/nonblocking-application_test.go b/core/nonblocking-application_test.go index 82b3f2c..52757d3 100644 --- a/core/nonblocking-application_test.go +++ b/core/nonblocking-application_test.go @@ -34,6 +34,7 @@ type slowApp struct { totalOrderingDeliver map[common.Hash]struct{} deliverBlock map[common.Hash]struct{} witnessAck map[common.Hash]struct{} + witnessResultChan chan types.WitnessResult } func newSlowApp(sleep time.Duration) *slowApp { @@ -44,6 +45,7 @@ func newSlowApp(sleep time.Duration) *slowApp { totalOrderingDeliver: make(map[common.Hash]struct{}), deliverBlock: make(map[common.Hash]struct{}), witnessAck: make(map[common.Hash]struct{}), + witnessResultChan: make(chan types.WitnessResult), } } @@ -77,6 +79,10 @@ func (app *slowApp) DeliverBlock(blockHash common.Hash, timestamp time.Time) { app.deliverBlock[blockHash] = struct{}{} } +func (app *slowApp) BlockProcessedChan() <-chan types.WitnessResult { + return app.witnessResultChan +} + func (app *slowApp) WitnessAckDeliver(witnessAck *types.WitnessAck) { time.Sleep(app.sleep) app.witnessAck[witnessAck.Hash] = struct{}{} diff --git a/core/test/app.go b/core/test/app.go index e9ef871..617bc38 100644 --- a/core/test/app.go +++ b/core/test/app.go @@ -89,6 +89,7 @@ type App struct { deliveredLock sync.RWMutex WitnessAckSequence []*types.WitnessAck witnessAckLock sync.RWMutex + witnessResultChan chan types.WitnessResult } // NewApp constructs a TestApp instance. @@ -99,6 +100,7 @@ func NewApp() *App { TotalOrderedByHash: make(map[common.Hash]*AppTotalOrderRecord), Delivered: make(map[common.Hash]*AppDeliveredRecord), DeliverSequence: common.Hashes{}, + witnessResultChan: make(chan types.WitnessResult), } } @@ -155,6 +157,12 @@ func (app *App) DeliverBlock(blockHash common.Hash, timestamp time.Time) { app.DeliverSequence = append(app.DeliverSequence, blockHash) } +// BlockProcessedChan returns a channel to receive the block hashes that have +// finished processing by the application. +func (app *App) BlockProcessedChan() <-chan types.WitnessResult { + return app.witnessResultChan +} + // WitnessAckDeliver implements Application interface. func (app *App) WitnessAckDeliver(witnessAck *types.WitnessAck) { app.witnessAckLock.Lock() diff --git a/core/types/witness.go b/core/types/witness.go index 349c1ab..46aa1cc 100644 --- a/core/types/witness.go +++ b/core/types/witness.go @@ -50,4 +50,12 @@ type Witness struct { ParentHash common.Hash `json:"parent_hash"` Timestamp time.Time `json:"timestamp"` Height uint64 `json:"height"` + Data []byte `json:"data"` +} + +// WitnessResult is the result pass from application containing the witness +// data. +type WitnessResult struct { + BlockHash common.Hash + Data []byte } diff --git a/simulation/app.go b/simulation/app.go index 9c5619a..d00cf19 100644 --- a/simulation/app.go +++ b/simulation/app.go @@ -40,6 +40,7 @@ type simApp struct { unconfirmedBlocks map[types.NodeID]common.Hashes blockByHash map[common.Hash]*types.Block blockByHashMutex sync.RWMutex + witnessResultChan chan types.WitnessResult } // newSimApp returns point to a new instance of simApp. @@ -51,6 +52,7 @@ func newSimApp(id types.NodeID, netModule *network) *simApp { blockSeen: make(map[common.Hash]time.Time), unconfirmedBlocks: make(map[types.NodeID]common.Hashes), blockByHash: make(map[common.Hash]*types.Block), + witnessResultChan: make(chan types.WitnessResult), } } @@ -193,6 +195,19 @@ func (a *simApp) DeliverBlock(blockHash common.Hash, timestamp time.Time) { Payload: jsonPayload, } a.netModule.report(msg) + + go func() { + a.witnessResultChan <- types.WitnessResult{ + BlockHash: blockHash, + Data: []byte(fmt.Sprintf("Block %s", blockHash)), + } + }() +} + +// BlockProcessedChan returns a channel to receive the block hashes that have +// finished processing by the application. +func (a *simApp) BlockProcessedChan() <-chan types.WitnessResult { + return a.witnessResultChan } // WitnessAckDeliver is called when a witness ack is created. |