diff options
author | zelig <viktor.tron@gmail.com> | 2014-07-06 02:56:01 +0800 |
---|---|---|
committer | zelig <viktor.tron@gmail.com> | 2014-07-06 02:56:01 +0800 |
commit | 5a2afc575485e2d651b9840f5d1ea080cdc72fa7 (patch) | |
tree | 9ea81946396fc584c8c8a0b64d9e9f9fff43b17a /ethreact | |
parent | d4300c406c5f98d35857b6e53b0427be5f45e3b2 (diff) | |
download | go-tangerine-5a2afc575485e2d651b9840f5d1ea080cdc72fa7.tar go-tangerine-5a2afc575485e2d651b9840f5d1ea080cdc72fa7.tar.gz go-tangerine-5a2afc575485e2d651b9840f5d1ea080cdc72fa7.tar.bz2 go-tangerine-5a2afc575485e2d651b9840f5d1ea080cdc72fa7.tar.lz go-tangerine-5a2afc575485e2d651b9840f5d1ea080cdc72fa7.tar.xz go-tangerine-5a2afc575485e2d651b9840f5d1ea080cdc72fa7.tar.zst go-tangerine-5a2afc575485e2d651b9840f5d1ea080cdc72fa7.zip |
fix reactor engine main loop blocked to wait if drained
Diffstat (limited to 'ethreact')
-rw-r--r-- | ethreact/reactor.go | 22 |
1 files changed, 14 insertions, 8 deletions
diff --git a/ethreact/reactor.go b/ethreact/reactor.go index 3802d95b3..f42f71202 100644 --- a/ethreact/reactor.go +++ b/ethreact/reactor.go @@ -28,7 +28,7 @@ func (e *EventHandler) Post(event Event) { select { case ch <- event: default: - logger.Warnln("subscribing channel %d to event %s blocked. skipping", i, event.Name) + logger.Warnf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name) } } } @@ -69,7 +69,7 @@ type ReactorEngine struct { quit chan bool shutdownChannel chan bool running bool - drained bool + drained chan bool } func New() *ReactorEngine { @@ -77,6 +77,7 @@ func New() *ReactorEngine { eventHandlers: make(map[string]*EventHandler), eventChannel: make(chan Event), quit: make(chan bool, 1), + drained: make(chan bool, 1), shutdownChannel: make(chan bool, 1), } } @@ -94,8 +95,9 @@ func (reactor *ReactorEngine) Start() { case event := <-reactor.eventChannel: // needs to be called syncronously to keep order of events reactor.dispatch(event) + case reactor.drained <- true: default: - reactor.drained = true + reactor.drained <- true // blocking till message is coming in } } reactor.lock.Lock() @@ -113,14 +115,16 @@ func (reactor *ReactorEngine) Stop() { reactor.lock.RLock() if reactor.running { reactor.quit <- true + select { + case <-reactor.drained: + } } reactor.lock.RUnlock() <-reactor.shutdownChannel } func (reactor *ReactorEngine) Flush() { - for !reactor.drained { - } + <-reactor.drained } // Subscribe a channel to the specified event @@ -136,7 +140,7 @@ func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) { } // Add the events channel to reactor event handler eventHandler.Add(eventChannel) - logger.Debugln("added new subscription to %s", event) + logger.Debugf("added new subscription to %s", event) } func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) { @@ -149,7 +153,7 @@ func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) if len == 0 { reactor.eventHandlers[event] = nil } - logger.Debugln("removed subscription to %s", event) + logger.Debugf("removed subscription to %s", event) } } @@ -158,8 +162,10 @@ func (reactor *ReactorEngine) Post(event string, resource interface{}) { defer reactor.lock.Unlock() if reactor.running { - reactor.drained = false reactor.eventChannel <- Event{Resource: resource, Name: event} + select { + case <-reactor.drained: + } } } |