aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2015-04-22 04:01:04 +0800
committerobscuren <geffobscura@gmail.com>2015-04-23 17:50:11 +0800
commit498b24270a9c301a9251150afb7f3889c929765c (patch)
treea7bf2d902fef5a0558363a84b0903fec07ed5b7b
parent2fe54ab233c0cd1bf09b49085477c961dcc1980f (diff)
downloaddexon-498b24270a9c301a9251150afb7f3889c929765c.tar
dexon-498b24270a9c301a9251150afb7f3889c929765c.tar.gz
dexon-498b24270a9c301a9251150afb7f3889c929765c.tar.bz2
dexon-498b24270a9c301a9251150afb7f3889c929765c.tar.lz
dexon-498b24270a9c301a9251150afb7f3889c929765c.tar.xz
dexon-498b24270a9c301a9251150afb7f3889c929765c.tar.zst
dexon-498b24270a9c301a9251150afb7f3889c929765c.zip
core: implemented a queued approach processing transactions
Implemented a new transaction queue. Transactions with a holes in their nonce sequence are also not propagated over the network. N: 0,1,2,5,6,7 = propagate 0..2 -- 5..N is kept in the tx pool
-rw-r--r--core/block_processor.go2
-rw-r--r--core/transaction_pool.go92
-rw-r--r--core/transaction_pool_test.go54
3 files changed, 123 insertions, 25 deletions
diff --git a/core/block_processor.go b/core/block_processor.go
index f33f0d433..af47069ad 100644
--- a/core/block_processor.go
+++ b/core/block_processor.go
@@ -258,7 +258,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st
state.Sync()
// Remove transactions from the pool
- sm.txpool.RemoveSet(block.Transactions())
+ sm.txpool.RemoveTransactions(block.Transactions())
// This puts transactions in a extra db for rpc
for i, tx := range block.Transactions() {
diff --git a/core/transaction_pool.go b/core/transaction_pool.go
index eaddcfa09..92a2462c6 100644
--- a/core/transaction_pool.go
+++ b/core/transaction_pool.go
@@ -4,7 +4,9 @@ import (
"errors"
"fmt"
"math/big"
+ "sort"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
@@ -17,7 +19,7 @@ import (
var (
ErrInvalidSender = errors.New("Invalid sender")
- ErrImpossibleNonce = errors.New("Impossible nonce")
+ ErrNonce = errors.New("Nonce too low")
ErrNonExistentAccount = errors.New("Account does not exist")
ErrInsufficientFunds = errors.New("Insufficient funds")
ErrIntrinsicGas = errors.New("Intrinsic gas too low")
@@ -54,20 +56,37 @@ type TxPool struct {
txs map[common.Hash]*types.Transaction
invalidHashes *set.Set
+ queue map[common.Address]types.Transactions
+
subscribers []chan TxMsg
eventMux *event.TypeMux
}
func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn) *TxPool {
- return &TxPool{
+ txPool := &TxPool{
txs: make(map[common.Hash]*types.Transaction),
+ queue: make(map[common.Address]types.Transactions),
queueChan: make(chan *types.Transaction, txPoolQueueSize),
quit: make(chan bool),
eventMux: eventMux,
invalidHashes: set.New(),
currentState: currentStateFn,
}
+ return txPool
+}
+
+func (pool *TxPool) Start() {
+ ticker := time.NewTicker(300 * time.Millisecond)
+done:
+ for {
+ select {
+ case <-ticker.C:
+ pool.checkQueue()
+ case <-pool.quit:
+ break done
+ }
+ }
}
func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
@@ -100,14 +119,15 @@ func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
}
if pool.currentState().GetNonce(from) > tx.Nonce() {
- return ErrImpossibleNonce
+ return ErrNonce
}
return nil
}
func (self *TxPool) addTx(tx *types.Transaction) {
- self.txs[tx.Hash()] = tx
+ from, _ := tx.From()
+ self.queue[from] = append(self.queue[from], tx)
}
func (self *TxPool) add(tx *types.Transaction) error {
@@ -144,9 +164,6 @@ func (self *TxPool) add(tx *types.Transaction) error {
glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
}
- // Notify the subscribers
- go self.eventMux.Post(TxPreEvent{tx})
-
return nil
}
@@ -189,34 +206,65 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) {
return
}
-func (self *TxPool) RemoveSet(txs types.Transactions) {
+func (self *TxPool) RemoveTransactions(txs types.Transactions) {
self.mu.Lock()
defer self.mu.Unlock()
+
for _, tx := range txs {
delete(self.txs, tx.Hash())
}
}
-func (self *TxPool) InvalidateSet(hashes *set.Set) {
- self.mu.Lock()
- defer self.mu.Unlock()
-
- hashes.Each(func(v interface{}) bool {
- delete(self.txs, v.(common.Hash))
- return true
- })
- self.invalidHashes.Merge(hashes)
-}
-
func (pool *TxPool) Flush() {
pool.txs = make(map[common.Hash]*types.Transaction)
}
-func (pool *TxPool) Start() {
-}
-
func (pool *TxPool) Stop() {
pool.Flush()
+ close(pool.quit)
glog.V(logger.Info).Infoln("TX Pool stopped")
}
+
+// check queue will attempt to insert
+func (pool *TxPool) checkQueue() {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ for address, txs := range pool.queue {
+ sort.Sort(types.TxByNonce{txs})
+
+ var (
+ nonce = pool.currentState().GetNonce(address)
+ start int
+ )
+ // Clean up the transactions first and determine the start of the nonces
+ for _, tx := range txs {
+ if tx.Nonce() >= nonce {
+ break
+ }
+ start++
+ }
+ pool.queue[address] = txs[start:]
+
+ // expected nonce
+ enonce := nonce
+ for _, tx := range pool.queue[address] {
+ // If the expected nonce does not match up with the next one
+ // (i.e. a nonce gap), we stop the loop
+ if enonce != tx.Nonce() {
+ break
+ }
+ enonce++
+
+ pool.txs[tx.Hash()] = tx
+ // Notify the subscribers
+ go pool.eventMux.Post(TxPreEvent{tx})
+ }
+ //pool.queue[address] = txs[i:]
+ // delete the entire queue entry if it's empty. There's no need to keep it
+ if len(pool.queue[address]) == 0 {
+ delete(pool.queue, address)
+ }
+ }
+}
diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go
index b7486adb3..5a5cd866f 100644
--- a/core/transaction_pool_test.go
+++ b/core/transaction_pool_test.go
@@ -56,7 +56,57 @@ func TestInvalidTransactions(t *testing.T) {
tx.SignECDSA(key)
err = pool.Add(tx)
- if err != ErrImpossibleNonce {
- t.Error("expected", ErrImpossibleNonce)
+ if err != ErrNonce {
+ t.Error("expected", ErrNonce)
+ }
+}
+
+func TestTransactionQueue(t *testing.T) {
+ pool, key := setupTxPool()
+ tx := transaction()
+ tx.SignECDSA(key)
+ from, _ := tx.From()
+ pool.currentState().AddBalance(from, big.NewInt(1))
+ pool.addTx(tx)
+
+ pool.checkQueue()
+ if len(pool.txs) != 1 {
+ t.Error("expected valid txs to be 1 is", len(pool.txs))
+ }
+
+ tx = transaction()
+ tx.SignECDSA(key)
+ from, _ = tx.From()
+ pool.currentState().SetNonce(from, 10)
+ tx.SetNonce(1)
+ pool.addTx(tx)
+ pool.checkQueue()
+ if _, ok := pool.txs[tx.Hash()]; ok {
+ t.Error("expected transaction to be in tx pool")
+ }
+
+ if len(pool.queue[from]) != 0 {
+ t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
+ }
+
+ pool, key = setupTxPool()
+ tx1, tx2, tx3 := transaction(), transaction(), transaction()
+ tx2.SetNonce(10)
+ tx3.SetNonce(11)
+ tx1.SignECDSA(key)
+ tx2.SignECDSA(key)
+ tx3.SignECDSA(key)
+ pool.addTx(tx1)
+ pool.addTx(tx2)
+ pool.addTx(tx3)
+ from, _ = tx1.From()
+ pool.checkQueue()
+
+ if len(pool.txs) != 1 {
+ t.Error("expected tx pool to be 1 =")
+ }
+
+ if len(pool.queue[from]) != 2 {
+ t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
}
}