aboutsummaryrefslogtreecommitdiffstats
path: root/ethreact/reactor.go
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2014-07-07 16:53:25 +0800
committerobscuren <geffobscura@gmail.com>2014-07-07 16:53:25 +0800
commit9dab7dcc3c99d387ad2c0a1623b574d3023bdfae (patch)
tree8f98cce69568d29a954410e18dce8c121d25592e /ethreact/reactor.go
parentd40cba3042564f3471aa20a5cf477cafcacc2189 (diff)
parentb958179263aa1fa5060dd5d5848b8bb092a4165c (diff)
downloadgo-tangerine-9dab7dcc3c99d387ad2c0a1623b574d3023bdfae.tar
go-tangerine-9dab7dcc3c99d387ad2c0a1623b574d3023bdfae.tar.gz
go-tangerine-9dab7dcc3c99d387ad2c0a1623b574d3023bdfae.tar.bz2
go-tangerine-9dab7dcc3c99d387ad2c0a1623b574d3023bdfae.tar.lz
go-tangerine-9dab7dcc3c99d387ad2c0a1623b574d3023bdfae.tar.xz
go-tangerine-9dab7dcc3c99d387ad2c0a1623b574d3023bdfae.tar.zst
go-tangerine-9dab7dcc3c99d387ad2c0a1623b574d3023bdfae.zip
Merge branch 'develop' of github.com-obscure:ethereum/eth-go into develop
Diffstat (limited to 'ethreact/reactor.go')
-rw-r--r--ethreact/reactor.go181
1 files changed, 181 insertions, 0 deletions
diff --git a/ethreact/reactor.go b/ethreact/reactor.go
new file mode 100644
index 000000000..f42f71202
--- /dev/null
+++ b/ethreact/reactor.go
@@ -0,0 +1,181 @@
+package ethreact
+
+import (
+ "github.com/ethereum/eth-go/ethlog"
+ "sync"
+)
+
+var logger = ethlog.NewLogger("REACTOR")
+
+type EventHandler struct {
+ lock sync.RWMutex
+ name string
+ chans []chan Event
+}
+
+// Post the Event with the reactor resource on the channels
+// currently subscribed to the event
+func (e *EventHandler) Post(event Event) {
+ e.lock.RLock()
+ defer e.lock.RUnlock()
+
+ // if we want to preserve order pushing to subscibed channels
+ // dispatching should be syncrounous
+ // this means if subscribed event channel is blocked (closed or has fixed capacity)
+ // the reactor dispatch will be blocked, so we need to mitigate by skipping
+ // rogue blocking subscribers
+ for i, ch := range e.chans {
+ select {
+ case ch <- event:
+ default:
+ logger.Warnf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name)
+ }
+ }
+}
+
+// Add a subscriber to this event
+func (e *EventHandler) Add(ch chan Event) {
+ e.lock.Lock()
+ defer e.lock.Unlock()
+
+ e.chans = append(e.chans, ch)
+}
+
+// Remove a subscriber
+func (e *EventHandler) Remove(ch chan Event) int {
+ e.lock.Lock()
+ defer e.lock.Unlock()
+
+ for i, c := range e.chans {
+ if c == ch {
+ e.chans = append(e.chans[:i], e.chans[i+1:]...)
+ }
+ }
+ return len(e.chans)
+}
+
+// Basic reactor resource
+type Event struct {
+ Resource interface{}
+ Name string
+}
+
+// The reactor basic engine. Acts as bridge
+// between the events and the subscribers/posters
+type ReactorEngine struct {
+ lock sync.RWMutex
+ eventChannel chan Event
+ eventHandlers map[string]*EventHandler
+ quit chan bool
+ shutdownChannel chan bool
+ running bool
+ drained chan bool
+}
+
+func New() *ReactorEngine {
+ return &ReactorEngine{
+ eventHandlers: make(map[string]*EventHandler),
+ eventChannel: make(chan Event),
+ quit: make(chan bool, 1),
+ drained: make(chan bool, 1),
+ shutdownChannel: make(chan bool, 1),
+ }
+}
+
+func (reactor *ReactorEngine) Start() {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+ if !reactor.running {
+ go func() {
+ out:
+ for {
+ select {
+ case <-reactor.quit:
+ break out
+ case event := <-reactor.eventChannel:
+ // needs to be called syncronously to keep order of events
+ reactor.dispatch(event)
+ case reactor.drained <- true:
+ default:
+ reactor.drained <- true // blocking till message is coming in
+ }
+ }
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+ reactor.running = false
+ logger.Infoln("stopped")
+ close(reactor.shutdownChannel)
+ }()
+ reactor.running = true
+ logger.Infoln("started")
+ }
+}
+
+func (reactor *ReactorEngine) Stop() {
+ reactor.lock.RLock()
+ if reactor.running {
+ reactor.quit <- true
+ select {
+ case <-reactor.drained:
+ }
+ }
+ reactor.lock.RUnlock()
+ <-reactor.shutdownChannel
+}
+
+func (reactor *ReactorEngine) Flush() {
+ <-reactor.drained
+}
+
+// Subscribe a channel to the specified event
+func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+
+ eventHandler := reactor.eventHandlers[event]
+ // Create a new event handler if one isn't available
+ if eventHandler == nil {
+ eventHandler = &EventHandler{name: event}
+ reactor.eventHandlers[event] = eventHandler
+ }
+ // Add the events channel to reactor event handler
+ eventHandler.Add(eventChannel)
+ logger.Debugf("added new subscription to %s", event)
+}
+
+func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+
+ eventHandler := reactor.eventHandlers[event]
+ if eventHandler != nil {
+ len := eventHandler.Remove(eventChannel)
+ if len == 0 {
+ reactor.eventHandlers[event] = nil
+ }
+ logger.Debugf("removed subscription to %s", event)
+ }
+}
+
+func (reactor *ReactorEngine) Post(event string, resource interface{}) {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+
+ if reactor.running {
+ reactor.eventChannel <- Event{Resource: resource, Name: event}
+ select {
+ case <-reactor.drained:
+ }
+ }
+}
+
+func (reactor *ReactorEngine) dispatch(event Event) {
+ name := event.Name
+ eventHandler := reactor.eventHandlers[name]
+ // if no subscriptions to this event type - no event handler created
+ // then noone to notify
+ if eventHandler != nil {
+ // needs to be called syncronously
+ eventHandler.Post(event)
+ }
+}