aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--transaction_pool.go161
1 files changed, 161 insertions, 0 deletions
diff --git a/transaction_pool.go b/transaction_pool.go
new file mode 100644
index 000000000..f645afd06
--- /dev/null
+++ b/transaction_pool.go
@@ -0,0 +1,161 @@
+package main
+
+import (
+ "container/list"
+ "errors"
+ "github.com/ethereum/ethutil-go"
+ "log"
+ "math/big"
+ "sync"
+)
+
+const (
+ txPoolQueueSize = 50
+)
+
+func FindTx(pool *list.List, finder func(*ethutil.Transaction, *list.Element) bool) *ethutil.Transaction {
+ for e := pool.Front(); e != nil; e = e.Next() {
+ if tx, ok := e.Value.(*ethutil.Transaction); ok {
+ if finder(tx, e) {
+ return tx
+ }
+ }
+ }
+
+ return nil
+}
+
+// The tx pool a thread safe transaction pool handler. In order to
+// guarantee a non blocking pool we use a queue channel which can be
+// independently read without needing access to the actual pool. If the
+// pool is being drained or synced for whatever reason the transactions
+// will simple queue up and handled when the mutex is freed.
+type TxPool struct {
+ server *Server
+ // The mutex for accessing the Tx pool.
+ mutex sync.Mutex
+ // Queueing channel for reading and writing incoming
+ // transactions to
+ queueChan chan *ethutil.Transaction
+ // Quiting channel
+ quit chan bool
+
+ pool *list.List
+}
+
+func NewTxPool(s *Server) *TxPool {
+ return &TxPool{
+ server: s,
+ mutex: sync.Mutex{},
+ pool: list.New(),
+ queueChan: make(chan *ethutil.Transaction, txPoolQueueSize),
+ quit: make(chan bool),
+ }
+}
+
+// Blocking function. Don't use directly. Use QueueTransaction instead
+func (pool *TxPool) addTransaction(tx *ethutil.Transaction) {
+ pool.mutex.Lock()
+ defer pool.mutex.Unlock()
+
+ pool.pool.PushBack(tx)
+}
+
+// Process transaction validates the Tx and processes funds from the
+// sender to the recipient.
+func (pool *TxPool) processTransaction(tx *ethutil.Transaction) error {
+ // Get the last block so we can retrieve the sender and receiver from
+ // the merkle trie
+ block := pool.server.blockManager.bc.LastBlock
+ // Something has gone horribly wrong if this happens
+ if block == nil {
+ return errors.New("No last block on the block chain")
+ }
+
+ var sender, receiver *ethutil.Ether
+
+ // Get the sender
+ data := block.State().Get(string(tx.Sender()))
+ // If it doesn't exist create a new account. Of course trying to send funds
+ // from this account will fail since it will hold 0 Wei
+ if data == "" {
+ sender = ethutil.NewEther(big.NewInt(0))
+ } else {
+ sender = ethutil.NewEtherFromData([]byte(data))
+ }
+ // Defer the update. Whatever happens it should be persisted
+ defer block.State().Update(string(tx.Sender()), string(sender.RlpEncode()))
+
+ // Make sure there's enough in the sender's account. Having insufficient
+ // funds won't invalidate this transaction but simple ignores it.
+ if sender.Amount.Cmp(tx.Value) < 0 {
+ return errors.New("Insufficient amount in sender's account")
+ }
+
+ // Subtract the amount from the senders account
+ sender.Amount.Sub(sender.Amount, tx.Value)
+ // Increment the nonce making each tx valid only once to prevent replay
+ // attacks
+ sender.Nonce += 1
+
+ // Get the receiver
+ data = block.State().Get(tx.Recipient)
+ // If the receiver doesn't exist yet, create a new account to which the
+ // funds will be send.
+ if data == "" {
+ receiver = ethutil.NewEther(big.NewInt(0))
+ } else {
+ receiver = ethutil.NewEtherFromData([]byte(data))
+ }
+ // Defer the update
+ defer block.State().Update(tx.Recipient, string(receiver.RlpEncode()))
+
+ // Add the amount to receivers account which should conclude this transaction
+ receiver.Amount.Add(receiver.Amount, tx.Value)
+
+ return nil
+}
+
+func (pool *TxPool) queueHandler() {
+out:
+ for {
+ select {
+ case tx := <-pool.queueChan:
+ // Process the transaction
+ err := pool.processTransaction(tx)
+ if err != nil {
+ log.Println("Error processing Tx", err)
+ } else {
+ // Call blocking version. At this point it
+ // doesn't matter since this is a goroutine
+ pool.addTransaction(tx)
+ }
+ case <-pool.quit:
+ break out
+ }
+ }
+}
+
+func (pool *TxPool) QueueTransaction(tx *ethutil.Transaction) {
+ pool.queueChan <- tx
+}
+
+func (pool *TxPool) Flush() {
+ pool.mutex.Lock()
+
+ defer pool.mutex.Unlock()
+
+ pool.mutex.Unlock()
+}
+
+func (pool *TxPool) Start() {
+ go pool.queueHandler()
+}
+
+func (pool *TxPool) Stop() {
+ log.Println("[TXP] Stopping...")
+
+ close(pool.quit)
+
+ pool.Flush()
+}