From 4d77b7facecfea7069af15f19429585687c47fbb Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 7 Jul 2014 12:30:25 +0100 Subject: remove extra case in main loop --- ethreact/reactor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'ethreact') diff --git a/ethreact/reactor.go b/ethreact/reactor.go index f42f71202..8e72ca903 100644 --- a/ethreact/reactor.go +++ b/ethreact/reactor.go @@ -95,7 +95,7 @@ func (reactor *ReactorEngine) Start() { case event := <-reactor.eventChannel: // needs to be called syncronously to keep order of events reactor.dispatch(event) - case reactor.drained <- true: + // case reactor.drained <- true: default: reactor.drained <- true // blocking till message is coming in } -- cgit v1.2.3 From dc11b5c55e2888a7a3dac51fedc3864d112136ce Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 14 Jul 2014 18:40:18 +0100 Subject: fix reactor channel blocking --- ethreact/reactor.go | 53 +++++++++++++++++++++++++++-------------------------- 1 file changed, 27 insertions(+), 26 deletions(-) (limited to 'ethreact') diff --git a/ethreact/reactor.go b/ethreact/reactor.go index 8e72ca903..a26f82a97 100644 --- a/ethreact/reactor.go +++ b/ethreact/reactor.go @@ -7,6 +7,10 @@ import ( var logger = ethlog.NewLogger("REACTOR") +const ( + eventBufferSize int = 10 +) + type EventHandler struct { lock sync.RWMutex name string @@ -21,7 +25,7 @@ func (e *EventHandler) Post(event Event) { // if we want to preserve order pushing to subscibed channels // dispatching should be syncrounous - // this means if subscribed event channel is blocked (closed or has fixed capacity) + // this means if subscribed event channel is blocked // the reactor dispatch will be blocked, so we need to mitigate by skipping // rogue blocking subscribers for i, ch := range e.chans { @@ -63,22 +67,20 @@ type Event struct { // The reactor basic engine. Acts as bridge // between the events and the subscribers/posters type ReactorEngine struct { - lock sync.RWMutex - eventChannel chan Event - eventHandlers map[string]*EventHandler - quit chan bool - shutdownChannel chan bool - running bool - drained chan bool + lock sync.RWMutex + eventChannel chan Event + eventHandlers map[string]*EventHandler + quit chan chan error + running bool + drained chan bool } func New() *ReactorEngine { return &ReactorEngine{ - eventHandlers: make(map[string]*EventHandler), - eventChannel: make(chan Event), - quit: make(chan bool, 1), - drained: make(chan bool, 1), - shutdownChannel: make(chan bool, 1), + eventHandlers: make(map[string]*EventHandler), + eventChannel: make(chan Event, eventBufferSize), + quit: make(chan chan error, 1), + drained: make(chan bool, 1), } } @@ -87,24 +89,22 @@ func (reactor *ReactorEngine) Start() { defer reactor.lock.Unlock() if !reactor.running { go func() { - out: for { select { - case <-reactor.quit: - break out + case status := <-reactor.quit: + reactor.lock.Lock() + defer reactor.lock.Unlock() + reactor.running = false + logger.Infoln("stopped") + status <- nil + return case event := <-reactor.eventChannel: // needs to be called syncronously to keep order of events reactor.dispatch(event) - // case reactor.drained <- true: default: reactor.drained <- true // blocking till message is coming in } } - reactor.lock.Lock() - defer reactor.lock.Unlock() - reactor.running = false - logger.Infoln("stopped") - close(reactor.shutdownChannel) }() reactor.running = true logger.Infoln("started") @@ -112,15 +112,15 @@ func (reactor *ReactorEngine) Start() { } func (reactor *ReactorEngine) Stop() { - reactor.lock.RLock() if reactor.running { - reactor.quit <- true + status := make(chan error) + reactor.quit <- status select { case <-reactor.drained: + default: } + <-status } - reactor.lock.RUnlock() - <-reactor.shutdownChannel } func (reactor *ReactorEngine) Flush() { @@ -165,6 +165,7 @@ func (reactor *ReactorEngine) Post(event string, resource interface{}) { reactor.eventChannel <- Event{Resource: resource, Name: event} select { case <-reactor.drained: + default: } } } -- cgit v1.2.3 From 0ecc5c815e4550d7e8dca7b4ab4c549b10bf0b19 Mon Sep 17 00:00:00 2001 From: zelig Date: Tue, 15 Jul 2014 00:15:37 +0100 Subject: reactor test --- ethreact/reactor_test.go | 63 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 ethreact/reactor_test.go (limited to 'ethreact') diff --git a/ethreact/reactor_test.go b/ethreact/reactor_test.go new file mode 100644 index 000000000..801a8abd0 --- /dev/null +++ b/ethreact/reactor_test.go @@ -0,0 +1,63 @@ +package ethreact + +import ( + "fmt" + "testing" +) + +func TestReactorAdd(t *testing.T) { + reactor := New() + ch := make(chan Event) + reactor.Subscribe("test", ch) + if reactor.eventHandlers["test"] == nil { + t.Error("Expected new eventHandler to be created") + } + reactor.Unsubscribe("test", ch) + if reactor.eventHandlers["test"] != nil { + t.Error("Expected eventHandler to be removed") + } +} + +func TestReactorEvent(t *testing.T) { + var name string + reactor := New() + // Buffer the channel, so it doesn't block for this test + cap := 20 + ch := make(chan Event, cap) + reactor.Subscribe("even", ch) + reactor.Subscribe("odd", ch) + reactor.Post("even", "disappears") // should not broadcast if engine not started + reactor.Start() + for i := 0; i < cap; i++ { + if i%2 == 0 { + name = "even" + } else { + name = "odd" + } + reactor.Post(name, i) + } + reactor.Post("test", cap) // this should not block + i := 0 + reactor.Flush() + close(ch) + for event := range ch { + fmt.Printf("%d: %v", i, event) + if i%2 == 0 { + name = "even" + } else { + name = "odd" + } + if val, ok := event.Resource.(int); ok { + if i != val || event.Name != name { + t.Error("Expected event %d to be of type %s and resource %d, got ", i, name, i, val) + } + } else { + t.Error("Unable to cast") + } + i++ + } + if i != cap { + t.Error("excpected exactly %d events, got ", i) + } + reactor.Stop() +} -- cgit v1.2.3 From 67528cf9709519458ab5500cb7cf54664dd20167 Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 21 Jul 2014 15:10:56 +0100 Subject: ethreact/README.md --- ethreact/README.md | 28 ++++++++++++++++++++++++++++ ethreact/reactor.go | 2 +- 2 files changed, 29 insertions(+), 1 deletion(-) create mode 100644 ethreact/README.md (limited to 'ethreact') diff --git a/ethreact/README.md b/ethreact/README.md new file mode 100644 index 000000000..61af8a572 --- /dev/null +++ b/ethreact/README.md @@ -0,0 +1,28 @@ +## Reactor + +Reactor is the internal broadcast engine that allows components to be notified of ethereum stack events such as finding new blocks or change in state. +Event notification is handled via subscription: + + var blockChan = make(chan ethreact.Event, 10) + reactor.Subscribe("newBlock", blockChan) + +ethreact.Event broadcast on the channel are + + type Event struct { + Resource interface{} + Name string + } + +Resource is polimorphic depending on the event type and should be typecast before use, e.g: + + b := <-blockChan: + block := b.Resource.(*ethchain.Block) + +Events are guaranteed to be broadcast in order but the broadcast never blocks or leaks which means while the subscribing event channel is blocked (e.g., full if buffered) further messages will be skipped. + +The engine allows arbitrary events to be posted and subscribed to. + + ethereum.Reactor().Post("newBlock", newBlock) + + + \ No newline at end of file diff --git a/ethreact/reactor.go b/ethreact/reactor.go index a26f82a97..7fe2356db 100644 --- a/ethreact/reactor.go +++ b/ethreact/reactor.go @@ -58,7 +58,7 @@ func (e *EventHandler) Remove(ch chan Event) int { return len(e.chans) } -// Basic reactor resource +// Basic reactor event type Event struct { Resource interface{} Name string -- cgit v1.2.3