aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2017-10-20 20:21:54 +0800
committerGitHub <noreply@github.com>2017-10-20 20:21:54 +0800
commit479aa61f11724560c63a7b56084259552892819d (patch)
treef21465975f55507e85f8a3cebc7440371b266c30
parenteaa4f8a5f925592bd777d5b0daf635bec85f6242 (diff)
parent0af1ab0c86975201349d92e0943485c3534d8c8c (diff)
downloadgo-tangerine-479aa61f11724560c63a7b56084259552892819d.tar
go-tangerine-479aa61f11724560c63a7b56084259552892819d.tar.gz
go-tangerine-479aa61f11724560c63a7b56084259552892819d.tar.bz2
go-tangerine-479aa61f11724560c63a7b56084259552892819d.tar.lz
go-tangerine-479aa61f11724560c63a7b56084259552892819d.tar.xz
go-tangerine-479aa61f11724560c63a7b56084259552892819d.tar.zst
go-tangerine-479aa61f11724560c63a7b56084259552892819d.zip
Merge pull request #15343 from karalabe/txpool-replacement-propagation
core: fire tx event on replace, expand tests
-rw-r--r--core/bloombits/matcher_test.go2
-rw-r--r--core/tx_journal.go13
-rw-r--r--core/tx_pool.go5
-rw-r--r--core/tx_pool_test.go188
-rw-r--r--event/feed.go8
5 files changed, 209 insertions, 7 deletions
diff --git a/core/bloombits/matcher_test.go b/core/bloombits/matcher_test.go
index f95d0ea9e..2e15e7aac 100644
--- a/core/bloombits/matcher_test.go
+++ b/core/bloombits/matcher_test.go
@@ -85,7 +85,7 @@ func TestWildcardMatcher(t *testing.T) {
}
// makeRandomIndexes generates a random filter system, composed on multiple filter
-// criteria, each having one bloom list component for the address and arbitrarilly
+// criteria, each having one bloom list component for the address and arbitrarily
// many topic bloom list components.
func makeRandomIndexes(lengths []int, max int) [][]bloomIndexes {
res := make([][]bloomIndexes, len(lengths))
diff --git a/core/tx_journal.go b/core/tx_journal.go
index 94a9ff9b8..3fd8ece49 100644
--- a/core/tx_journal.go
+++ b/core/tx_journal.go
@@ -31,6 +31,15 @@ import (
// into the journal, but no such file is currently open.
var errNoActiveJournal = errors.New("no active journal")
+// devNull is a WriteCloser that just discards anything written into it. Its
+// goal is to allow the transaction journal to write into a fake journal when
+// loading transactions on startup without printing warnings due to no file
+// being readt for write.
+type devNull struct{}
+
+func (*devNull) Write(p []byte) (n int, err error) { return len(p), nil }
+func (*devNull) Close() error { return nil }
+
// txJournal is a rotating log of transactions with the aim of storing locally
// created transactions to allow non-executed ones to survive node restarts.
type txJournal struct {
@@ -59,6 +68,10 @@ func (journal *txJournal) load(add func(*types.Transaction) error) error {
}
defer input.Close()
+ // Temporarilly discard any journal additions (don't double add on load)
+ journal.writer = new(devNull)
+ defer func() { journal.writer = nil }()
+
// Inject all transactions from the journal into the pool
stream := rlp.NewStream(input, 0)
total, dropped := 0, 0
diff --git a/core/tx_pool.go b/core/tx_pool.go
index 0ad765179..a705e36d6 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -640,6 +640,10 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool.journalTx(from, tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
+
+ // We've directly injected a replacement transaction, notify subsystems
+ go pool.txFeed.Send(TxPreEvent{tx})
+
return old != nil, nil
}
// New transaction isn't replacing a pending one, push into queue
@@ -729,6 +733,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.beats[addr] = time.Now()
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
+
go pool.txFeed.Send(TxPreEvent{tx})
}
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index 17d736877..eec128cba 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -117,6 +117,28 @@ func validateTxPoolInternals(pool *TxPool) error {
return nil
}
+// validateEvents checks that the correct number of transaction addition events
+// were fired on the pool's event feed.
+func validateEvents(events chan TxPreEvent, count int) error {
+ for i := 0; i < count; i++ {
+ select {
+ case <-events:
+ case <-time.After(time.Second):
+ return fmt.Errorf("event #%d not fired", i)
+ }
+ }
+ select {
+ case tx := <-events:
+ return fmt.Errorf("more than %d events fired: %v", count, tx.Tx)
+
+ case <-time.After(50 * time.Millisecond):
+ // This branch should be "default", but it's a data race between goroutines,
+ // reading the event channel and pushng into it, so better wait a bit ensuring
+ // really nothing gets injected.
+ }
+ return nil
+}
+
func deriveSender(tx *types.Transaction) (common.Address, error) {
return types.Sender(types.HomesteadSigner{}, tx)
}
@@ -149,7 +171,9 @@ func (c *testChain) State() (*state.StateDB, error) {
// This test simulates a scenario where a new block is imported during a
// state reset and tests whether the pending state is in sync with the
// block head event that initiated the resetState().
-func TestStateChangeDuringPoolReset(t *testing.T) {
+func TestStateChangeDuringTransactionPoolReset(t *testing.T) {
+ t.Parallel()
+
var (
db, _ = ethdb.NewMemDatabase()
key, _ = crypto.GenerateKey()
@@ -201,6 +225,8 @@ func TestStateChangeDuringPoolReset(t *testing.T) {
}
func TestInvalidTransactions(t *testing.T) {
+ t.Parallel()
+
pool, key := setupTxPool()
defer pool.Stop()
@@ -236,6 +262,8 @@ func TestInvalidTransactions(t *testing.T) {
}
func TestTransactionQueue(t *testing.T) {
+ t.Parallel()
+
pool, key := setupTxPool()
defer pool.Stop()
@@ -287,7 +315,9 @@ func TestTransactionQueue(t *testing.T) {
}
}
-func TestNegativeValue(t *testing.T) {
+func TestTransactionNegativeValue(t *testing.T) {
+ t.Parallel()
+
pool, key := setupTxPool()
defer pool.Stop()
@@ -300,6 +330,8 @@ func TestNegativeValue(t *testing.T) {
}
func TestTransactionChainFork(t *testing.T) {
+ t.Parallel()
+
pool, key := setupTxPool()
defer pool.Stop()
@@ -328,6 +360,8 @@ func TestTransactionChainFork(t *testing.T) {
}
func TestTransactionDoubleNonce(t *testing.T) {
+ t.Parallel()
+
pool, key := setupTxPool()
defer pool.Stop()
@@ -376,7 +410,9 @@ func TestTransactionDoubleNonce(t *testing.T) {
}
}
-func TestMissingNonce(t *testing.T) {
+func TestTransactionMissingNonce(t *testing.T) {
+ t.Parallel()
+
pool, key := setupTxPool()
defer pool.Stop()
@@ -398,6 +434,8 @@ func TestMissingNonce(t *testing.T) {
}
func TestTransactionNonceRecovery(t *testing.T) {
+ t.Parallel()
+
const n = 10
pool, key := setupTxPool()
defer pool.Stop()
@@ -422,6 +460,8 @@ func TestTransactionNonceRecovery(t *testing.T) {
// Tests that if an account runs out of funds, any pending and queued transactions
// are dropped.
func TestTransactionDropping(t *testing.T) {
+ t.Parallel()
+
// Create a test account and fund it
pool, key := setupTxPool()
defer pool.Stop()
@@ -515,6 +555,8 @@ func TestTransactionDropping(t *testing.T) {
// of fund), all consecutive (still valid, but not executable) transactions are
// postponed back into the future queue to prevent broadcasting them.
func TestTransactionPostponing(t *testing.T) {
+ t.Parallel()
+
// Create a test account and fund it
pool, key := setupTxPool()
defer pool.Stop()
@@ -586,9 +628,68 @@ func TestTransactionPostponing(t *testing.T) {
}
}
+// Tests that if the transaction pool has both executable and non-executable
+// transactions from an origin account, filling the nonce gap moves all queued
+// ones into the pending pool.
+func TestTransactionGapFilling(t *testing.T) {
+ t.Parallel()
+
+ // Create a test account and fund it
+ pool, key := setupTxPool()
+ defer pool.Stop()
+
+ account, _ := deriveSender(transaction(0, big.NewInt(0), key))
+ pool.currentState.AddBalance(account, big.NewInt(1000000))
+
+ // Keep track of transaction events to ensure all executables get announced
+ events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5)
+ sub := pool.txFeed.Subscribe(events)
+ defer sub.Unsubscribe()
+
+ // Create a pending and a queued transaction with a nonce-gap in between
+ if err := pool.AddRemote(transaction(0, big.NewInt(100000), key)); err != nil {
+ t.Fatalf("failed to add pending transaction: %v", err)
+ }
+ if err := pool.AddRemote(transaction(2, big.NewInt(100000), key)); err != nil {
+ t.Fatalf("failed to add queued transaction: %v", err)
+ }
+ pending, queued := pool.Stats()
+ if pending != 1 {
+ t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
+ }
+ if queued != 1 {
+ t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
+ }
+ if err := validateEvents(events, 1); err != nil {
+ t.Fatalf("original event firing failed: %v", err)
+ }
+ if err := validateTxPoolInternals(pool); err != nil {
+ t.Fatalf("pool internal state corrupted: %v", err)
+ }
+ // Fill the nonce gap and ensure all transactions become pending
+ if err := pool.AddRemote(transaction(1, big.NewInt(100000), key)); err != nil {
+ t.Fatalf("failed to add gapped transaction: %v", err)
+ }
+ pending, queued = pool.Stats()
+ if pending != 3 {
+ t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
+ }
+ if queued != 0 {
+ t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
+ }
+ if err := validateEvents(events, 2); err != nil {
+ t.Fatalf("gap-filling event firing failed: %v", err)
+ }
+ if err := validateTxPoolInternals(pool); err != nil {
+ t.Fatalf("pool internal state corrupted: %v", err)
+ }
+}
+
// Tests that if the transaction count belonging to a single account goes above
// some threshold, the higher transactions are dropped to prevent DOS attacks.
func TestTransactionQueueAccountLimiting(t *testing.T) {
+ t.Parallel()
+
// Create a test account and fund it
pool, key := setupTxPool()
defer pool.Stop()
@@ -632,6 +733,8 @@ func TestTransactionQueueGlobalLimitingNoLocals(t *testing.T) {
}
func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) {
+ t.Parallel()
+
// Create the pool to test the limit enforcement with
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
@@ -782,6 +885,8 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
// above some threshold, as long as the transactions are executable, they are
// accepted.
func TestTransactionPendingLimiting(t *testing.T) {
+ t.Parallel()
+
// Create a test account and fund it
pool, key := setupTxPool()
defer pool.Stop()
@@ -789,6 +894,11 @@ func TestTransactionPendingLimiting(t *testing.T) {
account, _ := deriveSender(transaction(0, big.NewInt(0), key))
pool.currentState.AddBalance(account, big.NewInt(1000000))
+ // Keep track of transaction events to ensure all executables get announced
+ events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5)
+ sub := pool.txFeed.Subscribe(events)
+ defer sub.Unsubscribe()
+
// Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
if err := pool.AddRemote(transaction(i, big.NewInt(100000), key)); err != nil {
@@ -804,6 +914,12 @@ func TestTransactionPendingLimiting(t *testing.T) {
if len(pool.all) != int(testTxPoolConfig.AccountQueue+5) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), testTxPoolConfig.AccountQueue+5)
}
+ if err := validateEvents(events, int(testTxPoolConfig.AccountQueue+5)); err != nil {
+ t.Fatalf("event firing failed: %v", err)
+ }
+ if err := validateTxPoolInternals(pool); err != nil {
+ t.Fatalf("pool internal state corrupted: %v", err)
+ }
}
// Tests that the transaction limits are enforced the same way irrelevant whether
@@ -812,6 +928,8 @@ func TestTransactionQueueLimitingEquivalency(t *testing.T) { testTransactionLi
func TestTransactionPendingLimitingEquivalency(t *testing.T) { testTransactionLimitingEquivalency(t, 0) }
func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
+ t.Parallel()
+
// Add a batch of transactions to a pool one by one
pool1, key1 := setupTxPool()
defer pool1.Stop()
@@ -859,6 +977,8 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
// some hard threshold, the higher transactions are dropped to prevent DOS
// attacks.
func TestTransactionPendingGlobalLimiting(t *testing.T) {
+ t.Parallel()
+
// Create the pool to test the limit enforcement with
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
@@ -904,6 +1024,8 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
// Tests that if transactions start being capped, transactions are also removed from 'all'
func TestTransactionCapClearsFromAll(t *testing.T) {
+ t.Parallel()
+
// Create the pool to test the limit enforcement with
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
@@ -937,6 +1059,8 @@ func TestTransactionCapClearsFromAll(t *testing.T) {
// some hard threshold, if they are under the minimum guaranteed slot count then
// the transactions are still kept.
func TestTransactionPendingMinimumAllowance(t *testing.T) {
+ t.Parallel()
+
// Create the pool to test the limit enforcement with
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
@@ -984,6 +1108,8 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
//
// Note, local transactions are never allowed to be dropped.
func TestTransactionPoolRepricing(t *testing.T) {
+ t.Parallel()
+
// Create the pool to test the pricing enforcement with
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
@@ -992,6 +1118,11 @@ func TestTransactionPoolRepricing(t *testing.T) {
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
+ // Keep track of transaction events to ensure all executables get announced
+ events := make(chan TxPreEvent, 32)
+ sub := pool.txFeed.Subscribe(events)
+ defer sub.Unsubscribe()
+
// Create a number of test accounts and fund them
keys := make([]*ecdsa.PrivateKey, 3)
for i := 0; i < len(keys); i++ {
@@ -1022,6 +1153,9 @@ func TestTransactionPoolRepricing(t *testing.T) {
if queued != 3 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
}
+ if err := validateEvents(events, 4); err != nil {
+ t.Fatalf("original event firing failed: %v", err)
+ }
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
@@ -1035,6 +1169,9 @@ func TestTransactionPoolRepricing(t *testing.T) {
if queued != 3 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
}
+ if err := validateEvents(events, 0); err != nil {
+ t.Fatalf("reprice event firing failed: %v", err)
+ }
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
@@ -1045,6 +1182,9 @@ func TestTransactionPoolRepricing(t *testing.T) {
if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), keys[1])); err != ErrUnderpriced {
t.Fatalf("adding underpriced queued transaction error mismatch: have %v, want %v", err, ErrUnderpriced)
}
+ if err := validateEvents(events, 0); err != nil {
+ t.Fatalf("post-reprice event firing failed: %v", err)
+ }
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
@@ -1056,6 +1196,9 @@ func TestTransactionPoolRepricing(t *testing.T) {
if pending, _ = pool.Stats(); pending != 3 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
}
+ if err := validateEvents(events, 1); err != nil {
+ t.Fatalf("post-reprice local event firing failed: %v", err)
+ }
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
@@ -1064,6 +1207,8 @@ func TestTransactionPoolRepricing(t *testing.T) {
// Tests that setting the transaction pool gas price to a higher value does not
// remove local transactions.
func TestTransactionPoolRepricingKeepsLocals(t *testing.T) {
+ t.Parallel()
+
// Create the pool to test the pricing enforcement with
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
@@ -1125,6 +1270,8 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) {
//
// Note, local transactions are never allowed to be dropped.
func TestTransactionPoolUnderpricing(t *testing.T) {
+ t.Parallel()
+
// Create the pool to test the pricing enforcement with
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
@@ -1137,6 +1284,11 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
pool := NewTxPool(config, params.TestChainConfig, blockchain)
defer pool.Stop()
+ // Keep track of transaction events to ensure all executables get announced
+ events := make(chan TxPreEvent, 32)
+ sub := pool.txFeed.Subscribe(events)
+ defer sub.Unsubscribe()
+
// Create a number of test accounts and fund them
keys := make([]*ecdsa.PrivateKey, 3)
for i := 0; i < len(keys); i++ {
@@ -1164,6 +1316,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
if queued != 1 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
}
+ if err := validateEvents(events, 3); err != nil {
+ t.Fatalf("original event firing failed: %v", err)
+ }
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
@@ -1188,6 +1343,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}
+ if err := validateEvents(events, 2); err != nil {
+ t.Fatalf("additional event firing failed: %v", err)
+ }
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
@@ -1203,6 +1361,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}
+ if err := validateEvents(events, 1); err != nil {
+ t.Fatalf("local event firing failed: %v", err)
+ }
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
@@ -1211,6 +1372,8 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
// Tests that the pool rejects replacement transactions that don't meet the minimum
// price bump required.
func TestTransactionReplacement(t *testing.T) {
+ t.Parallel()
+
// Create the pool to test the pricing enforcement with
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
@@ -1219,6 +1382,11 @@ func TestTransactionReplacement(t *testing.T) {
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
+ // Keep track of transaction events to ensure all executables get announced
+ events := make(chan TxPreEvent, 32)
+ sub := pool.txFeed.Subscribe(events)
+ defer sub.Unsubscribe()
+
// Create a test account to add transactions with
key, _ := crypto.GenerateKey()
pool.currentState.AddBalance(crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000))
@@ -1236,6 +1404,9 @@ func TestTransactionReplacement(t *testing.T) {
if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(2), key)); err != nil {
t.Fatalf("failed to replace original cheap pending transaction: %v", err)
}
+ if err := validateEvents(events, 2); err != nil {
+ t.Fatalf("cheap replacement event firing failed: %v", err)
+ }
if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(price), key)); err != nil {
t.Fatalf("failed to add original proper pending transaction: %v", err)
@@ -1246,6 +1417,9 @@ func TestTransactionReplacement(t *testing.T) {
if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(threshold+1), key)); err != nil {
t.Fatalf("failed to replace original proper pending transaction: %v", err)
}
+ if err := validateEvents(events, 2); err != nil {
+ t.Fatalf("proper replacement event firing failed: %v", err)
+ }
// Add queued transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too)
if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), key)); err != nil {
t.Fatalf("failed to add original queued transaction: %v", err)
@@ -1266,6 +1440,10 @@ func TestTransactionReplacement(t *testing.T) {
if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100000), big.NewInt(threshold+1), key)); err != nil {
t.Fatalf("failed to replace original queued transaction: %v", err)
}
+
+ if err := validateEvents(events, 0); err != nil {
+ t.Fatalf("queued replacement event firing failed: %v", err)
+ }
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
@@ -1277,6 +1455,8 @@ func TestTransactionJournaling(t *testing.T) { testTransactionJournaling
func TestTransactionJournalingNoLocals(t *testing.T) { testTransactionJournaling(t, true) }
func testTransactionJournaling(t *testing.T, nolocals bool) {
+ t.Parallel()
+
// Create a temporary file for the journal
file, err := ioutil.TempFile("", "")
if err != nil {
@@ -1335,6 +1515,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
pool.Stop()
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
+
pool = NewTxPool(config, params.TestChainConfig, blockchain)
pending, queued = pool.Stats()
@@ -1358,6 +1539,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
pool.lockedReset(nil, nil)
time.Sleep(2 * config.Rejournal)
pool.Stop()
+
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
pool = NewTxPool(config, params.TestChainConfig, blockchain)
diff --git a/event/feed.go b/event/feed.go
index b1b597f17..78fa3d98d 100644
--- a/event/feed.go
+++ b/event/feed.go
@@ -127,6 +127,8 @@ func (f *Feed) remove(sub *feedSub) {
// Send delivers to all subscribed channels simultaneously.
// It returns the number of subscribers that the value was sent to.
func (f *Feed) Send(value interface{}) (nsent int) {
+ rvalue := reflect.ValueOf(value)
+
f.once.Do(f.init)
<-f.sendLock
@@ -134,14 +136,14 @@ func (f *Feed) Send(value interface{}) (nsent int) {
f.mu.Lock()
f.sendCases = append(f.sendCases, f.inbox...)
f.inbox = nil
- f.mu.Unlock()
- // Set the sent value on all channels.
- rvalue := reflect.ValueOf(value)
if !f.typecheck(rvalue.Type()) {
f.sendLock <- struct{}{}
panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
}
+ f.mu.Unlock()
+
+ // Set the sent value on all channels.
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = rvalue
}