diff options
author | zelig <viktor.tron@gmail.com> | 2014-07-15 01:40:18 +0800 |
---|---|---|
committer | zelig <viktor.tron@gmail.com> | 2014-07-15 01:40:18 +0800 |
commit | dc11b5c55e2888a7a3dac51fedc3864d112136ce (patch) | |
tree | ce84c9b43057f9a3e860081986902e915b85d12e /ethreact | |
parent | 5c03adbdededd31cb73f64ced01e33154347e193 (diff) | |
download | go-tangerine-dc11b5c55e2888a7a3dac51fedc3864d112136ce.tar go-tangerine-dc11b5c55e2888a7a3dac51fedc3864d112136ce.tar.gz go-tangerine-dc11b5c55e2888a7a3dac51fedc3864d112136ce.tar.bz2 go-tangerine-dc11b5c55e2888a7a3dac51fedc3864d112136ce.tar.lz go-tangerine-dc11b5c55e2888a7a3dac51fedc3864d112136ce.tar.xz go-tangerine-dc11b5c55e2888a7a3dac51fedc3864d112136ce.tar.zst go-tangerine-dc11b5c55e2888a7a3dac51fedc3864d112136ce.zip |
fix reactor channel blocking
Diffstat (limited to 'ethreact')
-rw-r--r-- | ethreact/reactor.go | 53 |
1 files changed, 27 insertions, 26 deletions
diff --git a/ethreact/reactor.go b/ethreact/reactor.go index 8e72ca903..a26f82a97 100644 --- a/ethreact/reactor.go +++ b/ethreact/reactor.go @@ -7,6 +7,10 @@ import ( var logger = ethlog.NewLogger("REACTOR") +const ( + eventBufferSize int = 10 +) + type EventHandler struct { lock sync.RWMutex name string @@ -21,7 +25,7 @@ func (e *EventHandler) Post(event Event) { // if we want to preserve order pushing to subscibed channels // dispatching should be syncrounous - // this means if subscribed event channel is blocked (closed or has fixed capacity) + // this means if subscribed event channel is blocked // the reactor dispatch will be blocked, so we need to mitigate by skipping // rogue blocking subscribers for i, ch := range e.chans { @@ -63,22 +67,20 @@ type Event struct { // The reactor basic engine. Acts as bridge // between the events and the subscribers/posters type ReactorEngine struct { - lock sync.RWMutex - eventChannel chan Event - eventHandlers map[string]*EventHandler - quit chan bool - shutdownChannel chan bool - running bool - drained chan bool + lock sync.RWMutex + eventChannel chan Event + eventHandlers map[string]*EventHandler + quit chan chan error + running bool + drained chan bool } func New() *ReactorEngine { return &ReactorEngine{ - eventHandlers: make(map[string]*EventHandler), - eventChannel: make(chan Event), - quit: make(chan bool, 1), - drained: make(chan bool, 1), - shutdownChannel: make(chan bool, 1), + eventHandlers: make(map[string]*EventHandler), + eventChannel: make(chan Event, eventBufferSize), + quit: make(chan chan error, 1), + drained: make(chan bool, 1), } } @@ -87,24 +89,22 @@ func (reactor *ReactorEngine) Start() { defer reactor.lock.Unlock() if !reactor.running { go func() { - out: for { select { - case <-reactor.quit: - break out + case status := <-reactor.quit: + reactor.lock.Lock() + defer reactor.lock.Unlock() + reactor.running = false + logger.Infoln("stopped") + status <- nil + return case event := <-reactor.eventChannel: // needs to be called syncronously to keep order of events reactor.dispatch(event) - // case reactor.drained <- true: default: reactor.drained <- true // blocking till message is coming in } } - reactor.lock.Lock() - defer reactor.lock.Unlock() - reactor.running = false - logger.Infoln("stopped") - close(reactor.shutdownChannel) }() reactor.running = true logger.Infoln("started") @@ -112,15 +112,15 @@ func (reactor *ReactorEngine) Start() { } func (reactor *ReactorEngine) Stop() { - reactor.lock.RLock() if reactor.running { - reactor.quit <- true + status := make(chan error) + reactor.quit <- status select { case <-reactor.drained: + default: } + <-status } - reactor.lock.RUnlock() - <-reactor.shutdownChannel } func (reactor *ReactorEngine) Flush() { @@ -165,6 +165,7 @@ func (reactor *ReactorEngine) Post(event string, resource interface{}) { reactor.eventChannel <- Event{Resource: resource, Name: event} select { case <-reactor.drained: + default: } } } |