diff options
-rw-r--r-- | containers/docker/master-alpine/Dockerfile | 2 | ||||
-rw-r--r-- | core/tx_pool.go | 53 | ||||
-rw-r--r-- | core/tx_pool_test.go | 14 | ||||
-rw-r--r-- | ethstats/ethstats.go | 73 |
4 files changed, 87 insertions, 55 deletions
diff --git a/containers/docker/master-alpine/Dockerfile b/containers/docker/master-alpine/Dockerfile index 3c72bc8c8..b2467f1a7 100644 --- a/containers/docker/master-alpine/Dockerfile +++ b/containers/docker/master-alpine/Dockerfile @@ -2,7 +2,7 @@ FROM alpine:3.5 RUN \ apk add --update go git make gcc musl-dev linux-headers ca-certificates && \ - git clone --depth 1 --branch release/1.5 https://github.com/ethereum/go-ethereum && \ + git clone --depth 1 --branch release/1.6 https://github.com/ethereum/go-ethereum && \ (cd go-ethereum && make geth) && \ cp go-ethereum/build/bin/geth /geth && \ apk del go git make gcc musl-dev linux-headers && \ diff --git a/core/tx_pool.go b/core/tx_pool.go index 1f5b46d4b..04ffa8a98 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -251,7 +251,7 @@ func (pool *TxPool) resetState() { } // Check the queue and move transactions over to the pending if possible // or remove those that have become invalid - pool.promoteExecutables(currentState) + pool.promoteExecutables(currentState, nil) } // Stop terminates the transaction pool. @@ -339,17 +339,6 @@ func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) { pool.mu.Lock() defer pool.mu.Unlock() - state, err := pool.currentState() - if err != nil { - return nil, err - } - - // check queue first - pool.promoteExecutables(state) - - // invalidate any txs - pool.demoteUnexecutables(state) - pending := make(map[common.Address]types.Transactions) for addr, list := range pool.pending { pending[addr] = list.Flatten() @@ -551,13 +540,14 @@ func (pool *TxPool) Add(tx *types.Transaction) error { if err != nil { return err } - state, err := pool.currentState() - if err != nil { - return err - } // If we added a new transaction, run promotion checks and return if !replace { - pool.promoteExecutables(state) + state, err := pool.currentState() + if err != nil { + return err + } + from, _ := types.Sender(pool.signer, tx) // already validated + pool.promoteExecutables(state, []common.Address{from}) } return nil } @@ -568,24 +558,26 @@ func (pool *TxPool) AddBatch(txs []*types.Transaction) error { defer pool.mu.Unlock() // Add the batch of transaction, tracking the accepted ones - replaced, added := true, 0 + dirty := make(map[common.Address]struct{}) for _, tx := range txs { if replace, err := pool.add(tx); err == nil { - added++ if !replace { - replaced = false + from, _ := types.Sender(pool.signer, tx) // already validated + dirty[from] = struct{}{} } } } // Only reprocess the internal state if something was actually added - if added > 0 { + if len(dirty) > 0 { state, err := pool.currentState() if err != nil { return err } - if !replaced { - pool.promoteExecutables(state) + addrs := make([]common.Address, 0, len(dirty)) + for addr, _ := range dirty { + addrs = append(addrs, addr) } + pool.promoteExecutables(state, addrs) } return nil } @@ -662,12 +654,23 @@ func (pool *TxPool) removeTx(hash common.Hash) { // promoteExecutables moves transactions that have become processable from the // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. -func (pool *TxPool) promoteExecutables(state *state.StateDB) { +func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.Address) { gaslimit := pool.gasLimit() + // Gather all the accounts potentially needing updates + if accounts == nil { + accounts = make([]common.Address, 0, len(pool.queue)) + for addr, _ := range pool.queue { + accounts = append(accounts, addr) + } + } // Iterate over all accounts and promote any executable transactions queued := uint64(0) - for addr, list := range pool.queue { + for _, addr := range accounts { + list := pool.queue[addr] + if list == nil { + continue // Just in case someone calls with a non existing account + } // Drop all transactions that are deemed too old (low nonce) for _, tx := range list.Forward(state.GetNonce(addr)) { hash := tx.Hash() diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index c12bd20a1..94b07170d 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -175,7 +175,7 @@ func TestTransactionQueue(t *testing.T) { pool.resetState() pool.enqueueTx(tx.Hash(), tx) - pool.promoteExecutables(currentState) + pool.promoteExecutables(currentState, []common.Address{from}) if len(pool.pending) != 1 { t.Error("expected valid txs to be 1 is", len(pool.pending)) } @@ -184,7 +184,7 @@ func TestTransactionQueue(t *testing.T) { from, _ = deriveSender(tx) currentState.SetNonce(from, 2) pool.enqueueTx(tx.Hash(), tx) - pool.promoteExecutables(currentState) + pool.promoteExecutables(currentState, []common.Address{from}) if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok { t.Error("expected transaction to be in tx pool") } @@ -206,7 +206,7 @@ func TestTransactionQueue(t *testing.T) { pool.enqueueTx(tx2.Hash(), tx2) pool.enqueueTx(tx3.Hash(), tx3) - pool.promoteExecutables(currentState) + pool.promoteExecutables(currentState, []common.Address{from}) if len(pool.pending) != 1 { t.Error("expected tx pool to be 1, got", len(pool.pending)) @@ -304,16 +304,16 @@ func TestTransactionDoubleNonce(t *testing.T) { t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace) } state, _ := pool.currentState() - pool.promoteExecutables(state) + pool.promoteExecutables(state, []common.Address{addr}) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) } if tx := pool.pending[addr].txs.items[0]; tx.Hash() != tx2.Hash() { t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash()) } - // Add the thid transaction and ensure it's not saved (smaller price) + // Add the third transaction and ensure it's not saved (smaller price) pool.add(tx3) - pool.promoteExecutables(state) + pool.promoteExecutables(state, []common.Address{addr}) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) } @@ -1087,7 +1087,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) { // Benchmark the speed of pool validation b.ResetTimer() for i := 0; i < b.N; i++ { - pool.promoteExecutables(state) + pool.promoteExecutables(state, nil) } } diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index ad77cd1e8..333c975c9 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -31,6 +31,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -119,7 +120,7 @@ func (s *Service) Stop() error { // loop keeps trying to connect to the netstats server, reporting chain events // until termination. func (s *Service) loop() { - // Subscribe tso chain events to execute updates on + // Subscribe to chain events to execute updates on var emux *event.TypeMux if s.eth != nil { emux = s.eth.EventMux() @@ -132,6 +133,46 @@ func (s *Service) loop() { txSub := emux.Subscribe(core.TxPreEvent{}) defer txSub.Unsubscribe() + // Start a goroutine that exhausts the subsciptions to avoid events piling up + var ( + quitCh = make(chan struct{}) + headCh = make(chan *types.Block, 1) + txCh = make(chan struct{}, 1) + ) + go func() { + var lastTx mclock.AbsTime + + for { + select { + // Notify of chain head events, but drop if too frequent + case head, ok := <-headSub.Chan(): + if !ok { // node stopped + close(quitCh) + return + } + select { + case headCh <- head.Data.(core.ChainHeadEvent).Block: + default: + } + + // Notify of new transaction events, but drop if too frequent + case _, ok := <-txSub.Chan(): + if !ok { // node stopped + close(quitCh) + return + } + if time.Duration(mclock.Now()-lastTx) < time.Second { + continue + } + lastTx = mclock.Now() + + select { + case txCh <- struct{}{}: + default: + } + } + } + }() // Loop reporting until termination for { // Resolve the URL, defaulting to TLS, but falling back to none too @@ -151,7 +192,7 @@ func (s *Service) loop() { if conf, err = websocket.NewConfig(url, "http://localhost/"); err != nil { continue } - conf.Dialer = &net.Dialer{Timeout: 3 * time.Second} + conf.Dialer = &net.Dialer{Timeout: 5 * time.Second} if conn, err = websocket.DialConfig(conf); err == nil { break } @@ -181,6 +222,10 @@ func (s *Service) loop() { for err == nil { select { + case <-quitCh: + conn.Close() + return + case <-fullReport.C: if err = s.report(conn); err != nil { log.Warn("Full stats report failed", "err", err) @@ -189,30 +234,14 @@ func (s *Service) loop() { if err = s.reportHistory(conn, list); err != nil { log.Warn("Requested history report failed", "err", err) } - case head, ok := <-headSub.Chan(): - if !ok { // node stopped - conn.Close() - return - } - if err = s.reportBlock(conn, head.Data.(core.ChainHeadEvent).Block); err != nil { + case head := <-headCh: + if err = s.reportBlock(conn, head); err != nil { log.Warn("Block stats report failed", "err", err) } if err = s.reportPending(conn); err != nil { log.Warn("Post-block transaction stats report failed", "err", err) } - case _, ok := <-txSub.Chan(): - if !ok { // node stopped - conn.Close() - return - } - // Exhaust events to avoid reporting too frequently - for exhausted := false; !exhausted; { - select { - case <-headSub.Chan(): - default: - exhausted = true - } - } + case <-txCh: if err = s.reportPending(conn); err != nil { log.Warn("Transaction stats report failed", "err", err) } @@ -398,7 +427,7 @@ func (s *Service) reportLatency(conn *websocket.Conn) error { select { case <-s.pongCh: // Pong delivered, report the latency - case <-time.After(3 * time.Second): + case <-time.After(5 * time.Second): // Ping timeout, abort return errors.New("ping timed out") } |