aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ethlog/loggers.go60
-rw-r--r--ethlog/loggers_test.go30
-rw-r--r--ethreact/reactor.go182
3 files changed, 245 insertions, 27 deletions
diff --git a/ethlog/loggers.go b/ethlog/loggers.go
index ec481edd8..f13186102 100644
--- a/ethlog/loggers.go
+++ b/ethlog/loggers.go
@@ -39,7 +39,9 @@ func (msg *logMessage) send(logger LogSystem) {
var logMessages chan (*logMessage)
var logSystems []LogSystem
-var quit chan bool
+var quit chan chan error
+var drained chan bool
+var mutex = sync.Mutex{}
type LogLevel uint8
@@ -52,34 +54,55 @@ const (
DebugDetailLevel
)
+func dispatch(msg *logMessage) {
+ for _, logSystem := range logSystems {
+ if logSystem.GetLogLevel() >= msg.LogLevel {
+ msg.send(logSystem)
+ }
+ }
+}
+
// log messages are dispatched to log writers
func start() {
-out:
for {
select {
+ case status := <-quit:
+ status <- nil
+ return
case msg := <-logMessages:
- for _, logSystem := range logSystems {
- if logSystem.GetLogLevel() >= msg.LogLevel {
- msg.send(logSystem)
- }
- }
- case <-quit:
- break out
+ dispatch(msg)
+ default:
+ drained <- true // this blocks until a message is sent to the queue
}
}
}
-// waits until log messages are drained (dispatched to log writers)
-func Flush() {
- quit <- true
+func send(msg *logMessage) {
+ logMessages <- msg
+ select {
+ case <-drained:
+ default:
+ }
+}
-done:
- for {
+func Reset() {
+ mutex.Lock()
+ defer mutex.Unlock()
+ if logSystems != nil {
+ status := make(chan error)
+ quit <- status
select {
- case <-logMessages:
+ case <-drained:
default:
- break done
}
+ <-status
+ }
+}
+
+// waits until log messages are drained (dispatched to log writers)
+func Flush() {
+ if logSystems != nil {
+ <-drained
}
}
@@ -96,8 +119,9 @@ func AddLogSystem(logSystem LogSystem) {
mutex.Lock()
defer mutex.Unlock()
if logSystems == nil {
- logMessages = make(chan *logMessage)
- quit = make(chan bool, 1)
+ logMessages = make(chan *logMessage, 5)
+ quit = make(chan chan error, 1)
+ drained = make(chan bool, 1)
go start()
}
logSystems = append(logSystems, logSystem)
diff --git a/ethlog/loggers_test.go b/ethlog/loggers_test.go
index 89f416681..a9b1463e7 100644
--- a/ethlog/loggers_test.go
+++ b/ethlog/loggers_test.go
@@ -28,8 +28,19 @@ func (t *TestLogSystem) GetLogLevel() LogLevel {
return t.level
}
-func quote(s string) string {
- return fmt.Sprintf("'%s'", s)
+func TestLoggerFlush(t *testing.T) {
+ logger := NewLogger("TEST")
+ testLogSystem := &TestLogSystem{level: WarnLevel}
+ AddLogSystem(testLogSystem)
+ for i := 0; i < 5; i++ {
+ logger.Errorf(".")
+ }
+ Flush()
+ Reset()
+ output := testLogSystem.Output
+ if output != "[TEST] .[TEST] .[TEST] .[TEST] .[TEST] ." {
+ t.Error("Expected complete logger output '[TEST] .[TEST] .[TEST] .[TEST] .[TEST] .', got ", output)
+ }
}
func TestLoggerPrintln(t *testing.T) {
@@ -41,10 +52,11 @@ func TestLoggerPrintln(t *testing.T) {
logger.Infoln("info")
logger.Debugln("debug")
Flush()
+ Reset()
output := testLogSystem.Output
fmt.Println(quote(output))
if output != "[TEST] error\n[TEST] warn\n" {
- t.Error("Expected logger output '[TEST] error\\n[TEST] warn\\n', got ", quote(testLogSystem.Output))
+ t.Error("Expected logger output '[TEST] error\\n[TEST] warn\\n', got ", output)
}
}
@@ -57,10 +69,10 @@ func TestLoggerPrintf(t *testing.T) {
logger.Infof("info")
logger.Debugf("debug")
Flush()
+ Reset()
output := testLogSystem.Output
- fmt.Println(quote(output))
if output != "[TEST] error to { 2}\n[TEST] warn" {
- t.Error("Expected logger output '[TEST] error to { 2}\\n[TEST] warn', got ", quote(testLogSystem.Output))
+ t.Error("Expected logger output '[TEST] error to { 2}\\n[TEST] warn', got ", output)
}
}
@@ -73,13 +85,14 @@ func TestMultipleLogSystems(t *testing.T) {
logger.Errorln("error")
logger.Warnln("warn")
Flush()
+ Reset()
output0 := testLogSystem0.Output
output1 := testLogSystem1.Output
if output0 != "[TEST] error\n" {
- t.Error("Expected logger 0 output '[TEST] error\\n', got ", quote(testLogSystem0.Output))
+ t.Error("Expected logger 0 output '[TEST] error\\n', got ", output0)
}
if output1 != "[TEST] error\n[TEST] warn\n" {
- t.Error("Expected logger 1 output '[TEST] error\\n[TEST] warn\\n', got ", quote(testLogSystem1.Output))
+ t.Error("Expected logger 1 output '[TEST] error\\n[TEST] warn\\n', got ", output1)
}
}
@@ -94,9 +107,8 @@ func TestFileLogSystem(t *testing.T) {
Flush()
contents, _ := ioutil.ReadFile(filename)
output := string(contents)
- fmt.Println(quote(output))
if output != "[TEST] error to test.log\n[TEST] warn\n" {
- t.Error("Expected contents of file 'test.log': '[TEST] error to test.log\\n[TEST] warn\\n', got ", quote(output))
+ t.Error("Expected contents of file 'test.log': '[TEST] error to test.log\\n[TEST] warn\\n', got ", output)
} else {
os.Remove(filename)
}
diff --git a/ethreact/reactor.go b/ethreact/reactor.go
new file mode 100644
index 000000000..a26f82a97
--- /dev/null
+++ b/ethreact/reactor.go
@@ -0,0 +1,182 @@
+package ethreact
+
+import (
+ "github.com/ethereum/eth-go/ethlog"
+ "sync"
+)
+
+var logger = ethlog.NewLogger("REACTOR")
+
+const (
+ eventBufferSize int = 10
+)
+
+type EventHandler struct {
+ lock sync.RWMutex
+ name string
+ chans []chan Event
+}
+
+// Post the Event with the reactor resource on the channels
+// currently subscribed to the event
+func (e *EventHandler) Post(event Event) {
+ e.lock.RLock()
+ defer e.lock.RUnlock()
+
+ // if we want to preserve order pushing to subscibed channels
+ // dispatching should be syncrounous
+ // this means if subscribed event channel is blocked
+ // the reactor dispatch will be blocked, so we need to mitigate by skipping
+ // rogue blocking subscribers
+ for i, ch := range e.chans {
+ select {
+ case ch <- event:
+ default:
+ logger.Warnf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name)
+ }
+ }
+}
+
+// Add a subscriber to this event
+func (e *EventHandler) Add(ch chan Event) {
+ e.lock.Lock()
+ defer e.lock.Unlock()
+
+ e.chans = append(e.chans, ch)
+}
+
+// Remove a subscriber
+func (e *EventHandler) Remove(ch chan Event) int {
+ e.lock.Lock()
+ defer e.lock.Unlock()
+
+ for i, c := range e.chans {
+ if c == ch {
+ e.chans = append(e.chans[:i], e.chans[i+1:]...)
+ }
+ }
+ return len(e.chans)
+}
+
+// Basic reactor resource
+type Event struct {
+ Resource interface{}
+ Name string
+}
+
+// The reactor basic engine. Acts as bridge
+// between the events and the subscribers/posters
+type ReactorEngine struct {
+ lock sync.RWMutex
+ eventChannel chan Event
+ eventHandlers map[string]*EventHandler
+ quit chan chan error
+ running bool
+ drained chan bool
+}
+
+func New() *ReactorEngine {
+ return &ReactorEngine{
+ eventHandlers: make(map[string]*EventHandler),
+ eventChannel: make(chan Event, eventBufferSize),
+ quit: make(chan chan error, 1),
+ drained: make(chan bool, 1),
+ }
+}
+
+func (reactor *ReactorEngine) Start() {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+ if !reactor.running {
+ go func() {
+ for {
+ select {
+ case status := <-reactor.quit:
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+ reactor.running = false
+ logger.Infoln("stopped")
+ status <- nil
+ return
+ case event := <-reactor.eventChannel:
+ // needs to be called syncronously to keep order of events
+ reactor.dispatch(event)
+ default:
+ reactor.drained <- true // blocking till message is coming in
+ }
+ }
+ }()
+ reactor.running = true
+ logger.Infoln("started")
+ }
+}
+
+func (reactor *ReactorEngine) Stop() {
+ if reactor.running {
+ status := make(chan error)
+ reactor.quit <- status
+ select {
+ case <-reactor.drained:
+ default:
+ }
+ <-status
+ }
+}
+
+func (reactor *ReactorEngine) Flush() {
+ <-reactor.drained
+}
+
+// Subscribe a channel to the specified event
+func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+
+ eventHandler := reactor.eventHandlers[event]
+ // Create a new event handler if one isn't available
+ if eventHandler == nil {
+ eventHandler = &EventHandler{name: event}
+ reactor.eventHandlers[event] = eventHandler
+ }
+ // Add the events channel to reactor event handler
+ eventHandler.Add(eventChannel)
+ logger.Debugf("added new subscription to %s", event)
+}
+
+func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+
+ eventHandler := reactor.eventHandlers[event]
+ if eventHandler != nil {
+ len := eventHandler.Remove(eventChannel)
+ if len == 0 {
+ reactor.eventHandlers[event] = nil
+ }
+ logger.Debugf("removed subscription to %s", event)
+ }
+}
+
+func (reactor *ReactorEngine) Post(event string, resource interface{}) {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+
+ if reactor.running {
+ reactor.eventChannel <- Event{Resource: resource, Name: event}
+ select {
+ case <-reactor.drained:
+ default:
+ }
+ }
+}
+
+func (reactor *ReactorEngine) dispatch(event Event) {
+ name := event.Name
+ eventHandler := reactor.eventHandlers[name]
+ // if no subscriptions to this event type - no event handler created
+ // then noone to notify
+ if eventHandler != nil {
+ // needs to be called syncronously
+ eventHandler.Post(event)
+ }
+}