aboutsummaryrefslogtreecommitdiffstats
path: root/dashboard/dashboard.go
diff options
context:
space:
mode:
authorKurkó Mihály <kurkomisi@users.noreply.github.com>2019-03-13 20:53:52 +0800
committerPéter Szilágyi <peterke@gmail.com>2019-03-13 20:53:52 +0800
commit1a29bf0ee2c5753a6a0f6f57a90d079b4d17702d (patch)
tree0fa275c57dbc7ddd5371d52e842fa91dd4fa5adc /dashboard/dashboard.go
parent1591b63306cababdb37d244432653bbd71c346df (diff)
downloadgo-tangerine-1a29bf0ee2c5753a6a0f6f57a90d079b4d17702d.tar
go-tangerine-1a29bf0ee2c5753a6a0f6f57a90d079b4d17702d.tar.gz
go-tangerine-1a29bf0ee2c5753a6a0f6f57a90d079b4d17702d.tar.bz2
go-tangerine-1a29bf0ee2c5753a6a0f6f57a90d079b4d17702d.tar.lz
go-tangerine-1a29bf0ee2c5753a6a0f6f57a90d079b4d17702d.tar.xz
go-tangerine-1a29bf0ee2c5753a6a0f6f57a90d079b4d17702d.tar.zst
go-tangerine-1a29bf0ee2c5753a6a0f6f57a90d079b4d17702d.zip
dashboard, p2p, vendor: visualize peers (#19247)
* dashboard, p2p: visualize peers * dashboard: change scale to green to red
Diffstat (limited to 'dashboard/dashboard.go')
-rw-r--r--dashboard/dashboard.go215
1 files changed, 46 insertions, 169 deletions
diff --git a/dashboard/dashboard.go b/dashboard/dashboard.go
index 3ba92ac73..d69a750f1 100644
--- a/dashboard/dashboard.go
+++ b/dashboard/dashboard.go
@@ -18,8 +18,10 @@ package dashboard
//go:generate yarn --cwd ./assets install
//go:generate yarn --cwd ./assets build
-//go:generate go-bindata -nometadata -o assets.go -prefix assets -nocompress -pkg dashboard assets/index.html assets/bundle.js
+//go:generate yarn --cwd ./assets js-beautify -f bundle.js.map -r -w 1
+//go:generate go-bindata -nometadata -o assets.go -prefix assets -nocompress -pkg dashboard assets/index.html assets/bundle.js assets/bundle.js.map
//go:generate sh -c "sed 's#var _bundleJs#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
+//go:generate sh -c "sed 's#var _bundleJsMap#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
//go:generate sh -c "sed 's#var _indexHtml#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
//go:generate gofmt -w -s assets.go
@@ -27,16 +29,13 @@ import (
"fmt"
"net"
"net/http"
- "runtime"
"sync"
"sync/atomic"
"time"
"io"
- "github.com/elastic/gosigar"
"github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
@@ -45,31 +44,29 @@ import (
)
const (
- activeMemorySampleLimit = 200 // Maximum number of active memory data samples
- virtualMemorySampleLimit = 200 // Maximum number of virtual memory data samples
- networkIngressSampleLimit = 200 // Maximum number of network ingress data samples
- networkEgressSampleLimit = 200 // Maximum number of network egress data samples
- processCPUSampleLimit = 200 // Maximum number of process cpu data samples
- systemCPUSampleLimit = 200 // Maximum number of system cpu data samples
- diskReadSampleLimit = 200 // Maximum number of disk read data samples
- diskWriteSampleLimit = 200 // Maximum number of disk write data samples
+ sampleLimit = 200 // Maximum number of data samples
)
-var nextID uint32 // Next connection id
-
// Dashboard contains the dashboard internals.
type Dashboard struct {
- config *Config
+ config *Config // Configuration values for the dashboard
+
+ listener net.Listener // Network listener listening for dashboard clients
+ conns map[uint32]*client // Currently live websocket connections
+ nextConnID uint32 // Next connection id
+
+ history *Message // Stored historical data
- listener net.Listener
- conns map[uint32]*client // Currently live websocket connections
- history *Message
- lock sync.RWMutex // Lock protecting the dashboard's internals
+ lock sync.Mutex // Lock protecting the dashboard's internals
+ sysLock sync.RWMutex // Lock protecting the stored system data
+ peerLock sync.RWMutex // Lock protecting the stored peer data
+ logLock sync.RWMutex // Lock protecting the stored log data
- logdir string
+ geodb *geoDB // geoip database instance for IP to geographical information conversions
+ logdir string // Directory containing the log files
quit chan chan error // Channel used for graceful exit
- wg sync.WaitGroup
+ wg sync.WaitGroup // Wait group used to close the data collector threads
}
// client represents active websocket connection with a remote browser.
@@ -96,14 +93,14 @@ func New(config *Config, commit string, logdir string) *Dashboard {
Version: fmt.Sprintf("v%d.%d.%d%s", params.VersionMajor, params.VersionMinor, params.VersionPatch, versionMeta),
},
System: &SystemMessage{
- ActiveMemory: emptyChartEntries(now, activeMemorySampleLimit, config.Refresh),
- VirtualMemory: emptyChartEntries(now, virtualMemorySampleLimit, config.Refresh),
- NetworkIngress: emptyChartEntries(now, networkIngressSampleLimit, config.Refresh),
- NetworkEgress: emptyChartEntries(now, networkEgressSampleLimit, config.Refresh),
- ProcessCPU: emptyChartEntries(now, processCPUSampleLimit, config.Refresh),
- SystemCPU: emptyChartEntries(now, systemCPUSampleLimit, config.Refresh),
- DiskRead: emptyChartEntries(now, diskReadSampleLimit, config.Refresh),
- DiskWrite: emptyChartEntries(now, diskWriteSampleLimit, config.Refresh),
+ ActiveMemory: emptyChartEntries(now, sampleLimit),
+ VirtualMemory: emptyChartEntries(now, sampleLimit),
+ NetworkIngress: emptyChartEntries(now, sampleLimit),
+ NetworkEgress: emptyChartEntries(now, sampleLimit),
+ ProcessCPU: emptyChartEntries(now, sampleLimit),
+ SystemCPU: emptyChartEntries(now, sampleLimit),
+ DiskRead: emptyChartEntries(now, sampleLimit),
+ DiskWrite: emptyChartEntries(now, sampleLimit),
},
},
logdir: logdir,
@@ -111,12 +108,10 @@ func New(config *Config, commit string, logdir string) *Dashboard {
}
// emptyChartEntries returns a ChartEntry array containing limit number of empty samples.
-func emptyChartEntries(t time.Time, limit int, refresh time.Duration) ChartEntries {
+func emptyChartEntries(t time.Time, limit int) ChartEntries {
ce := make(ChartEntries, limit)
for i := 0; i < limit; i++ {
- ce[i] = &ChartEntry{
- Time: t.Add(-time.Duration(i) * refresh),
- }
+ ce[i] = new(ChartEntry)
}
return ce
}
@@ -132,9 +127,10 @@ func (db *Dashboard) APIs() []rpc.API { return nil }
func (db *Dashboard) Start(server *p2p.Server) error {
log.Info("Starting dashboard")
- db.wg.Add(2)
- go db.collectData()
+ db.wg.Add(3)
+ go db.collectSystemData()
go db.streamLogs()
+ go db.collectPeerData()
http.HandleFunc("/", db.webHandler)
http.Handle("/api", websocket.Handler(db.apiHandler))
@@ -160,7 +156,7 @@ func (db *Dashboard) Stop() error {
}
// Close the collectors.
errc := make(chan error, 1)
- for i := 0; i < 2; i++ {
+ for i := 0; i < 3; i++ {
db.quit <- errc
if err := <-errc; err != nil {
errs = append(errs, err)
@@ -206,7 +202,7 @@ func (db *Dashboard) webHandler(w http.ResponseWriter, r *http.Request) {
// apiHandler handles requests for the dashboard.
func (db *Dashboard) apiHandler(conn *websocket.Conn) {
- id := atomic.AddUint32(&nextID, 1)
+ id := atomic.AddUint32(&db.nextConnID, 1)
client := &client{
conn: conn,
msg: make(chan *Message, 128),
@@ -233,10 +229,21 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) {
}
}()
- db.lock.Lock()
// Send the past data.
- client.msg <- deepcopy.Copy(db.history).(*Message)
+ db.sysLock.RLock()
+ db.peerLock.RLock()
+ db.logLock.RLock()
+
+ h := deepcopy.Copy(db.history).(*Message)
+
+ db.sysLock.RUnlock()
+ db.peerLock.RUnlock()
+ db.logLock.RUnlock()
+
+ client.msg <- h
+
// Start tracking the connection and drop at connection loss.
+ db.lock.Lock()
db.conns[id] = client
db.lock.Unlock()
defer func() {
@@ -259,136 +266,6 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) {
}
}
-// meterCollector returns a function, which retrieves a specific meter.
-func meterCollector(name string) func() int64 {
- if metric := metrics.DefaultRegistry.Get(name); metric != nil {
- m := metric.(metrics.Meter)
- return func() int64 {
- return m.Count()
- }
- }
- return func() int64 {
- return 0
- }
-}
-
-// collectData collects the required data to plot on the dashboard.
-func (db *Dashboard) collectData() {
- defer db.wg.Done()
-
- systemCPUUsage := gosigar.Cpu{}
- systemCPUUsage.Get()
- var (
- mem runtime.MemStats
-
- collectNetworkIngress = meterCollector("p2p/InboundTraffic")
- collectNetworkEgress = meterCollector("p2p/OutboundTraffic")
- collectDiskRead = meterCollector("eth/db/chaindata/disk/read")
- collectDiskWrite = meterCollector("eth/db/chaindata/disk/write")
-
- prevNetworkIngress = collectNetworkIngress()
- prevNetworkEgress = collectNetworkEgress()
- prevProcessCPUTime = getProcessCPUTime()
- prevSystemCPUUsage = systemCPUUsage
- prevDiskRead = collectDiskRead()
- prevDiskWrite = collectDiskWrite()
-
- frequency = float64(db.config.Refresh / time.Second)
- numCPU = float64(runtime.NumCPU())
- )
-
- for {
- select {
- case errc := <-db.quit:
- errc <- nil
- return
- case <-time.After(db.config.Refresh):
- systemCPUUsage.Get()
- var (
- curNetworkIngress = collectNetworkIngress()
- curNetworkEgress = collectNetworkEgress()
- curProcessCPUTime = getProcessCPUTime()
- curSystemCPUUsage = systemCPUUsage
- curDiskRead = collectDiskRead()
- curDiskWrite = collectDiskWrite()
-
- deltaNetworkIngress = float64(curNetworkIngress - prevNetworkIngress)
- deltaNetworkEgress = float64(curNetworkEgress - prevNetworkEgress)
- deltaProcessCPUTime = curProcessCPUTime - prevProcessCPUTime
- deltaSystemCPUUsage = curSystemCPUUsage.Delta(prevSystemCPUUsage)
- deltaDiskRead = curDiskRead - prevDiskRead
- deltaDiskWrite = curDiskWrite - prevDiskWrite
- )
- prevNetworkIngress = curNetworkIngress
- prevNetworkEgress = curNetworkEgress
- prevProcessCPUTime = curProcessCPUTime
- prevSystemCPUUsage = curSystemCPUUsage
- prevDiskRead = curDiskRead
- prevDiskWrite = curDiskWrite
-
- now := time.Now()
-
- runtime.ReadMemStats(&mem)
- activeMemory := &ChartEntry{
- Time: now,
- Value: float64(mem.Alloc) / frequency,
- }
- virtualMemory := &ChartEntry{
- Time: now,
- Value: float64(mem.Sys) / frequency,
- }
- networkIngress := &ChartEntry{
- Time: now,
- Value: deltaNetworkIngress / frequency,
- }
- networkEgress := &ChartEntry{
- Time: now,
- Value: deltaNetworkEgress / frequency,
- }
- processCPU := &ChartEntry{
- Time: now,
- Value: deltaProcessCPUTime / frequency / numCPU * 100,
- }
- systemCPU := &ChartEntry{
- Time: now,
- Value: float64(deltaSystemCPUUsage.Sys+deltaSystemCPUUsage.User) / frequency / numCPU,
- }
- diskRead := &ChartEntry{
- Time: now,
- Value: float64(deltaDiskRead) / frequency,
- }
- diskWrite := &ChartEntry{
- Time: now,
- Value: float64(deltaDiskWrite) / frequency,
- }
- sys := db.history.System
- db.lock.Lock()
- sys.ActiveMemory = append(sys.ActiveMemory[1:], activeMemory)
- sys.VirtualMemory = append(sys.VirtualMemory[1:], virtualMemory)
- sys.NetworkIngress = append(sys.NetworkIngress[1:], networkIngress)
- sys.NetworkEgress = append(sys.NetworkEgress[1:], networkEgress)
- sys.ProcessCPU = append(sys.ProcessCPU[1:], processCPU)
- sys.SystemCPU = append(sys.SystemCPU[1:], systemCPU)
- sys.DiskRead = append(sys.DiskRead[1:], diskRead)
- sys.DiskWrite = append(sys.DiskWrite[1:], diskWrite)
- db.lock.Unlock()
-
- db.sendToAll(&Message{
- System: &SystemMessage{
- ActiveMemory: ChartEntries{activeMemory},
- VirtualMemory: ChartEntries{virtualMemory},
- NetworkIngress: ChartEntries{networkIngress},
- NetworkEgress: ChartEntries{networkEgress},
- ProcessCPU: ChartEntries{processCPU},
- SystemCPU: ChartEntries{systemCPU},
- DiskRead: ChartEntries{diskRead},
- DiskWrite: ChartEntries{diskWrite},
- },
- })
- }
- }
-}
-
// sendToAll sends the given message to the active dashboards.
func (db *Dashboard) sendToAll(msg *Message) {
db.lock.Lock()