aboutsummaryrefslogtreecommitdiffstats
path: root/ethstats/ethstats.go
diff options
context:
space:
mode:
Diffstat (limited to 'ethstats/ethstats.go')
-rw-r--r--ethstats/ethstats.go63
1 files changed, 44 insertions, 19 deletions
diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go
index b75c5e6da..bb03dc72b 100644
--- a/ethstats/ethstats.go
+++ b/ethstats/ethstats.go
@@ -44,9 +44,27 @@ import (
"golang.org/x/net/websocket"
)
-// historyUpdateRange is the number of blocks a node should report upon login or
-// history request.
-const historyUpdateRange = 50
+const (
+ // historyUpdateRange is the number of blocks a node should report upon login or
+ // history request.
+ historyUpdateRange = 50
+
+ // txChanSize is the size of channel listening to TxPreEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
+ // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
+ chainHeadChanSize = 10
+)
+
+type txPool interface {
+ // SubscribeTxPreEvent should return an event subscription of
+ // TxPreEvent and send events to the given channel.
+ SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
+}
+
+type blockChain interface {
+ SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
+}
// Service implements an Ethereum netstats reporting daemon that pushes local
// chain statistics up to a monitoring server.
@@ -118,16 +136,22 @@ func (s *Service) Stop() error {
// until termination.
func (s *Service) loop() {
// Subscribe to chain events to execute updates on
- var emux *event.TypeMux
+ var blockchain blockChain
+ var txpool txPool
if s.eth != nil {
- emux = s.eth.EventMux()
+ blockchain = s.eth.BlockChain()
+ txpool = s.eth.TxPool()
} else {
- emux = s.les.EventMux()
+ blockchain = s.les.BlockChain()
+ txpool = s.les.TxPool()
}
- headSub := emux.Subscribe(core.ChainHeadEvent{})
+
+ chainHeadCh := make(chan core.ChainHeadEvent, chainHeadChanSize)
+ headSub := blockchain.SubscribeChainHeadEvent(chainHeadCh)
defer headSub.Unsubscribe()
- txSub := emux.Subscribe(core.TxPreEvent{})
+ txEventCh := make(chan core.TxPreEvent, txChanSize)
+ txSub := txpool.SubscribeTxPreEvent(txEventCh)
defer txSub.Unsubscribe()
// Start a goroutine that exhausts the subsciptions to avoid events piling up
@@ -139,25 +163,18 @@ func (s *Service) loop() {
go func() {
var lastTx mclock.AbsTime
+ HandleLoop:
for {
select {
// Notify of chain head events, but drop if too frequent
- case head, ok := <-headSub.Chan():
- if !ok { // node stopped
- close(quitCh)
- return
- }
+ case head := <-chainHeadCh:
select {
- case headCh <- head.Data.(core.ChainHeadEvent).Block:
+ case headCh <- head.Block:
default:
}
// Notify of new transaction events, but drop if too frequent
- case _, ok := <-txSub.Chan():
- if !ok { // node stopped
- close(quitCh)
- return
- }
+ case <-txEventCh:
if time.Duration(mclock.Now()-lastTx) < time.Second {
continue
}
@@ -167,8 +184,16 @@ func (s *Service) loop() {
case txCh <- struct{}{}:
default:
}
+
+ // node stopped
+ case <-txSub.Err():
+ break HandleLoop
+ case <-headSub.Err():
+ break HandleLoop
}
}
+ close(quitCh)
+ return
}()
// Loop reporting until termination
for {