From f1b354e6aa6c4b0330e35ae9011784ff1a0b01ab Mon Sep 17 00:00:00 2001 From: obscuren Date: Sun, 2 Mar 2014 02:22:20 +0100 Subject: Reactor implemented --- ethutil/reactor.go | 77 +++++++++++++++++++++++++++++++++++++++++++++++++ ethutil/reactor_test.go | 30 +++++++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 ethutil/reactor.go create mode 100644 ethutil/reactor_test.go diff --git a/ethutil/reactor.go b/ethutil/reactor.go new file mode 100644 index 000000000..b3f8b9b5b --- /dev/null +++ b/ethutil/reactor.go @@ -0,0 +1,77 @@ +package ethutil + +import ( + "sync" +) + +type ReactorEvent struct { + mut sync.Mutex + event string + chans []chan React +} + +// Post the specified reactor resource on the channels +// currently subscribed +func (e *ReactorEvent) Post(react React) { + for _, ch := range e.chans { + go func(ch chan React) { + ch <- react + }(ch) + } +} + +// Add a subscriber to this event +func (e *ReactorEvent) Add(ch chan React) { + e.chans = append(e.chans, ch) +} + +// Remove a subscriber +func (e *ReactorEvent) Remove(ch chan React) { + for i, c := range e.chans { + if c == ch { + e.chans = append(e.chans[:i], e.chans[i+1:]...) + } + } +} + +// Basic reactor resource +type React struct { + Resource interface{} +} + +// The reactor basic engine. Acts as bridge +// between the events and the subscribers/posters +type ReactorEngine struct { + patterns map[string]*ReactorEvent +} + +func NewReactorEngine() *ReactorEngine { + return &ReactorEngine{patterns: make(map[string]*ReactorEvent)} +} + +// Subscribe a channel to the specified event +func (reactor *ReactorEngine) Subscribe(event string, ch chan React) { + ev := reactor.patterns[event] + // Create a new event if one isn't available + if ev == nil { + ev = &ReactorEvent{event: event} + reactor.patterns[event] = ev + } + + // Add the channel to reactor event handler + ev.Add(ch) +} + +func (reactor *ReactorEngine) Unsubscribe(event string, ch chan React) { + ev := reactor.patterns[event] + if ev != nil { + ev.Remove(ch) + } +} + +func (reactor *ReactorEngine) Post(event string, resource interface{}) { + ev := reactor.patterns[event] + if ev != nil { + ev.Post(React{Resource: resource}) + } +} diff --git a/ethutil/reactor_test.go b/ethutil/reactor_test.go new file mode 100644 index 000000000..48c2f0df3 --- /dev/null +++ b/ethutil/reactor_test.go @@ -0,0 +1,30 @@ +package ethutil + +import "testing" + +func TestReactorAdd(t *testing.T) { + engine := NewReactorEngine() + ch := make(chan React) + engine.Subscribe("test", ch) + if len(engine.patterns) != 1 { + t.Error("Expected patterns to be 1, got", len(engine.patterns)) + } +} + +func TestReactorEvent(t *testing.T) { + engine := NewReactorEngine() + + // Buffer 1, so it doesn't block for this test + ch := make(chan React, 1) + engine.Subscribe("test", ch) + engine.Post("test", "hello") + + value := <-ch + if val, ok := value.Resource.(string); ok { + if val != "hello" { + t.Error("Expected Resource to be 'hello', got", val) + } + } else { + t.Error("Unable to cast") + } +} -- cgit v1.2.3