From 498b24270a9c301a9251150afb7f3889c929765c Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 21 Apr 2015 22:01:04 +0200 Subject: core: implemented a queued approach processing transactions Implemented a new transaction queue. Transactions with a holes in their nonce sequence are also not propagated over the network. N: 0,1,2,5,6,7 = propagate 0..2 -- 5..N is kept in the tx pool --- core/block_processor.go | 2 +- core/transaction_pool.go | 92 ++++++++++++++++++++++++++++++++----------- core/transaction_pool_test.go | 54 ++++++++++++++++++++++++- 3 files changed, 123 insertions(+), 25 deletions(-) (limited to 'core') diff --git a/core/block_processor.go b/core/block_processor.go index f33f0d433..af47069ad 100644 --- a/core/block_processor.go +++ b/core/block_processor.go @@ -258,7 +258,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st state.Sync() // Remove transactions from the pool - sm.txpool.RemoveSet(block.Transactions()) + sm.txpool.RemoveTransactions(block.Transactions()) // This puts transactions in a extra db for rpc for i, tx := range block.Transactions() { diff --git a/core/transaction_pool.go b/core/transaction_pool.go index eaddcfa09..92a2462c6 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -4,7 +4,9 @@ import ( "errors" "fmt" "math/big" + "sort" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -17,7 +19,7 @@ import ( var ( ErrInvalidSender = errors.New("Invalid sender") - ErrImpossibleNonce = errors.New("Impossible nonce") + ErrNonce = errors.New("Nonce too low") ErrNonExistentAccount = errors.New("Account does not exist") ErrInsufficientFunds = errors.New("Insufficient funds") ErrIntrinsicGas = errors.New("Intrinsic gas too low") @@ -54,20 +56,37 @@ type TxPool struct { txs map[common.Hash]*types.Transaction invalidHashes *set.Set + queue map[common.Address]types.Transactions + subscribers []chan TxMsg eventMux *event.TypeMux } func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn) *TxPool { - return &TxPool{ + txPool := &TxPool{ txs: make(map[common.Hash]*types.Transaction), + queue: make(map[common.Address]types.Transactions), queueChan: make(chan *types.Transaction, txPoolQueueSize), quit: make(chan bool), eventMux: eventMux, invalidHashes: set.New(), currentState: currentStateFn, } + return txPool +} + +func (pool *TxPool) Start() { + ticker := time.NewTicker(300 * time.Millisecond) +done: + for { + select { + case <-ticker.C: + pool.checkQueue() + case <-pool.quit: + break done + } + } } func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { @@ -100,14 +119,15 @@ func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { } if pool.currentState().GetNonce(from) > tx.Nonce() { - return ErrImpossibleNonce + return ErrNonce } return nil } func (self *TxPool) addTx(tx *types.Transaction) { - self.txs[tx.Hash()] = tx + from, _ := tx.From() + self.queue[from] = append(self.queue[from], tx) } func (self *TxPool) add(tx *types.Transaction) error { @@ -144,9 +164,6 @@ func (self *TxPool) add(tx *types.Transaction) error { glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash()) } - // Notify the subscribers - go self.eventMux.Post(TxPreEvent{tx}) - return nil } @@ -189,34 +206,65 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) { return } -func (self *TxPool) RemoveSet(txs types.Transactions) { +func (self *TxPool) RemoveTransactions(txs types.Transactions) { self.mu.Lock() defer self.mu.Unlock() + for _, tx := range txs { delete(self.txs, tx.Hash()) } } -func (self *TxPool) InvalidateSet(hashes *set.Set) { - self.mu.Lock() - defer self.mu.Unlock() - - hashes.Each(func(v interface{}) bool { - delete(self.txs, v.(common.Hash)) - return true - }) - self.invalidHashes.Merge(hashes) -} - func (pool *TxPool) Flush() { pool.txs = make(map[common.Hash]*types.Transaction) } -func (pool *TxPool) Start() { -} - func (pool *TxPool) Stop() { pool.Flush() + close(pool.quit) glog.V(logger.Info).Infoln("TX Pool stopped") } + +// check queue will attempt to insert +func (pool *TxPool) checkQueue() { + pool.mu.Lock() + defer pool.mu.Unlock() + + for address, txs := range pool.queue { + sort.Sort(types.TxByNonce{txs}) + + var ( + nonce = pool.currentState().GetNonce(address) + start int + ) + // Clean up the transactions first and determine the start of the nonces + for _, tx := range txs { + if tx.Nonce() >= nonce { + break + } + start++ + } + pool.queue[address] = txs[start:] + + // expected nonce + enonce := nonce + for _, tx := range pool.queue[address] { + // If the expected nonce does not match up with the next one + // (i.e. a nonce gap), we stop the loop + if enonce != tx.Nonce() { + break + } + enonce++ + + pool.txs[tx.Hash()] = tx + // Notify the subscribers + go pool.eventMux.Post(TxPreEvent{tx}) + } + //pool.queue[address] = txs[i:] + // delete the entire queue entry if it's empty. There's no need to keep it + if len(pool.queue[address]) == 0 { + delete(pool.queue, address) + } + } +} diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go index b7486adb3..5a5cd866f 100644 --- a/core/transaction_pool_test.go +++ b/core/transaction_pool_test.go @@ -56,7 +56,57 @@ func TestInvalidTransactions(t *testing.T) { tx.SignECDSA(key) err = pool.Add(tx) - if err != ErrImpossibleNonce { - t.Error("expected", ErrImpossibleNonce) + if err != ErrNonce { + t.Error("expected", ErrNonce) + } +} + +func TestTransactionQueue(t *testing.T) { + pool, key := setupTxPool() + tx := transaction() + tx.SignECDSA(key) + from, _ := tx.From() + pool.currentState().AddBalance(from, big.NewInt(1)) + pool.addTx(tx) + + pool.checkQueue() + if len(pool.txs) != 1 { + t.Error("expected valid txs to be 1 is", len(pool.txs)) + } + + tx = transaction() + tx.SignECDSA(key) + from, _ = tx.From() + pool.currentState().SetNonce(from, 10) + tx.SetNonce(1) + pool.addTx(tx) + pool.checkQueue() + if _, ok := pool.txs[tx.Hash()]; ok { + t.Error("expected transaction to be in tx pool") + } + + if len(pool.queue[from]) != 0 { + t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) + } + + pool, key = setupTxPool() + tx1, tx2, tx3 := transaction(), transaction(), transaction() + tx2.SetNonce(10) + tx3.SetNonce(11) + tx1.SignECDSA(key) + tx2.SignECDSA(key) + tx3.SignECDSA(key) + pool.addTx(tx1) + pool.addTx(tx2) + pool.addTx(tx3) + from, _ = tx1.From() + pool.checkQueue() + + if len(pool.txs) != 1 { + t.Error("expected tx pool to be 1 =") + } + + if len(pool.queue[from]) != 2 { + t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) } } -- cgit v1.2.3 From 7138404cb09aebb990fcce589b87173e66355987 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 21 Apr 2015 23:20:27 +0200 Subject: core: only post event once per tx & fixed test --- core/transaction_pool.go | 24 +++++++++++++++--------- core/transaction_pool_test.go | 12 ++++++------ 2 files changed, 21 insertions(+), 15 deletions(-) (limited to 'core') diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 92a2462c6..bc6e70c7a 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -125,11 +125,6 @@ func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { return nil } -func (self *TxPool) addTx(tx *types.Transaction) { - from, _ := tx.From() - self.queue[from] = append(self.queue[from], tx) -} - func (self *TxPool) add(tx *types.Transaction) error { hash := tx.Hash() @@ -147,7 +142,7 @@ func (self *TxPool) add(tx *types.Transaction) error { return err } - self.addTx(tx) + self.queueTx(tx) var toname string if to := tx.To(); to != nil { @@ -226,6 +221,19 @@ func (pool *TxPool) Stop() { glog.V(logger.Info).Infoln("TX Pool stopped") } +func (self *TxPool) queueTx(tx *types.Transaction) { + from, _ := tx.From() + self.queue[from] = append(self.queue[from], tx) +} + +func (pool *TxPool) addTx(tx *types.Transaction) { + if _, ok := pool.txs[tx.Hash()]; !ok { + pool.txs[tx.Hash()] = tx + // Notify the subscribers + pool.eventMux.Post(TxPreEvent{tx}) + } +} + // check queue will attempt to insert func (pool *TxPool) checkQueue() { pool.mu.Lock() @@ -257,9 +265,7 @@ func (pool *TxPool) checkQueue() { } enonce++ - pool.txs[tx.Hash()] = tx - // Notify the subscribers - go pool.eventMux.Post(TxPreEvent{tx}) + pool.addTx(tx) } //pool.queue[address] = txs[i:] // delete the entire queue entry if it's empty. There's no need to keep it diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go index 5a5cd866f..0e049139e 100644 --- a/core/transaction_pool_test.go +++ b/core/transaction_pool_test.go @@ -67,7 +67,7 @@ func TestTransactionQueue(t *testing.T) { tx.SignECDSA(key) from, _ := tx.From() pool.currentState().AddBalance(from, big.NewInt(1)) - pool.addTx(tx) + pool.queueTx(tx) pool.checkQueue() if len(pool.txs) != 1 { @@ -79,7 +79,7 @@ func TestTransactionQueue(t *testing.T) { from, _ = tx.From() pool.currentState().SetNonce(from, 10) tx.SetNonce(1) - pool.addTx(tx) + pool.queueTx(tx) pool.checkQueue() if _, ok := pool.txs[tx.Hash()]; ok { t.Error("expected transaction to be in tx pool") @@ -96,9 +96,9 @@ func TestTransactionQueue(t *testing.T) { tx1.SignECDSA(key) tx2.SignECDSA(key) tx3.SignECDSA(key) - pool.addTx(tx1) - pool.addTx(tx2) - pool.addTx(tx3) + pool.queueTx(tx1) + pool.queueTx(tx2) + pool.queueTx(tx3) from, _ = tx1.From() pool.checkQueue() @@ -106,7 +106,7 @@ func TestTransactionQueue(t *testing.T) { t.Error("expected tx pool to be 1 =") } - if len(pool.queue[from]) != 2 { + if len(pool.queue[from]) != 3 { t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) } } -- cgit v1.2.3 From 1506e00a2361ab641875d5f95966e8034b2902bc Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 22 Apr 2015 12:24:46 +0200 Subject: core: improved error message for invalid nonce txs --- core/error.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'core') diff --git a/core/error.go b/core/error.go index 0642948cd..40db99ecd 100644 --- a/core/error.go +++ b/core/error.go @@ -81,7 +81,7 @@ func (err *NonceErr) Error() string { } func NonceError(is, exp uint64) *NonceErr { - return &NonceErr{Message: fmt.Sprintf("Transaction w/ invalid nonce (%d / %d)", is, exp), Is: is, Exp: exp} + return &NonceErr{Message: fmt.Sprintf("Transaction w/ invalid nonce. tx=%d state=%d)", is, exp), Is: is, Exp: exp} } func IsNonceErr(err error) bool { -- cgit v1.2.3 From 7edbb0110f6ab04541ed2fef1e373d61d0dc063d Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 22 Apr 2015 12:25:24 +0200 Subject: core: set the state for the managed tx state Set the state for the managed tx state instead of creating a new managed state. --- core/chain_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'core') diff --git a/core/chain_manager.go b/core/chain_manager.go index 1df56b27f..47f84b80a 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -576,7 +576,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { }) self.setTransState(state.New(block.Root(), self.stateDb)) - self.setTxState(state.New(block.Root(), self.stateDb)) + self.txState.SetState(state.New(block.Root(), self.stateDb)) queue[i] = ChainEvent{block, logs} queueEvent.canonicalCount++ -- cgit v1.2.3 From 888ece0cb2c9d07ae821398aeffb0000ef28fb23 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 22 Apr 2015 13:09:59 +0200 Subject: core: fixed test --- core/chain_makers.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'core') diff --git a/core/chain_makers.go b/core/chain_makers.go index 250671ef8..9b4911fba 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -108,7 +108,9 @@ func makeChain(bman *BlockProcessor, parent *types.Block, max int, db common.Dat // Create a new chain manager starting from given block // Effectively a fork factory func newChainManager(block *types.Block, eventMux *event.TypeMux, db common.Database) *ChainManager { - bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: GenesisBlock(db), eventMux: eventMux} + genesis := GenesisBlock(db) + bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: eventMux} + bc.txState = state.ManageState(state.New(genesis.Root(), db)) bc.futureBlocks = NewBlockCache(1000) if block == nil { bc.Reset() -- cgit v1.2.3 From d3be1a271961f13f5bd056d195b790c668552fe1 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 22 Apr 2015 17:56:06 +0200 Subject: eth: moved mined, tx events to protocol-hnd and improved tx propagation Transactions are now propagated to peers from which we have not yet received the transaction. This will significantly reduce the chatter on the network. Moved new mined block handler to the protocol handler and moved transaction handling to protocol handler. --- core/transaction_pool.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'core') diff --git a/core/transaction_pool.go b/core/transaction_pool.go index bc6e70c7a..9c175e568 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -229,8 +229,10 @@ func (self *TxPool) queueTx(tx *types.Transaction) { func (pool *TxPool) addTx(tx *types.Transaction) { if _, ok := pool.txs[tx.Hash()]; !ok { pool.txs[tx.Hash()] = tx - // Notify the subscribers - pool.eventMux.Post(TxPreEvent{tx}) + // Notify the subscribers. This event is posted in a goroutine + // because it's possible that somewhere during the post "Remove transaction" + // gets called which will then wait for the global tx pool lock and deadlock. + go pool.eventMux.Post(TxPreEvent{tx}) } } -- cgit v1.2.3 From fba40e18d9c231b3ab7ee7f6eba36ac859dffbb2 Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 23 Apr 2015 10:51:13 +0200 Subject: core: added accessor for queued transactions --- core/transaction_pool.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'core') diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 9c175e568..7098dba23 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -201,6 +201,18 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) { return } +func (self *TxPool) GetQueuedTransactions() types.Transactions { + self.mu.RLock() + defer self.mu.RUnlock() + + var txs types.Transactions + for _, ts := range self.queue { + txs = append(txs, ts...) + } + + return txs +} + func (self *TxPool) RemoveTransactions(txs types.Transactions) { self.mu.Lock() defer self.mu.Unlock() -- cgit v1.2.3 From 7f14fbd57936cf74627572da4a06585d35161ea9 Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 23 Apr 2015 11:09:58 +0200 Subject: core: pending txs now re-validated once every second --- core/transaction_pool.go | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) (limited to 'core') diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 7098dba23..392e17856 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -77,12 +77,18 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn) *TxPool { } func (pool *TxPool) Start() { - ticker := time.NewTicker(300 * time.Millisecond) + // Queue timer will tick so we can attempt to move items from the queue to the + // main transaction pool. + queueTimer := time.NewTicker(300 * time.Millisecond) + // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce) + removalTimer := time.NewTicker(1 * time.Second) done: for { select { - case <-ticker.C: + case <-queueTimer.C: pool.checkQueue() + case <-removalTimer.C: + pool.validatePool() case <-pool.quit: break done } @@ -253,11 +259,12 @@ func (pool *TxPool) checkQueue() { pool.mu.Lock() defer pool.mu.Unlock() + statedb := pool.currentState() for address, txs := range pool.queue { sort.Sort(types.TxByNonce{txs}) var ( - nonce = pool.currentState().GetNonce(address) + nonce = statedb.GetNonce(address) start int ) // Clean up the transactions first and determine the start of the nonces @@ -288,3 +295,20 @@ func (pool *TxPool) checkQueue() { } } } + +func (pool *TxPool) validatePool() { + pool.mu.Lock() + defer pool.mu.Unlock() + + statedb := pool.currentState() + for hash, tx := range pool.txs { + from, _ := tx.From() + if nonce := statedb.GetNonce(from); nonce > tx.Nonce() { + if glog.V(logger.Debug) { + glog.Infof("removed tx (%x) from pool due to nonce error. state=%d tx=%d\n", hash[:4], nonce, tx.Nonce()) + } + + delete(pool.txs, hash) + } + } +} -- cgit v1.2.3