From ff697e82dcfc94cc666f863d5093cfc23a11d98a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 1 Jul 2016 18:59:55 +0300 Subject: [release/1.4.17] core, eth, internal, miner: optimize txpool for quick ops (cherry picked from commit 0ef327bbee79c01a69ba59258acc6ce3a48bc288) --- eth/api.go | 62 +++++++++++++++++++++++++++------------------------- eth/handler.go | 2 +- eth/helper_test.go | 22 ++++++++++++------- eth/protocol.go | 8 +++---- eth/protocol_test.go | 2 +- eth/sync.go | 5 ++++- 6 files changed, 56 insertions(+), 45 deletions(-) (limited to 'eth') diff --git a/eth/api.go b/eth/api.go index 43e00fc06..ed885805a 100644 --- a/eth/api.go +++ b/eth/api.go @@ -979,7 +979,7 @@ func getTransaction(chainDb ethdb.Database, txPool *core.TxPool, txHash common.H } } else { // pending transaction? - tx = txPool.GetTransaction(txHash) + tx = txPool.Get(txHash) isPending = true } @@ -1391,12 +1391,13 @@ func (s *PublicTransactionPoolAPI) SignTransaction(args SignTransactionArgs) (*S // PendingTransactions returns the transactions that are in the transaction pool and have a from address that is one of // the accounts this node manages. func (s *PublicTransactionPoolAPI) PendingTransactions() []*RPCTransaction { - pending := s.txPool.GetTransactions() + pending := s.txPool.Pending() transactions := make([]*RPCTransaction, 0, len(pending)) - for _, tx := range pending { - from, _ := tx.FromFrontier() - if s.am.HasAddress(from) { - transactions = append(transactions, newRPCPendingTransaction(tx)) + for addr, txs := range pending { + if s.am.HasAddress(addr) { + for _, tx := range txs { + transactions = append(transactions, newRPCPendingTransaction(tx)) + } } } return transactions @@ -1430,35 +1431,36 @@ func (s *PublicTransactionPoolAPI) NewPendingTransactions(ctx context.Context) ( // Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the // pool and reinsert it with the new gas price and limit. func (s *PublicTransactionPoolAPI) Resend(tx Tx, gasPrice, gasLimit *rpc.HexNumber) (common.Hash, error) { + pending := s.txPool.Pending() + for addr, txs := range pending { + for _, p := range txs { + if addr == tx.From && p.SigHash() == tx.tx.SigHash() { + if gasPrice == nil { + gasPrice = rpc.NewHexNumber(tx.tx.GasPrice()) + } + if gasLimit == nil { + gasLimit = rpc.NewHexNumber(tx.tx.Gas()) + } - pending := s.txPool.GetTransactions() - for _, p := range pending { - if pFrom, err := p.FromFrontier(); err == nil && pFrom == tx.From && p.SigHash() == tx.tx.SigHash() { - if gasPrice == nil { - gasPrice = rpc.NewHexNumber(tx.tx.GasPrice()) - } - if gasLimit == nil { - gasLimit = rpc.NewHexNumber(tx.tx.Gas()) - } + var newTx *types.Transaction + if tx.tx.To() == nil { + newTx = types.NewContractCreation(tx.tx.Nonce(), tx.tx.Value(), gasPrice.BigInt(), gasLimit.BigInt(), tx.tx.Data()) + } else { + newTx = types.NewTransaction(tx.tx.Nonce(), *tx.tx.To(), tx.tx.Value(), gasPrice.BigInt(), gasLimit.BigInt(), tx.tx.Data()) + } - var newTx *types.Transaction - if tx.tx.To() == nil { - newTx = types.NewContractCreation(tx.tx.Nonce(), tx.tx.Value(), gasPrice.BigInt(), gasLimit.BigInt(), tx.tx.Data()) - } else { - newTx = types.NewTransaction(tx.tx.Nonce(), *tx.tx.To(), tx.tx.Value(), gasPrice.BigInt(), gasLimit.BigInt(), tx.tx.Data()) - } + signedTx, err := s.sign(tx.From, newTx) + if err != nil { + return common.Hash{}, err + } - signedTx, err := s.sign(tx.From, newTx) - if err != nil { - return common.Hash{}, err - } + s.txPool.Remove(tx.Hash) + if err = s.txPool.Add(signedTx); err != nil { + return common.Hash{}, err + } - s.txPool.RemoveTx(tx.Hash) - if err = s.txPool.Add(signedTx); err != nil { - return common.Hash{}, err + return signedTx.Hash(), nil } - - return signedTx.Hash(), nil } } diff --git a/eth/handler.go b/eth/handler.go index ca73a7d54..89418c0df 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -689,7 +689,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } p.MarkTransaction(tx.Hash()) } - pm.txpool.AddTransactions(txs) + pm.txpool.AddBatch(txs) default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) diff --git a/eth/helper_test.go b/eth/helper_test.go index 28ff69b17..732fe89ee 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -23,6 +23,7 @@ import ( "crypto/ecdsa" "crypto/rand" "math/big" + "sort" "sync" "testing" @@ -89,9 +90,9 @@ type testTxPool struct { lock sync.RWMutex // Protects the transaction pool } -// AddTransactions appends a batch of transactions to the pool, and notifies any +// AddBatch appends a batch of transactions to the pool, and notifies any // listeners if the addition channel is non nil -func (p *testTxPool) AddTransactions(txs []*types.Transaction) { +func (p *testTxPool) AddBatch(txs []*types.Transaction) { p.lock.Lock() defer p.lock.Unlock() @@ -101,15 +102,20 @@ func (p *testTxPool) AddTransactions(txs []*types.Transaction) { } } -// GetTransactions returns all the transactions known to the pool -func (p *testTxPool) GetTransactions() types.Transactions { +// Pending returns all the transactions known to the pool +func (p *testTxPool) Pending() map[common.Address]types.Transactions { p.lock.RLock() defer p.lock.RUnlock() - txs := make([]*types.Transaction, len(p.pool)) - copy(txs, p.pool) - - return txs + batches := make(map[common.Address]types.Transactions) + for _, tx := range p.pool { + from, _ := tx.From() + batches[from] = append(batches[from], tx) + } + for _, batch := range batches { + sort.Sort(types.TxByNonce(batch)) + } + return batches } // newTestTransaction create a new dummy transaction. diff --git a/eth/protocol.go b/eth/protocol.go index 69b3be578..3f65c204b 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -97,12 +97,12 @@ var errorToString = map[int]string{ } type txPool interface { - // AddTransactions should add the given transactions to the pool. - AddTransactions([]*types.Transaction) + // AddBatch should add the given transactions to the pool. + AddBatch([]*types.Transaction) - // GetTransactions should return pending transactions. + // Pending should return pending transactions. // The slice should be modifiable by the caller. - GetTransactions() types.Transactions + Pending() map[common.Address]types.Transactions } // statusData is the network packet for the status message. diff --git a/eth/protocol_test.go b/eth/protocol_test.go index 4633344da..0aac19f43 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -130,7 +130,7 @@ func testSendTransactions(t *testing.T, protocol int) { for nonce := range alltxs { alltxs[nonce] = newTestTransaction(testAccount, uint64(nonce), txsize) } - pm.txpool.AddTransactions(alltxs) + pm.txpool.AddBatch(alltxs) // Connect several peers. They should all receive the pending transactions. var wg sync.WaitGroup diff --git a/eth/sync.go b/eth/sync.go index e0418d4d8..c78f98b40 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -45,7 +45,10 @@ type txsync struct { // syncTransactions starts sending all currently pending transactions to the given peer. func (pm *ProtocolManager) syncTransactions(p *peer) { - txs := pm.txpool.GetTransactions() + var txs types.Transactions + for _, batch := range pm.txpool.Pending() { + txs = append(txs, batch...) + } if len(txs) == 0 { return } -- cgit v1.2.3