aboutsummaryrefslogtreecommitdiffstats
path: root/les/csvlogger/csvlogger.go
blob: 9a4093cb9abe2e58a319e4767b51a0b23324e38f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package csvlogger

import (
    "fmt"
    "os"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/common/mclock"
    "github.com/ethereum/go-ethereum/log"
)

// Logger is a metrics/events logger that writes logged values and events into a comma separated file
type Logger struct {
    file            *os.File
    started         mclock.AbsTime
    channels        []*Channel
    period          time.Duration
    stopCh, stopped chan struct{}
    storeCh         chan string
    eventHeader     string
}

// NewLogger creates a new Logger
func NewLogger(fileName string, updatePeriod time.Duration, eventHeader string) *Logger {
    if fileName == "" {
        return nil
    }
    f, err := os.Create(fileName)
    if err != nil {
        log.Error("Error creating log file", "name", fileName, "error", err)
        return nil
    }
    return &Logger{
        file:        f,
        period:      updatePeriod,
        stopCh:      make(chan struct{}),
        storeCh:     make(chan string, 1),
        eventHeader: eventHeader,
    }
}

// NewChannel creates a new value logger channel that writes values in a single
// column. If the relative change of the value is bigger than the given threshold
// then a new line is added immediately (threshold can also be 0).
func (l *Logger) NewChannel(name string, threshold float64) *Channel {
    if l == nil {
        return nil
    }
    c := &Channel{
        logger:    l,
        name:      name,
        threshold: threshold,
    }
    l.channels = append(l.channels, c)
    return c
}

// NewMinMaxChannel creates a new value logger channel that writes the minimum and
// maximum of the tracked value in two columns. It never triggers adding a new line.
// If zeroDefault is true then 0 is written to both min and max columns if no update
// was given during the last period. If it is false then the last update will appear
// in both columns.
func (l *Logger) NewMinMaxChannel(name string, zeroDefault bool) *Channel {
    if l == nil {
        return nil
    }
    c := &Channel{
        logger:        l,
        name:          name,
        minmax:        true,
        mmZeroDefault: zeroDefault,
    }
    l.channels = append(l.channels, c)
    return c
}

func (l *Logger) store(event string) {
    s := fmt.Sprintf("%g", float64(mclock.Now()-l.started)/1000000000)
    for _, ch := range l.channels {
        s += ", " + ch.store()
    }
    if event != "" {
        s += ", " + event
    }
    l.file.WriteString(s + "\n")
}

// Start writes the header line and starts the logger
func (l *Logger) Start() {
    if l == nil {
        return
    }
    l.started = mclock.Now()
    s := "Time"
    for _, ch := range l.channels {
        s += ", " + ch.header()
    }
    if l.eventHeader != "" {
        s += ", " + l.eventHeader
    }
    l.file.WriteString(s + "\n")
    go func() {
        timer := time.NewTimer(l.period)
        for {
            select {
            case <-timer.C:
                l.store("")
                timer.Reset(l.period)
            case event := <-l.storeCh:
                l.store(event)
                if !timer.Stop() {
                    <-timer.C
                }
                timer.Reset(l.period)
            case <-l.stopCh:
                close(l.stopped)
                return
            }
        }
    }()
}

// Stop stops the logger and closes the file
func (l *Logger) Stop() {
    if l == nil {
        return
    }
    l.stopped = make(chan struct{})
    close(l.stopCh)
    <-l.stopped
    l.file.Close()
}

// Event immediately adds a new line and adds the given event string in the last column
func (l *Logger) Event(event string) {
    if l == nil {
        return
    }
    select {
    case l.storeCh <- event:
    case <-l.stopCh:
    }
}

// Channel represents a logger channel tracking a single value
type Channel struct {
    logger                                             *Logger
    lock                                               sync.Mutex
    name                                               string
    threshold, storeMin, storeMax, lastValue, min, max float64
    minmax, mmSet, mmZeroDefault                       bool
}

// Update updates the tracked value
func (lc *Channel) Update(value float64) {
    if lc == nil {
        return
    }
    lc.lock.Lock()
    defer lc.lock.Unlock()

    lc.lastValue = value
    if lc.minmax {
        if value > lc.max || !lc.mmSet {
            lc.max = value
        }
        if value < lc.min || !lc.mmSet {
            lc.min = value
        }
        lc.mmSet = true
    } else {
        if value < lc.storeMin || value > lc.storeMax {
            select {
            case lc.logger.storeCh <- "":
            default:
            }
        }
    }
}

func (lc *Channel) store() (s string) {
    lc.lock.Lock()
    defer lc.lock.Unlock()

    if lc.minmax {
        s = fmt.Sprintf("%g, %g", lc.min, lc.max)
        lc.mmSet = false
        if lc.mmZeroDefault {
            lc.min = 0
        } else {
            lc.min = lc.lastValue
        }
        lc.max = lc.min
    } else {
        s = fmt.Sprintf("%g", lc.lastValue)
        lc.storeMin = lc.lastValue * (1 - lc.threshold)
        lc.storeMax = lc.lastValue * (1 + lc.threshold)
        if lc.lastValue < 0 {
            lc.storeMin, lc.storeMax = lc.storeMax, lc.storeMin
        }
    }
    return
}

func (lc *Channel) header() string {
    if lc.minmax {
        return lc.name + " (min), " + lc.name + " (max)"
    }
    return lc.name
}