diff options
author | obscuren <geffobscura@gmail.com> | 2014-12-20 09:34:12 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2014-12-20 09:34:12 +0800 |
commit | 3983dd2428137211f84f299f9ce8690c22f50afd (patch) | |
tree | 3a2dc53b365e6f377fc82a3514150d1297fe549c /core/transaction_pool.go | |
parent | 7daa8c2f6eb25511c6a54ad420709af911fc6748 (diff) | |
parent | 0a9dc1536c5d776844d6947a0090ff7e1a7c6ab4 (diff) | |
download | go-tangerine-vv0.7.10.tar go-tangerine-vv0.7.10.tar.gz go-tangerine-vv0.7.10.tar.bz2 go-tangerine-vv0.7.10.tar.lz go-tangerine-vv0.7.10.tar.xz go-tangerine-vv0.7.10.tar.zst go-tangerine-vv0.7.10.zip |
Merge branch 'release/v0.7.10'vv0.7.10
Diffstat (limited to 'core/transaction_pool.go')
-rw-r--r-- | core/transaction_pool.go | 233 |
1 files changed, 233 insertions, 0 deletions
diff --git a/core/transaction_pool.go b/core/transaction_pool.go new file mode 100644 index 000000000..58c2255a4 --- /dev/null +++ b/core/transaction_pool.go @@ -0,0 +1,233 @@ +package core + +import ( + "bytes" + "container/list" + "fmt" + "math/big" + "sync" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/state" + "github.com/ethereum/go-ethereum/wire" +) + +var txplogger = logger.NewLogger("TXP") + +const txPoolQueueSize = 50 + +type TxPoolHook chan *types.Transaction +type TxMsgTy byte + +const ( + minGasPrice = 1000000 +) + +var MinGasPrice = big.NewInt(10000000000000) + +type TxMsg struct { + Tx *types.Transaction + Type TxMsgTy +} + +func EachTx(pool *list.List, it func(*types.Transaction, *list.Element) bool) { + for e := pool.Front(); e != nil; e = e.Next() { + if it(e.Value.(*types.Transaction), e) { + break + } + } +} + +func FindTx(pool *list.List, finder func(*types.Transaction, *list.Element) bool) *types.Transaction { + for e := pool.Front(); e != nil; e = e.Next() { + if tx, ok := e.Value.(*types.Transaction); ok { + if finder(tx, e) { + return tx + } + } + } + + return nil +} + +type TxProcessor interface { + ProcessTransaction(tx *types.Transaction) +} + +// The tx pool a thread safe transaction pool handler. In order to +// guarantee a non blocking pool we use a queue channel which can be +// independently read without needing access to the actual pool. If the +// pool is being drained or synced for whatever reason the transactions +// will simple queue up and handled when the mutex is freed. +type TxPool struct { + // The mutex for accessing the Tx pool. + mutex sync.Mutex + // Queueing channel for reading and writing incoming + // transactions to + queueChan chan *types.Transaction + // Quiting channel + quit chan bool + // The actual pool + pool *list.List + + SecondaryProcessor TxProcessor + + subscribers []chan TxMsg + + broadcaster types.Broadcaster + chainManager *ChainManager + eventMux *event.TypeMux +} + +func NewTxPool(chainManager *ChainManager, broadcaster types.Broadcaster, eventMux *event.TypeMux) *TxPool { + return &TxPool{ + pool: list.New(), + queueChan: make(chan *types.Transaction, txPoolQueueSize), + quit: make(chan bool), + chainManager: chainManager, + eventMux: eventMux, + broadcaster: broadcaster, + } +} + +// Blocking function. Don't use directly. Use QueueTransaction instead +func (pool *TxPool) addTransaction(tx *types.Transaction) { + pool.mutex.Lock() + defer pool.mutex.Unlock() + + pool.pool.PushBack(tx) + + // Broadcast the transaction to the rest of the peers + pool.broadcaster.Broadcast(wire.MsgTxTy, []interface{}{tx.RlpData()}) +} + +func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { + // Get the last block so we can retrieve the sender and receiver from + // the merkle trie + block := pool.chainManager.CurrentBlock + // Something has gone horribly wrong if this happens + if block == nil { + return fmt.Errorf("No last block on the block chain") + } + + if len(tx.To()) != 0 && len(tx.To()) != 20 { + return fmt.Errorf("Invalid recipient. len = %d", len(tx.To())) + } + + v, _, _ := tx.Curve() + if v > 28 || v < 27 { + return fmt.Errorf("tx.v != (28 || 27)") + } + + // Get the sender + sender := pool.chainManager.State().GetAccount(tx.Sender()) + + totAmount := new(big.Int).Set(tx.Value()) + // Make sure there's enough in the sender's account. Having insufficient + // funds won't invalidate this transaction but simple ignores it. + if sender.Balance().Cmp(totAmount) < 0 { + return fmt.Errorf("Insufficient amount in sender's (%x) account", tx.From()) + } + + // Increment the nonce making each tx valid only once to prevent replay + // attacks + + return nil +} + +func (self *TxPool) Add(tx *types.Transaction) error { + hash := tx.Hash() + foundTx := FindTx(self.pool, func(tx *types.Transaction, e *list.Element) bool { + return bytes.Compare(tx.Hash(), hash) == 0 + }) + + if foundTx != nil { + return fmt.Errorf("Known transaction (%x)", hash[0:4]) + } + + err := self.ValidateTransaction(tx) + if err != nil { + return err + } + + self.addTransaction(tx) + + txplogger.Debugf("(t) %x => %x (%v) %x\n", tx.From()[:4], tx.To()[:4], tx.Value, tx.Hash()) + + // Notify the subscribers + go self.eventMux.Post(TxPreEvent{tx}) + + return nil +} + +func (self *TxPool) Size() int { + return self.pool.Len() +} + +func (pool *TxPool) CurrentTransactions() []*types.Transaction { + pool.mutex.Lock() + defer pool.mutex.Unlock() + + txList := make([]*types.Transaction, pool.pool.Len()) + i := 0 + for e := pool.pool.Front(); e != nil; e = e.Next() { + tx := e.Value.(*types.Transaction) + + txList[i] = tx + + i++ + } + + return txList +} + +func (pool *TxPool) RemoveInvalid(state *state.StateDB) { + pool.mutex.Lock() + defer pool.mutex.Unlock() + + for e := pool.pool.Front(); e != nil; e = e.Next() { + tx := e.Value.(*types.Transaction) + sender := state.GetAccount(tx.Sender()) + err := pool.ValidateTransaction(tx) + if err != nil || sender.Nonce >= tx.Nonce() { + pool.pool.Remove(e) + } + } +} + +func (self *TxPool) RemoveSet(txs types.Transactions) { + self.mutex.Lock() + defer self.mutex.Unlock() + + for _, tx := range txs { + EachTx(self.pool, func(t *types.Transaction, element *list.Element) bool { + if t == tx { + self.pool.Remove(element) + return true // To stop the loop + } + return false + }) + } +} + +func (pool *TxPool) Flush() []*types.Transaction { + txList := pool.CurrentTransactions() + + // Recreate a new list all together + // XXX Is this the fastest way? + pool.pool = list.New() + + return txList +} + +func (pool *TxPool) Start() { + //go pool.queueHandler() +} + +func (pool *TxPool) Stop() { + pool.Flush() + + txplogger.Infoln("Stopped") +} |