aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2018-08-15 17:30:46 +0800
committermissionliao <38416648+missionliao@users.noreply.github.com>2018-08-15 17:30:46 +0800
commit39f1d8ae529805fa410d3ed08358c568343705a5 (patch)
treee4f1e44495f5d48df38b7507a01a8f32372eafde
parent3a9b545b0f33435c277fcede2251e4b5ae800d40 (diff)
downloaddexon-consensus-39f1d8ae529805fa410d3ed08358c568343705a5.tar
dexon-consensus-39f1d8ae529805fa410d3ed08358c568343705a5.tar.gz
dexon-consensus-39f1d8ae529805fa410d3ed08358c568343705a5.tar.bz2
dexon-consensus-39f1d8ae529805fa410d3ed08358c568343705a5.tar.lz
dexon-consensus-39f1d8ae529805fa410d3ed08358c568343705a5.tar.xz
dexon-consensus-39f1d8ae529805fa410d3ed08358c568343705a5.tar.zst
dexon-consensus-39f1d8ae529805fa410d3ed08358c568343705a5.zip
core: Add a nonBlockingApplication. (#62)
-rw-r--r--core/consensus.go2
-rw-r--r--core/consensus_test.go21
-rw-r--r--core/nonblocking-application.go126
-rw-r--r--core/nonblocking-application_test.go97
4 files changed, 238 insertions, 8 deletions
diff --git a/core/consensus.go b/core/consensus.go
index 9b95e56..f591db5 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -87,7 +87,7 @@ func NewConsensus(
rbModule: rb,
toModule: to,
ctModule: newConsensusTimestamp(),
- app: app,
+ app: newNonBlockingApplication(app),
gov: gov,
db: db,
prvKey: prv,
diff --git a/core/consensus_test.go b/core/consensus_test.go
index 522fcbb..2887d66 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -47,7 +47,7 @@ func (s *ConsensusTestSuite) prepareGenesisBlock(
}
func (s *ConsensusTestSuite) prepareConsensus(
- gov *test.Governance, vID types.ValidatorID) (*test.App, *Consensus) {
+ gov *test.Governance, vID types.ValidatorID) (*Application, *Consensus) {
app := test.NewApp()
db, err := blockdb.NewMemBackedBlockDB()
@@ -55,7 +55,7 @@ func (s *ConsensusTestSuite) prepareConsensus(
prv, exist := gov.PrivateKeys[vID]
s.Require().True(exist)
con := NewConsensus(app, gov, db, prv, eth.SigToPub)
- return app, con
+ return &con.app, con
}
func (s *ConsensusTestSuite) TestSimpleDeliverBlock() {
@@ -84,13 +84,13 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() {
// Setup core.Consensus and test.App.
objs := map[types.ValidatorID]*struct {
- app *test.App
+ app *Application
con *Consensus
}{}
for _, vID := range validators {
app, con := s.prepareConsensus(gov, vID)
objs[vID] = &struct {
- app *test.App
+ app *Application
con *Consensus
}{app, con}
}
@@ -246,7 +246,14 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() {
req.Equal(t, app.Delivered[b11.Hash])
}
for _, obj := range objs {
- verify(obj.app)
+ app := *obj.app
+ if nbapp, ok := app.(*nonBlockingApplication); ok {
+ nbapp.wait()
+ app = nbapp.app
+ }
+ testApp, ok := app.(*test.App)
+ s.Require().True(ok)
+ verify(testApp)
}
}
@@ -269,13 +276,13 @@ func (s *ConsensusTestSuite) TestPrepareBlock() {
}
// Setup core.Consensus and test.App.
objs := map[types.ValidatorID]*struct {
- app *test.App
+ app *Application
con *Consensus
}{}
for _, vID := range validators {
app, con := s.prepareConsensus(gov, vID)
objs[vID] = &struct {
- app *test.App
+ app *Application
con *Consensus
}{app, con}
}
diff --git a/core/nonblocking-application.go b/core/nonblocking-application.go
new file mode 100644
index 0000000..4c06ab9
--- /dev/null
+++ b/core/nonblocking-application.go
@@ -0,0 +1,126 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus-core/common"
+)
+
+type stronglyAckedEvent struct {
+ blockHash common.Hash
+}
+
+type totalOrderingDeliverEvent struct {
+ blockHashes common.Hashes
+ early bool
+}
+
+type deliverBlockEvent struct {
+ blockHash common.Hash
+ timestamp time.Time
+}
+
+// nonBlockingApplication implements Application and is a decorator for
+// Application that makes the methods to be non-blocking.
+type nonBlockingApplication struct {
+ app Application
+ eventChan chan interface{}
+ events []interface{}
+ eventsChange *sync.Cond
+ running sync.WaitGroup
+}
+
+func newNonBlockingApplication(app Application) *nonBlockingApplication {
+ nonBlockingApp := &nonBlockingApplication{
+ app: app,
+ eventChan: make(chan interface{}, 6),
+ events: make([]interface{}, 0, 100),
+ eventsChange: sync.NewCond(&sync.Mutex{}),
+ }
+ go nonBlockingApp.run()
+ return nonBlockingApp
+}
+
+func (app *nonBlockingApplication) addEvent(event interface{}) {
+ app.eventsChange.L.Lock()
+ defer app.eventsChange.L.Unlock()
+ app.events = append(app.events, event)
+ app.eventsChange.Broadcast()
+}
+
+func (app *nonBlockingApplication) run() {
+ // This go routine consume the first event from events and call the
+ // corresponding method of app.
+ for {
+ var event interface{}
+ func() {
+ app.eventsChange.L.Lock()
+ defer app.eventsChange.L.Unlock()
+ for len(app.events) == 0 {
+ app.eventsChange.Wait()
+ }
+ event = app.events[0]
+ app.events = app.events[1:]
+ app.running.Add(1)
+ }()
+ switch e := event.(type) {
+ case stronglyAckedEvent:
+ app.app.StronglyAcked(e.blockHash)
+ case totalOrderingDeliverEvent:
+ app.app.TotalOrderingDeliver(e.blockHashes, e.early)
+ case deliverBlockEvent:
+ app.app.DeliverBlock(e.blockHash, e.timestamp)
+ default:
+ fmt.Printf("Unknown event %v.", e)
+ }
+ app.running.Done()
+ app.eventsChange.Broadcast()
+ }
+}
+
+// wait will wait for all event in events finishes.
+func (app *nonBlockingApplication) wait() {
+ app.eventsChange.L.Lock()
+ defer app.eventsChange.L.Unlock()
+ for len(app.events) > 0 {
+ app.eventsChange.Wait()
+ }
+ app.running.Wait()
+}
+
+// StronglyAcked is called when a block is strongly acked.
+func (app *nonBlockingApplication) StronglyAcked(blockHash common.Hash) {
+ app.addEvent(stronglyAckedEvent{blockHash})
+}
+
+// TotalOrderingDeliver is called when the total ordering algorithm deliver
+// a set of block.
+func (app *nonBlockingApplication) TotalOrderingDeliver(
+ blockHashes common.Hashes, early bool) {
+ app.addEvent(totalOrderingDeliverEvent{blockHashes, early})
+}
+
+// DeliverBlock is called when a block is add to the compaction chain.
+func (app *nonBlockingApplication) DeliverBlock(
+ blockHash common.Hash, timestamp time.Time) {
+ app.addEvent(deliverBlockEvent{blockHash, timestamp})
+}
diff --git a/core/nonblocking-application_test.go b/core/nonblocking-application_test.go
new file mode 100644
index 0000000..070eae3
--- /dev/null
+++ b/core/nonblocking-application_test.go
@@ -0,0 +1,97 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/suite"
+
+ "github.com/dexon-foundation/dexon-consensus-core/common"
+)
+
+type slowApp struct {
+ sleep time.Duration
+ stronglyAcked map[common.Hash]struct{}
+ totalOrderingDeliver map[common.Hash]struct{}
+ deliverBlock map[common.Hash]struct{}
+}
+
+func newSlowApp(sleep time.Duration) *slowApp {
+ return &slowApp{
+ sleep: sleep,
+ stronglyAcked: make(map[common.Hash]struct{}),
+ totalOrderingDeliver: make(map[common.Hash]struct{}),
+ deliverBlock: make(map[common.Hash]struct{}),
+ }
+}
+
+func (app *slowApp) StronglyAcked(blockHash common.Hash) {
+ time.Sleep(app.sleep)
+ app.stronglyAcked[blockHash] = struct{}{}
+}
+
+func (app *slowApp) TotalOrderingDeliver(blockHashes common.Hashes, early bool) {
+ time.Sleep(app.sleep)
+ for _, hash := range blockHashes {
+ app.totalOrderingDeliver[hash] = struct{}{}
+ }
+}
+
+func (app *slowApp) DeliverBlock(blockHash common.Hash, timestamp time.Time) {
+ time.Sleep(app.sleep)
+ app.deliverBlock[blockHash] = struct{}{}
+}
+
+type NonBlockingAppTestSuite struct {
+ suite.Suite
+}
+
+func (s *NonBlockingAppTestSuite) TestNonBlockingApplication() {
+ sleep := 50 * time.Millisecond
+ app := newSlowApp(sleep)
+ nbapp := newNonBlockingApplication(app)
+ hashes := make(common.Hashes, 10)
+ for idx := range hashes {
+ hashes[idx] = common.NewRandomHash()
+ }
+ now := time.Now().UTC()
+ shouldFinish := now.Add(100 * time.Millisecond)
+
+ // Start doing some 'heavy' job.
+ for _, hash := range hashes {
+ nbapp.StronglyAcked(hash)
+ nbapp.DeliverBlock(hash, time.Now().UTC())
+ }
+ nbapp.TotalOrderingDeliver(hashes, true)
+
+ // nonBlockingApplication should be non-blocking.
+ s.True(shouldFinish.After(time.Now().UTC()))
+
+ nbapp.wait()
+ for _, hash := range hashes {
+ s.Contains(app.stronglyAcked, hash)
+ s.Contains(app.totalOrderingDeliver, hash)
+ s.Contains(app.deliverBlock, hash)
+ }
+}
+
+func TestNonBlockingApplication(t *testing.T) {
+ suite.Run(t, new(NonBlockingAppTestSuite))
+}