// Copyright 2018 The dexon-consensus Authors // This file is part of the dexon-consensus library. // // The dexon-consensus library is free software: you can redistribute it // and/or modify it under the terms of the GNU Lesser General Public License as // published by the Free Software Foundation, either version 3 of the License, // or (at your option) any later version. // // The dexon-consensus library is distributed in the hope that it will be // useful, but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser // General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the dexon-consensus library. If not, see // . package test import ( "container/heap" "context" "fmt" "sync" "time" "github.com/dexon-foundation/dexon-consensus/core/types" ) var ( // ErrSchedulerAlreadyStarted means callers attempt to insert some // seed events after calling 'Run'. ErrSchedulerAlreadyStarted = fmt.Errorf("scheduler already started") // errNilEventWhenNotified is an internal error which means a worker routine // can't get an event when notified. errNilEventWhenNotified = fmt.Errorf("nil event when notified") ) type schedulerHandlerRecord struct { handler EventHandler lock sync.Mutex } // Scheduler is an event scheduler. type Scheduler struct { events eventQueue eventsLock sync.Mutex history []*Event historyLock sync.RWMutex isStarted bool handlers map[types.NodeID]*schedulerHandlerRecord handlersLock sync.RWMutex eventNotification chan struct{} ctx context.Context cancelFunc context.CancelFunc stopper Stopper } // NewScheduler constructs an Scheduler instance. func NewScheduler(stopper Stopper) *Scheduler { ctx, cancel := context.WithCancel(context.Background()) return &Scheduler{ events: eventQueue{}, history: []*Event{}, handlers: make(map[types.NodeID]*schedulerHandlerRecord), eventNotification: make(chan struct{}, 100000), ctx: ctx, cancelFunc: cancel, stopper: stopper, } } // Run would run the scheduler. If you need strict incrememtal execution order // of events based on their 'Time' field, assign 'numWorkers' as 1. If you need // faster execution, assign 'numWorkers' a larger number. func (sch *Scheduler) Run(numWorkers int) { var wg sync.WaitGroup sch.isStarted = true for i := 0; i < numWorkers; i++ { wg.Add(1) go sch.workerRoutine(&wg) } // Blocks until all routines are finished. wg.Wait() } // Seed is used to provide the scheduler some seed events. func (sch *Scheduler) Seed(e *Event) error { sch.eventsLock.Lock() defer sch.eventsLock.Unlock() if sch.isStarted { return ErrSchedulerAlreadyStarted } sch.addEvent(e) return nil } // RegisterEventHandler register an event handler by providing ID of // corresponding node. func (sch *Scheduler) RegisterEventHandler( nID types.NodeID, handler EventHandler) { sch.handlersLock.Lock() defer sch.handlersLock.Unlock() sch.handlers[nID] = &schedulerHandlerRecord{handler: handler} } // nextTick would pick the oldest event from eventQueue. func (sch *Scheduler) nextTick() (e *Event) { sch.eventsLock.Lock() defer sch.eventsLock.Unlock() if len(sch.events) == 0 { return nil } return heap.Pop(&sch.events).(*Event) } // addEvent is an helper function to add events into eventQueue sorted by // their 'Time' field. func (sch *Scheduler) addEvent(e *Event) { // Perform sorted insertion. heap.Push(&sch.events, e) sch.eventNotification <- struct{}{} } // CloneExecutionHistory returns a cloned event execution history. func (sch *Scheduler) CloneExecutionHistory() (cloned []*Event) { sch.historyLock.RLock() defer sch.historyLock.RUnlock() cloned = make([]*Event, len(sch.history)) copy(cloned, sch.history) return } // workerRoutine is the mainloop when handling events. func (sch *Scheduler) workerRoutine(wg *sync.WaitGroup) { defer wg.Done() handleEvent := func(e *Event) { // Find correspond handler record. hRec := func(nID types.NodeID) *schedulerHandlerRecord { sch.handlersLock.RLock() defer sch.handlersLock.RUnlock() return sch.handlers[nID] }(e.NodeID) newEvents := func() []*Event { // This lock makes sure there would be no concurrent access // against each handler. hRec.lock.Lock() defer hRec.lock.Unlock() // Handle incoming event, and record its execution time. beforeExecution := time.Now().UTC() newEvents := hRec.handler.Handle(e) e.ExecInterval = time.Now().UTC().Sub(beforeExecution) // It's safe to check status of that node under 'hRec.lock'. if sch.stopper.ShouldStop(e.NodeID) { sch.cancelFunc() } return newEvents }() // Record executed events as history. func() { sch.historyLock.Lock() defer sch.historyLock.Unlock() e.HistoryIndex = len(sch.history) sch.history = append(sch.history, e) }() // Include the execution interval of parent event to the expected time // to execute child events. for _, newEvent := range newEvents { newEvent.ParentHistoryIndex = e.HistoryIndex newEvent.Time = newEvent.Time.Add(e.ExecInterval) } // Add derivated events back to event queue. func() { sch.eventsLock.Lock() defer sch.eventsLock.Unlock() for _, newEvent := range newEvents { sch.addEvent(newEvent) } }() } Done: for { // We favor scheduler-shutdown signal than other events. select { case <-sch.ctx.Done(): break Done default: } // Block until new event arrival or scheduler shutdown. select { case <-sch.eventNotification: e := sch.nextTick() if e == nil { panic(errNilEventWhenNotified) } handleEvent(e) case <-sch.ctx.Done(): break Done } } }