aboutsummaryrefslogtreecommitdiffstats
path: root/ethreact
diff options
context:
space:
mode:
Diffstat (limited to 'ethreact')
-rw-r--r--ethreact/reactor.go182
-rw-r--r--ethreact/reactor_test.go63
2 files changed, 245 insertions, 0 deletions
diff --git a/ethreact/reactor.go b/ethreact/reactor.go
new file mode 100644
index 000000000..a26f82a97
--- /dev/null
+++ b/ethreact/reactor.go
@@ -0,0 +1,182 @@
+package ethreact
+
+import (
+ "github.com/ethereum/eth-go/ethlog"
+ "sync"
+)
+
+var logger = ethlog.NewLogger("REACTOR")
+
+const (
+ eventBufferSize int = 10
+)
+
+type EventHandler struct {
+ lock sync.RWMutex
+ name string
+ chans []chan Event
+}
+
+// Post the Event with the reactor resource on the channels
+// currently subscribed to the event
+func (e *EventHandler) Post(event Event) {
+ e.lock.RLock()
+ defer e.lock.RUnlock()
+
+ // if we want to preserve order pushing to subscibed channels
+ // dispatching should be syncrounous
+ // this means if subscribed event channel is blocked
+ // the reactor dispatch will be blocked, so we need to mitigate by skipping
+ // rogue blocking subscribers
+ for i, ch := range e.chans {
+ select {
+ case ch <- event:
+ default:
+ logger.Warnf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name)
+ }
+ }
+}
+
+// Add a subscriber to this event
+func (e *EventHandler) Add(ch chan Event) {
+ e.lock.Lock()
+ defer e.lock.Unlock()
+
+ e.chans = append(e.chans, ch)
+}
+
+// Remove a subscriber
+func (e *EventHandler) Remove(ch chan Event) int {
+ e.lock.Lock()
+ defer e.lock.Unlock()
+
+ for i, c := range e.chans {
+ if c == ch {
+ e.chans = append(e.chans[:i], e.chans[i+1:]...)
+ }
+ }
+ return len(e.chans)
+}
+
+// Basic reactor resource
+type Event struct {
+ Resource interface{}
+ Name string
+}
+
+// The reactor basic engine. Acts as bridge
+// between the events and the subscribers/posters
+type ReactorEngine struct {
+ lock sync.RWMutex
+ eventChannel chan Event
+ eventHandlers map[string]*EventHandler
+ quit chan chan error
+ running bool
+ drained chan bool
+}
+
+func New() *ReactorEngine {
+ return &ReactorEngine{
+ eventHandlers: make(map[string]*EventHandler),
+ eventChannel: make(chan Event, eventBufferSize),
+ quit: make(chan chan error, 1),
+ drained: make(chan bool, 1),
+ }
+}
+
+func (reactor *ReactorEngine) Start() {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+ if !reactor.running {
+ go func() {
+ for {
+ select {
+ case status := <-reactor.quit:
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+ reactor.running = false
+ logger.Infoln("stopped")
+ status <- nil
+ return
+ case event := <-reactor.eventChannel:
+ // needs to be called syncronously to keep order of events
+ reactor.dispatch(event)
+ default:
+ reactor.drained <- true // blocking till message is coming in
+ }
+ }
+ }()
+ reactor.running = true
+ logger.Infoln("started")
+ }
+}
+
+func (reactor *ReactorEngine) Stop() {
+ if reactor.running {
+ status := make(chan error)
+ reactor.quit <- status
+ select {
+ case <-reactor.drained:
+ default:
+ }
+ <-status
+ }
+}
+
+func (reactor *ReactorEngine) Flush() {
+ <-reactor.drained
+}
+
+// Subscribe a channel to the specified event
+func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+
+ eventHandler := reactor.eventHandlers[event]
+ // Create a new event handler if one isn't available
+ if eventHandler == nil {
+ eventHandler = &EventHandler{name: event}
+ reactor.eventHandlers[event] = eventHandler
+ }
+ // Add the events channel to reactor event handler
+ eventHandler.Add(eventChannel)
+ logger.Debugf("added new subscription to %s", event)
+}
+
+func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+
+ eventHandler := reactor.eventHandlers[event]
+ if eventHandler != nil {
+ len := eventHandler.Remove(eventChannel)
+ if len == 0 {
+ reactor.eventHandlers[event] = nil
+ }
+ logger.Debugf("removed subscription to %s", event)
+ }
+}
+
+func (reactor *ReactorEngine) Post(event string, resource interface{}) {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+
+ if reactor.running {
+ reactor.eventChannel <- Event{Resource: resource, Name: event}
+ select {
+ case <-reactor.drained:
+ default:
+ }
+ }
+}
+
+func (reactor *ReactorEngine) dispatch(event Event) {
+ name := event.Name
+ eventHandler := reactor.eventHandlers[name]
+ // if no subscriptions to this event type - no event handler created
+ // then noone to notify
+ if eventHandler != nil {
+ // needs to be called syncronously
+ eventHandler.Post(event)
+ }
+}
diff --git a/ethreact/reactor_test.go b/ethreact/reactor_test.go
new file mode 100644
index 000000000..801a8abd0
--- /dev/null
+++ b/ethreact/reactor_test.go
@@ -0,0 +1,63 @@
+package ethreact
+
+import (
+ "fmt"
+ "testing"
+)
+
+func TestReactorAdd(t *testing.T) {
+ reactor := New()
+ ch := make(chan Event)
+ reactor.Subscribe("test", ch)
+ if reactor.eventHandlers["test"] == nil {
+ t.Error("Expected new eventHandler to be created")
+ }
+ reactor.Unsubscribe("test", ch)
+ if reactor.eventHandlers["test"] != nil {
+ t.Error("Expected eventHandler to be removed")
+ }
+}
+
+func TestReactorEvent(t *testing.T) {
+ var name string
+ reactor := New()
+ // Buffer the channel, so it doesn't block for this test
+ cap := 20
+ ch := make(chan Event, cap)
+ reactor.Subscribe("even", ch)
+ reactor.Subscribe("odd", ch)
+ reactor.Post("even", "disappears") // should not broadcast if engine not started
+ reactor.Start()
+ for i := 0; i < cap; i++ {
+ if i%2 == 0 {
+ name = "even"
+ } else {
+ name = "odd"
+ }
+ reactor.Post(name, i)
+ }
+ reactor.Post("test", cap) // this should not block
+ i := 0
+ reactor.Flush()
+ close(ch)
+ for event := range ch {
+ fmt.Printf("%d: %v", i, event)
+ if i%2 == 0 {
+ name = "even"
+ } else {
+ name = "odd"
+ }
+ if val, ok := event.Resource.(int); ok {
+ if i != val || event.Name != name {
+ t.Error("Expected event %d to be of type %s and resource %d, got ", i, name, i, val)
+ }
+ } else {
+ t.Error("Unable to cast")
+ }
+ i++
+ }
+ if i != cap {
+ t.Error("excpected exactly %d events, got ", i)
+ }
+ reactor.Stop()
+}