aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cmd/geth/admin.go13
-rw-r--r--core/helper_test.go5
-rw-r--r--core/transaction_pool.go63
-rw-r--r--eth/backend.go5
-rw-r--r--eth/downloader/downloader.go46
-rw-r--r--rpc/args_test.go8
6 files changed, 91 insertions, 49 deletions
diff --git a/cmd/geth/admin.go b/cmd/geth/admin.go
index 13d10de32..4f22110ad 100644
--- a/cmd/geth/admin.go
+++ b/cmd/geth/admin.go
@@ -51,7 +51,7 @@ func (js *jsre) adminBindings() {
admin.Set("import", js.importChain)
admin.Set("export", js.exportChain)
admin.Set("verbosity", js.verbosity)
- admin.Set("progress", js.downloadProgress)
+ admin.Set("progress", js.syncProgress)
admin.Set("setSolc", js.setSolc)
admin.Set("contractInfo", struct{}{})
@@ -324,9 +324,14 @@ func (js *jsre) setHead(call otto.FunctionCall) otto.Value {
return otto.UndefinedValue()
}
-func (js *jsre) downloadProgress(call otto.FunctionCall) otto.Value {
- pending, cached := js.ethereum.Downloader().Stats()
- v, _ := call.Otto.ToValue(map[string]interface{}{"pending": pending, "cached": cached})
+func (js *jsre) syncProgress(call otto.FunctionCall) otto.Value {
+ pending, cached, importing, eta := js.ethereum.Downloader().Stats()
+ v, _ := call.Otto.ToValue(map[string]interface{}{
+ "pending": pending,
+ "cached": cached,
+ "importing": importing,
+ "estimate": (eta / time.Second * time.Second).String(),
+ })
return v
}
diff --git a/core/helper_test.go b/core/helper_test.go
index 1e0ed178b..a308153aa 100644
--- a/core/helper_test.go
+++ b/core/helper_test.go
@@ -6,8 +6,8 @@ import (
"github.com/ethereum/go-ethereum/core/types"
// "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
)
@@ -76,8 +76,5 @@ func NewTestManager() *TestManager {
// testManager.blockChain = NewChainManager(testManager)
// testManager.stateManager = NewStateManager(testManager)
- // Start the tx pool
- testManager.txPool.Start()
-
return testManager
}
diff --git a/core/transaction_pool.go b/core/transaction_pool.go
index a2f970195..4a0594228 100644
--- a/core/transaction_pool.go
+++ b/core/transaction_pool.go
@@ -50,7 +50,7 @@ type TxPool struct {
}
func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
- return &TxPool{
+ pool := &TxPool{
pending: make(map[common.Hash]*types.Transaction),
queue: make(map[common.Address]map[common.Hash]*types.Transaction),
quit: make(chan bool),
@@ -58,14 +58,17 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func(
currentState: currentStateFn,
gasLimit: gasLimitFn,
pendingState: state.ManageState(currentStateFn()),
+ events: eventMux.Subscribe(ChainEvent{}),
}
+ go pool.eventLoop()
+
+ return pool
}
-func (pool *TxPool) Start() {
+func (pool *TxPool) eventLoop() {
// Track chain events. When a chain events occurs (new chain canon block)
// we need to know the new state. The new state will help us determine
// the nonces in the managed state
- pool.events = pool.eventMux.Subscribe(ChainEvent{})
for _ = range pool.events.Chan() {
pool.mu.Lock()
@@ -100,7 +103,6 @@ func (pool *TxPool) resetState() {
}
func (pool *TxPool) Stop() {
- pool.pending = make(map[common.Hash]*types.Transaction)
close(pool.quit)
pool.events.Unsubscribe()
glog.V(logger.Info).Infoln("TX Pool stopped")
@@ -169,15 +171,10 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error {
return nil
}
+// validate and queue transactions.
func (self *TxPool) add(tx *types.Transaction) error {
hash := tx.Hash()
- /* XXX I'm unsure about this. This is extremely dangerous and may result
- in total black listing of certain transactions
- if self.invalidHashes.Has(hash) {
- return fmt.Errorf("Invalid transaction (%x)", hash[:4])
- }
- */
if self.pending[hash] != nil {
return fmt.Errorf("Known transaction (%x)", hash[:4])
}
@@ -207,6 +204,30 @@ func (self *TxPool) add(tx *types.Transaction) error {
return nil
}
+// queueTx will queue an unknown transaction
+func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
+ from, _ := tx.From() // already validated
+ if self.queue[from] == nil {
+ self.queue[from] = make(map[common.Hash]*types.Transaction)
+ }
+ self.queue[from][hash] = tx
+}
+
+// addTx will add a transaction to the pending (processable queue) list of transactions
+func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
+ if _, ok := pool.pending[hash]; !ok {
+ pool.pending[hash] = tx
+
+ // Increment the nonce on the pending state. This can only happen if
+ // the nonce is +1 to the previous one.
+ pool.pendingState.SetNonce(addr, tx.AccountNonce+1)
+ // Notify the subscribers. This event is posted in a goroutine
+ // because it's possible that somewhere during the post "Remove transaction"
+ // gets called which will then wait for the global tx pool lock and deadlock.
+ go pool.eventMux.Post(TxPreEvent{tx})
+ }
+}
+
// Add queues a single transaction in the pool if it is valid.
func (self *TxPool) Add(tx *types.Transaction) error {
self.mu.Lock()
@@ -290,28 +311,6 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) {
}
}
-func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
- from, _ := tx.From() // already validated
- if self.queue[from] == nil {
- self.queue[from] = make(map[common.Hash]*types.Transaction)
- }
- self.queue[from][hash] = tx
-}
-
-func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
- if _, ok := pool.pending[hash]; !ok {
- pool.pending[hash] = tx
-
- // Increment the nonce on the pending state. This can only happen if
- // the nonce is +1 to the previous one.
- pool.pendingState.SetNonce(addr, tx.AccountNonce+1)
- // Notify the subscribers. This event is posted in a goroutine
- // because it's possible that somewhere during the post "Remove transaction"
- // gets called which will then wait for the global tx pool lock and deadlock.
- go pool.eventMux.Post(TxPreEvent{tx})
- }
-}
-
// checkQueue moves transactions that have become processable to main pool.
func (pool *TxPool) checkQueue() {
state := pool.pendingState
diff --git a/eth/backend.go b/eth/backend.go
index fcbea04a2..60e9359dc 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -466,8 +466,6 @@ func (s *Ethereum) Start() error {
s.StartAutoDAG()
}
- // Start services
- go s.txPool.Start()
s.protocolManager.Start()
if s.whisper != nil {
@@ -513,9 +511,6 @@ func (s *Ethereum) StartForTest() {
ClientString: s.net.Name,
ProtocolVersion: ProtocolVersion,
})
-
- // Start services
- s.txPool.Start()
}
// AddPeer connects to the given node and maintains the connection until the
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 29b627771..f0a515d12 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -78,6 +78,12 @@ type Downloader struct {
checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain
banned *set.Set // Set of hashes we've received and banned
+ // Statistics
+ importStart time.Time // Instance when the last blocks were taken from the cache
+ importQueue []*Block // Previously taken blocks to check import progress
+ importDone int // Number of taken blocks already imported from the last batch
+ importLock sync.Mutex
+
// Callbacks
hasBlock hashCheckFn
getBlock getBlockFn
@@ -121,8 +127,27 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloa
return downloader
}
-func (d *Downloader) Stats() (current int, max int) {
- return d.queue.Size()
+// Stats retrieves the current status of the downloader.
+func (d *Downloader) Stats() (pending int, cached int, importing int, estimate time.Duration) {
+ // Fetch the download status
+ pending, cached = d.queue.Size()
+
+ // Figure out the import progress
+ d.importLock.Lock()
+ defer d.importLock.Unlock()
+
+ for len(d.importQueue) > 0 && d.hasBlock(d.importQueue[0].RawBlock.Hash()) {
+ d.importQueue = d.importQueue[1:]
+ d.importDone++
+ }
+ importing = len(d.importQueue)
+
+ // Make an estimate on the total sync
+ estimate = 0
+ if d.importDone > 0 {
+ estimate = time.Since(d.importStart) / time.Duration(d.importDone) * time.Duration(pending+cached+importing)
+ }
+ return
}
// Synchronising returns the state of the downloader
@@ -202,7 +227,15 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
// TakeBlocks takes blocks from the queue and yields them to the caller.
func (d *Downloader) TakeBlocks() []*Block {
- return d.queue.TakeBlocks()
+ blocks := d.queue.TakeBlocks()
+ if len(blocks) > 0 {
+ d.importLock.Lock()
+ d.importStart = time.Now()
+ d.importQueue = blocks
+ d.importDone = 0
+ d.importLock.Unlock()
+ }
+ return blocks
}
// Has checks if the downloader knows about a particular hash, meaning that its
@@ -255,9 +288,14 @@ func (d *Downloader) Cancel() bool {
}
d.cancelLock.Unlock()
- // reset the queue
+ // Reset the queue and import statistics
d.queue.Reset()
+ d.importLock.Lock()
+ d.importQueue = nil
+ d.importDone = 0
+ d.importLock.Unlock()
+
return true
}
diff --git a/rpc/args_test.go b/rpc/args_test.go
index fc10d68cf..81a2972cd 100644
--- a/rpc/args_test.go
+++ b/rpc/args_test.go
@@ -2519,6 +2519,14 @@ func TestSigArgs(t *testing.T) {
if err := json.Unmarshal([]byte(input), &args); err != nil {
t.Error(err)
}
+
+ if expected.From != args.From {
+ t.Errorf("From should be %v but is %v", expected.From, args.From)
+ }
+
+ if expected.Data != args.Data {
+ t.Errorf("Data should be %v but is %v", expected.Data, args.Data)
+ }
}
func TestSigArgsEmptyData(t *testing.T) {