aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--containers/docker/master-alpine/Dockerfile2
-rw-r--r--core/tx_pool.go53
-rw-r--r--core/tx_pool_test.go14
-rw-r--r--ethstats/ethstats.go73
4 files changed, 87 insertions, 55 deletions
diff --git a/containers/docker/master-alpine/Dockerfile b/containers/docker/master-alpine/Dockerfile
index 3c72bc8c8..b2467f1a7 100644
--- a/containers/docker/master-alpine/Dockerfile
+++ b/containers/docker/master-alpine/Dockerfile
@@ -2,7 +2,7 @@ FROM alpine:3.5
RUN \
apk add --update go git make gcc musl-dev linux-headers ca-certificates && \
- git clone --depth 1 --branch release/1.5 https://github.com/ethereum/go-ethereum && \
+ git clone --depth 1 --branch release/1.6 https://github.com/ethereum/go-ethereum && \
(cd go-ethereum && make geth) && \
cp go-ethereum/build/bin/geth /geth && \
apk del go git make gcc musl-dev linux-headers && \
diff --git a/core/tx_pool.go b/core/tx_pool.go
index 1f5b46d4b..04ffa8a98 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -251,7 +251,7 @@ func (pool *TxPool) resetState() {
}
// Check the queue and move transactions over to the pending if possible
// or remove those that have become invalid
- pool.promoteExecutables(currentState)
+ pool.promoteExecutables(currentState, nil)
}
// Stop terminates the transaction pool.
@@ -339,17 +339,6 @@ func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) {
pool.mu.Lock()
defer pool.mu.Unlock()
- state, err := pool.currentState()
- if err != nil {
- return nil, err
- }
-
- // check queue first
- pool.promoteExecutables(state)
-
- // invalidate any txs
- pool.demoteUnexecutables(state)
-
pending := make(map[common.Address]types.Transactions)
for addr, list := range pool.pending {
pending[addr] = list.Flatten()
@@ -551,13 +540,14 @@ func (pool *TxPool) Add(tx *types.Transaction) error {
if err != nil {
return err
}
- state, err := pool.currentState()
- if err != nil {
- return err
- }
// If we added a new transaction, run promotion checks and return
if !replace {
- pool.promoteExecutables(state)
+ state, err := pool.currentState()
+ if err != nil {
+ return err
+ }
+ from, _ := types.Sender(pool.signer, tx) // already validated
+ pool.promoteExecutables(state, []common.Address{from})
}
return nil
}
@@ -568,24 +558,26 @@ func (pool *TxPool) AddBatch(txs []*types.Transaction) error {
defer pool.mu.Unlock()
// Add the batch of transaction, tracking the accepted ones
- replaced, added := true, 0
+ dirty := make(map[common.Address]struct{})
for _, tx := range txs {
if replace, err := pool.add(tx); err == nil {
- added++
if !replace {
- replaced = false
+ from, _ := types.Sender(pool.signer, tx) // already validated
+ dirty[from] = struct{}{}
}
}
}
// Only reprocess the internal state if something was actually added
- if added > 0 {
+ if len(dirty) > 0 {
state, err := pool.currentState()
if err != nil {
return err
}
- if !replaced {
- pool.promoteExecutables(state)
+ addrs := make([]common.Address, 0, len(dirty))
+ for addr, _ := range dirty {
+ addrs = append(addrs, addr)
}
+ pool.promoteExecutables(state, addrs)
}
return nil
}
@@ -662,12 +654,23 @@ func (pool *TxPool) removeTx(hash common.Hash) {
// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
-func (pool *TxPool) promoteExecutables(state *state.StateDB) {
+func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.Address) {
gaslimit := pool.gasLimit()
+ // Gather all the accounts potentially needing updates
+ if accounts == nil {
+ accounts = make([]common.Address, 0, len(pool.queue))
+ for addr, _ := range pool.queue {
+ accounts = append(accounts, addr)
+ }
+ }
// Iterate over all accounts and promote any executable transactions
queued := uint64(0)
- for addr, list := range pool.queue {
+ for _, addr := range accounts {
+ list := pool.queue[addr]
+ if list == nil {
+ continue // Just in case someone calls with a non existing account
+ }
// Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(state.GetNonce(addr)) {
hash := tx.Hash()
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index c12bd20a1..94b07170d 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -175,7 +175,7 @@ func TestTransactionQueue(t *testing.T) {
pool.resetState()
pool.enqueueTx(tx.Hash(), tx)
- pool.promoteExecutables(currentState)
+ pool.promoteExecutables(currentState, []common.Address{from})
if len(pool.pending) != 1 {
t.Error("expected valid txs to be 1 is", len(pool.pending))
}
@@ -184,7 +184,7 @@ func TestTransactionQueue(t *testing.T) {
from, _ = deriveSender(tx)
currentState.SetNonce(from, 2)
pool.enqueueTx(tx.Hash(), tx)
- pool.promoteExecutables(currentState)
+ pool.promoteExecutables(currentState, []common.Address{from})
if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
t.Error("expected transaction to be in tx pool")
}
@@ -206,7 +206,7 @@ func TestTransactionQueue(t *testing.T) {
pool.enqueueTx(tx2.Hash(), tx2)
pool.enqueueTx(tx3.Hash(), tx3)
- pool.promoteExecutables(currentState)
+ pool.promoteExecutables(currentState, []common.Address{from})
if len(pool.pending) != 1 {
t.Error("expected tx pool to be 1, got", len(pool.pending))
@@ -304,16 +304,16 @@ func TestTransactionDoubleNonce(t *testing.T) {
t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace)
}
state, _ := pool.currentState()
- pool.promoteExecutables(state)
+ pool.promoteExecutables(state, []common.Address{addr})
if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
}
if tx := pool.pending[addr].txs.items[0]; tx.Hash() != tx2.Hash() {
t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash())
}
- // Add the thid transaction and ensure it's not saved (smaller price)
+ // Add the third transaction and ensure it's not saved (smaller price)
pool.add(tx3)
- pool.promoteExecutables(state)
+ pool.promoteExecutables(state, []common.Address{addr})
if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
}
@@ -1087,7 +1087,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
// Benchmark the speed of pool validation
b.ResetTimer()
for i := 0; i < b.N; i++ {
- pool.promoteExecutables(state)
+ pool.promoteExecutables(state, nil)
}
}
diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go
index ad77cd1e8..333c975c9 100644
--- a/ethstats/ethstats.go
+++ b/ethstats/ethstats.go
@@ -31,6 +31,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
@@ -119,7 +120,7 @@ func (s *Service) Stop() error {
// loop keeps trying to connect to the netstats server, reporting chain events
// until termination.
func (s *Service) loop() {
- // Subscribe tso chain events to execute updates on
+ // Subscribe to chain events to execute updates on
var emux *event.TypeMux
if s.eth != nil {
emux = s.eth.EventMux()
@@ -132,6 +133,46 @@ func (s *Service) loop() {
txSub := emux.Subscribe(core.TxPreEvent{})
defer txSub.Unsubscribe()
+ // Start a goroutine that exhausts the subsciptions to avoid events piling up
+ var (
+ quitCh = make(chan struct{})
+ headCh = make(chan *types.Block, 1)
+ txCh = make(chan struct{}, 1)
+ )
+ go func() {
+ var lastTx mclock.AbsTime
+
+ for {
+ select {
+ // Notify of chain head events, but drop if too frequent
+ case head, ok := <-headSub.Chan():
+ if !ok { // node stopped
+ close(quitCh)
+ return
+ }
+ select {
+ case headCh <- head.Data.(core.ChainHeadEvent).Block:
+ default:
+ }
+
+ // Notify of new transaction events, but drop if too frequent
+ case _, ok := <-txSub.Chan():
+ if !ok { // node stopped
+ close(quitCh)
+ return
+ }
+ if time.Duration(mclock.Now()-lastTx) < time.Second {
+ continue
+ }
+ lastTx = mclock.Now()
+
+ select {
+ case txCh <- struct{}{}:
+ default:
+ }
+ }
+ }
+ }()
// Loop reporting until termination
for {
// Resolve the URL, defaulting to TLS, but falling back to none too
@@ -151,7 +192,7 @@ func (s *Service) loop() {
if conf, err = websocket.NewConfig(url, "http://localhost/"); err != nil {
continue
}
- conf.Dialer = &net.Dialer{Timeout: 3 * time.Second}
+ conf.Dialer = &net.Dialer{Timeout: 5 * time.Second}
if conn, err = websocket.DialConfig(conf); err == nil {
break
}
@@ -181,6 +222,10 @@ func (s *Service) loop() {
for err == nil {
select {
+ case <-quitCh:
+ conn.Close()
+ return
+
case <-fullReport.C:
if err = s.report(conn); err != nil {
log.Warn("Full stats report failed", "err", err)
@@ -189,30 +234,14 @@ func (s *Service) loop() {
if err = s.reportHistory(conn, list); err != nil {
log.Warn("Requested history report failed", "err", err)
}
- case head, ok := <-headSub.Chan():
- if !ok { // node stopped
- conn.Close()
- return
- }
- if err = s.reportBlock(conn, head.Data.(core.ChainHeadEvent).Block); err != nil {
+ case head := <-headCh:
+ if err = s.reportBlock(conn, head); err != nil {
log.Warn("Block stats report failed", "err", err)
}
if err = s.reportPending(conn); err != nil {
log.Warn("Post-block transaction stats report failed", "err", err)
}
- case _, ok := <-txSub.Chan():
- if !ok { // node stopped
- conn.Close()
- return
- }
- // Exhaust events to avoid reporting too frequently
- for exhausted := false; !exhausted; {
- select {
- case <-headSub.Chan():
- default:
- exhausted = true
- }
- }
+ case <-txCh:
if err = s.reportPending(conn); err != nil {
log.Warn("Transaction stats report failed", "err", err)
}
@@ -398,7 +427,7 @@ func (s *Service) reportLatency(conn *websocket.Conn) error {
select {
case <-s.pongCh:
// Pong delivered, report the latency
- case <-time.After(3 * time.Second):
+ case <-time.After(5 * time.Second):
// Ping timeout, abort
return errors.New("ping timed out")
}