aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJeffrey Wilcke <jeffrey@ethereum.org>2015-06-11 03:10:18 +0800
committerJeffrey Wilcke <jeffrey@ethereum.org>2015-06-11 03:10:18 +0800
commitacb59f3243bf1e12185912ba0684007247ade7ca (patch)
treec746236deea60c34d40c0238a4551a2eafd9b8ae
parent858a6f0be9da459a87004755dffae2c3fc5544d2 (diff)
parent4407524d13994759230ce6d31d828914597e8f6c (diff)
downloaddexon-acb59f3243bf1e12185912ba0684007247ade7ca.tar
dexon-acb59f3243bf1e12185912ba0684007247ade7ca.tar.gz
dexon-acb59f3243bf1e12185912ba0684007247ade7ca.tar.bz2
dexon-acb59f3243bf1e12185912ba0684007247ade7ca.tar.lz
dexon-acb59f3243bf1e12185912ba0684007247ade7ca.tar.xz
dexon-acb59f3243bf1e12185912ba0684007247ade7ca.tar.zst
dexon-acb59f3243bf1e12185912ba0684007247ade7ca.zip
Merge pull request #1223 from obscuren/tx-pool-vroooooom
core: fixed race condition in the transaction pool
-rw-r--r--core/helper_test.go5
-rw-r--r--core/transaction_pool.go63
-rw-r--r--eth/backend.go5
3 files changed, 32 insertions, 41 deletions
diff --git a/core/helper_test.go b/core/helper_test.go
index 1e0ed178b..a308153aa 100644
--- a/core/helper_test.go
+++ b/core/helper_test.go
@@ -6,8 +6,8 @@ import (
"github.com/ethereum/go-ethereum/core/types"
// "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
)
@@ -76,8 +76,5 @@ func NewTestManager() *TestManager {
// testManager.blockChain = NewChainManager(testManager)
// testManager.stateManager = NewStateManager(testManager)
- // Start the tx pool
- testManager.txPool.Start()
-
return testManager
}
diff --git a/core/transaction_pool.go b/core/transaction_pool.go
index a2f970195..4a0594228 100644
--- a/core/transaction_pool.go
+++ b/core/transaction_pool.go
@@ -50,7 +50,7 @@ type TxPool struct {
}
func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
- return &TxPool{
+ pool := &TxPool{
pending: make(map[common.Hash]*types.Transaction),
queue: make(map[common.Address]map[common.Hash]*types.Transaction),
quit: make(chan bool),
@@ -58,14 +58,17 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func(
currentState: currentStateFn,
gasLimit: gasLimitFn,
pendingState: state.ManageState(currentStateFn()),
+ events: eventMux.Subscribe(ChainEvent{}),
}
+ go pool.eventLoop()
+
+ return pool
}
-func (pool *TxPool) Start() {
+func (pool *TxPool) eventLoop() {
// Track chain events. When a chain events occurs (new chain canon block)
// we need to know the new state. The new state will help us determine
// the nonces in the managed state
- pool.events = pool.eventMux.Subscribe(ChainEvent{})
for _ = range pool.events.Chan() {
pool.mu.Lock()
@@ -100,7 +103,6 @@ func (pool *TxPool) resetState() {
}
func (pool *TxPool) Stop() {
- pool.pending = make(map[common.Hash]*types.Transaction)
close(pool.quit)
pool.events.Unsubscribe()
glog.V(logger.Info).Infoln("TX Pool stopped")
@@ -169,15 +171,10 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error {
return nil
}
+// validate and queue transactions.
func (self *TxPool) add(tx *types.Transaction) error {
hash := tx.Hash()
- /* XXX I'm unsure about this. This is extremely dangerous and may result
- in total black listing of certain transactions
- if self.invalidHashes.Has(hash) {
- return fmt.Errorf("Invalid transaction (%x)", hash[:4])
- }
- */
if self.pending[hash] != nil {
return fmt.Errorf("Known transaction (%x)", hash[:4])
}
@@ -207,6 +204,30 @@ func (self *TxPool) add(tx *types.Transaction) error {
return nil
}
+// queueTx will queue an unknown transaction
+func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
+ from, _ := tx.From() // already validated
+ if self.queue[from] == nil {
+ self.queue[from] = make(map[common.Hash]*types.Transaction)
+ }
+ self.queue[from][hash] = tx
+}
+
+// addTx will add a transaction to the pending (processable queue) list of transactions
+func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
+ if _, ok := pool.pending[hash]; !ok {
+ pool.pending[hash] = tx
+
+ // Increment the nonce on the pending state. This can only happen if
+ // the nonce is +1 to the previous one.
+ pool.pendingState.SetNonce(addr, tx.AccountNonce+1)
+ // Notify the subscribers. This event is posted in a goroutine
+ // because it's possible that somewhere during the post "Remove transaction"
+ // gets called which will then wait for the global tx pool lock and deadlock.
+ go pool.eventMux.Post(TxPreEvent{tx})
+ }
+}
+
// Add queues a single transaction in the pool if it is valid.
func (self *TxPool) Add(tx *types.Transaction) error {
self.mu.Lock()
@@ -290,28 +311,6 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) {
}
}
-func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
- from, _ := tx.From() // already validated
- if self.queue[from] == nil {
- self.queue[from] = make(map[common.Hash]*types.Transaction)
- }
- self.queue[from][hash] = tx
-}
-
-func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
- if _, ok := pool.pending[hash]; !ok {
- pool.pending[hash] = tx
-
- // Increment the nonce on the pending state. This can only happen if
- // the nonce is +1 to the previous one.
- pool.pendingState.SetNonce(addr, tx.AccountNonce+1)
- // Notify the subscribers. This event is posted in a goroutine
- // because it's possible that somewhere during the post "Remove transaction"
- // gets called which will then wait for the global tx pool lock and deadlock.
- go pool.eventMux.Post(TxPreEvent{tx})
- }
-}
-
// checkQueue moves transactions that have become processable to main pool.
func (pool *TxPool) checkQueue() {
state := pool.pendingState
diff --git a/eth/backend.go b/eth/backend.go
index fcbea04a2..60e9359dc 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -466,8 +466,6 @@ func (s *Ethereum) Start() error {
s.StartAutoDAG()
}
- // Start services
- go s.txPool.Start()
s.protocolManager.Start()
if s.whisper != nil {
@@ -513,9 +511,6 @@ func (s *Ethereum) StartForTest() {
ClientString: s.net.Name,
ProtocolVersion: ProtocolVersion,
})
-
- // Start services
- s.txPool.Start()
}
// AddPeer connects to the given node and maintains the connection until the