aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorWei-Ning Huang <w@dexon.org>2018-09-20 16:08:08 +0800
committerGitHub <noreply@github.com>2018-09-20 16:08:08 +0800
commita4b6b9e6a28a4d8fc49ee76c191454a819265713 (patch)
tree716e5724b182b8dccb01a49faec4c163f1fafdb0
parent2f1e71d9d298d1f6ade8d17a1db7a657b0223872 (diff)
downloaddexon-consensus-a4b6b9e6a28a4d8fc49ee76c191454a819265713.tar
dexon-consensus-a4b6b9e6a28a4d8fc49ee76c191454a819265713.tar.gz
dexon-consensus-a4b6b9e6a28a4d8fc49ee76c191454a819265713.tar.bz2
dexon-consensus-a4b6b9e6a28a4d8fc49ee76c191454a819265713.tar.lz
dexon-consensus-a4b6b9e6a28a4d8fc49ee76c191454a819265713.tar.xz
dexon-consensus-a4b6b9e6a28a4d8fc49ee76c191454a819265713.tar.zst
dexon-consensus-a4b6b9e6a28a4d8fc49ee76c191454a819265713.zip
core: refactor witness data processing flow (#124)
Since witness data need to include data from application after it processed a block (e.g. stateRoot). We should make the process of witness data asynchronous. An interface `BlockProcessedChan()` is added to the application interface to return a channel for notifying the consensus core when a block is processed. The notification object includes a byte slice (witenss data) which will be include in the final witness data object.
-rw-r--r--core/compaction-chain.go36
-rw-r--r--core/compaction-chain_test.go28
-rw-r--r--core/consensus.go52
-rw-r--r--core/crypto.go3
-rw-r--r--core/interfaces.go4
-rw-r--r--core/nonblocking-application.go6
-rw-r--r--core/nonblocking-application_test.go6
-rw-r--r--core/test/app.go8
-rw-r--r--core/types/witness.go8
-rw-r--r--simulation/app.go15
10 files changed, 133 insertions, 33 deletions
diff --git a/core/compaction-chain.go b/core/compaction-chain.go
index c72cd7b..f5c5548 100644
--- a/core/compaction-chain.go
+++ b/core/compaction-chain.go
@@ -107,14 +107,34 @@ func (cc *compactionChain) processBlock(block *types.Block) error {
return nil
}
-func (cc *compactionChain) prepareWitnessAck(prvKey crypto.PrivateKey) (
- witnessAck *types.WitnessAck, err error) {
- lastBlock := cc.lastBlock()
- if lastBlock == nil {
- err = ErrNoWitnessToAck
- return
+func (cc *compactionChain) processWitnessResult(
+ block *types.Block, result types.WitnessResult) error {
+ block.Witness.Data = result.Data
+
+ // block is a genesis block, no need to update witness parent hash.
+ if block.IsGenesis() {
+ return nil
}
- hash, err := hashWitness(lastBlock)
+
+ prevBlock, err := cc.db.Get(block.ParentHash)
+ if err != nil {
+ return err
+ }
+
+ hash, err := hashWitness(&prevBlock)
+ if err != nil {
+ return err
+ }
+
+ block.Witness.ParentHash = hash
+ return nil
+}
+
+func (cc *compactionChain) prepareWitnessAck(
+ block *types.Block, prvKey crypto.PrivateKey) (
+ witnessAck *types.WitnessAck, err error) {
+
+ hash, err := hashWitness(block)
if err != nil {
return
}
@@ -124,7 +144,7 @@ func (cc *compactionChain) prepareWitnessAck(prvKey crypto.PrivateKey) (
}
witnessAck = &types.WitnessAck{
ProposerID: types.NewNodeID(prvKey.PublicKey()),
- WitnessBlockHash: lastBlock.Hash,
+ WitnessBlockHash: block.Hash,
Signature: sig,
Hash: hash,
}
diff --git a/core/compaction-chain_test.go b/core/compaction-chain_test.go
index 5c08798..3603fb6 100644
--- a/core/compaction-chain_test.go
+++ b/core/compaction-chain_test.go
@@ -93,20 +93,20 @@ func (s *CompactionChainTestSuite) TestProcessBlock() {
func (s *CompactionChainTestSuite) TestPrepareWitnessAck() {
cc := s.newCompactionChain()
- blocks := s.generateBlocks(10, cc)
+ blocks := s.generateBlocks(2, cc)
prv, err := eth.NewPrivateKey()
s.Require().Nil(err)
- for _, block := range blocks {
- witnessAck, err := cc.prepareWitnessAck(prv)
- s.Require().Nil(err)
- if cc.prevBlock != nil {
- s.True(verifyWitnessSignature(
- prv.PublicKey(),
- cc.prevBlock,
- witnessAck.Signature))
- s.Equal(witnessAck.WitnessBlockHash, cc.prevBlock.Hash)
- }
- cc.prevBlock = block
+
+ block := blocks[1]
+ witnessAck, err := cc.prepareWitnessAck(block, prv)
+ s.Require().Nil(err)
+ if cc.prevBlock != nil {
+ verified, _ := verifyWitnessSignature(
+ prv.PublicKey(),
+ cc.prevBlock,
+ witnessAck.Signature)
+ s.True(verified)
+ s.Equal(witnessAck.WitnessBlockHash, block.Hash)
}
}
@@ -123,9 +123,9 @@ func (s *CompactionChainTestSuite) TestProcessWitnessAck() {
witnessAcks2 := []*types.WitnessAck{}
for _, block := range blocks {
cc.prevBlock = block
- witnessAck1, err := cc.prepareWitnessAck(prv1)
+ witnessAck1, err := cc.prepareWitnessAck(block, prv1)
s.Require().Nil(err)
- witnessAck2, err := cc.prepareWitnessAck(prv2)
+ witnessAck2, err := cc.prepareWitnessAck(block, prv2)
s.Require().Nil(err)
witnessAcks1 = append(witnessAcks1, witnessAck1)
witnessAcks2 = append(witnessAcks2, witnessAck2)
diff --git a/core/consensus.go b/core/consensus.go
index 1af66b3..33a3d7f 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -217,6 +217,8 @@ func (con *Consensus) Run() {
go con.runBA(i, tick)
}
go con.processMsg(con.network.ReceiveChan(), con.PreProcessBlock)
+ go con.processWitnessData()
+
// Reset ticker.
<-con.tickerObj.Tick()
<-con.tickerObj.Tick()
@@ -272,6 +274,7 @@ BALoop:
// RunLegacy starts running Legacy DEXON Consensus.
func (con *Consensus) RunLegacy() {
go con.processMsg(con.network.ReceiveChan(), con.ProcessBlock)
+ go con.processWitnessData()
chainID := uint32(0)
hashes := make(common.Hashes, 0, len(con.gov.GetNotarySet()))
@@ -383,6 +386,45 @@ func (con *Consensus) ProcessVote(vote *types.Vote) (err error) {
return err
}
+// processWitnessData process witness acks.
+func (con *Consensus) processWitnessData() {
+ ch := con.app.BlockProcessedChan()
+
+ for {
+ select {
+ case <-con.ctx.Done():
+ return
+ case result := <-ch:
+ block, err := con.db.Get(result.BlockHash)
+ if err != nil {
+ panic(err)
+ }
+
+ if err = con.ccModule.processWitnessResult(&block, result); err != nil {
+ panic(err)
+ }
+ if err := con.db.Update(block); err != nil {
+ panic(err)
+ }
+
+ // TODO(w): move the acking interval into governance.
+ if block.Witness.Height%5 != 0 {
+ continue
+ }
+
+ witnessAck, err := con.ccModule.prepareWitnessAck(&block, con.prvKey)
+ if err != nil {
+ panic(err)
+ }
+ err = con.ProcessWitnessAck(witnessAck)
+ if err != nil {
+ panic(err)
+ }
+ con.app.WitnessAckDeliver(witnessAck)
+ }
+ }
+}
+
// prepareVote prepares a vote.
func (con *Consensus) prepareVote(chainID uint32, vote *types.Vote) error {
return con.baModules[chainID].prepareVote(vote, con.prvKey)
@@ -482,16 +524,6 @@ func (con *Consensus) ProcessBlock(block *types.Block) (err error) {
// nonBlockingApplication and let them recycle the
// block.
}
- var witnessAck *types.WitnessAck
- witnessAck, err = con.ccModule.prepareWitnessAck(con.prvKey)
- if err != nil {
- return
- }
- err = con.ProcessWitnessAck(witnessAck)
- if err != nil {
- return
- }
- con.app.WitnessAckDeliver(witnessAck)
}
return
}
diff --git a/core/crypto.go b/core/crypto.go
index e68d7cc..111f709 100644
--- a/core/crypto.go
+++ b/core/crypto.go
@@ -35,7 +35,8 @@ func hashWitness(block *types.Block) (common.Hash, error) {
hash := crypto.Keccak256Hash(
block.Witness.ParentHash[:],
binaryTime,
- binaryHeight)
+ binaryHeight,
+ block.Witness.Data[:])
return hash, nil
}
diff --git a/core/interfaces.go b/core/interfaces.go
index 4f67e1e..8ecfb3c 100644
--- a/core/interfaces.go
+++ b/core/interfaces.go
@@ -46,6 +46,10 @@ type Application interface {
// DeliverBlock is called when a block is add to the compaction chain.
DeliverBlock(blockHash common.Hash, timestamp time.Time)
+ // BlockProcessedChan returns a channel to receive the block hashes that have
+ // finished processing by the application.
+ BlockProcessedChan() <-chan types.WitnessResult
+
// WitnessAckDeliver is called when a witness ack is created.
WitnessAckDeliver(witnessAck *types.WitnessAck)
}
diff --git a/core/nonblocking-application.go b/core/nonblocking-application.go
index 2226eb0..98f92fc 100644
--- a/core/nonblocking-application.go
+++ b/core/nonblocking-application.go
@@ -154,6 +154,12 @@ func (app *nonBlockingApplication) DeliverBlock(
app.addEvent(deliverBlockEvent{blockHash, timestamp})
}
+// BlockProcessedChan returns a channel to receive the block hashes that have
+// finished processing by the application.
+func (app *nonBlockingApplication) BlockProcessedChan() <-chan types.WitnessResult {
+ return app.app.BlockProcessedChan()
+}
+
// WitnessAckDeliver is called when a witness ack is created.
func (app *nonBlockingApplication) WitnessAckDeliver(witnessAck *types.WitnessAck) {
app.addEvent(witnessAckEvent{witnessAck})
diff --git a/core/nonblocking-application_test.go b/core/nonblocking-application_test.go
index 82b3f2c..52757d3 100644
--- a/core/nonblocking-application_test.go
+++ b/core/nonblocking-application_test.go
@@ -34,6 +34,7 @@ type slowApp struct {
totalOrderingDeliver map[common.Hash]struct{}
deliverBlock map[common.Hash]struct{}
witnessAck map[common.Hash]struct{}
+ witnessResultChan chan types.WitnessResult
}
func newSlowApp(sleep time.Duration) *slowApp {
@@ -44,6 +45,7 @@ func newSlowApp(sleep time.Duration) *slowApp {
totalOrderingDeliver: make(map[common.Hash]struct{}),
deliverBlock: make(map[common.Hash]struct{}),
witnessAck: make(map[common.Hash]struct{}),
+ witnessResultChan: make(chan types.WitnessResult),
}
}
@@ -77,6 +79,10 @@ func (app *slowApp) DeliverBlock(blockHash common.Hash, timestamp time.Time) {
app.deliverBlock[blockHash] = struct{}{}
}
+func (app *slowApp) BlockProcessedChan() <-chan types.WitnessResult {
+ return app.witnessResultChan
+}
+
func (app *slowApp) WitnessAckDeliver(witnessAck *types.WitnessAck) {
time.Sleep(app.sleep)
app.witnessAck[witnessAck.Hash] = struct{}{}
diff --git a/core/test/app.go b/core/test/app.go
index e9ef871..617bc38 100644
--- a/core/test/app.go
+++ b/core/test/app.go
@@ -89,6 +89,7 @@ type App struct {
deliveredLock sync.RWMutex
WitnessAckSequence []*types.WitnessAck
witnessAckLock sync.RWMutex
+ witnessResultChan chan types.WitnessResult
}
// NewApp constructs a TestApp instance.
@@ -99,6 +100,7 @@ func NewApp() *App {
TotalOrderedByHash: make(map[common.Hash]*AppTotalOrderRecord),
Delivered: make(map[common.Hash]*AppDeliveredRecord),
DeliverSequence: common.Hashes{},
+ witnessResultChan: make(chan types.WitnessResult),
}
}
@@ -155,6 +157,12 @@ func (app *App) DeliverBlock(blockHash common.Hash, timestamp time.Time) {
app.DeliverSequence = append(app.DeliverSequence, blockHash)
}
+// BlockProcessedChan returns a channel to receive the block hashes that have
+// finished processing by the application.
+func (app *App) BlockProcessedChan() <-chan types.WitnessResult {
+ return app.witnessResultChan
+}
+
// WitnessAckDeliver implements Application interface.
func (app *App) WitnessAckDeliver(witnessAck *types.WitnessAck) {
app.witnessAckLock.Lock()
diff --git a/core/types/witness.go b/core/types/witness.go
index 349c1ab..46aa1cc 100644
--- a/core/types/witness.go
+++ b/core/types/witness.go
@@ -50,4 +50,12 @@ type Witness struct {
ParentHash common.Hash `json:"parent_hash"`
Timestamp time.Time `json:"timestamp"`
Height uint64 `json:"height"`
+ Data []byte `json:"data"`
+}
+
+// WitnessResult is the result pass from application containing the witness
+// data.
+type WitnessResult struct {
+ BlockHash common.Hash
+ Data []byte
}
diff --git a/simulation/app.go b/simulation/app.go
index 9c5619a..d00cf19 100644
--- a/simulation/app.go
+++ b/simulation/app.go
@@ -40,6 +40,7 @@ type simApp struct {
unconfirmedBlocks map[types.NodeID]common.Hashes
blockByHash map[common.Hash]*types.Block
blockByHashMutex sync.RWMutex
+ witnessResultChan chan types.WitnessResult
}
// newSimApp returns point to a new instance of simApp.
@@ -51,6 +52,7 @@ func newSimApp(id types.NodeID, netModule *network) *simApp {
blockSeen: make(map[common.Hash]time.Time),
unconfirmedBlocks: make(map[types.NodeID]common.Hashes),
blockByHash: make(map[common.Hash]*types.Block),
+ witnessResultChan: make(chan types.WitnessResult),
}
}
@@ -193,6 +195,19 @@ func (a *simApp) DeliverBlock(blockHash common.Hash, timestamp time.Time) {
Payload: jsonPayload,
}
a.netModule.report(msg)
+
+ go func() {
+ a.witnessResultChan <- types.WitnessResult{
+ BlockHash: blockHash,
+ Data: []byte(fmt.Sprintf("Block %s", blockHash)),
+ }
+ }()
+}
+
+// BlockProcessedChan returns a channel to receive the block hashes that have
+// finished processing by the application.
+func (a *simApp) BlockProcessedChan() <-chan types.WitnessResult {
+ return a.witnessResultChan
}
// WitnessAckDeliver is called when a witness ack is created.