diff options
-rw-r--r-- | core/chain_manager.go | 21 | ||||
-rw-r--r-- | core/transaction_pool.go | 88 | ||||
-rw-r--r-- | core/transaction_pool_test.go | 20 | ||||
-rw-r--r-- | eth/backend.go | 29 | ||||
-rw-r--r-- | miner/worker.go | 4 | ||||
-rw-r--r-- | xeth/xeth.go | 8 |
6 files changed, 66 insertions, 104 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go index d58c0d504..d14a19fea 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -214,19 +214,6 @@ func (self *ChainManager) TransState() *state.StateDB { return self.transState } -func (self *ChainManager) TxState() *state.ManagedState { - self.tsmu.RLock() - defer self.tsmu.RUnlock() - - return self.txState -} - -func (self *ChainManager) setTxState(statedb *state.StateDB) { - self.tsmu.Lock() - defer self.tsmu.Unlock() - self.txState = state.ManageState(statedb) -} - func (self *ChainManager) setTransState(statedb *state.StateDB) { self.transState = statedb } @@ -751,7 +738,7 @@ out: case ev := <-events.Chan(): switch ev := ev.(type) { case queueEvent: - for i, event := range ev.queue { + for _, event := range ev.queue { switch event := event.(type) { case ChainEvent: // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long @@ -760,12 +747,6 @@ out: self.currentGasLimit = CalcGasLimit(event.Block) self.eventMux.Post(ChainHeadEvent{event.Block}) } - case ChainSplitEvent: - // On chain splits we need to reset the transaction state. We can't be sure whether the actual - // state of the accounts are still valid. - if i == ev.splitCount { - self.setTxState(state.New(event.Block.Root(), self.stateDb)) - } } self.eventMux.Post(event) diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 7d58ffbd9..5e6f2c6a4 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -6,7 +6,6 @@ import ( "math/big" "sort" "sync" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -38,10 +37,12 @@ type stateFn func() *state.StateDB // current state) and future transactions. Transactions move between those // two states over time as they are received and processed. type TxPool struct { - quit chan bool // Quiting channel - currentState stateFn // The state function which will allow us to do some pre checkes + quit chan bool // Quiting channel + currentState stateFn // The state function which will allow us to do some pre checkes + state *state.ManagedState gasLimit func() *big.Int // The current gas limit function callback eventMux *event.TypeMux + events event.Subscription mu sync.RWMutex txs map[common.Hash]*types.Transaction // processable transactions @@ -56,28 +57,41 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func( eventMux: eventMux, currentState: currentStateFn, gasLimit: gasLimitFn, + state: state.ManageState(currentStateFn()), } } func (pool *TxPool) Start() { - // Queue timer will tick so we can attempt to move items from the queue to the - // main transaction pool. - queueTimer := time.NewTicker(300 * time.Millisecond) - // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce) - removalTimer := time.NewTicker(1 * time.Second) -done: - for { - select { - case <-queueTimer.C: - pool.checkQueue() - case <-removalTimer.C: - pool.validatePool() - case <-pool.quit: - break done + pool.events = pool.eventMux.Subscribe(ChainEvent{}) + for _ = range pool.events.Chan() { + pool.mu.Lock() + pool.state = state.ManageState(pool.currentState()) + + for _, tx := range pool.txs { + if addr, err := tx.From(); err == nil { + pool.state.SetNonce(addr, tx.Nonce()) + } } + + pool.checkQueue() + pool.mu.Unlock() } } +func (pool *TxPool) Stop() { + pool.txs = make(map[common.Hash]*types.Transaction) + close(pool.quit) + pool.events.Unsubscribe() + glog.V(logger.Info).Infoln("TX Pool stopped") +} + +func (pool *TxPool) State() *state.ManagedState { + pool.mu.RLock() + defer pool.mu.RUnlock() + + return pool.state +} + // validateTx checks whether a transaction is valid according // to the consensus rules. func (pool *TxPool) validateTx(tx *types.Transaction) error { @@ -152,6 +166,9 @@ func (self *TxPool) add(tx *types.Transaction) error { glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash) } + // check and validate the queueue + self.checkQueue() + return nil } @@ -196,8 +213,13 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction { // GetTransactions returns all currently processable transactions. func (self *TxPool) GetTransactions() (txs types.Transactions) { - self.mu.RLock() - defer self.mu.RUnlock() + self.mu.Lock() + defer self.mu.Unlock() + + // check queue first + self.checkQueue() + // invalidate any txs + self.validatePool() txs = make(types.Transactions, len(self.txs)) i := 0 @@ -232,12 +254,6 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) { } } -func (pool *TxPool) Stop() { - pool.txs = make(map[common.Hash]*types.Transaction) - close(pool.quit) - glog.V(logger.Info).Infoln("TX Pool stopped") -} - func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { from, _ := tx.From() // already validated if self.queue[from] == nil { @@ -246,9 +262,11 @@ func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { self.queue[from][hash] = tx } -func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) { +func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) { if _, ok := pool.txs[hash]; !ok { pool.txs[hash] = tx + + pool.state.SetNonce(addr, tx.AccountNonce) // Notify the subscribers. This event is posted in a goroutine // because it's possible that somewhere during the post "Remove transaction" // gets called which will then wait for the global tx pool lock and deadlock. @@ -258,34 +276,32 @@ func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) { // checkQueue moves transactions that have become processable to main pool. func (pool *TxPool) checkQueue() { - pool.mu.Lock() - defer pool.mu.Unlock() + state := pool.state - statedb := pool.currentState() var addq txQueue for address, txs := range pool.queue { - curnonce := statedb.GetNonce(address) + curnonce := state.GetNonce(address) addq := addq[:0] for hash, tx := range txs { if tx.AccountNonce < curnonce { + fmt.Println("delete the tx", tx.AccountNonce, curnonce) // Drop queued transactions whose nonce is lower than // the account nonce because they have been processed. delete(txs, hash) } else { // Collect the remaining transactions for the next pass. - addq = append(addq, txQueueEntry{hash, tx}) + addq = append(addq, txQueueEntry{hash, address, tx}) } } // Find the next consecutive nonce range starting at the // current account nonce. sort.Sort(addq) for _, e := range addq { - if e.AccountNonce != curnonce { + if e.AccountNonce > curnonce+1 { break } - curnonce++ delete(txs, e.hash) - pool.addTx(e.hash, e.Transaction) + pool.addTx(e.hash, address, e.Transaction) } // Delete the entire queue entry if it became empty. if len(txs) == 0 { @@ -313,9 +329,6 @@ func (pool *TxPool) removeTx(hash common.Hash) { // validatePool removes invalid and processed transactions from the main pool. func (pool *TxPool) validatePool() { - pool.mu.Lock() - defer pool.mu.Unlock() - for hash, tx := range pool.txs { if err := pool.validateTx(tx); err != nil { if glog.V(logger.Info) { @@ -330,6 +343,7 @@ type txQueue []txQueueEntry type txQueueEntry struct { hash common.Hash + addr common.Address *types.Transaction } diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go index 600fd9b4f..170bdfa72 100644 --- a/core/transaction_pool_test.go +++ b/core/transaction_pool_test.go @@ -37,21 +37,21 @@ func TestInvalidTransactions(t *testing.T) { } from, _ := tx.From() - pool.currentState().AddBalance(from, big.NewInt(1)) + pool.state.AddBalance(from, big.NewInt(1)) err = pool.Add(tx) if err != ErrInsufficientFunds { t.Error("expected", ErrInsufficientFunds) } balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(tx.Gas(), tx.GasPrice())) - pool.currentState().AddBalance(from, balance) + pool.state.AddBalance(from, balance) err = pool.Add(tx) if err != ErrIntrinsicGas { t.Error("expected", ErrIntrinsicGas, "got", err) } - pool.currentState().SetNonce(from, 1) - pool.currentState().AddBalance(from, big.NewInt(0xffffffffffffff)) + pool.state.SetNonce(from, 1) + pool.state.AddBalance(from, big.NewInt(0xffffffffffffff)) tx.GasLimit = big.NewInt(100000) tx.Price = big.NewInt(1) tx.SignECDSA(key) @@ -67,7 +67,7 @@ func TestTransactionQueue(t *testing.T) { tx := transaction() tx.SignECDSA(key) from, _ := tx.From() - pool.currentState().AddBalance(from, big.NewInt(1)) + pool.state.AddBalance(from, big.NewInt(1)) pool.queueTx(tx.Hash(), tx) pool.checkQueue() @@ -76,17 +76,17 @@ func TestTransactionQueue(t *testing.T) { } tx = transaction() + tx.SetNonce(1) tx.SignECDSA(key) from, _ = tx.From() - pool.currentState().SetNonce(from, 10) - tx.SetNonce(1) + pool.state.SetNonce(from, 2) pool.queueTx(tx.Hash(), tx) pool.checkQueue() if _, ok := pool.txs[tx.Hash()]; ok { t.Error("expected transaction to be in tx pool") } - if len(pool.queue[from]) != 0 { + if len(pool.queue[from]) > 0 { t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) } @@ -117,7 +117,7 @@ func TestRemoveTx(t *testing.T) { tx := transaction() tx.SignECDSA(key) from, _ := tx.From() - pool.currentState().AddBalance(from, big.NewInt(1)) + pool.state.AddBalance(from, big.NewInt(1)) pool.queueTx(tx.Hash(), tx) pool.addTx(tx.Hash(), tx) if len(pool.queue) != 1 { @@ -146,7 +146,7 @@ func TestNegativeValue(t *testing.T) { tx.Value().Set(big.NewInt(-1)) tx.SignECDSA(key) from, _ := tx.From() - pool.currentState().AddBalance(from, big.NewInt(1)) + pool.state.AddBalance(from, big.NewInt(1)) err := pool.Add(tx) if err != ErrNegativeValue { t.Error("expected", ErrNegativeValue, "got", err) diff --git a/eth/backend.go b/eth/backend.go index 724763a52..3956dfcaa 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -198,7 +198,6 @@ type Ethereum struct { net *p2p.Server eventMux *event.TypeMux - txSub event.Subscription miner *miner.Miner // logger logger.LogSystem @@ -470,10 +469,6 @@ func (s *Ethereum) Start() error { s.whisper.Start() } - // broadcast transactions - s.txSub = s.eventMux.Subscribe(core.TxPreEvent{}) - go s.txBroadcastLoop() - glog.V(logger.Info).Infoln("Server started") return nil } @@ -531,8 +526,6 @@ func (self *Ethereum) AddPeer(nodeURL string) error { } func (s *Ethereum) Stop() { - s.txSub.Unsubscribe() // quits txBroadcastLoop - s.net.Stop() s.protocolManager.Stop() s.chainManager.Stop() @@ -552,28 +545,6 @@ func (s *Ethereum) WaitForShutdown() { <-s.shutdownChan } -func (self *Ethereum) txBroadcastLoop() { - // automatically stops if unsubscribe - for obj := range self.txSub.Chan() { - event := obj.(core.TxPreEvent) - self.syncAccounts(event.Tx) - } -} - -// keep accounts synced up -func (self *Ethereum) syncAccounts(tx *types.Transaction) { - from, err := tx.From() - if err != nil { - return - } - - if self.accountManager.HasAccount(from) { - if self.chainManager.TxState().GetNonce(from) < tx.Nonce() { - self.chainManager.TxState().SetNonce(from, tx.Nonce()) - } - } -} - // StartAutoDAG() spawns a go routine that checks the DAG every autoDAGcheckInterval // by default that is 10 times per epoch // in epoch n, if we past autoDAGepochHeight within-epoch blocks, diff --git a/miner/worker.go b/miner/worker.go index 58efd61db..1580d4d42 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -494,10 +494,6 @@ func (self *worker) commitTransactions(transactions types.Transactions) { err := self.commitTransaction(tx) switch { case core.IsNonceErr(err) || core.IsInvalidTxErr(err): - // Remove invalid transactions - from, _ := tx.From() - - self.chain.TxState().RemoveNonce(from, tx.Nonce()) current.remove.Add(tx.Hash()) if glog.V(logger.Detail) { diff --git a/xeth/xeth.go b/xeth/xeth.go index 157fe76c7..187892a49 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -936,22 +936,22 @@ func (self *XEth) Transact(fromStr, toStr, nonceStr, valueStr, gasStr, gasPriceS tx = types.NewTransactionMessage(to, value, gas, price, data) } - state := self.backend.ChainManager().TxState() + state := self.backend.TxPool().State() var nonce uint64 if len(nonceStr) != 0 { nonce = common.Big(nonceStr).Uint64() } else { - nonce = state.NewNonce(from) + nonce = state.GetNonce(from) + 1 //state.NewNonce(from) } tx.SetNonce(nonce) if err := self.sign(tx, from, false); err != nil { - state.RemoveNonce(from, tx.Nonce()) + //state.RemoveNonce(from, tx.Nonce()) return "", err } if err := self.backend.TxPool().Add(tx); err != nil { - state.RemoveNonce(from, tx.Nonce()) + //state.RemoveNonce(from, tx.Nonce()) return "", err } |