aboutsummaryrefslogtreecommitdiffstats
path: root/logger/sys.go
blob: c4d5c382a87f5e14f6e86f1aa888c4a134e32df4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package logger

import (
    "fmt"
    "sync"
)

type stdMsg struct {
    level LogLevel
    msg   string
}

type jsonMsg []byte

func (m jsonMsg) Level() LogLevel {
    return 0
}

func (m jsonMsg) String() string {
    return string(m)
}

type LogMsg interface {
    Level() LogLevel
    fmt.Stringer
}

func (m stdMsg) Level() LogLevel {
    return m.level
}

func (m stdMsg) String() string {
    return m.msg
}

var (
    logMessageC = make(chan LogMsg)
    addSystemC  = make(chan LogSystem)
    flushC      = make(chan chan struct{})
    resetC      = make(chan chan struct{})
)

func init() {
    go dispatchLoop()
}

// each system can buffer this many messages before
// blocking incoming log messages.
const sysBufferSize = 500

func dispatchLoop() {
    var (
        systems  []LogSystem
        systemIn []chan LogMsg
        systemWG sync.WaitGroup
    )
    bootSystem := func(sys LogSystem) {
        in := make(chan LogMsg, sysBufferSize)
        systemIn = append(systemIn, in)
        systemWG.Add(1)
        go sysLoop(sys, in, &systemWG)
    }

    for {
        select {
        case msg := <-logMessageC:
            for _, c := range systemIn {
                c <- msg
            }

        case sys := <-addSystemC:
            systems = append(systems, sys)
            bootSystem(sys)

        case waiter := <-resetC:
            // reset means terminate all systems
            for _, c := range systemIn {
                close(c)
            }
            systems = nil
            systemIn = nil
            systemWG.Wait()
            close(waiter)

        case waiter := <-flushC:
            // flush means reboot all systems
            for _, c := range systemIn {
                close(c)
            }
            systemIn = nil
            systemWG.Wait()
            for _, sys := range systems {
                bootSystem(sys)
            }
            close(waiter)
        }
    }
}

func sysLoop(sys LogSystem, in <-chan LogMsg, wg *sync.WaitGroup) {
    for msg := range in {
        sys.LogPrint(msg)
    }
    wg.Done()
}

// Reset removes all active log systems.
// It blocks until all current messages have been delivered.
func Reset() {
    waiter := make(chan struct{})
    resetC <- waiter
    <-waiter
}

// Flush waits until all current log messages have been dispatched to
// the active log systems.
func Flush() {
    waiter := make(chan struct{})
    flushC <- waiter
    <-waiter
}

// AddLogSystem starts printing messages to the given LogSystem.
func AddLogSystem(sys LogSystem) {
    addSystemC <- sys
}