aboutsummaryrefslogtreecommitdiffstats
path: root/ethreact
diff options
context:
space:
mode:
authorzelig <viktor.tron@gmail.com>2014-07-15 01:40:18 +0800
committerzelig <viktor.tron@gmail.com>2014-07-15 01:40:18 +0800
commitdc11b5c55e2888a7a3dac51fedc3864d112136ce (patch)
treece84c9b43057f9a3e860081986902e915b85d12e /ethreact
parent5c03adbdededd31cb73f64ced01e33154347e193 (diff)
downloadgo-tangerine-dc11b5c55e2888a7a3dac51fedc3864d112136ce.tar
go-tangerine-dc11b5c55e2888a7a3dac51fedc3864d112136ce.tar.gz
go-tangerine-dc11b5c55e2888a7a3dac51fedc3864d112136ce.tar.bz2
go-tangerine-dc11b5c55e2888a7a3dac51fedc3864d112136ce.tar.lz
go-tangerine-dc11b5c55e2888a7a3dac51fedc3864d112136ce.tar.xz
go-tangerine-dc11b5c55e2888a7a3dac51fedc3864d112136ce.tar.zst
go-tangerine-dc11b5c55e2888a7a3dac51fedc3864d112136ce.zip
fix reactor channel blocking
Diffstat (limited to 'ethreact')
-rw-r--r--ethreact/reactor.go53
1 files changed, 27 insertions, 26 deletions
diff --git a/ethreact/reactor.go b/ethreact/reactor.go
index 8e72ca903..a26f82a97 100644
--- a/ethreact/reactor.go
+++ b/ethreact/reactor.go
@@ -7,6 +7,10 @@ import (
var logger = ethlog.NewLogger("REACTOR")
+const (
+ eventBufferSize int = 10
+)
+
type EventHandler struct {
lock sync.RWMutex
name string
@@ -21,7 +25,7 @@ func (e *EventHandler) Post(event Event) {
// if we want to preserve order pushing to subscibed channels
// dispatching should be syncrounous
- // this means if subscribed event channel is blocked (closed or has fixed capacity)
+ // this means if subscribed event channel is blocked
// the reactor dispatch will be blocked, so we need to mitigate by skipping
// rogue blocking subscribers
for i, ch := range e.chans {
@@ -63,22 +67,20 @@ type Event struct {
// The reactor basic engine. Acts as bridge
// between the events and the subscribers/posters
type ReactorEngine struct {
- lock sync.RWMutex
- eventChannel chan Event
- eventHandlers map[string]*EventHandler
- quit chan bool
- shutdownChannel chan bool
- running bool
- drained chan bool
+ lock sync.RWMutex
+ eventChannel chan Event
+ eventHandlers map[string]*EventHandler
+ quit chan chan error
+ running bool
+ drained chan bool
}
func New() *ReactorEngine {
return &ReactorEngine{
- eventHandlers: make(map[string]*EventHandler),
- eventChannel: make(chan Event),
- quit: make(chan bool, 1),
- drained: make(chan bool, 1),
- shutdownChannel: make(chan bool, 1),
+ eventHandlers: make(map[string]*EventHandler),
+ eventChannel: make(chan Event, eventBufferSize),
+ quit: make(chan chan error, 1),
+ drained: make(chan bool, 1),
}
}
@@ -87,24 +89,22 @@ func (reactor *ReactorEngine) Start() {
defer reactor.lock.Unlock()
if !reactor.running {
go func() {
- out:
for {
select {
- case <-reactor.quit:
- break out
+ case status := <-reactor.quit:
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+ reactor.running = false
+ logger.Infoln("stopped")
+ status <- nil
+ return
case event := <-reactor.eventChannel:
// needs to be called syncronously to keep order of events
reactor.dispatch(event)
- // case reactor.drained <- true:
default:
reactor.drained <- true // blocking till message is coming in
}
}
- reactor.lock.Lock()
- defer reactor.lock.Unlock()
- reactor.running = false
- logger.Infoln("stopped")
- close(reactor.shutdownChannel)
}()
reactor.running = true
logger.Infoln("started")
@@ -112,15 +112,15 @@ func (reactor *ReactorEngine) Start() {
}
func (reactor *ReactorEngine) Stop() {
- reactor.lock.RLock()
if reactor.running {
- reactor.quit <- true
+ status := make(chan error)
+ reactor.quit <- status
select {
case <-reactor.drained:
+ default:
}
+ <-status
}
- reactor.lock.RUnlock()
- <-reactor.shutdownChannel
}
func (reactor *ReactorEngine) Flush() {
@@ -165,6 +165,7 @@ func (reactor *ReactorEngine) Post(event string, resource interface{}) {
reactor.eventChannel <- Event{Resource: resource, Name: event}
select {
case <-reactor.drained:
+ default:
}
}
}