diff options
Diffstat (limited to 'ethchain/transaction_pool.go')
-rw-r--r-- | ethchain/transaction_pool.go | 245 |
1 files changed, 0 insertions, 245 deletions
diff --git a/ethchain/transaction_pool.go b/ethchain/transaction_pool.go deleted file mode 100644 index 7bd3e9ffd..000000000 --- a/ethchain/transaction_pool.go +++ /dev/null @@ -1,245 +0,0 @@ -package ethchain - -import ( - "bytes" - "container/list" - "fmt" - "math/big" - "sync" - - "github.com/ethereum/go-ethereum/ethlog" - "github.com/ethereum/go-ethereum/ethstate" - "github.com/ethereum/go-ethereum/ethwire" -) - -var txplogger = ethlog.NewLogger("TXP") - -const txPoolQueueSize = 50 - -type TxPoolHook chan *Transaction -type TxMsgTy byte - -const ( - minGasPrice = 1000000 -) - -var MinGasPrice = big.NewInt(10000000000000) - -type TxMsg struct { - Tx *Transaction - Type TxMsgTy -} - -func EachTx(pool *list.List, it func(*Transaction, *list.Element) bool) { - for e := pool.Front(); e != nil; e = e.Next() { - if it(e.Value.(*Transaction), e) { - break - } - } -} - -func FindTx(pool *list.List, finder func(*Transaction, *list.Element) bool) *Transaction { - for e := pool.Front(); e != nil; e = e.Next() { - if tx, ok := e.Value.(*Transaction); ok { - if finder(tx, e) { - return tx - } - } - } - - return nil -} - -type TxProcessor interface { - ProcessTransaction(tx *Transaction) -} - -// The tx pool a thread safe transaction pool handler. In order to -// guarantee a non blocking pool we use a queue channel which can be -// independently read without needing access to the actual pool. If the -// pool is being drained or synced for whatever reason the transactions -// will simple queue up and handled when the mutex is freed. -type TxPool struct { - Ethereum EthManager - // The mutex for accessing the Tx pool. - mutex sync.Mutex - // Queueing channel for reading and writing incoming - // transactions to - queueChan chan *Transaction - // Quiting channel - quit chan bool - // The actual pool - pool *list.List - - SecondaryProcessor TxProcessor - - subscribers []chan TxMsg -} - -func NewTxPool(ethereum EthManager) *TxPool { - return &TxPool{ - pool: list.New(), - queueChan: make(chan *Transaction, txPoolQueueSize), - quit: make(chan bool), - Ethereum: ethereum, - } -} - -// Blocking function. Don't use directly. Use QueueTransaction instead -func (pool *TxPool) addTransaction(tx *Transaction) { - pool.mutex.Lock() - defer pool.mutex.Unlock() - - pool.pool.PushBack(tx) - - // Broadcast the transaction to the rest of the peers - pool.Ethereum.Broadcast(ethwire.MsgTxTy, []interface{}{tx.RlpData()}) -} - -func (pool *TxPool) ValidateTransaction(tx *Transaction) error { - // Get the last block so we can retrieve the sender and receiver from - // the merkle trie - block := pool.Ethereum.ChainManager().CurrentBlock - // Something has gone horribly wrong if this happens - if block == nil { - return fmt.Errorf("[TXPL] No last block on the block chain") - } - - if len(tx.Recipient) != 0 && len(tx.Recipient) != 20 { - return fmt.Errorf("[TXPL] Invalid recipient. len = %d", len(tx.Recipient)) - } - - if tx.GasPrice.Cmp(MinGasPrice) < 0 { - return fmt.Errorf("Gas price to low. Require %v > Got %v", MinGasPrice, tx.GasPrice) - } - - // Get the sender - //sender := pool.Ethereum.StateManager().procState.GetAccount(tx.Sender()) - sender := pool.Ethereum.StateManager().CurrentState().GetAccount(tx.Sender()) - - totAmount := new(big.Int).Set(tx.Value) - // Make sure there's enough in the sender's account. Having insufficient - // funds won't invalidate this transaction but simple ignores it. - if sender.Balance().Cmp(totAmount) < 0 { - return fmt.Errorf("[TXPL] Insufficient amount in sender's (%x) account", tx.Sender()) - } - - if tx.IsContract() { - if tx.GasPrice.Cmp(big.NewInt(minGasPrice)) < 0 { - return fmt.Errorf("[TXPL] Gasprice too low, %s given should be at least %d.", tx.GasPrice, minGasPrice) - } - } - - // Increment the nonce making each tx valid only once to prevent replay - // attacks - - return nil -} - -func (pool *TxPool) queueHandler() { -out: - for { - select { - case tx := <-pool.queueChan: - hash := tx.Hash() - foundTx := FindTx(pool.pool, func(tx *Transaction, e *list.Element) bool { - return bytes.Compare(tx.Hash(), hash) == 0 - }) - - if foundTx != nil { - break - } - - // Validate the transaction - err := pool.ValidateTransaction(tx) - if err != nil { - txplogger.Debugln("Validating Tx failed", err) - } else { - // Call blocking version. - pool.addTransaction(tx) - - tmp := make([]byte, 4) - copy(tmp, tx.Recipient) - - txplogger.Debugf("(t) %x => %x (%v) %x\n", tx.Sender()[:4], tmp, tx.Value, tx.Hash()) - - // Notify the subscribers - pool.Ethereum.EventMux().Post(TxPreEvent{tx}) - } - case <-pool.quit: - break out - } - } -} - -func (pool *TxPool) QueueTransaction(tx *Transaction) { - pool.queueChan <- tx -} - -func (pool *TxPool) CurrentTransactions() []*Transaction { - pool.mutex.Lock() - defer pool.mutex.Unlock() - - txList := make([]*Transaction, pool.pool.Len()) - i := 0 - for e := pool.pool.Front(); e != nil; e = e.Next() { - tx := e.Value.(*Transaction) - - txList[i] = tx - - i++ - } - - return txList -} - -func (pool *TxPool) RemoveInvalid(state *ethstate.State) { - pool.mutex.Lock() - defer pool.mutex.Unlock() - - for e := pool.pool.Front(); e != nil; e = e.Next() { - tx := e.Value.(*Transaction) - sender := state.GetAccount(tx.Sender()) - err := pool.ValidateTransaction(tx) - if err != nil || sender.Nonce >= tx.Nonce { - pool.pool.Remove(e) - } - } -} - -func (self *TxPool) RemoveSet(txs Transactions) { - self.mutex.Lock() - defer self.mutex.Unlock() - - for _, tx := range txs { - EachTx(self.pool, func(t *Transaction, element *list.Element) bool { - if t == tx { - self.pool.Remove(element) - return true // To stop the loop - } - return false - }) - } -} - -func (pool *TxPool) Flush() []*Transaction { - txList := pool.CurrentTransactions() - - // Recreate a new list all together - // XXX Is this the fastest way? - pool.pool = list.New() - - return txList -} - -func (pool *TxPool) Start() { - go pool.queueHandler() -} - -func (pool *TxPool) Stop() { - close(pool.quit) - - pool.Flush() - - txplogger.Infoln("Stopped") -} |