diff options
Diffstat (limited to 'dashboard/dashboard.go')
-rw-r--r-- | dashboard/dashboard.go | 305 |
1 files changed, 305 insertions, 0 deletions
diff --git a/dashboard/dashboard.go b/dashboard/dashboard.go new file mode 100644 index 000000000..10a363619 --- /dev/null +++ b/dashboard/dashboard.go @@ -0,0 +1,305 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package dashboard + +//go:generate go-bindata -nometadata -o assets.go -prefix assets -pkg dashboard assets/public/... + +import ( + "fmt" + "io/ioutil" + "net" + "net/http" + "path/filepath" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rpc" + "github.com/rcrowley/go-metrics" + "golang.org/x/net/websocket" +) + +const ( + memorySampleLimit = 200 // Maximum number of memory data samples + trafficSampleLimit = 200 // Maximum number of traffic data samples +) + +var nextId uint32 // Next connection id + +// Dashboard contains the dashboard internals. +type Dashboard struct { + config *Config + + listener net.Listener + conns map[uint32]*client // Currently live websocket connections + charts charts // The collected data samples to plot + lock sync.RWMutex // Lock protecting the dashboard's internals + + quit chan chan error // Channel used for graceful exit + wg sync.WaitGroup +} + +// message embraces the data samples of a client message. +type message struct { + History *charts `json:"history,omitempty"` // Past data samples + Memory *chartEntry `json:"memory,omitempty"` // One memory sample + Traffic *chartEntry `json:"traffic,omitempty"` // One traffic sample + Log string `json:"log,omitempty"` // One log +} + +// client represents active websocket connection with a remote browser. +type client struct { + conn *websocket.Conn // Particular live websocket connection + msg chan message // Message queue for the update messages + logger log.Logger // Logger for the particular live websocket connection +} + +// charts contains the collected data samples. +type charts struct { + Memory []*chartEntry `json:"memorySamples,omitempty"` + Traffic []*chartEntry `json:"trafficSamples,omitempty"` +} + +// chartEntry represents one data sample +type chartEntry struct { + Time time.Time `json:"time,omitempty"` + Value float64 `json:"value,omitempty"` +} + +// New creates a new dashboard instance with the given configuration. +func New(config *Config) (*Dashboard, error) { + return &Dashboard{ + conns: make(map[uint32]*client), + config: config, + quit: make(chan chan error), + }, nil +} + +// Protocols is a meaningless implementation of node.Service. +func (db *Dashboard) Protocols() []p2p.Protocol { return nil } + +// APIs is a meaningless implementation of node.Service. +func (db *Dashboard) APIs() []rpc.API { return nil } + +// Start implements node.Service, starting the data collection thread and the listening server of the dashboard. +func (db *Dashboard) Start(server *p2p.Server) error { + db.wg.Add(2) + go db.collectData() + go db.collectLogs() // In case of removing this line change 2 back to 1 in wg.Add. + + http.HandleFunc("/", db.webHandler) + http.Handle("/api", websocket.Handler(db.apiHandler)) + + listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", db.config.Host, db.config.Port)) + if err != nil { + return err + } + db.listener = listener + + go http.Serve(listener, nil) + + return nil +} + +// Stop implements node.Service, stopping the data collection thread and the connection listener of the dashboard. +func (db *Dashboard) Stop() error { + // Close the connection listener. + var errs []error + if err := db.listener.Close(); err != nil { + errs = append(errs, err) + } + // Close the collectors. + errc := make(chan error, 1) + for i := 0; i < 2; i++ { + db.quit <- errc + if err := <-errc; err != nil { + errs = append(errs, err) + } + } + // Close the connections. + db.lock.Lock() + for _, c := range db.conns { + if err := c.conn.Close(); err != nil { + c.logger.Warn("Failed to close connection", "err", err) + } + } + db.lock.Unlock() + + // Wait until every goroutine terminates. + db.wg.Wait() + log.Info("Dashboard stopped") + + var err error + if len(errs) > 0 { + err = fmt.Errorf("%v", errs) + } + + return err +} + +// webHandler handles all non-api requests, simply flattening and returning the dashboard website. +func (db *Dashboard) webHandler(w http.ResponseWriter, r *http.Request) { + log.Debug("Request", "URL", r.URL) + + path := r.URL.String() + if path == "/" { + path = "/dashboard.html" + } + // If the path of the assets is manually set + if db.config.Assets != "" { + blob, err := ioutil.ReadFile(filepath.Join(db.config.Assets, path)) + if err != nil { + log.Warn("Failed to read file", "path", path, "err", err) + http.Error(w, "not found", http.StatusNotFound) + return + } + w.Write(blob) + return + } + blob, err := Asset(filepath.Join("public", path)) + if err != nil { + log.Warn("Failed to load the asset", "path", path, "err", err) + http.Error(w, "not found", http.StatusNotFound) + return + } + w.Write(blob) +} + +// apiHandler handles requests for the dashboard. +func (db *Dashboard) apiHandler(conn *websocket.Conn) { + id := atomic.AddUint32(&nextId, 1) + client := &client{ + conn: conn, + msg: make(chan message, 128), + logger: log.New("id", id), + } + done := make(chan struct{}) // Buffered channel as sender may exit early + + // Start listening for messages to send. + db.wg.Add(1) + go func() { + defer db.wg.Done() + + for { + select { + case <-done: + return + case msg := <-client.msg: + if err := websocket.JSON.Send(client.conn, msg); err != nil { + client.logger.Warn("Failed to send the message", "msg", msg, "err", err) + client.conn.Close() + return + } + } + } + }() + // Send the past data. + client.msg <- message{ + History: &db.charts, + } + // Start tracking the connection and drop at connection loss. + db.lock.Lock() + db.conns[id] = client + db.lock.Unlock() + defer func() { + db.lock.Lock() + delete(db.conns, id) + db.lock.Unlock() + }() + for { + fail := []byte{} + if _, err := conn.Read(fail); err != nil { + close(done) + return + } + // Ignore all messages + } +} + +// collectData collects the required data to plot on the dashboard. +func (db *Dashboard) collectData() { + defer db.wg.Done() + + for { + select { + case errc := <-db.quit: + errc <- nil + return + case <-time.After(db.config.Refresh): + inboundTraffic := metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Rate1() + memoryInUse := metrics.DefaultRegistry.Get("system/memory/inuse").(metrics.Meter).Rate1() + now := time.Now() + memory := &chartEntry{ + Time: now, + Value: memoryInUse, + } + traffic := &chartEntry{ + Time: now, + Value: inboundTraffic, + } + // Remove the first elements in case the samples' amount exceeds the limit. + first := 0 + if len(db.charts.Memory) == memorySampleLimit { + first = 1 + } + db.charts.Memory = append(db.charts.Memory[first:], memory) + first = 0 + if len(db.charts.Traffic) == trafficSampleLimit { + first = 1 + } + db.charts.Traffic = append(db.charts.Traffic[first:], traffic) + + db.sendToAll(&message{ + Memory: memory, + Traffic: traffic, + }) + } + } +} + +// collectLogs collects and sends the logs to the active dashboards. +func (db *Dashboard) collectLogs() { + defer db.wg.Done() + + // TODO (kurkomisi): log collection comes here. + for { + select { + case errc := <-db.quit: + errc <- nil + return + case <-time.After(db.config.Refresh / 2): + db.sendToAll(&message{ + Log: "This is a fake log.", + }) + } + } +} + +// sendToAll sends the given message to the active dashboards. +func (db *Dashboard) sendToAll(msg *message) { + db.lock.Lock() + for _, c := range db.conns { + select { + case c.msg <- *msg: + default: + c.conn.Close() + } + } + db.lock.Unlock() +} |