aboutsummaryrefslogtreecommitdiffstats
path: root/ethreact/reactor.go
diff options
context:
space:
mode:
Diffstat (limited to 'ethreact/reactor.go')
-rw-r--r--ethreact/reactor.go183
1 files changed, 0 insertions, 183 deletions
diff --git a/ethreact/reactor.go b/ethreact/reactor.go
deleted file mode 100644
index 2edcbbbd9..000000000
--- a/ethreact/reactor.go
+++ /dev/null
@@ -1,183 +0,0 @@
-package ethreact
-
-import (
- "sync"
-
- "github.com/ethereum/eth-go/ethlog"
-)
-
-var logger = ethlog.NewLogger("REACTOR")
-
-const (
- eventBufferSize int = 10
-)
-
-type EventHandler struct {
- lock sync.RWMutex
- name string
- chans []chan Event
-}
-
-// Post the Event with the reactor resource on the channels
-// currently subscribed to the event
-func (e *EventHandler) Post(event Event) {
- e.lock.RLock()
- defer e.lock.RUnlock()
-
- // if we want to preserve order pushing to subscibed channels
- // dispatching should be syncrounous
- // this means if subscribed event channel is blocked
- // the reactor dispatch will be blocked, so we need to mitigate by skipping
- // rogue blocking subscribers
- for i, ch := range e.chans {
- select {
- case ch <- event:
- default:
- logger.Debugf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name)
- }
- }
-}
-
-// Add a subscriber to this event
-func (e *EventHandler) Add(ch chan Event) {
- e.lock.Lock()
- defer e.lock.Unlock()
-
- e.chans = append(e.chans, ch)
-}
-
-// Remove a subscriber
-func (e *EventHandler) Remove(ch chan Event) int {
- e.lock.Lock()
- defer e.lock.Unlock()
-
- for i, c := range e.chans {
- if c == ch {
- e.chans = append(e.chans[:i], e.chans[i+1:]...)
- }
- }
- return len(e.chans)
-}
-
-// Basic reactor event
-type Event struct {
- Resource interface{}
- Name string
-}
-
-// The reactor basic engine. Acts as bridge
-// between the events and the subscribers/posters
-type ReactorEngine struct {
- lock sync.RWMutex
- eventChannel chan Event
- eventHandlers map[string]*EventHandler
- quit chan chan error
- running bool
- drained chan bool
-}
-
-func New() *ReactorEngine {
- return &ReactorEngine{
- eventHandlers: make(map[string]*EventHandler),
- eventChannel: make(chan Event, eventBufferSize),
- quit: make(chan chan error, 1),
- drained: make(chan bool, 1),
- }
-}
-
-func (reactor *ReactorEngine) Start() {
- reactor.lock.Lock()
- defer reactor.lock.Unlock()
- if !reactor.running {
- go func() {
- for {
- select {
- case status := <-reactor.quit:
- reactor.lock.Lock()
- defer reactor.lock.Unlock()
- reactor.running = false
- logger.Infoln("stopped")
- status <- nil
- return
- case event := <-reactor.eventChannel:
- // needs to be called syncronously to keep order of events
- reactor.dispatch(event)
- default:
- reactor.drained <- true // blocking till message is coming in
- }
- }
- }()
- reactor.running = true
- logger.Infoln("started")
- }
-}
-
-func (reactor *ReactorEngine) Stop() {
- if reactor.running {
- status := make(chan error)
- reactor.quit <- status
- select {
- case <-reactor.drained:
- default:
- }
- <-status
- }
-}
-
-func (reactor *ReactorEngine) Flush() {
- <-reactor.drained
-}
-
-// Subscribe a channel to the specified event
-func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) {
- reactor.lock.Lock()
- defer reactor.lock.Unlock()
-
- eventHandler := reactor.eventHandlers[event]
- // Create a new event handler if one isn't available
- if eventHandler == nil {
- eventHandler = &EventHandler{name: event}
- reactor.eventHandlers[event] = eventHandler
- }
- // Add the events channel to reactor event handler
- eventHandler.Add(eventChannel)
- logger.Debugf("added new subscription to %s", event)
-}
-
-func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) {
- reactor.lock.Lock()
- defer reactor.lock.Unlock()
-
- eventHandler := reactor.eventHandlers[event]
- if eventHandler != nil {
- len := eventHandler.Remove(eventChannel)
- if len == 0 {
- reactor.eventHandlers[event] = nil
- }
- logger.Debugf("removed subscription to %s", event)
- }
-}
-
-func (reactor *ReactorEngine) Post(event string, resource interface{}) {
- reactor.lock.Lock()
- defer reactor.lock.Unlock()
-
- if reactor.running {
- reactor.eventChannel <- Event{Resource: resource, Name: event}
- select {
- case <-reactor.drained:
- default:
- }
- }
-}
-
-func (reactor *ReactorEngine) dispatch(event Event) {
- name := event.Name
- eventHandler := reactor.eventHandlers[name]
- // if no subscriptions to this event type - no event handler created
- // then noone to notify
- if eventHandler != nil {
- // needs to be called syncronously
- eventHandler.Post(event)
- }
-}