diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-04-23 17:59:56 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-04-23 17:59:56 +0800 |
commit | 2f8809df404b83824ffadd61ff766cbf9e4c5419 (patch) | |
tree | d3c168060e3efffb945079be1d221d14b4ec79d1 | |
parent | 2fe54ab233c0cd1bf09b49085477c961dcc1980f (diff) | |
parent | 7f14fbd57936cf74627572da4a06585d35161ea9 (diff) | |
download | go-tangerine-2f8809df404b83824ffadd61ff766cbf9e4c5419.tar go-tangerine-2f8809df404b83824ffadd61ff766cbf9e4c5419.tar.gz go-tangerine-2f8809df404b83824ffadd61ff766cbf9e4c5419.tar.bz2 go-tangerine-2f8809df404b83824ffadd61ff766cbf9e4c5419.tar.lz go-tangerine-2f8809df404b83824ffadd61ff766cbf9e4c5419.tar.xz go-tangerine-2f8809df404b83824ffadd61ff766cbf9e4c5419.tar.zst go-tangerine-2f8809df404b83824ffadd61ff766cbf9e4c5419.zip |
Merge pull request #769 from obscuren/develop
core: transaction queue
-rw-r--r-- | common/natspec/natspec_e2e_test.go | 2 | ||||
-rw-r--r-- | core/block_processor.go | 2 | ||||
-rw-r--r-- | core/chain_makers.go | 4 | ||||
-rw-r--r-- | core/chain_manager.go | 2 | ||||
-rw-r--r-- | core/error.go | 2 | ||||
-rw-r--r-- | core/transaction_pool.go | 142 | ||||
-rw-r--r-- | core/transaction_pool_test.go | 54 | ||||
-rw-r--r-- | eth/backend.go | 36 | ||||
-rw-r--r-- | eth/handler.go | 66 | ||||
-rw-r--r-- | eth/peer.go | 6 | ||||
-rw-r--r-- | miner/worker.go | 7 | ||||
-rw-r--r-- | xeth/xeth.go | 4 |
12 files changed, 261 insertions, 66 deletions
diff --git a/common/natspec/natspec_e2e_test.go b/common/natspec/natspec_e2e_test.go index 147abe162..6bdaec8a1 100644 --- a/common/natspec/natspec_e2e_test.go +++ b/common/natspec/natspec_e2e_test.go @@ -220,7 +220,7 @@ func (self *testFrontend) applyTxs() { block := self.ethereum.ChainManager().NewBlock(cb) coinbase := self.stateDb.GetStateObject(cb) coinbase.SetGasPool(big.NewInt(10000000)) - txs := self.ethereum.TxPool().GetTransactions() + txs := self.ethereum.TxPool().GetQueuedTransactions() for i := 0; i < len(txs); i++ { for _, tx := range txs { diff --git a/core/block_processor.go b/core/block_processor.go index f33f0d433..af47069ad 100644 --- a/core/block_processor.go +++ b/core/block_processor.go @@ -258,7 +258,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st state.Sync() // Remove transactions from the pool - sm.txpool.RemoveSet(block.Transactions()) + sm.txpool.RemoveTransactions(block.Transactions()) // This puts transactions in a extra db for rpc for i, tx := range block.Transactions() { diff --git a/core/chain_makers.go b/core/chain_makers.go index 250671ef8..9b4911fba 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -108,7 +108,9 @@ func makeChain(bman *BlockProcessor, parent *types.Block, max int, db common.Dat // Create a new chain manager starting from given block // Effectively a fork factory func newChainManager(block *types.Block, eventMux *event.TypeMux, db common.Database) *ChainManager { - bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: GenesisBlock(db), eventMux: eventMux} + genesis := GenesisBlock(db) + bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: eventMux} + bc.txState = state.ManageState(state.New(genesis.Root(), db)) bc.futureBlocks = NewBlockCache(1000) if block == nil { bc.Reset() diff --git a/core/chain_manager.go b/core/chain_manager.go index 1df56b27f..47f84b80a 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -576,7 +576,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { }) self.setTransState(state.New(block.Root(), self.stateDb)) - self.setTxState(state.New(block.Root(), self.stateDb)) + self.txState.SetState(state.New(block.Root(), self.stateDb)) queue[i] = ChainEvent{block, logs} queueEvent.canonicalCount++ diff --git a/core/error.go b/core/error.go index 0642948cd..40db99ecd 100644 --- a/core/error.go +++ b/core/error.go @@ -81,7 +81,7 @@ func (err *NonceErr) Error() string { } func NonceError(is, exp uint64) *NonceErr { - return &NonceErr{Message: fmt.Sprintf("Transaction w/ invalid nonce (%d / %d)", is, exp), Is: is, Exp: exp} + return &NonceErr{Message: fmt.Sprintf("Transaction w/ invalid nonce. tx=%d state=%d)", is, exp), Is: is, Exp: exp} } func IsNonceErr(err error) bool { diff --git a/core/transaction_pool.go b/core/transaction_pool.go index eaddcfa09..392e17856 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -4,7 +4,9 @@ import ( "errors" "fmt" "math/big" + "sort" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -17,7 +19,7 @@ import ( var ( ErrInvalidSender = errors.New("Invalid sender") - ErrImpossibleNonce = errors.New("Impossible nonce") + ErrNonce = errors.New("Nonce too low") ErrNonExistentAccount = errors.New("Account does not exist") ErrInsufficientFunds = errors.New("Insufficient funds") ErrIntrinsicGas = errors.New("Intrinsic gas too low") @@ -54,20 +56,43 @@ type TxPool struct { txs map[common.Hash]*types.Transaction invalidHashes *set.Set + queue map[common.Address]types.Transactions + subscribers []chan TxMsg eventMux *event.TypeMux } func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn) *TxPool { - return &TxPool{ + txPool := &TxPool{ txs: make(map[common.Hash]*types.Transaction), + queue: make(map[common.Address]types.Transactions), queueChan: make(chan *types.Transaction, txPoolQueueSize), quit: make(chan bool), eventMux: eventMux, invalidHashes: set.New(), currentState: currentStateFn, } + return txPool +} + +func (pool *TxPool) Start() { + // Queue timer will tick so we can attempt to move items from the queue to the + // main transaction pool. + queueTimer := time.NewTicker(300 * time.Millisecond) + // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce) + removalTimer := time.NewTicker(1 * time.Second) +done: + for { + select { + case <-queueTimer.C: + pool.checkQueue() + case <-removalTimer.C: + pool.validatePool() + case <-pool.quit: + break done + } + } } func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { @@ -100,16 +125,12 @@ func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { } if pool.currentState().GetNonce(from) > tx.Nonce() { - return ErrImpossibleNonce + return ErrNonce } return nil } -func (self *TxPool) addTx(tx *types.Transaction) { - self.txs[tx.Hash()] = tx -} - func (self *TxPool) add(tx *types.Transaction) error { hash := tx.Hash() @@ -127,7 +148,7 @@ func (self *TxPool) add(tx *types.Transaction) error { return err } - self.addTx(tx) + self.queueTx(tx) var toname string if to := tx.To(); to != nil { @@ -144,9 +165,6 @@ func (self *TxPool) add(tx *types.Transaction) error { glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash()) } - // Notify the subscribers - go self.eventMux.Post(TxPreEvent{tx}) - return nil } @@ -189,34 +207,108 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) { return } -func (self *TxPool) RemoveSet(txs types.Transactions) { - self.mu.Lock() - defer self.mu.Unlock() - for _, tx := range txs { - delete(self.txs, tx.Hash()) +func (self *TxPool) GetQueuedTransactions() types.Transactions { + self.mu.RLock() + defer self.mu.RUnlock() + + var txs types.Transactions + for _, ts := range self.queue { + txs = append(txs, ts...) } + + return txs } -func (self *TxPool) InvalidateSet(hashes *set.Set) { +func (self *TxPool) RemoveTransactions(txs types.Transactions) { self.mu.Lock() defer self.mu.Unlock() - hashes.Each(func(v interface{}) bool { - delete(self.txs, v.(common.Hash)) - return true - }) - self.invalidHashes.Merge(hashes) + for _, tx := range txs { + delete(self.txs, tx.Hash()) + } } func (pool *TxPool) Flush() { pool.txs = make(map[common.Hash]*types.Transaction) } -func (pool *TxPool) Start() { -} - func (pool *TxPool) Stop() { pool.Flush() + close(pool.quit) glog.V(logger.Info).Infoln("TX Pool stopped") } + +func (self *TxPool) queueTx(tx *types.Transaction) { + from, _ := tx.From() + self.queue[from] = append(self.queue[from], tx) +} + +func (pool *TxPool) addTx(tx *types.Transaction) { + if _, ok := pool.txs[tx.Hash()]; !ok { + pool.txs[tx.Hash()] = tx + // Notify the subscribers. This event is posted in a goroutine + // because it's possible that somewhere during the post "Remove transaction" + // gets called which will then wait for the global tx pool lock and deadlock. + go pool.eventMux.Post(TxPreEvent{tx}) + } +} + +// check queue will attempt to insert +func (pool *TxPool) checkQueue() { + pool.mu.Lock() + defer pool.mu.Unlock() + + statedb := pool.currentState() + for address, txs := range pool.queue { + sort.Sort(types.TxByNonce{txs}) + + var ( + nonce = statedb.GetNonce(address) + start int + ) + // Clean up the transactions first and determine the start of the nonces + for _, tx := range txs { + if tx.Nonce() >= nonce { + break + } + start++ + } + pool.queue[address] = txs[start:] + + // expected nonce + enonce := nonce + for _, tx := range pool.queue[address] { + // If the expected nonce does not match up with the next one + // (i.e. a nonce gap), we stop the loop + if enonce != tx.Nonce() { + break + } + enonce++ + + pool.addTx(tx) + } + //pool.queue[address] = txs[i:] + // delete the entire queue entry if it's empty. There's no need to keep it + if len(pool.queue[address]) == 0 { + delete(pool.queue, address) + } + } +} + +func (pool *TxPool) validatePool() { + pool.mu.Lock() + defer pool.mu.Unlock() + + statedb := pool.currentState() + for hash, tx := range pool.txs { + from, _ := tx.From() + if nonce := statedb.GetNonce(from); nonce > tx.Nonce() { + if glog.V(logger.Debug) { + glog.Infof("removed tx (%x) from pool due to nonce error. state=%d tx=%d\n", hash[:4], nonce, tx.Nonce()) + } + + delete(pool.txs, hash) + } + } +} diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go index b7486adb3..0e049139e 100644 --- a/core/transaction_pool_test.go +++ b/core/transaction_pool_test.go @@ -56,7 +56,57 @@ func TestInvalidTransactions(t *testing.T) { tx.SignECDSA(key) err = pool.Add(tx) - if err != ErrImpossibleNonce { - t.Error("expected", ErrImpossibleNonce) + if err != ErrNonce { + t.Error("expected", ErrNonce) + } +} + +func TestTransactionQueue(t *testing.T) { + pool, key := setupTxPool() + tx := transaction() + tx.SignECDSA(key) + from, _ := tx.From() + pool.currentState().AddBalance(from, big.NewInt(1)) + pool.queueTx(tx) + + pool.checkQueue() + if len(pool.txs) != 1 { + t.Error("expected valid txs to be 1 is", len(pool.txs)) + } + + tx = transaction() + tx.SignECDSA(key) + from, _ = tx.From() + pool.currentState().SetNonce(from, 10) + tx.SetNonce(1) + pool.queueTx(tx) + pool.checkQueue() + if _, ok := pool.txs[tx.Hash()]; ok { + t.Error("expected transaction to be in tx pool") + } + + if len(pool.queue[from]) != 0 { + t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) + } + + pool, key = setupTxPool() + tx1, tx2, tx3 := transaction(), transaction(), transaction() + tx2.SetNonce(10) + tx3.SetNonce(11) + tx1.SignECDSA(key) + tx2.SignECDSA(key) + tx3.SignECDSA(key) + pool.queueTx(tx1) + pool.queueTx(tx2) + pool.queueTx(tx3) + from, _ = tx1.From() + pool.checkQueue() + + if len(pool.txs) != 1 { + t.Error("expected tx pool to be 1 =") + } + + if len(pool.queue[from]) != 3 { + t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) } } diff --git a/eth/backend.go b/eth/backend.go index 88456e448..646a4eaf2 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -3,7 +3,6 @@ package eth import ( "crypto/ecdsa" "fmt" - "math" "path" "strings" @@ -136,11 +135,10 @@ type Ethereum struct { protocolManager *ProtocolManager downloader *downloader.Downloader - net *p2p.Server - eventMux *event.TypeMux - txSub event.Subscription - minedBlockSub event.Subscription - miner *miner.Miner + net *p2p.Server + eventMux *event.TypeMux + txSub event.Subscription + miner *miner.Miner // logger logger.LogSystem @@ -222,7 +220,7 @@ func New(config *Config) (*Ethereum, error) { eth.whisper = whisper.New() eth.shhVersionId = int(eth.whisper.Version()) eth.miner = miner.New(eth, eth.pow, config.MinerThreads) - eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.txPool, eth.chainManager, eth.downloader) + eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader) netprv, err := config.nodeKey() if err != nil { @@ -379,7 +377,8 @@ func (s *Ethereum) Start() error { } // Start services - s.txPool.Start() + go s.txPool.Start() + s.protocolManager.Start() if s.whisper != nil { s.whisper.Start() @@ -389,10 +388,6 @@ func (s *Ethereum) Start() error { s.txSub = s.eventMux.Subscribe(core.TxPreEvent{}) go s.txBroadcastLoop() - // broadcast mined blocks - s.minedBlockSub = s.eventMux.Subscribe(core.NewMinedBlockEvent{}) - go s.minedBroadcastLoop() - glog.V(logger.Info).Infoln("Server started") return nil } @@ -422,9 +417,9 @@ func (s *Ethereum) Stop() { defer s.stateDb.Close() defer s.extraDb.Close() - s.txSub.Unsubscribe() // quits txBroadcastLoop - s.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + s.txSub.Unsubscribe() // quits txBroadcastLoop + s.protocolManager.Stop() s.txPool.Stop() s.eventMux.Stop() if s.whisper != nil { @@ -440,13 +435,10 @@ func (s *Ethereum) WaitForShutdown() { <-s.shutdownChan } -// now tx broadcasting is taken out of txPool -// handled here via subscription, efficiency? func (self *Ethereum) txBroadcastLoop() { // automatically stops if unsubscribe for obj := range self.txSub.Chan() { event := obj.(core.TxPreEvent) - self.net.BroadcastLimited("eth", TxMsg, math.Sqrt, []*types.Transaction{event.Tx}) self.syncAccounts(event.Tx) } } @@ -465,16 +457,6 @@ func (self *Ethereum) syncAccounts(tx *types.Transaction) { } } -func (self *Ethereum) minedBroadcastLoop() { - // automatically stops if unsubscribe - for obj := range self.minedBlockSub.Chan() { - switch ev := obj.(type) { - case core.NewMinedBlockEvent: - self.protocolManager.BroadcastBlock(ev.Block.Hash(), ev.Block) - } - } -} - func saveProtocolVersion(db common.Database, protov int) { d, _ := db.Get([]byte("ProtocolVersion")) protocolVersion := common.NewValue(d).Uint() diff --git a/eth/handler.go b/eth/handler.go index 622f22132..d466dbfee 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -44,6 +44,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" @@ -77,12 +78,17 @@ type ProtocolManager struct { peers map[string]*peer SubProtocol p2p.Protocol + + eventMux *event.TypeMux + txSub event.Subscription + minedBlockSub event.Subscription } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager { +func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager { manager := &ProtocolManager{ + eventMux: mux, txpool: txpool, chainman: chainman, downloader: downloader, @@ -105,6 +111,21 @@ func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman return manager } +func (pm *ProtocolManager) Start() { + // broadcast transactions + pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) + go pm.txBroadcastLoop() + + // broadcast mined blocks + pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) + go pm.minedBroadcastLoop() +} + +func (pm *ProtocolManager) Stop() { + pm.txSub.Unsubscribe() // quits txBroadcastLoop + pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop +} + func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { td, current, genesis := pm.chainman.Status() @@ -326,10 +347,51 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) } } // Broadcast block to peer set - // XXX due to the current shit state of the network disable the limit peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { peer.sendNewBlock(block) } glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers") } + +// BroadcastTx will propagate the block to its connected peers. It will sort +// out which peers do not contain the block in their block set and will do a +// sqrt(peers) to determine the amount of peers we broadcast to. +func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) { + pm.pmu.Lock() + defer pm.pmu.Unlock() + + // Find peers who don't know anything about the given hash. Peers that + // don't know about the hash will be a candidate for the broadcast loop + var peers []*peer + for _, peer := range pm.peers { + if !peer.txHashes.Has(hash) { + peers = append(peers, peer) + } + } + // Broadcast block to peer set + peers = peers[:int(math.Sqrt(float64(len(peers))))] + for _, peer := range peers { + peer.sendTransaction(tx) + } + glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers") +} + +// Mined broadcast loop +func (self *ProtocolManager) minedBroadcastLoop() { + // automatically stops if unsubscribe + for obj := range self.minedBlockSub.Chan() { + switch ev := obj.(type) { + case core.NewMinedBlockEvent: + self.BroadcastBlock(ev.Block.Hash(), ev.Block) + } + } +} + +func (self *ProtocolManager) txBroadcastLoop() { + // automatically stops if unsubscribe + for obj := range self.txSub.Chan() { + event := obj.(core.TxPreEvent) + self.BroadcastTx(event.Tx.Hash(), event.Tx) + } +} diff --git a/eth/peer.go b/eth/peer.go index 972880845..ec0c4b1f3 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -86,6 +86,12 @@ func (p *peer) sendNewBlock(block *types.Block) error { return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, block.Td}) } +func (p *peer) sendTransaction(tx *types.Transaction) error { + p.txHashes.Add(tx.Hash()) + + return p2p.Send(p.rw, TxMsg, []*types.Transaction{tx}) +} + func (p *peer) requestHashes(from common.Hash) error { glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, maxHashes, from[:4]) return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, maxHashes}) diff --git a/miner/worker.go b/miner/worker.go index dc1f04d87..19ede3c93 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -258,7 +258,7 @@ func (self *worker) commitNewWork() { tcount = 0 ignoredTransactors = set.New() ) - //gasLimit: + for _, tx := range transactions { // We can skip err. It has already been validated in the tx pool from, _ := tx.From() @@ -290,13 +290,12 @@ func (self *worker) commitNewWork() { // ignore the transactor so no nonce errors will be thrown for this account // next time the worker is run, they'll be picked up again. ignoredTransactors.Add(from) - //glog.V(logger.Debug).Infof("Gas limit reached for block. %d TXs included in this block\n", i) - //break gasLimit + + glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4]) default: tcount++ } } - //self.eth.TxPool().InvalidateSet(remove) var ( uncles []*types.Header diff --git a/xeth/xeth.go b/xeth/xeth.go index afcb33e4c..251b070e4 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -682,9 +682,11 @@ func (self *XEth) Transact(fromStr, toStr, valueStr, gasStr, gasPriceStr, codeSt if contractCreation { addr := core.AddressFromMessage(tx) - glog.V(logger.Info).Infof("Contract addr %x\n", addr) + glog.V(logger.Info).Infof("Tx(%x) created: %x\n", tx.Hash(), addr) return core.AddressFromMessage(tx).Hex(), nil + } else { + glog.V(logger.Info).Infof("Tx(%x) to: %x\n", tx.Hash(), tx.To()) } return tx.Hash().Hex(), nil } |