diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/block_processor.go | 2 | ||||
-rw-r--r-- | core/chain_makers.go | 4 | ||||
-rw-r--r-- | core/chain_manager.go | 2 | ||||
-rw-r--r-- | core/error.go | 2 | ||||
-rw-r--r-- | core/transaction_pool.go | 142 | ||||
-rw-r--r-- | core/transaction_pool_test.go | 54 |
6 files changed, 175 insertions, 31 deletions
diff --git a/core/block_processor.go b/core/block_processor.go index f33f0d433..af47069ad 100644 --- a/core/block_processor.go +++ b/core/block_processor.go @@ -258,7 +258,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st state.Sync() // Remove transactions from the pool - sm.txpool.RemoveSet(block.Transactions()) + sm.txpool.RemoveTransactions(block.Transactions()) // This puts transactions in a extra db for rpc for i, tx := range block.Transactions() { diff --git a/core/chain_makers.go b/core/chain_makers.go index 250671ef8..9b4911fba 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -108,7 +108,9 @@ func makeChain(bman *BlockProcessor, parent *types.Block, max int, db common.Dat // Create a new chain manager starting from given block // Effectively a fork factory func newChainManager(block *types.Block, eventMux *event.TypeMux, db common.Database) *ChainManager { - bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: GenesisBlock(db), eventMux: eventMux} + genesis := GenesisBlock(db) + bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: eventMux} + bc.txState = state.ManageState(state.New(genesis.Root(), db)) bc.futureBlocks = NewBlockCache(1000) if block == nil { bc.Reset() diff --git a/core/chain_manager.go b/core/chain_manager.go index 1df56b27f..47f84b80a 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -576,7 +576,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { }) self.setTransState(state.New(block.Root(), self.stateDb)) - self.setTxState(state.New(block.Root(), self.stateDb)) + self.txState.SetState(state.New(block.Root(), self.stateDb)) queue[i] = ChainEvent{block, logs} queueEvent.canonicalCount++ diff --git a/core/error.go b/core/error.go index 0642948cd..40db99ecd 100644 --- a/core/error.go +++ b/core/error.go @@ -81,7 +81,7 @@ func (err *NonceErr) Error() string { } func NonceError(is, exp uint64) *NonceErr { - return &NonceErr{Message: fmt.Sprintf("Transaction w/ invalid nonce (%d / %d)", is, exp), Is: is, Exp: exp} + return &NonceErr{Message: fmt.Sprintf("Transaction w/ invalid nonce. tx=%d state=%d)", is, exp), Is: is, Exp: exp} } func IsNonceErr(err error) bool { diff --git a/core/transaction_pool.go b/core/transaction_pool.go index eaddcfa09..392e17856 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -4,7 +4,9 @@ import ( "errors" "fmt" "math/big" + "sort" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -17,7 +19,7 @@ import ( var ( ErrInvalidSender = errors.New("Invalid sender") - ErrImpossibleNonce = errors.New("Impossible nonce") + ErrNonce = errors.New("Nonce too low") ErrNonExistentAccount = errors.New("Account does not exist") ErrInsufficientFunds = errors.New("Insufficient funds") ErrIntrinsicGas = errors.New("Intrinsic gas too low") @@ -54,20 +56,43 @@ type TxPool struct { txs map[common.Hash]*types.Transaction invalidHashes *set.Set + queue map[common.Address]types.Transactions + subscribers []chan TxMsg eventMux *event.TypeMux } func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn) *TxPool { - return &TxPool{ + txPool := &TxPool{ txs: make(map[common.Hash]*types.Transaction), + queue: make(map[common.Address]types.Transactions), queueChan: make(chan *types.Transaction, txPoolQueueSize), quit: make(chan bool), eventMux: eventMux, invalidHashes: set.New(), currentState: currentStateFn, } + return txPool +} + +func (pool *TxPool) Start() { + // Queue timer will tick so we can attempt to move items from the queue to the + // main transaction pool. + queueTimer := time.NewTicker(300 * time.Millisecond) + // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce) + removalTimer := time.NewTicker(1 * time.Second) +done: + for { + select { + case <-queueTimer.C: + pool.checkQueue() + case <-removalTimer.C: + pool.validatePool() + case <-pool.quit: + break done + } + } } func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { @@ -100,16 +125,12 @@ func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { } if pool.currentState().GetNonce(from) > tx.Nonce() { - return ErrImpossibleNonce + return ErrNonce } return nil } -func (self *TxPool) addTx(tx *types.Transaction) { - self.txs[tx.Hash()] = tx -} - func (self *TxPool) add(tx *types.Transaction) error { hash := tx.Hash() @@ -127,7 +148,7 @@ func (self *TxPool) add(tx *types.Transaction) error { return err } - self.addTx(tx) + self.queueTx(tx) var toname string if to := tx.To(); to != nil { @@ -144,9 +165,6 @@ func (self *TxPool) add(tx *types.Transaction) error { glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash()) } - // Notify the subscribers - go self.eventMux.Post(TxPreEvent{tx}) - return nil } @@ -189,34 +207,108 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) { return } -func (self *TxPool) RemoveSet(txs types.Transactions) { - self.mu.Lock() - defer self.mu.Unlock() - for _, tx := range txs { - delete(self.txs, tx.Hash()) +func (self *TxPool) GetQueuedTransactions() types.Transactions { + self.mu.RLock() + defer self.mu.RUnlock() + + var txs types.Transactions + for _, ts := range self.queue { + txs = append(txs, ts...) } + + return txs } -func (self *TxPool) InvalidateSet(hashes *set.Set) { +func (self *TxPool) RemoveTransactions(txs types.Transactions) { self.mu.Lock() defer self.mu.Unlock() - hashes.Each(func(v interface{}) bool { - delete(self.txs, v.(common.Hash)) - return true - }) - self.invalidHashes.Merge(hashes) + for _, tx := range txs { + delete(self.txs, tx.Hash()) + } } func (pool *TxPool) Flush() { pool.txs = make(map[common.Hash]*types.Transaction) } -func (pool *TxPool) Start() { -} - func (pool *TxPool) Stop() { pool.Flush() + close(pool.quit) glog.V(logger.Info).Infoln("TX Pool stopped") } + +func (self *TxPool) queueTx(tx *types.Transaction) { + from, _ := tx.From() + self.queue[from] = append(self.queue[from], tx) +} + +func (pool *TxPool) addTx(tx *types.Transaction) { + if _, ok := pool.txs[tx.Hash()]; !ok { + pool.txs[tx.Hash()] = tx + // Notify the subscribers. This event is posted in a goroutine + // because it's possible that somewhere during the post "Remove transaction" + // gets called which will then wait for the global tx pool lock and deadlock. + go pool.eventMux.Post(TxPreEvent{tx}) + } +} + +// check queue will attempt to insert +func (pool *TxPool) checkQueue() { + pool.mu.Lock() + defer pool.mu.Unlock() + + statedb := pool.currentState() + for address, txs := range pool.queue { + sort.Sort(types.TxByNonce{txs}) + + var ( + nonce = statedb.GetNonce(address) + start int + ) + // Clean up the transactions first and determine the start of the nonces + for _, tx := range txs { + if tx.Nonce() >= nonce { + break + } + start++ + } + pool.queue[address] = txs[start:] + + // expected nonce + enonce := nonce + for _, tx := range pool.queue[address] { + // If the expected nonce does not match up with the next one + // (i.e. a nonce gap), we stop the loop + if enonce != tx.Nonce() { + break + } + enonce++ + + pool.addTx(tx) + } + //pool.queue[address] = txs[i:] + // delete the entire queue entry if it's empty. There's no need to keep it + if len(pool.queue[address]) == 0 { + delete(pool.queue, address) + } + } +} + +func (pool *TxPool) validatePool() { + pool.mu.Lock() + defer pool.mu.Unlock() + + statedb := pool.currentState() + for hash, tx := range pool.txs { + from, _ := tx.From() + if nonce := statedb.GetNonce(from); nonce > tx.Nonce() { + if glog.V(logger.Debug) { + glog.Infof("removed tx (%x) from pool due to nonce error. state=%d tx=%d\n", hash[:4], nonce, tx.Nonce()) + } + + delete(pool.txs, hash) + } + } +} diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go index b7486adb3..0e049139e 100644 --- a/core/transaction_pool_test.go +++ b/core/transaction_pool_test.go @@ -56,7 +56,57 @@ func TestInvalidTransactions(t *testing.T) { tx.SignECDSA(key) err = pool.Add(tx) - if err != ErrImpossibleNonce { - t.Error("expected", ErrImpossibleNonce) + if err != ErrNonce { + t.Error("expected", ErrNonce) + } +} + +func TestTransactionQueue(t *testing.T) { + pool, key := setupTxPool() + tx := transaction() + tx.SignECDSA(key) + from, _ := tx.From() + pool.currentState().AddBalance(from, big.NewInt(1)) + pool.queueTx(tx) + + pool.checkQueue() + if len(pool.txs) != 1 { + t.Error("expected valid txs to be 1 is", len(pool.txs)) + } + + tx = transaction() + tx.SignECDSA(key) + from, _ = tx.From() + pool.currentState().SetNonce(from, 10) + tx.SetNonce(1) + pool.queueTx(tx) + pool.checkQueue() + if _, ok := pool.txs[tx.Hash()]; ok { + t.Error("expected transaction to be in tx pool") + } + + if len(pool.queue[from]) != 0 { + t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) + } + + pool, key = setupTxPool() + tx1, tx2, tx3 := transaction(), transaction(), transaction() + tx2.SetNonce(10) + tx3.SetNonce(11) + tx1.SignECDSA(key) + tx2.SignECDSA(key) + tx3.SignECDSA(key) + pool.queueTx(tx1) + pool.queueTx(tx2) + pool.queueTx(tx3) + from, _ = tx1.From() + pool.checkQueue() + + if len(pool.txs) != 1 { + t.Error("expected tx pool to be 1 =") + } + + if len(pool.queue[from]) != 3 { + t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) } } |