diff options
-rw-r--r-- | cmd/geth/admin.go | 13 | ||||
-rw-r--r-- | core/helper_test.go | 5 | ||||
-rw-r--r-- | core/transaction_pool.go | 63 | ||||
-rw-r--r-- | eth/backend.go | 5 | ||||
-rw-r--r-- | eth/downloader/downloader.go | 46 | ||||
-rw-r--r-- | rpc/args_test.go | 8 |
6 files changed, 91 insertions, 49 deletions
diff --git a/cmd/geth/admin.go b/cmd/geth/admin.go index 13d10de32..4f22110ad 100644 --- a/cmd/geth/admin.go +++ b/cmd/geth/admin.go @@ -51,7 +51,7 @@ func (js *jsre) adminBindings() { admin.Set("import", js.importChain) admin.Set("export", js.exportChain) admin.Set("verbosity", js.verbosity) - admin.Set("progress", js.downloadProgress) + admin.Set("progress", js.syncProgress) admin.Set("setSolc", js.setSolc) admin.Set("contractInfo", struct{}{}) @@ -324,9 +324,14 @@ func (js *jsre) setHead(call otto.FunctionCall) otto.Value { return otto.UndefinedValue() } -func (js *jsre) downloadProgress(call otto.FunctionCall) otto.Value { - pending, cached := js.ethereum.Downloader().Stats() - v, _ := call.Otto.ToValue(map[string]interface{}{"pending": pending, "cached": cached}) +func (js *jsre) syncProgress(call otto.FunctionCall) otto.Value { + pending, cached, importing, eta := js.ethereum.Downloader().Stats() + v, _ := call.Otto.ToValue(map[string]interface{}{ + "pending": pending, + "cached": cached, + "importing": importing, + "estimate": (eta / time.Second * time.Second).String(), + }) return v } diff --git a/core/helper_test.go b/core/helper_test.go index 1e0ed178b..a308153aa 100644 --- a/core/helper_test.go +++ b/core/helper_test.go @@ -6,8 +6,8 @@ import ( "github.com/ethereum/go-ethereum/core/types" // "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" ) @@ -76,8 +76,5 @@ func NewTestManager() *TestManager { // testManager.blockChain = NewChainManager(testManager) // testManager.stateManager = NewStateManager(testManager) - // Start the tx pool - testManager.txPool.Start() - return testManager } diff --git a/core/transaction_pool.go b/core/transaction_pool.go index a2f970195..4a0594228 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -50,7 +50,7 @@ type TxPool struct { } func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { - return &TxPool{ + pool := &TxPool{ pending: make(map[common.Hash]*types.Transaction), queue: make(map[common.Address]map[common.Hash]*types.Transaction), quit: make(chan bool), @@ -58,14 +58,17 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func( currentState: currentStateFn, gasLimit: gasLimitFn, pendingState: state.ManageState(currentStateFn()), + events: eventMux.Subscribe(ChainEvent{}), } + go pool.eventLoop() + + return pool } -func (pool *TxPool) Start() { +func (pool *TxPool) eventLoop() { // Track chain events. When a chain events occurs (new chain canon block) // we need to know the new state. The new state will help us determine // the nonces in the managed state - pool.events = pool.eventMux.Subscribe(ChainEvent{}) for _ = range pool.events.Chan() { pool.mu.Lock() @@ -100,7 +103,6 @@ func (pool *TxPool) resetState() { } func (pool *TxPool) Stop() { - pool.pending = make(map[common.Hash]*types.Transaction) close(pool.quit) pool.events.Unsubscribe() glog.V(logger.Info).Infoln("TX Pool stopped") @@ -169,15 +171,10 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { return nil } +// validate and queue transactions. func (self *TxPool) add(tx *types.Transaction) error { hash := tx.Hash() - /* XXX I'm unsure about this. This is extremely dangerous and may result - in total black listing of certain transactions - if self.invalidHashes.Has(hash) { - return fmt.Errorf("Invalid transaction (%x)", hash[:4]) - } - */ if self.pending[hash] != nil { return fmt.Errorf("Known transaction (%x)", hash[:4]) } @@ -207,6 +204,30 @@ func (self *TxPool) add(tx *types.Transaction) error { return nil } +// queueTx will queue an unknown transaction +func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { + from, _ := tx.From() // already validated + if self.queue[from] == nil { + self.queue[from] = make(map[common.Hash]*types.Transaction) + } + self.queue[from][hash] = tx +} + +// addTx will add a transaction to the pending (processable queue) list of transactions +func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) { + if _, ok := pool.pending[hash]; !ok { + pool.pending[hash] = tx + + // Increment the nonce on the pending state. This can only happen if + // the nonce is +1 to the previous one. + pool.pendingState.SetNonce(addr, tx.AccountNonce+1) + // Notify the subscribers. This event is posted in a goroutine + // because it's possible that somewhere during the post "Remove transaction" + // gets called which will then wait for the global tx pool lock and deadlock. + go pool.eventMux.Post(TxPreEvent{tx}) + } +} + // Add queues a single transaction in the pool if it is valid. func (self *TxPool) Add(tx *types.Transaction) error { self.mu.Lock() @@ -290,28 +311,6 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) { } } -func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { - from, _ := tx.From() // already validated - if self.queue[from] == nil { - self.queue[from] = make(map[common.Hash]*types.Transaction) - } - self.queue[from][hash] = tx -} - -func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) { - if _, ok := pool.pending[hash]; !ok { - pool.pending[hash] = tx - - // Increment the nonce on the pending state. This can only happen if - // the nonce is +1 to the previous one. - pool.pendingState.SetNonce(addr, tx.AccountNonce+1) - // Notify the subscribers. This event is posted in a goroutine - // because it's possible that somewhere during the post "Remove transaction" - // gets called which will then wait for the global tx pool lock and deadlock. - go pool.eventMux.Post(TxPreEvent{tx}) - } -} - // checkQueue moves transactions that have become processable to main pool. func (pool *TxPool) checkQueue() { state := pool.pendingState diff --git a/eth/backend.go b/eth/backend.go index fcbea04a2..60e9359dc 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -466,8 +466,6 @@ func (s *Ethereum) Start() error { s.StartAutoDAG() } - // Start services - go s.txPool.Start() s.protocolManager.Start() if s.whisper != nil { @@ -513,9 +511,6 @@ func (s *Ethereum) StartForTest() { ClientString: s.net.Name, ProtocolVersion: ProtocolVersion, }) - - // Start services - s.txPool.Start() } // AddPeer connects to the given node and maintains the connection until the diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 29b627771..f0a515d12 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -78,6 +78,12 @@ type Downloader struct { checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain banned *set.Set // Set of hashes we've received and banned + // Statistics + importStart time.Time // Instance when the last blocks were taken from the cache + importQueue []*Block // Previously taken blocks to check import progress + importDone int // Number of taken blocks already imported from the last batch + importLock sync.Mutex + // Callbacks hasBlock hashCheckFn getBlock getBlockFn @@ -121,8 +127,27 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloa return downloader } -func (d *Downloader) Stats() (current int, max int) { - return d.queue.Size() +// Stats retrieves the current status of the downloader. +func (d *Downloader) Stats() (pending int, cached int, importing int, estimate time.Duration) { + // Fetch the download status + pending, cached = d.queue.Size() + + // Figure out the import progress + d.importLock.Lock() + defer d.importLock.Unlock() + + for len(d.importQueue) > 0 && d.hasBlock(d.importQueue[0].RawBlock.Hash()) { + d.importQueue = d.importQueue[1:] + d.importDone++ + } + importing = len(d.importQueue) + + // Make an estimate on the total sync + estimate = 0 + if d.importDone > 0 { + estimate = time.Since(d.importStart) / time.Duration(d.importDone) * time.Duration(pending+cached+importing) + } + return } // Synchronising returns the state of the downloader @@ -202,7 +227,15 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { // TakeBlocks takes blocks from the queue and yields them to the caller. func (d *Downloader) TakeBlocks() []*Block { - return d.queue.TakeBlocks() + blocks := d.queue.TakeBlocks() + if len(blocks) > 0 { + d.importLock.Lock() + d.importStart = time.Now() + d.importQueue = blocks + d.importDone = 0 + d.importLock.Unlock() + } + return blocks } // Has checks if the downloader knows about a particular hash, meaning that its @@ -255,9 +288,14 @@ func (d *Downloader) Cancel() bool { } d.cancelLock.Unlock() - // reset the queue + // Reset the queue and import statistics d.queue.Reset() + d.importLock.Lock() + d.importQueue = nil + d.importDone = 0 + d.importLock.Unlock() + return true } diff --git a/rpc/args_test.go b/rpc/args_test.go index fc10d68cf..81a2972cd 100644 --- a/rpc/args_test.go +++ b/rpc/args_test.go @@ -2519,6 +2519,14 @@ func TestSigArgs(t *testing.T) { if err := json.Unmarshal([]byte(input), &args); err != nil { t.Error(err) } + + if expected.From != args.From { + t.Errorf("From should be %v but is %v", expected.From, args.From) + } + + if expected.Data != args.Data { + t.Errorf("Data should be %v but is %v", expected.Data, args.Data) + } } func TestSigArgsEmptyData(t *testing.T) { |