diff options
author | Kurkó Mihály <kurkomisi@users.noreply.github.com> | 2019-03-13 20:53:52 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2019-03-13 20:53:52 +0800 |
commit | 1a29bf0ee2c5753a6a0f6f57a90d079b4d17702d (patch) | |
tree | 0fa275c57dbc7ddd5371d52e842fa91dd4fa5adc /dashboard/dashboard.go | |
parent | 1591b63306cababdb37d244432653bbd71c346df (diff) | |
download | go-tangerine-1a29bf0ee2c5753a6a0f6f57a90d079b4d17702d.tar go-tangerine-1a29bf0ee2c5753a6a0f6f57a90d079b4d17702d.tar.gz go-tangerine-1a29bf0ee2c5753a6a0f6f57a90d079b4d17702d.tar.bz2 go-tangerine-1a29bf0ee2c5753a6a0f6f57a90d079b4d17702d.tar.lz go-tangerine-1a29bf0ee2c5753a6a0f6f57a90d079b4d17702d.tar.xz go-tangerine-1a29bf0ee2c5753a6a0f6f57a90d079b4d17702d.tar.zst go-tangerine-1a29bf0ee2c5753a6a0f6f57a90d079b4d17702d.zip |
dashboard, p2p, vendor: visualize peers (#19247)
* dashboard, p2p: visualize peers
* dashboard: change scale to green to red
Diffstat (limited to 'dashboard/dashboard.go')
-rw-r--r-- | dashboard/dashboard.go | 215 |
1 files changed, 46 insertions, 169 deletions
diff --git a/dashboard/dashboard.go b/dashboard/dashboard.go index 3ba92ac73..d69a750f1 100644 --- a/dashboard/dashboard.go +++ b/dashboard/dashboard.go @@ -18,8 +18,10 @@ package dashboard //go:generate yarn --cwd ./assets install //go:generate yarn --cwd ./assets build -//go:generate go-bindata -nometadata -o assets.go -prefix assets -nocompress -pkg dashboard assets/index.html assets/bundle.js +//go:generate yarn --cwd ./assets js-beautify -f bundle.js.map -r -w 1 +//go:generate go-bindata -nometadata -o assets.go -prefix assets -nocompress -pkg dashboard assets/index.html assets/bundle.js assets/bundle.js.map //go:generate sh -c "sed 's#var _bundleJs#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go" +//go:generate sh -c "sed 's#var _bundleJsMap#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go" //go:generate sh -c "sed 's#var _indexHtml#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go" //go:generate gofmt -w -s assets.go @@ -27,16 +29,13 @@ import ( "fmt" "net" "net/http" - "runtime" "sync" "sync/atomic" "time" "io" - "github.com/elastic/gosigar" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" @@ -45,31 +44,29 @@ import ( ) const ( - activeMemorySampleLimit = 200 // Maximum number of active memory data samples - virtualMemorySampleLimit = 200 // Maximum number of virtual memory data samples - networkIngressSampleLimit = 200 // Maximum number of network ingress data samples - networkEgressSampleLimit = 200 // Maximum number of network egress data samples - processCPUSampleLimit = 200 // Maximum number of process cpu data samples - systemCPUSampleLimit = 200 // Maximum number of system cpu data samples - diskReadSampleLimit = 200 // Maximum number of disk read data samples - diskWriteSampleLimit = 200 // Maximum number of disk write data samples + sampleLimit = 200 // Maximum number of data samples ) -var nextID uint32 // Next connection id - // Dashboard contains the dashboard internals. type Dashboard struct { - config *Config + config *Config // Configuration values for the dashboard + + listener net.Listener // Network listener listening for dashboard clients + conns map[uint32]*client // Currently live websocket connections + nextConnID uint32 // Next connection id + + history *Message // Stored historical data - listener net.Listener - conns map[uint32]*client // Currently live websocket connections - history *Message - lock sync.RWMutex // Lock protecting the dashboard's internals + lock sync.Mutex // Lock protecting the dashboard's internals + sysLock sync.RWMutex // Lock protecting the stored system data + peerLock sync.RWMutex // Lock protecting the stored peer data + logLock sync.RWMutex // Lock protecting the stored log data - logdir string + geodb *geoDB // geoip database instance for IP to geographical information conversions + logdir string // Directory containing the log files quit chan chan error // Channel used for graceful exit - wg sync.WaitGroup + wg sync.WaitGroup // Wait group used to close the data collector threads } // client represents active websocket connection with a remote browser. @@ -96,14 +93,14 @@ func New(config *Config, commit string, logdir string) *Dashboard { Version: fmt.Sprintf("v%d.%d.%d%s", params.VersionMajor, params.VersionMinor, params.VersionPatch, versionMeta), }, System: &SystemMessage{ - ActiveMemory: emptyChartEntries(now, activeMemorySampleLimit, config.Refresh), - VirtualMemory: emptyChartEntries(now, virtualMemorySampleLimit, config.Refresh), - NetworkIngress: emptyChartEntries(now, networkIngressSampleLimit, config.Refresh), - NetworkEgress: emptyChartEntries(now, networkEgressSampleLimit, config.Refresh), - ProcessCPU: emptyChartEntries(now, processCPUSampleLimit, config.Refresh), - SystemCPU: emptyChartEntries(now, systemCPUSampleLimit, config.Refresh), - DiskRead: emptyChartEntries(now, diskReadSampleLimit, config.Refresh), - DiskWrite: emptyChartEntries(now, diskWriteSampleLimit, config.Refresh), + ActiveMemory: emptyChartEntries(now, sampleLimit), + VirtualMemory: emptyChartEntries(now, sampleLimit), + NetworkIngress: emptyChartEntries(now, sampleLimit), + NetworkEgress: emptyChartEntries(now, sampleLimit), + ProcessCPU: emptyChartEntries(now, sampleLimit), + SystemCPU: emptyChartEntries(now, sampleLimit), + DiskRead: emptyChartEntries(now, sampleLimit), + DiskWrite: emptyChartEntries(now, sampleLimit), }, }, logdir: logdir, @@ -111,12 +108,10 @@ func New(config *Config, commit string, logdir string) *Dashboard { } // emptyChartEntries returns a ChartEntry array containing limit number of empty samples. -func emptyChartEntries(t time.Time, limit int, refresh time.Duration) ChartEntries { +func emptyChartEntries(t time.Time, limit int) ChartEntries { ce := make(ChartEntries, limit) for i := 0; i < limit; i++ { - ce[i] = &ChartEntry{ - Time: t.Add(-time.Duration(i) * refresh), - } + ce[i] = new(ChartEntry) } return ce } @@ -132,9 +127,10 @@ func (db *Dashboard) APIs() []rpc.API { return nil } func (db *Dashboard) Start(server *p2p.Server) error { log.Info("Starting dashboard") - db.wg.Add(2) - go db.collectData() + db.wg.Add(3) + go db.collectSystemData() go db.streamLogs() + go db.collectPeerData() http.HandleFunc("/", db.webHandler) http.Handle("/api", websocket.Handler(db.apiHandler)) @@ -160,7 +156,7 @@ func (db *Dashboard) Stop() error { } // Close the collectors. errc := make(chan error, 1) - for i := 0; i < 2; i++ { + for i := 0; i < 3; i++ { db.quit <- errc if err := <-errc; err != nil { errs = append(errs, err) @@ -206,7 +202,7 @@ func (db *Dashboard) webHandler(w http.ResponseWriter, r *http.Request) { // apiHandler handles requests for the dashboard. func (db *Dashboard) apiHandler(conn *websocket.Conn) { - id := atomic.AddUint32(&nextID, 1) + id := atomic.AddUint32(&db.nextConnID, 1) client := &client{ conn: conn, msg: make(chan *Message, 128), @@ -233,10 +229,21 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) { } }() - db.lock.Lock() // Send the past data. - client.msg <- deepcopy.Copy(db.history).(*Message) + db.sysLock.RLock() + db.peerLock.RLock() + db.logLock.RLock() + + h := deepcopy.Copy(db.history).(*Message) + + db.sysLock.RUnlock() + db.peerLock.RUnlock() + db.logLock.RUnlock() + + client.msg <- h + // Start tracking the connection and drop at connection loss. + db.lock.Lock() db.conns[id] = client db.lock.Unlock() defer func() { @@ -259,136 +266,6 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) { } } -// meterCollector returns a function, which retrieves a specific meter. -func meterCollector(name string) func() int64 { - if metric := metrics.DefaultRegistry.Get(name); metric != nil { - m := metric.(metrics.Meter) - return func() int64 { - return m.Count() - } - } - return func() int64 { - return 0 - } -} - -// collectData collects the required data to plot on the dashboard. -func (db *Dashboard) collectData() { - defer db.wg.Done() - - systemCPUUsage := gosigar.Cpu{} - systemCPUUsage.Get() - var ( - mem runtime.MemStats - - collectNetworkIngress = meterCollector("p2p/InboundTraffic") - collectNetworkEgress = meterCollector("p2p/OutboundTraffic") - collectDiskRead = meterCollector("eth/db/chaindata/disk/read") - collectDiskWrite = meterCollector("eth/db/chaindata/disk/write") - - prevNetworkIngress = collectNetworkIngress() - prevNetworkEgress = collectNetworkEgress() - prevProcessCPUTime = getProcessCPUTime() - prevSystemCPUUsage = systemCPUUsage - prevDiskRead = collectDiskRead() - prevDiskWrite = collectDiskWrite() - - frequency = float64(db.config.Refresh / time.Second) - numCPU = float64(runtime.NumCPU()) - ) - - for { - select { - case errc := <-db.quit: - errc <- nil - return - case <-time.After(db.config.Refresh): - systemCPUUsage.Get() - var ( - curNetworkIngress = collectNetworkIngress() - curNetworkEgress = collectNetworkEgress() - curProcessCPUTime = getProcessCPUTime() - curSystemCPUUsage = systemCPUUsage - curDiskRead = collectDiskRead() - curDiskWrite = collectDiskWrite() - - deltaNetworkIngress = float64(curNetworkIngress - prevNetworkIngress) - deltaNetworkEgress = float64(curNetworkEgress - prevNetworkEgress) - deltaProcessCPUTime = curProcessCPUTime - prevProcessCPUTime - deltaSystemCPUUsage = curSystemCPUUsage.Delta(prevSystemCPUUsage) - deltaDiskRead = curDiskRead - prevDiskRead - deltaDiskWrite = curDiskWrite - prevDiskWrite - ) - prevNetworkIngress = curNetworkIngress - prevNetworkEgress = curNetworkEgress - prevProcessCPUTime = curProcessCPUTime - prevSystemCPUUsage = curSystemCPUUsage - prevDiskRead = curDiskRead - prevDiskWrite = curDiskWrite - - now := time.Now() - - runtime.ReadMemStats(&mem) - activeMemory := &ChartEntry{ - Time: now, - Value: float64(mem.Alloc) / frequency, - } - virtualMemory := &ChartEntry{ - Time: now, - Value: float64(mem.Sys) / frequency, - } - networkIngress := &ChartEntry{ - Time: now, - Value: deltaNetworkIngress / frequency, - } - networkEgress := &ChartEntry{ - Time: now, - Value: deltaNetworkEgress / frequency, - } - processCPU := &ChartEntry{ - Time: now, - Value: deltaProcessCPUTime / frequency / numCPU * 100, - } - systemCPU := &ChartEntry{ - Time: now, - Value: float64(deltaSystemCPUUsage.Sys+deltaSystemCPUUsage.User) / frequency / numCPU, - } - diskRead := &ChartEntry{ - Time: now, - Value: float64(deltaDiskRead) / frequency, - } - diskWrite := &ChartEntry{ - Time: now, - Value: float64(deltaDiskWrite) / frequency, - } - sys := db.history.System - db.lock.Lock() - sys.ActiveMemory = append(sys.ActiveMemory[1:], activeMemory) - sys.VirtualMemory = append(sys.VirtualMemory[1:], virtualMemory) - sys.NetworkIngress = append(sys.NetworkIngress[1:], networkIngress) - sys.NetworkEgress = append(sys.NetworkEgress[1:], networkEgress) - sys.ProcessCPU = append(sys.ProcessCPU[1:], processCPU) - sys.SystemCPU = append(sys.SystemCPU[1:], systemCPU) - sys.DiskRead = append(sys.DiskRead[1:], diskRead) - sys.DiskWrite = append(sys.DiskWrite[1:], diskWrite) - db.lock.Unlock() - - db.sendToAll(&Message{ - System: &SystemMessage{ - ActiveMemory: ChartEntries{activeMemory}, - VirtualMemory: ChartEntries{virtualMemory}, - NetworkIngress: ChartEntries{networkIngress}, - NetworkEgress: ChartEntries{networkEgress}, - ProcessCPU: ChartEntries{processCPU}, - SystemCPU: ChartEntries{systemCPU}, - DiskRead: ChartEntries{diskRead}, - DiskWrite: ChartEntries{diskWrite}, - }, - }) - } - } -} - // sendToAll sends the given message to the active dashboards. func (db *Dashboard) sendToAll(msg *Message) { db.lock.Lock() |