diff options
Diffstat (limited to 'core/test/scheduler.go')
-rw-r--r-- | core/test/scheduler.go | 214 |
1 files changed, 214 insertions, 0 deletions
diff --git a/core/test/scheduler.go b/core/test/scheduler.go new file mode 100644 index 0000000..023d09f --- /dev/null +++ b/core/test/scheduler.go @@ -0,0 +1,214 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package test + +import ( + "container/heap" + "context" + "fmt" + "sync" + "time" + + "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +var ( + // ErrSchedulerAlreadyStarted means callers attempt to insert some + // seed events after calling 'Run'. + ErrSchedulerAlreadyStarted = fmt.Errorf("scheduler already started") + // errNilEventWhenNotified is an internal error which means a worker routine + // can't get an event when notified. + errNilEventWhenNotified = fmt.Errorf("nil event when notified") +) + +type schedulerHandlerRecord struct { + handler EventHandler + lock sync.Mutex +} + +// Scheduler is an event scheduler. +type Scheduler struct { + events eventQueue + eventsLock sync.Mutex + history []*Event + historyLock sync.RWMutex + isStarted bool + handlers map[types.ValidatorID]*schedulerHandlerRecord + handlersLock sync.RWMutex + eventNotification chan struct{} + ctx context.Context + cancelFunc context.CancelFunc + stopper Stopper +} + +// NewScheduler constructs an Scheduler instance. +func NewScheduler(stopper Stopper) *Scheduler { + ctx, cancel := context.WithCancel(context.Background()) + return &Scheduler{ + events: eventQueue{}, + history: []*Event{}, + handlers: make(map[types.ValidatorID]*schedulerHandlerRecord), + eventNotification: make(chan struct{}, 100000), + ctx: ctx, + cancelFunc: cancel, + stopper: stopper, + } +} + +// Run would run the scheduler. If you need strict incrememtal execution order +// of events based on their 'Time' field, assign 'numWorkers' as 1. If you need +// faster execution, assign 'numWorkers' a larger number. +func (sch *Scheduler) Run(numWorkers int) { + var wg sync.WaitGroup + + sch.isStarted = true + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go sch.workerRoutine(&wg) + } + // Blocks until all routines are finished. + wg.Wait() +} + +// Seed is used to provide the scheduler some seed events. +func (sch *Scheduler) Seed(e *Event) error { + sch.eventsLock.Lock() + defer sch.eventsLock.Unlock() + + if sch.isStarted { + return ErrSchedulerAlreadyStarted + } + sch.addEvent(e) + return nil +} + +// RegisterEventHandler register an event handler by providing ID of +// corresponding validator. +func (sch *Scheduler) RegisterEventHandler( + vID types.ValidatorID, + handler EventHandler) { + + sch.handlersLock.Lock() + defer sch.handlersLock.Unlock() + + sch.handlers[vID] = &schedulerHandlerRecord{handler: handler} +} + +// nextTick would pick the oldest event from eventQueue. +func (sch *Scheduler) nextTick() (e *Event) { + sch.eventsLock.Lock() + defer sch.eventsLock.Unlock() + + if len(sch.events) == 0 { + return nil + } + return heap.Pop(&sch.events).(*Event) +} + +// addEvent is an helper function to add events into eventQueue sorted by +// their 'Time' field. +func (sch *Scheduler) addEvent(e *Event) { + // Perform sorted insertion. + heap.Push(&sch.events, e) + sch.eventNotification <- struct{}{} +} + +// CloneExecutionHistory returns a cloned event execution history. +func (sch *Scheduler) CloneExecutionHistory() (cloned []*Event) { + sch.historyLock.RLock() + defer sch.historyLock.RUnlock() + + cloned = make([]*Event, len(sch.history)) + copy(cloned, sch.history) + return +} + +// workerRoutine is the mainloop when handling events. +func (sch *Scheduler) workerRoutine(wg *sync.WaitGroup) { + defer wg.Done() + + handleEvent := func(e *Event) { + // Find correspond handler record. + hRec := func(vID types.ValidatorID) *schedulerHandlerRecord { + sch.handlersLock.RLock() + defer sch.handlersLock.RUnlock() + + return sch.handlers[vID] + }(e.ValidatorID) + + newEvents := func() []*Event { + // This lock makes sure there would be no concurrent access + // against each handler. + hRec.lock.Lock() + defer hRec.lock.Unlock() + + // Handle incoming event, and record its execution time. + beforeExecution := time.Now().UTC() + newEvents := hRec.handler.Handle(e) + e.ExecInterval = time.Now().UTC().Sub(beforeExecution) + // It's safe to check status of that validator under 'hRec.lock'. + if sch.stopper.ShouldStop(e.ValidatorID) { + sch.cancelFunc() + } + // Include the execution interval of parent event to the expected time + // to execute child events. + for _, newEvent := range newEvents { + newEvent.ParentTime = e.Time + newEvent.Time = newEvent.Time.Add(e.ExecInterval) + } + return newEvents + }() + // Record executed events as history. + func() { + sch.historyLock.Lock() + defer sch.historyLock.Unlock() + + sch.history = append(sch.history, e) + }() + // Add derivated events back to event queue. + func() { + sch.eventsLock.Lock() + defer sch.eventsLock.Unlock() + + for _, newEvent := range newEvents { + sch.addEvent(newEvent) + } + }() + } + +Done: + for { + // We favor scheduler-shutdown signal than other events. + select { + case <-sch.ctx.Done(): + break Done + default: + } + // Block until new event arrival or scheduler shutdown. + select { + case <-sch.eventNotification: + e := sch.nextTick() + if e == nil { + panic(errNilEventWhenNotified) + } + handleEvent(e) + case <-sch.ctx.Done(): + break Done + } + } +} |