aboutsummaryrefslogtreecommitdiffstats
path: root/core/test/scheduler.go
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-08-15 23:10:21 +0800
committerGitHub <noreply@github.com>2018-08-15 23:10:21 +0800
commitd3107b56cbef1f05baddb64880c3e97d7eda87a4 (patch)
treeb94d05f101f61c9808ae1681a28dde4c37a5068f /core/test/scheduler.go
parent39f1d8ae529805fa410d3ed08358c568343705a5 (diff)
downloaddexon-consensus-d3107b56cbef1f05baddb64880c3e97d7eda87a4.tar
dexon-consensus-d3107b56cbef1f05baddb64880c3e97d7eda87a4.tar.gz
dexon-consensus-d3107b56cbef1f05baddb64880c3e97d7eda87a4.tar.bz2
dexon-consensus-d3107b56cbef1f05baddb64880c3e97d7eda87a4.tar.lz
dexon-consensus-d3107b56cbef1f05baddb64880c3e97d7eda87a4.tar.xz
dexon-consensus-d3107b56cbef1f05baddb64880c3e97d7eda87a4.tar.zst
dexon-consensus-d3107b56cbef1f05baddb64880c3e97d7eda87a4.zip
test: add test.Scheduler (#58)
When simulating execution of core.Consensus by passing packets through golang-channel or real-socket, we need to utilize time.Sleep and time.Now to simulate the required network/proposing latency. It's problematic when we try to test a simulation with long network latency. Instead, Scheduler would try to execute the event with minimum timestamp, thus time.Sleep is replaced with Scheduler.nextTick, and time.Now is replaced with Event.Time. Changes: - Add test.Scheduler. - Add test.Stopper interface to provide encapsulate different stop conditions for scheduler. - Add a reference implementation for test.Stopper, it will stop scheduler when all validators confirmed X blocks proposed from themselves. - Add a test scenario on core.Consensus that all validators are not byzantine.
Diffstat (limited to 'core/test/scheduler.go')
-rw-r--r--core/test/scheduler.go214
1 files changed, 214 insertions, 0 deletions
diff --git a/core/test/scheduler.go b/core/test/scheduler.go
new file mode 100644
index 0000000..023d09f
--- /dev/null
+++ b/core/test/scheduler.go
@@ -0,0 +1,214 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package test
+
+import (
+ "container/heap"
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus-core/core/types"
+)
+
+var (
+ // ErrSchedulerAlreadyStarted means callers attempt to insert some
+ // seed events after calling 'Run'.
+ ErrSchedulerAlreadyStarted = fmt.Errorf("scheduler already started")
+ // errNilEventWhenNotified is an internal error which means a worker routine
+ // can't get an event when notified.
+ errNilEventWhenNotified = fmt.Errorf("nil event when notified")
+)
+
+type schedulerHandlerRecord struct {
+ handler EventHandler
+ lock sync.Mutex
+}
+
+// Scheduler is an event scheduler.
+type Scheduler struct {
+ events eventQueue
+ eventsLock sync.Mutex
+ history []*Event
+ historyLock sync.RWMutex
+ isStarted bool
+ handlers map[types.ValidatorID]*schedulerHandlerRecord
+ handlersLock sync.RWMutex
+ eventNotification chan struct{}
+ ctx context.Context
+ cancelFunc context.CancelFunc
+ stopper Stopper
+}
+
+// NewScheduler constructs an Scheduler instance.
+func NewScheduler(stopper Stopper) *Scheduler {
+ ctx, cancel := context.WithCancel(context.Background())
+ return &Scheduler{
+ events: eventQueue{},
+ history: []*Event{},
+ handlers: make(map[types.ValidatorID]*schedulerHandlerRecord),
+ eventNotification: make(chan struct{}, 100000),
+ ctx: ctx,
+ cancelFunc: cancel,
+ stopper: stopper,
+ }
+}
+
+// Run would run the scheduler. If you need strict incrememtal execution order
+// of events based on their 'Time' field, assign 'numWorkers' as 1. If you need
+// faster execution, assign 'numWorkers' a larger number.
+func (sch *Scheduler) Run(numWorkers int) {
+ var wg sync.WaitGroup
+
+ sch.isStarted = true
+ for i := 0; i < numWorkers; i++ {
+ wg.Add(1)
+ go sch.workerRoutine(&wg)
+ }
+ // Blocks until all routines are finished.
+ wg.Wait()
+}
+
+// Seed is used to provide the scheduler some seed events.
+func (sch *Scheduler) Seed(e *Event) error {
+ sch.eventsLock.Lock()
+ defer sch.eventsLock.Unlock()
+
+ if sch.isStarted {
+ return ErrSchedulerAlreadyStarted
+ }
+ sch.addEvent(e)
+ return nil
+}
+
+// RegisterEventHandler register an event handler by providing ID of
+// corresponding validator.
+func (sch *Scheduler) RegisterEventHandler(
+ vID types.ValidatorID,
+ handler EventHandler) {
+
+ sch.handlersLock.Lock()
+ defer sch.handlersLock.Unlock()
+
+ sch.handlers[vID] = &schedulerHandlerRecord{handler: handler}
+}
+
+// nextTick would pick the oldest event from eventQueue.
+func (sch *Scheduler) nextTick() (e *Event) {
+ sch.eventsLock.Lock()
+ defer sch.eventsLock.Unlock()
+
+ if len(sch.events) == 0 {
+ return nil
+ }
+ return heap.Pop(&sch.events).(*Event)
+}
+
+// addEvent is an helper function to add events into eventQueue sorted by
+// their 'Time' field.
+func (sch *Scheduler) addEvent(e *Event) {
+ // Perform sorted insertion.
+ heap.Push(&sch.events, e)
+ sch.eventNotification <- struct{}{}
+}
+
+// CloneExecutionHistory returns a cloned event execution history.
+func (sch *Scheduler) CloneExecutionHistory() (cloned []*Event) {
+ sch.historyLock.RLock()
+ defer sch.historyLock.RUnlock()
+
+ cloned = make([]*Event, len(sch.history))
+ copy(cloned, sch.history)
+ return
+}
+
+// workerRoutine is the mainloop when handling events.
+func (sch *Scheduler) workerRoutine(wg *sync.WaitGroup) {
+ defer wg.Done()
+
+ handleEvent := func(e *Event) {
+ // Find correspond handler record.
+ hRec := func(vID types.ValidatorID) *schedulerHandlerRecord {
+ sch.handlersLock.RLock()
+ defer sch.handlersLock.RUnlock()
+
+ return sch.handlers[vID]
+ }(e.ValidatorID)
+
+ newEvents := func() []*Event {
+ // This lock makes sure there would be no concurrent access
+ // against each handler.
+ hRec.lock.Lock()
+ defer hRec.lock.Unlock()
+
+ // Handle incoming event, and record its execution time.
+ beforeExecution := time.Now().UTC()
+ newEvents := hRec.handler.Handle(e)
+ e.ExecInterval = time.Now().UTC().Sub(beforeExecution)
+ // It's safe to check status of that validator under 'hRec.lock'.
+ if sch.stopper.ShouldStop(e.ValidatorID) {
+ sch.cancelFunc()
+ }
+ // Include the execution interval of parent event to the expected time
+ // to execute child events.
+ for _, newEvent := range newEvents {
+ newEvent.ParentTime = e.Time
+ newEvent.Time = newEvent.Time.Add(e.ExecInterval)
+ }
+ return newEvents
+ }()
+ // Record executed events as history.
+ func() {
+ sch.historyLock.Lock()
+ defer sch.historyLock.Unlock()
+
+ sch.history = append(sch.history, e)
+ }()
+ // Add derivated events back to event queue.
+ func() {
+ sch.eventsLock.Lock()
+ defer sch.eventsLock.Unlock()
+
+ for _, newEvent := range newEvents {
+ sch.addEvent(newEvent)
+ }
+ }()
+ }
+
+Done:
+ for {
+ // We favor scheduler-shutdown signal than other events.
+ select {
+ case <-sch.ctx.Done():
+ break Done
+ default:
+ }
+ // Block until new event arrival or scheduler shutdown.
+ select {
+ case <-sch.eventNotification:
+ e := sch.nextTick()
+ if e == nil {
+ panic(errNilEventWhenNotified)
+ }
+ handleEvent(e)
+ case <-sch.ctx.Done():
+ break Done
+ }
+ }
+}