diff options
Diffstat (limited to 'dashboard/dashboard.go')
-rw-r--r-- | dashboard/dashboard.go | 225 |
1 files changed, 167 insertions, 58 deletions
diff --git a/dashboard/dashboard.go b/dashboard/dashboard.go index 10a363619..09038638e 100644 --- a/dashboard/dashboard.go +++ b/dashboard/dashboard.go @@ -16,7 +16,12 @@ package dashboard -//go:generate go-bindata -nometadata -o assets.go -prefix assets -pkg dashboard assets/public/... +//go:generate npm --prefix ./assets install +//go:generate ./assets/node_modules/.bin/webpack --config ./assets/webpack.config.js --context ./assets +//go:generate go-bindata -nometadata -o assets.go -prefix assets -nocompress -pkg dashboard assets/dashboard.html assets/bundle.js +//go:generate sh -c "sed 's#var _bundleJs#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go" +//go:generate sh -c "sed 's#var _dashboardHtml#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go" +//go:generate gofmt -w -s assets.go import ( "fmt" @@ -24,23 +29,32 @@ import ( "net" "net/http" "path/filepath" + "runtime" "sync" "sync/atomic" "time" + "github.com/elastic/gosigar" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" "github.com/rcrowley/go-metrics" "golang.org/x/net/websocket" ) const ( - memorySampleLimit = 200 // Maximum number of memory data samples - trafficSampleLimit = 200 // Maximum number of traffic data samples + activeMemorySampleLimit = 200 // Maximum number of active memory data samples + virtualMemorySampleLimit = 200 // Maximum number of virtual memory data samples + networkIngressSampleLimit = 200 // Maximum number of network ingress data samples + networkEgressSampleLimit = 200 // Maximum number of network egress data samples + processCPUSampleLimit = 200 // Maximum number of process cpu data samples + systemCPUSampleLimit = 200 // Maximum number of system cpu data samples + diskReadSampleLimit = 200 // Maximum number of disk read data samples + diskWriteSampleLimit = 200 // Maximum number of disk write data samples ) -var nextId uint32 // Next connection id +var nextID uint32 // Next connection id // Dashboard contains the dashboard internals. type Dashboard struct { @@ -48,47 +62,52 @@ type Dashboard struct { listener net.Listener conns map[uint32]*client // Currently live websocket connections - charts charts // The collected data samples to plot - lock sync.RWMutex // Lock protecting the dashboard's internals + charts *HomeMessage + commit string + lock sync.RWMutex // Lock protecting the dashboard's internals quit chan chan error // Channel used for graceful exit wg sync.WaitGroup } -// message embraces the data samples of a client message. -type message struct { - History *charts `json:"history,omitempty"` // Past data samples - Memory *chartEntry `json:"memory,omitempty"` // One memory sample - Traffic *chartEntry `json:"traffic,omitempty"` // One traffic sample - Log string `json:"log,omitempty"` // One log -} - // client represents active websocket connection with a remote browser. type client struct { conn *websocket.Conn // Particular live websocket connection - msg chan message // Message queue for the update messages + msg chan Message // Message queue for the update messages logger log.Logger // Logger for the particular live websocket connection } -// charts contains the collected data samples. -type charts struct { - Memory []*chartEntry `json:"memorySamples,omitempty"` - Traffic []*chartEntry `json:"trafficSamples,omitempty"` -} - -// chartEntry represents one data sample -type chartEntry struct { - Time time.Time `json:"time,omitempty"` - Value float64 `json:"value,omitempty"` -} - // New creates a new dashboard instance with the given configuration. -func New(config *Config) (*Dashboard, error) { - return &Dashboard{ +func New(config *Config, commit string) (*Dashboard, error) { + now := time.Now() + db := &Dashboard{ conns: make(map[uint32]*client), config: config, quit: make(chan chan error), - }, nil + charts: &HomeMessage{ + ActiveMemory: emptyChartEntries(now, activeMemorySampleLimit, config.Refresh), + VirtualMemory: emptyChartEntries(now, virtualMemorySampleLimit, config.Refresh), + NetworkIngress: emptyChartEntries(now, networkIngressSampleLimit, config.Refresh), + NetworkEgress: emptyChartEntries(now, networkEgressSampleLimit, config.Refresh), + ProcessCPU: emptyChartEntries(now, processCPUSampleLimit, config.Refresh), + SystemCPU: emptyChartEntries(now, systemCPUSampleLimit, config.Refresh), + DiskRead: emptyChartEntries(now, diskReadSampleLimit, config.Refresh), + DiskWrite: emptyChartEntries(now, diskWriteSampleLimit, config.Refresh), + }, + commit: commit, + } + return db, nil +} + +// emptyChartEntries returns a ChartEntry array containing limit number of empty samples. +func emptyChartEntries(t time.Time, limit int, refresh time.Duration) ChartEntries { + ce := make(ChartEntries, limit) + for i := 0; i < limit; i++ { + ce[i] = &ChartEntry{ + Time: t.Add(-time.Duration(i) * refresh), + } + } + return ce } // Protocols is a meaningless implementation of node.Service. @@ -99,6 +118,8 @@ func (db *Dashboard) APIs() []rpc.API { return nil } // Start implements node.Service, starting the data collection thread and the listening server of the dashboard. func (db *Dashboard) Start(server *p2p.Server) error { + log.Info("Starting dashboard") + db.wg.Add(2) go db.collectData() go db.collectLogs() // In case of removing this line change 2 back to 1 in wg.Add. @@ -172,7 +193,7 @@ func (db *Dashboard) webHandler(w http.ResponseWriter, r *http.Request) { w.Write(blob) return } - blob, err := Asset(filepath.Join("public", path)) + blob, err := Asset(path[1:]) if err != nil { log.Warn("Failed to load the asset", "path", path, "err", err) http.Error(w, "not found", http.StatusNotFound) @@ -183,13 +204,13 @@ func (db *Dashboard) webHandler(w http.ResponseWriter, r *http.Request) { // apiHandler handles requests for the dashboard. func (db *Dashboard) apiHandler(conn *websocket.Conn) { - id := atomic.AddUint32(&nextId, 1) + id := atomic.AddUint32(&nextID, 1) client := &client{ conn: conn, - msg: make(chan message, 128), + msg: make(chan Message, 128), logger: log.New("id", id), } - done := make(chan struct{}) // Buffered channel as sender may exit early + done := make(chan struct{}) // Start listening for messages to send. db.wg.Add(1) @@ -209,9 +230,27 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) { } } }() + + versionMeta := "" + if len(params.VersionMeta) > 0 { + versionMeta = fmt.Sprintf(" (%s)", params.VersionMeta) + } // Send the past data. - client.msg <- message{ - History: &db.charts, + client.msg <- Message{ + General: &GeneralMessage{ + Version: fmt.Sprintf("v%d.%d.%d%s", params.VersionMajor, params.VersionMinor, params.VersionPatch, versionMeta), + Commit: db.commit, + }, + Home: &HomeMessage{ + ActiveMemory: db.charts.ActiveMemory, + VirtualMemory: db.charts.VirtualMemory, + NetworkIngress: db.charts.NetworkIngress, + NetworkEgress: db.charts.NetworkEgress, + ProcessCPU: db.charts.ProcessCPU, + SystemCPU: db.charts.SystemCPU, + DiskRead: db.charts.DiskRead, + DiskWrite: db.charts.DiskWrite, + }, } // Start tracking the connection and drop at connection loss. db.lock.Lock() @@ -235,6 +274,19 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) { // collectData collects the required data to plot on the dashboard. func (db *Dashboard) collectData() { defer db.wg.Done() + systemCPUUsage := gosigar.Cpu{} + systemCPUUsage.Get() + var ( + prevNetworkIngress = metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Count() + prevNetworkEgress = metrics.DefaultRegistry.Get("p2p/OutboundTraffic").(metrics.Meter).Count() + prevProcessCPUTime = getProcessCPUTime() + prevSystemCPUUsage = systemCPUUsage + prevDiskRead = metrics.DefaultRegistry.Get("eth/db/chaindata/compact/input").(metrics.Meter).Count() + prevDiskWrite = metrics.DefaultRegistry.Get("eth/db/chaindata/compact/output").(metrics.Meter).Count() + + frequency = float64(db.config.Refresh / time.Second) + numCPU = float64(runtime.NumCPU()) + ) for { select { @@ -242,32 +294,85 @@ func (db *Dashboard) collectData() { errc <- nil return case <-time.After(db.config.Refresh): - inboundTraffic := metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Rate1() - memoryInUse := metrics.DefaultRegistry.Get("system/memory/inuse").(metrics.Meter).Rate1() + systemCPUUsage.Get() + var ( + curNetworkIngress = metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Count() + curNetworkEgress = metrics.DefaultRegistry.Get("p2p/OutboundTraffic").(metrics.Meter).Count() + curProcessCPUTime = getProcessCPUTime() + curSystemCPUUsage = systemCPUUsage + curDiskRead = metrics.DefaultRegistry.Get("eth/db/chaindata/compact/input").(metrics.Meter).Count() + curDiskWrite = metrics.DefaultRegistry.Get("eth/db/chaindata/compact/output").(metrics.Meter).Count() + + deltaNetworkIngress = float64(curNetworkIngress - prevNetworkIngress) + deltaNetworkEgress = float64(curNetworkEgress - prevNetworkEgress) + deltaProcessCPUTime = curProcessCPUTime - prevProcessCPUTime + deltaSystemCPUUsage = systemCPUUsage.Delta(prevSystemCPUUsage) + deltaDiskRead = curDiskRead - prevDiskRead + deltaDiskWrite = curDiskWrite - prevDiskWrite + ) + prevNetworkIngress = curNetworkIngress + prevNetworkEgress = curNetworkEgress + prevProcessCPUTime = curProcessCPUTime + prevSystemCPUUsage = curSystemCPUUsage + prevDiskRead = curDiskRead + prevDiskWrite = curDiskWrite + now := time.Now() - memory := &chartEntry{ + + var mem runtime.MemStats + runtime.ReadMemStats(&mem) + activeMemory := &ChartEntry{ Time: now, - Value: memoryInUse, + Value: float64(mem.Alloc) / frequency, } - traffic := &chartEntry{ + virtualMemory := &ChartEntry{ Time: now, - Value: inboundTraffic, + Value: float64(mem.Sys) / frequency, } - // Remove the first elements in case the samples' amount exceeds the limit. - first := 0 - if len(db.charts.Memory) == memorySampleLimit { - first = 1 + networkIngress := &ChartEntry{ + Time: now, + Value: deltaNetworkIngress / frequency, } - db.charts.Memory = append(db.charts.Memory[first:], memory) - first = 0 - if len(db.charts.Traffic) == trafficSampleLimit { - first = 1 + networkEgress := &ChartEntry{ + Time: now, + Value: deltaNetworkEgress / frequency, } - db.charts.Traffic = append(db.charts.Traffic[first:], traffic) - - db.sendToAll(&message{ - Memory: memory, - Traffic: traffic, + processCPU := &ChartEntry{ + Time: now, + Value: deltaProcessCPUTime / frequency / numCPU * 100, + } + systemCPU := &ChartEntry{ + Time: now, + Value: float64(deltaSystemCPUUsage.Sys+deltaSystemCPUUsage.User) / frequency / numCPU, + } + diskRead := &ChartEntry{ + Time: now, + Value: float64(deltaDiskRead) / frequency, + } + diskWrite := &ChartEntry{ + Time: now, + Value: float64(deltaDiskWrite) / frequency, + } + db.charts.ActiveMemory = append(db.charts.ActiveMemory[1:], activeMemory) + db.charts.VirtualMemory = append(db.charts.VirtualMemory[1:], virtualMemory) + db.charts.NetworkIngress = append(db.charts.NetworkIngress[1:], networkIngress) + db.charts.NetworkEgress = append(db.charts.NetworkEgress[1:], networkEgress) + db.charts.ProcessCPU = append(db.charts.ProcessCPU[1:], processCPU) + db.charts.SystemCPU = append(db.charts.SystemCPU[1:], systemCPU) + db.charts.DiskRead = append(db.charts.DiskRead[1:], diskRead) + db.charts.DiskWrite = append(db.charts.DiskRead[1:], diskWrite) + + db.sendToAll(&Message{ + Home: &HomeMessage{ + ActiveMemory: ChartEntries{activeMemory}, + VirtualMemory: ChartEntries{virtualMemory}, + NetworkIngress: ChartEntries{networkIngress}, + NetworkEgress: ChartEntries{networkEgress}, + ProcessCPU: ChartEntries{processCPU}, + SystemCPU: ChartEntries{systemCPU}, + DiskRead: ChartEntries{diskRead}, + DiskWrite: ChartEntries{diskWrite}, + }, }) } } @@ -277,6 +382,7 @@ func (db *Dashboard) collectData() { func (db *Dashboard) collectLogs() { defer db.wg.Done() + id := 1 // TODO (kurkomisi): log collection comes here. for { select { @@ -284,15 +390,18 @@ func (db *Dashboard) collectLogs() { errc <- nil return case <-time.After(db.config.Refresh / 2): - db.sendToAll(&message{ - Log: "This is a fake log.", + db.sendToAll(&Message{ + Logs: &LogsMessage{ + Log: []string{fmt.Sprintf("%-4d: This is a fake log.", id)}, + }, }) + id++ } } } // sendToAll sends the given message to the active dashboards. -func (db *Dashboard) sendToAll(msg *message) { +func (db *Dashboard) sendToAll(msg *Message) { db.lock.Lock() for _, c := range db.conns { select { |