aboutsummaryrefslogtreecommitdiffstats
path: root/ethstats/ethstats.go
diff options
context:
space:
mode:
Diffstat (limited to 'ethstats/ethstats.go')
-rw-r--r--ethstats/ethstats.go73
1 files changed, 51 insertions, 22 deletions
diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go
index ad77cd1e8..333c975c9 100644
--- a/ethstats/ethstats.go
+++ b/ethstats/ethstats.go
@@ -31,6 +31,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
@@ -119,7 +120,7 @@ func (s *Service) Stop() error {
// loop keeps trying to connect to the netstats server, reporting chain events
// until termination.
func (s *Service) loop() {
- // Subscribe tso chain events to execute updates on
+ // Subscribe to chain events to execute updates on
var emux *event.TypeMux
if s.eth != nil {
emux = s.eth.EventMux()
@@ -132,6 +133,46 @@ func (s *Service) loop() {
txSub := emux.Subscribe(core.TxPreEvent{})
defer txSub.Unsubscribe()
+ // Start a goroutine that exhausts the subsciptions to avoid events piling up
+ var (
+ quitCh = make(chan struct{})
+ headCh = make(chan *types.Block, 1)
+ txCh = make(chan struct{}, 1)
+ )
+ go func() {
+ var lastTx mclock.AbsTime
+
+ for {
+ select {
+ // Notify of chain head events, but drop if too frequent
+ case head, ok := <-headSub.Chan():
+ if !ok { // node stopped
+ close(quitCh)
+ return
+ }
+ select {
+ case headCh <- head.Data.(core.ChainHeadEvent).Block:
+ default:
+ }
+
+ // Notify of new transaction events, but drop if too frequent
+ case _, ok := <-txSub.Chan():
+ if !ok { // node stopped
+ close(quitCh)
+ return
+ }
+ if time.Duration(mclock.Now()-lastTx) < time.Second {
+ continue
+ }
+ lastTx = mclock.Now()
+
+ select {
+ case txCh <- struct{}{}:
+ default:
+ }
+ }
+ }
+ }()
// Loop reporting until termination
for {
// Resolve the URL, defaulting to TLS, but falling back to none too
@@ -151,7 +192,7 @@ func (s *Service) loop() {
if conf, err = websocket.NewConfig(url, "http://localhost/"); err != nil {
continue
}
- conf.Dialer = &net.Dialer{Timeout: 3 * time.Second}
+ conf.Dialer = &net.Dialer{Timeout: 5 * time.Second}
if conn, err = websocket.DialConfig(conf); err == nil {
break
}
@@ -181,6 +222,10 @@ func (s *Service) loop() {
for err == nil {
select {
+ case <-quitCh:
+ conn.Close()
+ return
+
case <-fullReport.C:
if err = s.report(conn); err != nil {
log.Warn("Full stats report failed", "err", err)
@@ -189,30 +234,14 @@ func (s *Service) loop() {
if err = s.reportHistory(conn, list); err != nil {
log.Warn("Requested history report failed", "err", err)
}
- case head, ok := <-headSub.Chan():
- if !ok { // node stopped
- conn.Close()
- return
- }
- if err = s.reportBlock(conn, head.Data.(core.ChainHeadEvent).Block); err != nil {
+ case head := <-headCh:
+ if err = s.reportBlock(conn, head); err != nil {
log.Warn("Block stats report failed", "err", err)
}
if err = s.reportPending(conn); err != nil {
log.Warn("Post-block transaction stats report failed", "err", err)
}
- case _, ok := <-txSub.Chan():
- if !ok { // node stopped
- conn.Close()
- return
- }
- // Exhaust events to avoid reporting too frequently
- for exhausted := false; !exhausted; {
- select {
- case <-headSub.Chan():
- default:
- exhausted = true
- }
- }
+ case <-txCh:
if err = s.reportPending(conn); err != nil {
log.Warn("Transaction stats report failed", "err", err)
}
@@ -398,7 +427,7 @@ func (s *Service) reportLatency(conn *websocket.Conn) error {
select {
case <-s.pongCh:
// Pong delivered, report the latency
- case <-time.After(3 * time.Second):
+ case <-time.After(5 * time.Second):
// Ping timeout, abort
return errors.New("ping timed out")
}