aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2014-10-16 17:50:10 +0800
committerFelix Lange <fjl@twurst.com>2014-10-17 23:23:29 +0800
commitfd9b03a431a9b8bb28a5b681f97e0b2d18ef7a3e (patch)
tree566005d81fe6bcb21d148ad024c17955c47142a3
parentc090a77f1c9bc890e67a00fb47a1c23c8769799d (diff)
downloadgo-tangerine-fd9b03a431a9b8bb28a5b681f97e0b2d18ef7a3e.tar
go-tangerine-fd9b03a431a9b8bb28a5b681f97e0b2d18ef7a3e.tar.gz
go-tangerine-fd9b03a431a9b8bb28a5b681f97e0b2d18ef7a3e.tar.bz2
go-tangerine-fd9b03a431a9b8bb28a5b681f97e0b2d18ef7a3e.tar.lz
go-tangerine-fd9b03a431a9b8bb28a5b681f97e0b2d18ef7a3e.tar.xz
go-tangerine-fd9b03a431a9b8bb28a5b681f97e0b2d18ef7a3e.tar.zst
go-tangerine-fd9b03a431a9b8bb28a5b681f97e0b2d18ef7a3e.zip
ethlog: fix concurrency
Rather than spawning a new goroutine for each message, run each log system in a dedicated goroutine. Ensure that logging is still asynchronous by using a per-system buffer (currently 500 messages). If it overflows all logging will hang, but that's better than spawning indefinitely many goroutines.
-rw-r--r--ethlog/loggers.go98
1 files changed, 60 insertions, 38 deletions
diff --git a/ethlog/loggers.go b/ethlog/loggers.go
index f5ec4d402..732d6a970 100644
--- a/ethlog/loggers.go
+++ b/ethlog/loggers.go
@@ -47,75 +47,97 @@ const (
)
var (
- mutex sync.RWMutex // protects logSystems
- logSystems []LogSystem
-
- logMessages = make(chan message)
- drainWaitReq = make(chan chan struct{})
+ logMessageC = make(chan message)
+ addSystemC = make(chan LogSystem)
+ flushC = make(chan chan struct{})
+ resetC = make(chan chan struct{})
)
func init() {
go dispatchLoop()
}
+// each system can buffer this many messages before
+// blocking incoming log messages.
+const sysBufferSize = 500
+
func dispatchLoop() {
- var drainWait []chan struct{}
- dispatchDone := make(chan struct{})
- pending := 0
+ var (
+ systems []LogSystem
+ systemIn []chan message
+ systemWG sync.WaitGroup
+ )
+ bootSystem := func(sys LogSystem) {
+ in := make(chan message, sysBufferSize)
+ systemIn = append(systemIn, in)
+ systemWG.Add(1)
+ go sysLoop(sys, in, &systemWG)
+ }
+
for {
select {
- case msg := <-logMessages:
- go dispatch(msg, dispatchDone)
- pending++
- case waiter := <-drainWaitReq:
- if pending == 0 {
- close(waiter)
- } else {
- drainWait = append(drainWait, waiter)
+ case msg := <-logMessageC:
+ for _, c := range systemIn {
+ c <- msg
+ }
+
+ case sys := <-addSystemC:
+ systems = append(systems, sys)
+ bootSystem(sys)
+
+ case waiter := <-resetC:
+ // reset means terminate all systems
+ for _, c := range systemIn {
+ close(c)
+ }
+ systems = nil
+ systemIn = nil
+ systemWG.Wait()
+ close(waiter)
+
+ case waiter := <-flushC:
+ // flush means reboot all systems
+ for _, c := range systemIn {
+ close(c)
}
- case <-dispatchDone:
- pending--
- if pending == 0 {
- for _, c := range drainWait {
- close(c)
- }
- drainWait = nil
+ systemIn = nil
+ systemWG.Wait()
+ for _, sys := range systems {
+ bootSystem(sys)
}
+ close(waiter)
}
}
}
-func dispatch(msg message, done chan<- struct{}) {
- mutex.RLock()
- for _, sys := range logSystems {
+func sysLoop(sys LogSystem, in <-chan message, wg *sync.WaitGroup) {
+ for msg := range in {
if sys.GetLogLevel() >= msg.level {
sys.LogPrint(msg.level, msg.msg)
}
}
- mutex.RUnlock()
- done <- struct{}{}
+ wg.Done()
}
// Reset removes all active log systems.
+// It blocks until all current messages have been delivered.
func Reset() {
- mutex.Lock()
- logSystems = nil
- mutex.Unlock()
+ waiter := make(chan struct{})
+ resetC <- waiter
+ <-waiter
}
// Flush waits until all current log messages have been dispatched to
// the active log systems.
func Flush() {
waiter := make(chan struct{})
- drainWaitReq <- waiter
+ flushC <- waiter
<-waiter
}
// AddLogSystem starts printing messages to the given LogSystem.
-func AddLogSystem(logSystem LogSystem) {
- mutex.Lock()
- logSystems = append(logSystems, logSystem)
- mutex.Unlock()
+func AddLogSystem(sys LogSystem) {
+ addSystemC <- sys
}
// A Logger prints messages prefixed by a given tag. It provides named
@@ -130,11 +152,11 @@ func NewLogger(tag string) *Logger {
}
func (logger *Logger) sendln(level LogLevel, v ...interface{}) {
- logMessages <- message{level, logger.tag + fmt.Sprintln(v...)}
+ logMessageC <- message{level, logger.tag + fmt.Sprintln(v...)}
}
func (logger *Logger) sendf(level LogLevel, format string, v ...interface{}) {
- logMessages <- message{level, logger.tag + fmt.Sprintf(format, v...)}
+ logMessageC <- message{level, logger.tag + fmt.Sprintf(format, v...)}
}
// Errorln writes a message with ErrorLevel.