aboutsummaryrefslogblamecommitdiffstats
path: root/core/test/scheduler.go
blob: 7c5bbdebf6c6a3b81acaba72cc5d5765acdbc43a (plain) (tree)

















































                                                                                    
                                                                  












                                                               
                                                                                  


































                                                                               
                      
                                           
                         




                                       
                                                                     




































                                                                         
                                                                        


                                                        

                                                










                                                                                   

                                                                                    

                                                






                                                      
                                                         

                                                            





                                                                                      































                                                                        
// Copyright 2018 The dexon-consensus-core Authors
// This file is part of the dexon-consensus-core library.
//
// The dexon-consensus-core library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus-core library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus-core library. If not, see
// <http://www.gnu.org/licenses/>.

package test

import (
    "container/heap"
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/dexon-foundation/dexon-consensus-core/core/types"
)

var (
    // ErrSchedulerAlreadyStarted means callers attempt to insert some
    // seed events after calling 'Run'.
    ErrSchedulerAlreadyStarted = fmt.Errorf("scheduler already started")
    // errNilEventWhenNotified is an internal error which means a worker routine
    // can't get an event when notified.
    errNilEventWhenNotified = fmt.Errorf("nil event when notified")
)

type schedulerHandlerRecord struct {
    handler EventHandler
    lock    sync.Mutex
}

// Scheduler is an event scheduler.
type Scheduler struct {
    events            eventQueue
    eventsLock        sync.Mutex
    history           []*Event
    historyLock       sync.RWMutex
    isStarted         bool
    handlers          map[types.NodeID]*schedulerHandlerRecord
    handlersLock      sync.RWMutex
    eventNotification chan struct{}
    ctx               context.Context
    cancelFunc        context.CancelFunc
    stopper           Stopper
}

// NewScheduler constructs an Scheduler instance.
func NewScheduler(stopper Stopper) *Scheduler {
    ctx, cancel := context.WithCancel(context.Background())
    return &Scheduler{
        events:            eventQueue{},
        history:           []*Event{},
        handlers:          make(map[types.NodeID]*schedulerHandlerRecord),
        eventNotification: make(chan struct{}, 100000),
        ctx:               ctx,
        cancelFunc:        cancel,
        stopper:           stopper,
    }
}

// Run would run the scheduler. If you need strict incrememtal execution order
// of events based on their 'Time' field, assign 'numWorkers' as 1. If you need
// faster execution, assign 'numWorkers' a larger number.
func (sch *Scheduler) Run(numWorkers int) {
    var wg sync.WaitGroup

    sch.isStarted = true
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go sch.workerRoutine(&wg)
    }
    // Blocks until all routines are finished.
    wg.Wait()
}

// Seed is used to provide the scheduler some seed events.
func (sch *Scheduler) Seed(e *Event) error {
    sch.eventsLock.Lock()
    defer sch.eventsLock.Unlock()

    if sch.isStarted {
        return ErrSchedulerAlreadyStarted
    }
    sch.addEvent(e)
    return nil
}

// RegisterEventHandler register an event handler by providing ID of
// corresponding node.
func (sch *Scheduler) RegisterEventHandler(
    nID types.NodeID,
    handler EventHandler) {

    sch.handlersLock.Lock()
    defer sch.handlersLock.Unlock()

    sch.handlers[nID] = &schedulerHandlerRecord{handler: handler}
}

// nextTick would pick the oldest event from eventQueue.
func (sch *Scheduler) nextTick() (e *Event) {
    sch.eventsLock.Lock()
    defer sch.eventsLock.Unlock()

    if len(sch.events) == 0 {
        return nil
    }
    return heap.Pop(&sch.events).(*Event)
}

// addEvent is an helper function to add events into eventQueue sorted by
// their 'Time' field.
func (sch *Scheduler) addEvent(e *Event) {
    // Perform sorted insertion.
    heap.Push(&sch.events, e)
    sch.eventNotification <- struct{}{}
}

// CloneExecutionHistory returns a cloned event execution history.
func (sch *Scheduler) CloneExecutionHistory() (cloned []*Event) {
    sch.historyLock.RLock()
    defer sch.historyLock.RUnlock()

    cloned = make([]*Event, len(sch.history))
    copy(cloned, sch.history)
    return
}

// workerRoutine is the mainloop when handling events.
func (sch *Scheduler) workerRoutine(wg *sync.WaitGroup) {
    defer wg.Done()

    handleEvent := func(e *Event) {
        // Find correspond handler record.
        hRec := func(nID types.NodeID) *schedulerHandlerRecord {
            sch.handlersLock.RLock()
            defer sch.handlersLock.RUnlock()

            return sch.handlers[nID]
        }(e.NodeID)

        newEvents := func() []*Event {
            // This lock makes sure there would be no concurrent access
            // against each handler.
            hRec.lock.Lock()
            defer hRec.lock.Unlock()

            // Handle incoming event, and record its execution time.
            beforeExecution := time.Now().UTC()
            newEvents := hRec.handler.Handle(e)
            e.ExecInterval = time.Now().UTC().Sub(beforeExecution)
            // It's safe to check status of that node under 'hRec.lock'.
            if sch.stopper.ShouldStop(e.NodeID) {
                sch.cancelFunc()
            }
            return newEvents
        }()
        // Record executed events as history.
        func() {
            sch.historyLock.Lock()
            defer sch.historyLock.Unlock()

            e.HistoryIndex = len(sch.history)
            sch.history = append(sch.history, e)
        }()
        // Include the execution interval of parent event to the expected time
        // to execute child events.
        for _, newEvent := range newEvents {
            newEvent.ParentHistoryIndex = e.HistoryIndex
            newEvent.Time = newEvent.Time.Add(e.ExecInterval)
        }
        // Add derivated events back to event queue.
        func() {
            sch.eventsLock.Lock()
            defer sch.eventsLock.Unlock()

            for _, newEvent := range newEvents {
                sch.addEvent(newEvent)
            }
        }()
    }

Done:
    for {
        // We favor scheduler-shutdown signal than other events.
        select {
        case <-sch.ctx.Done():
            break Done
        default:
        }
        // Block until new event arrival or scheduler shutdown.
        select {
        case <-sch.eventNotification:
            e := sch.nextTick()
            if e == nil {
                panic(errNilEventWhenNotified)
            }
            handleEvent(e)
        case <-sch.ctx.Done():
            break Done
        }
    }
}