From 2c816b5d636b8f7decd234582470a3d4c6b4a93a Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Tue, 21 Aug 2018 16:43:37 +0800 Subject: simulation: add simulation with scheduler (#71) - Add new field in test.Event: HistoryIndex HistoryIndex allow us to access them by their position in event history. - Record local time in test.App when receiving events. - Add statisitics module for slices of test.Event. - add new command line utility *dexcon-simulation-with-scheduler to verify the execution time of core.Consensus. --- core/consensus_test.go | 2 +- core/test/app.go | 74 +++++++++++++++++++++++++++++++------------- core/test/app_test.go | 10 +++--- core/test/scheduler-event.go | 15 +++++---- core/test/scheduler.go | 13 ++++---- core/test/scheduler_test.go | 3 +- 6 files changed, 76 insertions(+), 41 deletions(-) (limited to 'core') diff --git a/core/consensus_test.go b/core/consensus_test.go index 2887d66..cd7ff02 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -243,7 +243,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { // its ConsensusTimestamp is not interpolated. t, err := getMedianTime(b11) req.Nil(err) - req.Equal(t, app.Delivered[b11.Hash]) + req.Equal(t, app.Delivered[b11.Hash].ConsensusTime) } for _, obj := range objs { app := *obj.app diff --git a/core/test/app.go b/core/test/app.go index 0242ca5..ddce31a 100644 --- a/core/test/app.go +++ b/core/test/app.go @@ -52,29 +52,47 @@ var ( "mismatch total ordering and delivered sequence") ) -type totalOrderDeliver struct { +// AppAckedRecord caches information when this application received +// a strongly-acked notification. +type AppAckedRecord struct { + When time.Time +} + +// AppTotalOrderRecord caches information when this application received +// a total-ordering deliver notification. +type AppTotalOrderRecord struct { BlockHashes common.Hashes Early bool + When time.Time +} + +// AppDeliveredRecord caches information when this application received +// a block delivered notification. +type AppDeliveredRecord struct { + ConsensusTime time.Time + When time.Time } // App implements Application interface for testing purpose. type App struct { - Acked map[common.Hash]struct{} - ackedLock sync.RWMutex - TotalOrdered []*totalOrderDeliver - totalOrderedLock sync.RWMutex - Delivered map[common.Hash]time.Time - DeliverSequence common.Hashes - deliveredLock sync.RWMutex + Acked map[common.Hash]*AppAckedRecord + ackedLock sync.RWMutex + TotalOrdered []*AppTotalOrderRecord + TotalOrderedByHash map[common.Hash]*AppTotalOrderRecord + totalOrderedLock sync.RWMutex + Delivered map[common.Hash]*AppDeliveredRecord + DeliverSequence common.Hashes + deliveredLock sync.RWMutex } // NewApp constructs a TestApp instance. func NewApp() *App { return &App{ - Acked: make(map[common.Hash]struct{}), - TotalOrdered: []*totalOrderDeliver{}, - Delivered: make(map[common.Hash]time.Time), - DeliverSequence: common.Hashes{}, + Acked: make(map[common.Hash]*AppAckedRecord), + TotalOrdered: []*AppTotalOrderRecord{}, + TotalOrderedByHash: make(map[common.Hash]*AppTotalOrderRecord), + Delivered: make(map[common.Hash]*AppDeliveredRecord), + DeliverSequence: common.Hashes{}, } } @@ -83,7 +101,7 @@ func (app *App) StronglyAcked(blockHash common.Hash) { app.ackedLock.Lock() defer app.ackedLock.Unlock() - app.Acked[blockHash] = struct{}{} + app.Acked[blockHash] = &AppAckedRecord{When: time.Now().UTC()} } // TotalOrderingDeliver implements Application interface. @@ -91,10 +109,18 @@ func (app *App) TotalOrderingDeliver(blockHashes common.Hashes, early bool) { app.totalOrderedLock.Lock() defer app.totalOrderedLock.Unlock() - app.TotalOrdered = append(app.TotalOrdered, &totalOrderDeliver{ + rec := &AppTotalOrderRecord{ BlockHashes: blockHashes, Early: early, - }) + When: time.Now().UTC(), + } + app.TotalOrdered = append(app.TotalOrdered, rec) + for _, h := range blockHashes { + if _, exists := app.TotalOrderedByHash[h]; exists { + panic(fmt.Errorf("deliver duplicated blocks from total ordering")) + } + app.TotalOrderedByHash[h] = rec + } } // DeliverBlock implements Application interface. @@ -102,7 +128,10 @@ func (app *App) DeliverBlock(blockHash common.Hash, timestamp time.Time) { app.deliveredLock.Lock() defer app.deliveredLock.Unlock() - app.Delivered[blockHash] = timestamp + app.Delivered[blockHash] = &AppDeliveredRecord{ + ConsensusTime: timestamp, + When: time.Now().UTC(), + } app.DeliverSequence = append(app.DeliverSequence, blockHash) } @@ -132,7 +161,7 @@ func (app *App) Compare(other *App) error { if hOther != h { return ErrMismatchBlockHashSequence } - if app.Delivered[h] != other.Delivered[h] { + if app.Delivered[h].ConsensusTime != other.Delivered[h].ConsensusTime { return ErrMismatchConsensusTime } } @@ -160,16 +189,17 @@ func (app *App) Verify() error { if _, acked := app.Acked[h]; !acked { return ErrDeliveredBlockNotAcked } - t, exists := app.Delivered[h] + rec, exists := app.Delivered[h] if !exists { return ErrApplicationIntegrityFailed } // Make sure the consensus time is incremental. - ok := prevTime.Before(t) || prevTime.Equal(t) + ok := prevTime.Before(rec.ConsensusTime) || + prevTime.Equal(rec.ConsensusTime) if !ok { return ErrConsensusTimestampOutOfOrder } - prevTime = t + prevTime = rec.ConsensusTime } // Make sure the order of delivered and total ordering are the same by // comparing the concated string. @@ -178,8 +208,8 @@ func (app *App) Verify() error { hashSequenceIdx := 0 Loop: - for _, totalOrderDeliver := range app.TotalOrdered { - for _, h := range totalOrderDeliver.BlockHashes { + for _, rec := range app.TotalOrdered { + for _, h := range rec.BlockHashes { if hashSequenceIdx >= len(app.DeliverSequence) { break Loop } diff --git a/core/test/app_test.go b/core/test/app_test.go index 285773e..f4c4a74 100644 --- a/core/test/app_test.go +++ b/core/test/app_test.go @@ -11,18 +11,18 @@ import ( type AppTestSuite struct { suite.Suite - to1, to2, to3 *totalOrderDeliver + to1, to2, to3 *AppTotalOrderRecord } func (s *AppTestSuite) SetupSuite() { - s.to1 = &totalOrderDeliver{ + s.to1 = &AppTotalOrderRecord{ BlockHashes: common.Hashes{ common.NewRandomHash(), common.NewRandomHash(), }, Early: false, } - s.to2 = &totalOrderDeliver{ + s.to2 = &AppTotalOrderRecord{ BlockHashes: common.Hashes{ common.NewRandomHash(), common.NewRandomHash(), @@ -30,7 +30,7 @@ func (s *AppTestSuite) SetupSuite() { }, Early: false, } - s.to3 = &totalOrderDeliver{ + s.to3 = &AppTotalOrderRecord{ BlockHashes: common.Hashes{ common.NewRandomHash(), }, @@ -39,7 +39,7 @@ func (s *AppTestSuite) SetupSuite() { } func (s *AppTestSuite) setupAppByTotalOrderDeliver( - app *App, to *totalOrderDeliver) { + app *App, to *AppTotalOrderRecord) { for _, h := range to.BlockHashes { app.StronglyAcked(h) diff --git a/core/test/scheduler-event.go b/core/test/scheduler-event.go index 60411b4..85968c5 100644 --- a/core/test/scheduler-event.go +++ b/core/test/scheduler-event.go @@ -25,6 +25,8 @@ import ( // Event defines a scheduler event. type Event struct { + // HistoryIndex is the index of this event in history. + HistoryIndex int // ValidatorID is the ID of handler that this event deginated to. ValidatorID types.ValidatorID // Time is the expected execution time of this event. @@ -33,9 +35,8 @@ type Event struct { ExecError error // Payload is application specific data carried by this event. Payload interface{} - // ParentTime is the time of parent event, this field is essential when - // we need to calculate the latency the handler assigned. - ParentTime time.Time + // ParentHistoryIndex is the index of parent event in history. + ParentHistoryIndex int // ExecInterval is the latency to execute this event ExecInterval time.Duration } @@ -69,8 +70,10 @@ func NewEvent( vID types.ValidatorID, when time.Time, payload interface{}) *Event { return &Event{ - ValidatorID: vID, - Time: when, - Payload: payload, + HistoryIndex: -1, + ParentHistoryIndex: -1, + ValidatorID: vID, + Time: when, + Payload: payload, } } diff --git a/core/test/scheduler.go b/core/test/scheduler.go index 023d09f..6a3a40a 100644 --- a/core/test/scheduler.go +++ b/core/test/scheduler.go @@ -165,12 +165,6 @@ func (sch *Scheduler) workerRoutine(wg *sync.WaitGroup) { if sch.stopper.ShouldStop(e.ValidatorID) { sch.cancelFunc() } - // Include the execution interval of parent event to the expected time - // to execute child events. - for _, newEvent := range newEvents { - newEvent.ParentTime = e.Time - newEvent.Time = newEvent.Time.Add(e.ExecInterval) - } return newEvents }() // Record executed events as history. @@ -178,8 +172,15 @@ func (sch *Scheduler) workerRoutine(wg *sync.WaitGroup) { sch.historyLock.Lock() defer sch.historyLock.Unlock() + e.HistoryIndex = len(sch.history) sch.history = append(sch.history, e) }() + // Include the execution interval of parent event to the expected time + // to execute child events. + for _, newEvent := range newEvents { + newEvent.ParentHistoryIndex = e.HistoryIndex + newEvent.Time = newEvent.Time.Add(e.ExecInterval) + } // Add derivated events back to event queue. func() { sch.eventsLock.Lock() diff --git a/core/test/scheduler_test.go b/core/test/scheduler_test.go index c67240f..5aef36e 100644 --- a/core/test/scheduler_test.go +++ b/core/test/scheduler_test.go @@ -165,7 +165,8 @@ func (s *SchedulerTestSuite) TestChildEvent() { req.True(e.Time.Sub(curEvent.Time) >= 1300*time.Millisecond) // Make sure ParentTime field is set and is equal to parent event's // time. - req.Equal(e.ParentTime, curEvent.Time) + req.NotEqual(-1, e.ParentHistoryIndex) + req.Equal(e.ParentHistoryIndex, curEvent.HistoryIndex) curEvent = e } } -- cgit v1.2.3