aboutsummaryrefslogtreecommitdiffstats
path: root/dashboard/dashboard.go
diff options
context:
space:
mode:
Diffstat (limited to 'dashboard/dashboard.go')
-rw-r--r--dashboard/dashboard.go225
1 files changed, 167 insertions, 58 deletions
diff --git a/dashboard/dashboard.go b/dashboard/dashboard.go
index 10a363619..09038638e 100644
--- a/dashboard/dashboard.go
+++ b/dashboard/dashboard.go
@@ -16,7 +16,12 @@
package dashboard
-//go:generate go-bindata -nometadata -o assets.go -prefix assets -pkg dashboard assets/public/...
+//go:generate npm --prefix ./assets install
+//go:generate ./assets/node_modules/.bin/webpack --config ./assets/webpack.config.js --context ./assets
+//go:generate go-bindata -nometadata -o assets.go -prefix assets -nocompress -pkg dashboard assets/dashboard.html assets/bundle.js
+//go:generate sh -c "sed 's#var _bundleJs#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
+//go:generate sh -c "sed 's#var _dashboardHtml#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
+//go:generate gofmt -w -s assets.go
import (
"fmt"
@@ -24,23 +29,32 @@ import (
"net"
"net/http"
"path/filepath"
+ "runtime"
"sync"
"sync/atomic"
"time"
+ "github.com/elastic/gosigar"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/rcrowley/go-metrics"
"golang.org/x/net/websocket"
)
const (
- memorySampleLimit = 200 // Maximum number of memory data samples
- trafficSampleLimit = 200 // Maximum number of traffic data samples
+ activeMemorySampleLimit = 200 // Maximum number of active memory data samples
+ virtualMemorySampleLimit = 200 // Maximum number of virtual memory data samples
+ networkIngressSampleLimit = 200 // Maximum number of network ingress data samples
+ networkEgressSampleLimit = 200 // Maximum number of network egress data samples
+ processCPUSampleLimit = 200 // Maximum number of process cpu data samples
+ systemCPUSampleLimit = 200 // Maximum number of system cpu data samples
+ diskReadSampleLimit = 200 // Maximum number of disk read data samples
+ diskWriteSampleLimit = 200 // Maximum number of disk write data samples
)
-var nextId uint32 // Next connection id
+var nextID uint32 // Next connection id
// Dashboard contains the dashboard internals.
type Dashboard struct {
@@ -48,47 +62,52 @@ type Dashboard struct {
listener net.Listener
conns map[uint32]*client // Currently live websocket connections
- charts charts // The collected data samples to plot
- lock sync.RWMutex // Lock protecting the dashboard's internals
+ charts *HomeMessage
+ commit string
+ lock sync.RWMutex // Lock protecting the dashboard's internals
quit chan chan error // Channel used for graceful exit
wg sync.WaitGroup
}
-// message embraces the data samples of a client message.
-type message struct {
- History *charts `json:"history,omitempty"` // Past data samples
- Memory *chartEntry `json:"memory,omitempty"` // One memory sample
- Traffic *chartEntry `json:"traffic,omitempty"` // One traffic sample
- Log string `json:"log,omitempty"` // One log
-}
-
// client represents active websocket connection with a remote browser.
type client struct {
conn *websocket.Conn // Particular live websocket connection
- msg chan message // Message queue for the update messages
+ msg chan Message // Message queue for the update messages
logger log.Logger // Logger for the particular live websocket connection
}
-// charts contains the collected data samples.
-type charts struct {
- Memory []*chartEntry `json:"memorySamples,omitempty"`
- Traffic []*chartEntry `json:"trafficSamples,omitempty"`
-}
-
-// chartEntry represents one data sample
-type chartEntry struct {
- Time time.Time `json:"time,omitempty"`
- Value float64 `json:"value,omitempty"`
-}
-
// New creates a new dashboard instance with the given configuration.
-func New(config *Config) (*Dashboard, error) {
- return &Dashboard{
+func New(config *Config, commit string) (*Dashboard, error) {
+ now := time.Now()
+ db := &Dashboard{
conns: make(map[uint32]*client),
config: config,
quit: make(chan chan error),
- }, nil
+ charts: &HomeMessage{
+ ActiveMemory: emptyChartEntries(now, activeMemorySampleLimit, config.Refresh),
+ VirtualMemory: emptyChartEntries(now, virtualMemorySampleLimit, config.Refresh),
+ NetworkIngress: emptyChartEntries(now, networkIngressSampleLimit, config.Refresh),
+ NetworkEgress: emptyChartEntries(now, networkEgressSampleLimit, config.Refresh),
+ ProcessCPU: emptyChartEntries(now, processCPUSampleLimit, config.Refresh),
+ SystemCPU: emptyChartEntries(now, systemCPUSampleLimit, config.Refresh),
+ DiskRead: emptyChartEntries(now, diskReadSampleLimit, config.Refresh),
+ DiskWrite: emptyChartEntries(now, diskWriteSampleLimit, config.Refresh),
+ },
+ commit: commit,
+ }
+ return db, nil
+}
+
+// emptyChartEntries returns a ChartEntry array containing limit number of empty samples.
+func emptyChartEntries(t time.Time, limit int, refresh time.Duration) ChartEntries {
+ ce := make(ChartEntries, limit)
+ for i := 0; i < limit; i++ {
+ ce[i] = &ChartEntry{
+ Time: t.Add(-time.Duration(i) * refresh),
+ }
+ }
+ return ce
}
// Protocols is a meaningless implementation of node.Service.
@@ -99,6 +118,8 @@ func (db *Dashboard) APIs() []rpc.API { return nil }
// Start implements node.Service, starting the data collection thread and the listening server of the dashboard.
func (db *Dashboard) Start(server *p2p.Server) error {
+ log.Info("Starting dashboard")
+
db.wg.Add(2)
go db.collectData()
go db.collectLogs() // In case of removing this line change 2 back to 1 in wg.Add.
@@ -172,7 +193,7 @@ func (db *Dashboard) webHandler(w http.ResponseWriter, r *http.Request) {
w.Write(blob)
return
}
- blob, err := Asset(filepath.Join("public", path))
+ blob, err := Asset(path[1:])
if err != nil {
log.Warn("Failed to load the asset", "path", path, "err", err)
http.Error(w, "not found", http.StatusNotFound)
@@ -183,13 +204,13 @@ func (db *Dashboard) webHandler(w http.ResponseWriter, r *http.Request) {
// apiHandler handles requests for the dashboard.
func (db *Dashboard) apiHandler(conn *websocket.Conn) {
- id := atomic.AddUint32(&nextId, 1)
+ id := atomic.AddUint32(&nextID, 1)
client := &client{
conn: conn,
- msg: make(chan message, 128),
+ msg: make(chan Message, 128),
logger: log.New("id", id),
}
- done := make(chan struct{}) // Buffered channel as sender may exit early
+ done := make(chan struct{})
// Start listening for messages to send.
db.wg.Add(1)
@@ -209,9 +230,27 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) {
}
}
}()
+
+ versionMeta := ""
+ if len(params.VersionMeta) > 0 {
+ versionMeta = fmt.Sprintf(" (%s)", params.VersionMeta)
+ }
// Send the past data.
- client.msg <- message{
- History: &db.charts,
+ client.msg <- Message{
+ General: &GeneralMessage{
+ Version: fmt.Sprintf("v%d.%d.%d%s", params.VersionMajor, params.VersionMinor, params.VersionPatch, versionMeta),
+ Commit: db.commit,
+ },
+ Home: &HomeMessage{
+ ActiveMemory: db.charts.ActiveMemory,
+ VirtualMemory: db.charts.VirtualMemory,
+ NetworkIngress: db.charts.NetworkIngress,
+ NetworkEgress: db.charts.NetworkEgress,
+ ProcessCPU: db.charts.ProcessCPU,
+ SystemCPU: db.charts.SystemCPU,
+ DiskRead: db.charts.DiskRead,
+ DiskWrite: db.charts.DiskWrite,
+ },
}
// Start tracking the connection and drop at connection loss.
db.lock.Lock()
@@ -235,6 +274,19 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) {
// collectData collects the required data to plot on the dashboard.
func (db *Dashboard) collectData() {
defer db.wg.Done()
+ systemCPUUsage := gosigar.Cpu{}
+ systemCPUUsage.Get()
+ var (
+ prevNetworkIngress = metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Count()
+ prevNetworkEgress = metrics.DefaultRegistry.Get("p2p/OutboundTraffic").(metrics.Meter).Count()
+ prevProcessCPUTime = getProcessCPUTime()
+ prevSystemCPUUsage = systemCPUUsage
+ prevDiskRead = metrics.DefaultRegistry.Get("eth/db/chaindata/compact/input").(metrics.Meter).Count()
+ prevDiskWrite = metrics.DefaultRegistry.Get("eth/db/chaindata/compact/output").(metrics.Meter).Count()
+
+ frequency = float64(db.config.Refresh / time.Second)
+ numCPU = float64(runtime.NumCPU())
+ )
for {
select {
@@ -242,32 +294,85 @@ func (db *Dashboard) collectData() {
errc <- nil
return
case <-time.After(db.config.Refresh):
- inboundTraffic := metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Rate1()
- memoryInUse := metrics.DefaultRegistry.Get("system/memory/inuse").(metrics.Meter).Rate1()
+ systemCPUUsage.Get()
+ var (
+ curNetworkIngress = metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Count()
+ curNetworkEgress = metrics.DefaultRegistry.Get("p2p/OutboundTraffic").(metrics.Meter).Count()
+ curProcessCPUTime = getProcessCPUTime()
+ curSystemCPUUsage = systemCPUUsage
+ curDiskRead = metrics.DefaultRegistry.Get("eth/db/chaindata/compact/input").(metrics.Meter).Count()
+ curDiskWrite = metrics.DefaultRegistry.Get("eth/db/chaindata/compact/output").(metrics.Meter).Count()
+
+ deltaNetworkIngress = float64(curNetworkIngress - prevNetworkIngress)
+ deltaNetworkEgress = float64(curNetworkEgress - prevNetworkEgress)
+ deltaProcessCPUTime = curProcessCPUTime - prevProcessCPUTime
+ deltaSystemCPUUsage = systemCPUUsage.Delta(prevSystemCPUUsage)
+ deltaDiskRead = curDiskRead - prevDiskRead
+ deltaDiskWrite = curDiskWrite - prevDiskWrite
+ )
+ prevNetworkIngress = curNetworkIngress
+ prevNetworkEgress = curNetworkEgress
+ prevProcessCPUTime = curProcessCPUTime
+ prevSystemCPUUsage = curSystemCPUUsage
+ prevDiskRead = curDiskRead
+ prevDiskWrite = curDiskWrite
+
now := time.Now()
- memory := &chartEntry{
+
+ var mem runtime.MemStats
+ runtime.ReadMemStats(&mem)
+ activeMemory := &ChartEntry{
Time: now,
- Value: memoryInUse,
+ Value: float64(mem.Alloc) / frequency,
}
- traffic := &chartEntry{
+ virtualMemory := &ChartEntry{
Time: now,
- Value: inboundTraffic,
+ Value: float64(mem.Sys) / frequency,
}
- // Remove the first elements in case the samples' amount exceeds the limit.
- first := 0
- if len(db.charts.Memory) == memorySampleLimit {
- first = 1
+ networkIngress := &ChartEntry{
+ Time: now,
+ Value: deltaNetworkIngress / frequency,
}
- db.charts.Memory = append(db.charts.Memory[first:], memory)
- first = 0
- if len(db.charts.Traffic) == trafficSampleLimit {
- first = 1
+ networkEgress := &ChartEntry{
+ Time: now,
+ Value: deltaNetworkEgress / frequency,
}
- db.charts.Traffic = append(db.charts.Traffic[first:], traffic)
-
- db.sendToAll(&message{
- Memory: memory,
- Traffic: traffic,
+ processCPU := &ChartEntry{
+ Time: now,
+ Value: deltaProcessCPUTime / frequency / numCPU * 100,
+ }
+ systemCPU := &ChartEntry{
+ Time: now,
+ Value: float64(deltaSystemCPUUsage.Sys+deltaSystemCPUUsage.User) / frequency / numCPU,
+ }
+ diskRead := &ChartEntry{
+ Time: now,
+ Value: float64(deltaDiskRead) / frequency,
+ }
+ diskWrite := &ChartEntry{
+ Time: now,
+ Value: float64(deltaDiskWrite) / frequency,
+ }
+ db.charts.ActiveMemory = append(db.charts.ActiveMemory[1:], activeMemory)
+ db.charts.VirtualMemory = append(db.charts.VirtualMemory[1:], virtualMemory)
+ db.charts.NetworkIngress = append(db.charts.NetworkIngress[1:], networkIngress)
+ db.charts.NetworkEgress = append(db.charts.NetworkEgress[1:], networkEgress)
+ db.charts.ProcessCPU = append(db.charts.ProcessCPU[1:], processCPU)
+ db.charts.SystemCPU = append(db.charts.SystemCPU[1:], systemCPU)
+ db.charts.DiskRead = append(db.charts.DiskRead[1:], diskRead)
+ db.charts.DiskWrite = append(db.charts.DiskRead[1:], diskWrite)
+
+ db.sendToAll(&Message{
+ Home: &HomeMessage{
+ ActiveMemory: ChartEntries{activeMemory},
+ VirtualMemory: ChartEntries{virtualMemory},
+ NetworkIngress: ChartEntries{networkIngress},
+ NetworkEgress: ChartEntries{networkEgress},
+ ProcessCPU: ChartEntries{processCPU},
+ SystemCPU: ChartEntries{systemCPU},
+ DiskRead: ChartEntries{diskRead},
+ DiskWrite: ChartEntries{diskWrite},
+ },
})
}
}
@@ -277,6 +382,7 @@ func (db *Dashboard) collectData() {
func (db *Dashboard) collectLogs() {
defer db.wg.Done()
+ id := 1
// TODO (kurkomisi): log collection comes here.
for {
select {
@@ -284,15 +390,18 @@ func (db *Dashboard) collectLogs() {
errc <- nil
return
case <-time.After(db.config.Refresh / 2):
- db.sendToAll(&message{
- Log: "This is a fake log.",
+ db.sendToAll(&Message{
+ Logs: &LogsMessage{
+ Log: []string{fmt.Sprintf("%-4d: This is a fake log.", id)},
+ },
})
+ id++
}
}
}
// sendToAll sends the given message to the active dashboards.
-func (db *Dashboard) sendToAll(msg *message) {
+func (db *Dashboard) sendToAll(msg *Message) {
db.lock.Lock()
for _, c := range db.conns {
select {