diff options
-rw-r--r-- | core/consensus.go | 2 | ||||
-rw-r--r-- | core/consensus_test.go | 21 | ||||
-rw-r--r-- | core/nonblocking-application.go | 126 | ||||
-rw-r--r-- | core/nonblocking-application_test.go | 97 |
4 files changed, 238 insertions, 8 deletions
diff --git a/core/consensus.go b/core/consensus.go index 9b95e56..f591db5 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -87,7 +87,7 @@ func NewConsensus( rbModule: rb, toModule: to, ctModule: newConsensusTimestamp(), - app: app, + app: newNonBlockingApplication(app), gov: gov, db: db, prvKey: prv, diff --git a/core/consensus_test.go b/core/consensus_test.go index 522fcbb..2887d66 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -47,7 +47,7 @@ func (s *ConsensusTestSuite) prepareGenesisBlock( } func (s *ConsensusTestSuite) prepareConsensus( - gov *test.Governance, vID types.ValidatorID) (*test.App, *Consensus) { + gov *test.Governance, vID types.ValidatorID) (*Application, *Consensus) { app := test.NewApp() db, err := blockdb.NewMemBackedBlockDB() @@ -55,7 +55,7 @@ func (s *ConsensusTestSuite) prepareConsensus( prv, exist := gov.PrivateKeys[vID] s.Require().True(exist) con := NewConsensus(app, gov, db, prv, eth.SigToPub) - return app, con + return &con.app, con } func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { @@ -84,13 +84,13 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { // Setup core.Consensus and test.App. objs := map[types.ValidatorID]*struct { - app *test.App + app *Application con *Consensus }{} for _, vID := range validators { app, con := s.prepareConsensus(gov, vID) objs[vID] = &struct { - app *test.App + app *Application con *Consensus }{app, con} } @@ -246,7 +246,14 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { req.Equal(t, app.Delivered[b11.Hash]) } for _, obj := range objs { - verify(obj.app) + app := *obj.app + if nbapp, ok := app.(*nonBlockingApplication); ok { + nbapp.wait() + app = nbapp.app + } + testApp, ok := app.(*test.App) + s.Require().True(ok) + verify(testApp) } } @@ -269,13 +276,13 @@ func (s *ConsensusTestSuite) TestPrepareBlock() { } // Setup core.Consensus and test.App. objs := map[types.ValidatorID]*struct { - app *test.App + app *Application con *Consensus }{} for _, vID := range validators { app, con := s.prepareConsensus(gov, vID) objs[vID] = &struct { - app *test.App + app *Application con *Consensus }{app, con} } diff --git a/core/nonblocking-application.go b/core/nonblocking-application.go new file mode 100644 index 0000000..4c06ab9 --- /dev/null +++ b/core/nonblocking-application.go @@ -0,0 +1,126 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package core + +import ( + "fmt" + "sync" + "time" + + "github.com/dexon-foundation/dexon-consensus-core/common" +) + +type stronglyAckedEvent struct { + blockHash common.Hash +} + +type totalOrderingDeliverEvent struct { + blockHashes common.Hashes + early bool +} + +type deliverBlockEvent struct { + blockHash common.Hash + timestamp time.Time +} + +// nonBlockingApplication implements Application and is a decorator for +// Application that makes the methods to be non-blocking. +type nonBlockingApplication struct { + app Application + eventChan chan interface{} + events []interface{} + eventsChange *sync.Cond + running sync.WaitGroup +} + +func newNonBlockingApplication(app Application) *nonBlockingApplication { + nonBlockingApp := &nonBlockingApplication{ + app: app, + eventChan: make(chan interface{}, 6), + events: make([]interface{}, 0, 100), + eventsChange: sync.NewCond(&sync.Mutex{}), + } + go nonBlockingApp.run() + return nonBlockingApp +} + +func (app *nonBlockingApplication) addEvent(event interface{}) { + app.eventsChange.L.Lock() + defer app.eventsChange.L.Unlock() + app.events = append(app.events, event) + app.eventsChange.Broadcast() +} + +func (app *nonBlockingApplication) run() { + // This go routine consume the first event from events and call the + // corresponding method of app. + for { + var event interface{} + func() { + app.eventsChange.L.Lock() + defer app.eventsChange.L.Unlock() + for len(app.events) == 0 { + app.eventsChange.Wait() + } + event = app.events[0] + app.events = app.events[1:] + app.running.Add(1) + }() + switch e := event.(type) { + case stronglyAckedEvent: + app.app.StronglyAcked(e.blockHash) + case totalOrderingDeliverEvent: + app.app.TotalOrderingDeliver(e.blockHashes, e.early) + case deliverBlockEvent: + app.app.DeliverBlock(e.blockHash, e.timestamp) + default: + fmt.Printf("Unknown event %v.", e) + } + app.running.Done() + app.eventsChange.Broadcast() + } +} + +// wait will wait for all event in events finishes. +func (app *nonBlockingApplication) wait() { + app.eventsChange.L.Lock() + defer app.eventsChange.L.Unlock() + for len(app.events) > 0 { + app.eventsChange.Wait() + } + app.running.Wait() +} + +// StronglyAcked is called when a block is strongly acked. +func (app *nonBlockingApplication) StronglyAcked(blockHash common.Hash) { + app.addEvent(stronglyAckedEvent{blockHash}) +} + +// TotalOrderingDeliver is called when the total ordering algorithm deliver +// a set of block. +func (app *nonBlockingApplication) TotalOrderingDeliver( + blockHashes common.Hashes, early bool) { + app.addEvent(totalOrderingDeliverEvent{blockHashes, early}) +} + +// DeliverBlock is called when a block is add to the compaction chain. +func (app *nonBlockingApplication) DeliverBlock( + blockHash common.Hash, timestamp time.Time) { + app.addEvent(deliverBlockEvent{blockHash, timestamp}) +} diff --git a/core/nonblocking-application_test.go b/core/nonblocking-application_test.go new file mode 100644 index 0000000..070eae3 --- /dev/null +++ b/core/nonblocking-application_test.go @@ -0,0 +1,97 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package core + +import ( + "testing" + "time" + + "github.com/stretchr/testify/suite" + + "github.com/dexon-foundation/dexon-consensus-core/common" +) + +type slowApp struct { + sleep time.Duration + stronglyAcked map[common.Hash]struct{} + totalOrderingDeliver map[common.Hash]struct{} + deliverBlock map[common.Hash]struct{} +} + +func newSlowApp(sleep time.Duration) *slowApp { + return &slowApp{ + sleep: sleep, + stronglyAcked: make(map[common.Hash]struct{}), + totalOrderingDeliver: make(map[common.Hash]struct{}), + deliverBlock: make(map[common.Hash]struct{}), + } +} + +func (app *slowApp) StronglyAcked(blockHash common.Hash) { + time.Sleep(app.sleep) + app.stronglyAcked[blockHash] = struct{}{} +} + +func (app *slowApp) TotalOrderingDeliver(blockHashes common.Hashes, early bool) { + time.Sleep(app.sleep) + for _, hash := range blockHashes { + app.totalOrderingDeliver[hash] = struct{}{} + } +} + +func (app *slowApp) DeliverBlock(blockHash common.Hash, timestamp time.Time) { + time.Sleep(app.sleep) + app.deliverBlock[blockHash] = struct{}{} +} + +type NonBlockingAppTestSuite struct { + suite.Suite +} + +func (s *NonBlockingAppTestSuite) TestNonBlockingApplication() { + sleep := 50 * time.Millisecond + app := newSlowApp(sleep) + nbapp := newNonBlockingApplication(app) + hashes := make(common.Hashes, 10) + for idx := range hashes { + hashes[idx] = common.NewRandomHash() + } + now := time.Now().UTC() + shouldFinish := now.Add(100 * time.Millisecond) + + // Start doing some 'heavy' job. + for _, hash := range hashes { + nbapp.StronglyAcked(hash) + nbapp.DeliverBlock(hash, time.Now().UTC()) + } + nbapp.TotalOrderingDeliver(hashes, true) + + // nonBlockingApplication should be non-blocking. + s.True(shouldFinish.After(time.Now().UTC())) + + nbapp.wait() + for _, hash := range hashes { + s.Contains(app.stronglyAcked, hash) + s.Contains(app.totalOrderingDeliver, hash) + s.Contains(app.deliverBlock, hash) + } +} + +func TestNonBlockingApplication(t *testing.T) { + suite.Run(t, new(NonBlockingAppTestSuite)) +} |