aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--build/ci.go1
-rw-r--r--build/nsis.geth.nsi5
-rw-r--r--build/nsis.install.nsh5
-rw-r--r--build/nsis.pathupdate.nsh153
-rw-r--r--build/nsis.uninstall.nsh3
-rw-r--r--core/tx_pool.go65
-rw-r--r--core/tx_pool_test.go95
-rw-r--r--eth/api_backend.go11
-rw-r--r--eth/helper_test.go8
-rw-r--r--eth/protocol.go4
-rw-r--r--eth/sync.go3
-rw-r--r--ethstats/ethstats.go235
-rw-r--r--internal/ethapi/api.go16
-rw-r--r--internal/ethapi/backend.go2
-rw-r--r--les/api_backend.go2
-rw-r--r--les/handler.go8
-rw-r--r--light/txpool.go4
-rw-r--r--miner/worker.go9
18 files changed, 532 insertions, 97 deletions
diff --git a/build/ci.go b/build/ci.go
index c985e2da6..602eb8239 100644
--- a/build/ci.go
+++ b/build/ci.go
@@ -638,6 +638,7 @@ func doWindowsInstaller(cmdline []string) {
build.Render("build/nsis.geth.nsi", filepath.Join(*workdir, "geth.nsi"), 0644, nil)
build.Render("build/nsis.install.nsh", filepath.Join(*workdir, "install.nsh"), 0644, templateData)
build.Render("build/nsis.uninstall.nsh", filepath.Join(*workdir, "uninstall.nsh"), 0644, allTools)
+ build.Render("build/nsis.pathupdate.nsh", filepath.Join(*workdir, "PathUpdate.nsh"), 0644, nil)
build.Render("build/nsis.envvarupdate.nsh", filepath.Join(*workdir, "EnvVarUpdate.nsh"), 0644, nil)
build.CopyFile(filepath.Join(*workdir, "SimpleFC.dll"), "build/nsis.simplefc.dll", 0755)
build.CopyFile(filepath.Join(*workdir, "COPYING"), "COPYING", 0755)
diff --git a/build/nsis.geth.nsi b/build/nsis.geth.nsi
index dbeb9319c..1034f3023 100644
--- a/build/nsis.geth.nsi
+++ b/build/nsis.geth.nsi
@@ -17,8 +17,12 @@
#
# Requirements:
# - NSIS, http://nsis.sourceforge.net/Main_Page
+# - NSIS Large Strings build, http://nsis.sourceforge.net/Special_Builds
# - SFP, http://nsis.sourceforge.net/NSIS_Simple_Firewall_Plugin (put dll in NSIS\Plugins\x86-ansi)
#
+# After intalling NSIS extra the NSIS Large Strings build zip and replace the makensis.exe and the
+# files found in Stub.
+#
# based on: http://nsis.sourceforge.net/A_simple_installer_with_start_menu_shortcut_and_uninstaller
#
# TODO:
@@ -37,6 +41,7 @@ RequestExecutionLevel admin
SetCompressor /SOLID lzma
!include LogicLib.nsh
+!include PathUpdate.nsh
!include EnvVarUpdate.nsh
!macro VerifyUserIsAdmin
diff --git a/build/nsis.install.nsh b/build/nsis.install.nsh
index f9ad8e95e..57ef5a37c 100644
--- a/build/nsis.install.nsh
+++ b/build/nsis.install.nsh
@@ -37,8 +37,9 @@ Section "Geth" GETH_IDX
${EnvVarUpdate} $0 "ETHEREUM_SOCKET" "R" "HKLM" "\\.\pipe\geth.ipc"
${EnvVarUpdate} $0 "ETHEREUM_SOCKET" "A" "HKLM" "\\.\pipe\geth.ipc"
- # Add geth to PATH
- ${EnvVarUpdate} $0 "PATH" "A" "HKLM" $INSTDIR
+ # Add instdir to PATH
+ Push "$INSTDIR"
+ Call AddToPath
SectionEnd
# Install optional develop tools.
diff --git a/build/nsis.pathupdate.nsh b/build/nsis.pathupdate.nsh
new file mode 100644
index 000000000..f54b7e3e1
--- /dev/null
+++ b/build/nsis.pathupdate.nsh
@@ -0,0 +1,153 @@
+!include "WinMessages.nsh"
+
+; see https://support.microsoft.com/en-us/kb/104011
+!define Environ 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
+; HKEY_LOCAL_MACHINE = 0x80000002
+
+; AddToPath - Appends dir to PATH
+; (does not work on Win9x/ME)
+;
+; Usage:
+; Push "dir"
+; Call AddToPath
+Function AddToPath
+ Exch $0
+ Push $1
+ Push $2
+ Push $3
+ Push $4
+
+ ; NSIS ReadRegStr returns empty string on string overflow
+ ; Native calls are used here to check actual length of PATH
+ ; $4 = RegOpenKey(HKEY_LOCAL_MACHINE, "SYSTEM\CurrentControlSet\Control\Session Manager\Environment", &$3)
+ System::Call "advapi32::RegOpenKey(i 0x80000002, t'SYSTEM\CurrentControlSet\Control\Session Manager\Environment', *i.r3) i.r4"
+ IntCmp $4 0 0 done done
+
+ ; $4 = RegQueryValueEx($3, "PATH", (DWORD*)0, (DWORD*)0, &$1, ($2=NSIS_MAX_STRLEN, &$2))
+ ; RegCloseKey($3)
+ System::Call "advapi32::RegQueryValueEx(i $3, t'PATH', i 0, i 0, t.r1, *i ${NSIS_MAX_STRLEN} r2) i.r4"
+ System::Call "advapi32::RegCloseKey(i $3)"
+
+ IntCmp $4 234 0 +4 +4 ; $4 == ERROR_MORE_DATA
+ DetailPrint "AddToPath: original length $2 > ${NSIS_MAX_STRLEN}"
+ MessageBox MB_OK "PATH not updated, original length $2 > ${NSIS_MAX_STRLEN}"
+ Goto done
+
+ IntCmp $4 0 +5 ; $4 != NO_ERROR
+ IntCmp $4 2 +3 ; $4 != ERROR_FILE_NOT_FOUND
+ DetailPrint "AddToPath: unexpected error code $4"
+ Goto done
+ StrCpy $1 ""
+
+ ; Check if already in PATH
+ Push "$1;"
+ Push "$0;"
+ Call StrStr
+ Pop $2
+ StrCmp $2 "" 0 done
+ Push "$1;"
+ Push "$0\;"
+ Call StrStr
+ Pop $2
+ StrCmp $2 "" 0 done
+
+ ; Prevent NSIS string overflow
+ StrLen $2 $0
+ StrLen $3 $1
+ IntOp $2 $2 + $3
+ IntOp $2 $2 + 2 ; $2 = strlen(dir) + strlen(PATH) + sizeof(";")
+ IntCmp $2 ${NSIS_MAX_STRLEN} +4 +4 0
+ DetailPrint "AddToPath: new length $2 > ${NSIS_MAX_STRLEN}"
+ MessageBox MB_OK "PATH not updated, new length $2 > ${NSIS_MAX_STRLEN}."
+ Goto done
+
+ ; Append dir to PATH
+ DetailPrint "Add to PATH: $0"
+ StrCpy $2 $1 1 -1
+ StrCmp $2 ";" 0 +2
+ StrCpy $1 $1 -1 ; remove trailing ';'
+ StrCmp $1 "" +2 ; no leading ';'
+ StrCpy $0 "$1;$0"
+
+ WriteRegExpandStr ${Environ} "PATH" $0
+ SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000
+
+done:
+ Pop $4
+ Pop $3
+ Pop $2
+ Pop $1
+ Pop $0
+FunctionEnd
+
+
+; RemoveFromPath - Removes dir from PATH
+;
+; Usage:
+; Push "dir"
+; Call RemoveFromPath
+Function un.RemoveFromPath
+ Exch $0
+ Push $1
+ Push $2
+ Push $3
+ Push $4
+ Push $5
+ Push $6
+
+ ; NSIS ReadRegStr returns empty string on string overflow
+ ; Native calls are used here to check actual length of PATH
+ ; $4 = RegOpenKey(HKEY_LOCAL_MACHINE, "SYSTEM\CurrentControlSet\Control\Session Manager\Environment", &$3)
+ System::Call "advapi32::RegOpenKey(i 0x80000002, t'SYSTEM\CurrentControlSet\Control\Session Manager\Environment', *i.r3) i.r4"
+ IntCmp $4 0 0 done done
+
+ ; $4 = RegQueryValueEx($3, "PATH", (DWORD*)0, (DWORD*)0, &$1, ($2=NSIS_MAX_STRLEN, &$2))
+ ; RegCloseKey($3)
+ System::Call "advapi32::RegQueryValueEx(i $3, t'PATH', i 0, i 0, t.r1, *i ${NSIS_MAX_STRLEN} r2) i.r4"
+ System::Call "advapi32::RegCloseKey(i $3)"
+
+ IntCmp $4 234 0 +4 +4 ; $4 == ERROR_MORE_DATA
+ DetailPrint "RemoveFromPath: original length $2 > ${NSIS_MAX_STRLEN}"
+ MessageBox MB_OK "PATH not updated, original length $2 > ${NSIS_MAX_STRLEN}"
+ Goto done
+
+ IntCmp $4 0 +5 ; $4 != NO_ERROR
+ IntCmp $4 2 +3 ; $4 != ERROR_FILE_NOT_FOUND
+ DetailPrint "RemoveFromPath: unexpected error code $4"
+ Goto done
+ StrCpy $1 ""
+
+ ; length < ${NSIS_MAX_STRLEN} -> ReadRegStr can be used
+ ReadRegStr $1 ${Environ} "PATH"
+ StrCpy $5 $1 1 -1
+ StrCmp $5 ";" +2
+ StrCpy $1 "$1;" ; ensure trailing ';'
+ Push $1
+ Push "$0;"
+ Call un.StrStr
+ Pop $2 ; pos of our dir
+ StrCmp $2 "" done
+
+ DetailPrint "Remove from PATH: $0"
+ StrLen $3 "$0;"
+ StrLen $4 $2
+ StrCpy $5 $1 -$4 ; $5 is now the part before the path to remove
+ StrCpy $6 $2 "" $3 ; $6 is now the part after the path to remove
+ StrCpy $3 "$5$6"
+ StrCpy $5 $3 1 -1
+ StrCmp $5 ";" 0 +2
+ StrCpy $3 $3 -1 ; remove trailing ';'
+ WriteRegExpandStr ${Environ} "PATH" $3
+ SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000
+
+done:
+ Pop $6
+ Pop $5
+ Pop $4
+ Pop $3
+ Pop $2
+ Pop $1
+ Pop $0
+FunctionEnd
+
+
diff --git a/build/nsis.uninstall.nsh b/build/nsis.uninstall.nsh
index ea7d5e298..6358faa74 100644
--- a/build/nsis.uninstall.nsh
+++ b/build/nsis.uninstall.nsh
@@ -25,7 +25,8 @@ Section "Uninstall"
${un.EnvVarUpdate} $0 "ETHEREUM_SOCKET" "R" "HKLM" "\\.\pipe\geth.ipc"
# Remove install directory from PATH
- ${un.EnvVarUpdate} $0 "PATH" "R" "HKLM" $INSTDIR
+ Push "$INSTDIR"
+ Call un.RemoveFromPath
# Cleanup registry (deletes all sub keys)
DeleteRegKey HKLM "Software\Microsoft\Windows\CurrentVersion\Uninstall\${GROUPNAME} ${APPNAME}"
diff --git a/core/tx_pool.go b/core/tx_pool.go
index edcbc21eb..b805cf226 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -176,7 +176,7 @@ func (pool *TxPool) resetState() {
// any transactions that have been included in the block or
// have been invalidated because of another transaction (e.g.
// higher gas price)
- pool.demoteUnexecutables()
+ pool.demoteUnexecutables(currentState)
// Update all accounts to the latest known pending nonce
for addr, list := range pool.pending {
@@ -185,7 +185,7 @@ func (pool *TxPool) resetState() {
}
// Check the queue and move transactions over to the pending if possible
// or remove those that have become invalid
- pool.promoteExecutables()
+ pool.promoteExecutables(currentState)
}
func (pool *TxPool) Stop() {
@@ -196,8 +196,12 @@ func (pool *TxPool) Stop() {
}
func (pool *TxPool) State() *state.ManagedState {
- pool.mu.RLock()
- defer pool.mu.RUnlock()
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ if pool.pendingState == nil {
+ pool.resetState()
+ }
return pool.pendingState
}
@@ -237,21 +241,26 @@ func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common
// Pending retrieves all currently processable transactions, groupped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be
// freely modified by calling code.
-func (pool *TxPool) Pending() map[common.Address]types.Transactions {
+func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) {
pool.mu.Lock()
defer pool.mu.Unlock()
+ state, err := pool.currentState()
+ if err != nil {
+ return nil, err
+ }
+
// check queue first
- pool.promoteExecutables()
+ pool.promoteExecutables(state)
// invalidate any txs
- pool.demoteUnexecutables()
+ pool.demoteUnexecutables(state)
pending := make(map[common.Address]types.Transactions)
for addr, list := range pool.pending {
pending[addr] = list.Flatten()
}
- return pending
+ return pending, nil
}
// SetLocal marks a transaction as local, skipping gas price
@@ -410,13 +419,19 @@ func (pool *TxPool) Add(tx *types.Transaction) error {
if err := pool.add(tx); err != nil {
return err
}
- pool.promoteExecutables()
+
+ state, err := pool.currentState()
+ if err != nil {
+ return err
+ }
+
+ pool.promoteExecutables(state)
return nil
}
// AddBatch attempts to queue a batch of transactions.
-func (pool *TxPool) AddBatch(txs []*types.Transaction) {
+func (pool *TxPool) AddBatch(txs []*types.Transaction) error {
pool.mu.Lock()
defer pool.mu.Unlock()
@@ -425,7 +440,15 @@ func (pool *TxPool) AddBatch(txs []*types.Transaction) {
glog.V(logger.Debug).Infoln("tx error:", err)
}
}
- pool.promoteExecutables()
+
+ state, err := pool.currentState()
+ if err != nil {
+ return err
+ }
+
+ pool.promoteExecutables(state)
+
+ return nil
}
// Get returns a transaction if it is contained in the pool
@@ -499,17 +522,7 @@ func (pool *TxPool) removeTx(hash common.Hash) {
// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
-func (pool *TxPool) promoteExecutables() {
- // Init delayed since tx pool could have been started before any state sync
- if pool.pendingState == nil {
- pool.resetState()
- }
- // Retrieve the current state to allow nonce and balance checking
- state, err := pool.currentState()
- if err != nil {
- glog.Errorf("Could not get current state: %v", err)
- return
- }
+func (pool *TxPool) promoteExecutables(state *state.StateDB) {
// Iterate over all accounts and promote any executable transactions
queued := uint64(0)
for addr, list := range pool.queue {
@@ -645,13 +658,7 @@ func (pool *TxPool) promoteExecutables() {
// demoteUnexecutables removes invalid and processed transactions from the pools
// executable/pending queue and any subsequent transactions that become unexecutable
// are moved back into the future queue.
-func (pool *TxPool) demoteUnexecutables() {
- // Retrieve the current state to allow nonce and balance checking
- state, err := pool.currentState()
- if err != nil {
- glog.V(logger.Info).Infoln("failed to get current state: %v", err)
- return
- }
+func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
// Iterate over all accounts and demote any non-executable transactions
for addr, list := range pool.pending {
nonce := state.GetNonce(addr)
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index 009d19886..3e516735b 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -51,6 +51,80 @@ func deriveSender(tx *types.Transaction) (common.Address, error) {
return types.Sender(types.HomesteadSigner{}, tx)
}
+// This test simulates a scenario where a new block is imported during a
+// state reset and tests whether the pending state is in sync with the
+// block head event that initiated the resetState().
+func TestStateChangeDuringPoolReset(t *testing.T) {
+ var (
+ db, _ = ethdb.NewMemDatabase()
+ key, _ = crypto.GenerateKey()
+ address = crypto.PubkeyToAddress(key.PublicKey)
+ mux = new(event.TypeMux)
+ statedb, _ = state.New(common.Hash{}, db)
+ trigger = false
+ )
+
+ // setup pool with 2 transaction in it
+ statedb.SetBalance(address, new(big.Int).Mul(common.Big1, common.Ether))
+
+ tx0 := transaction(0, big.NewInt(100000), key)
+ tx1 := transaction(1, big.NewInt(100000), key)
+
+ // stateFunc is used multiple times to reset the pending state.
+ // when simulate is true it will create a state that indicates
+ // that tx0 and tx1 are included in the chain.
+ stateFunc := func() (*state.StateDB, error) {
+ // delay "state change" by one. The tx pool fetches the
+ // state multiple times and by delaying it a bit we simulate
+ // a state change between those fetches.
+ stdb := statedb
+ if trigger {
+ statedb, _ = state.New(common.Hash{}, db)
+ // simulate that the new head block included tx0 and tx1
+ statedb.SetNonce(address, 2)
+ statedb.SetBalance(address, new(big.Int).Mul(common.Big1, common.Ether))
+ trigger = false
+ }
+ return stdb, nil
+ }
+
+ gasLimitFunc := func() *big.Int { return big.NewInt(1000000000) }
+
+ txpool := NewTxPool(testChainConfig(), mux, stateFunc, gasLimitFunc)
+ txpool.resetState()
+
+ nonce := txpool.State().GetNonce(address)
+ if nonce != 0 {
+ t.Fatalf("Invalid nonce, want 0, got %d", nonce)
+ }
+
+ txpool.AddBatch(types.Transactions{tx0, tx1})
+
+ nonce = txpool.State().GetNonce(address)
+ if nonce != 2 {
+ t.Fatalf("Invalid nonce, want 2, got %d", nonce)
+ }
+
+ // trigger state change in the background
+ trigger = true
+
+ txpool.resetState()
+
+ pendingTx, err := txpool.Pending()
+ if err != nil {
+ t.Fatalf("Could not fetch pending transactions: %v", err)
+ }
+
+ for addr, txs := range pendingTx {
+ t.Logf("%0x: %d\n", addr, len(txs))
+ }
+
+ nonce = txpool.State().GetNonce(address)
+ if nonce != 2 {
+ t.Fatalf("Invalid nonce, want 2, got %d", nonce)
+ }
+}
+
func TestInvalidTransactions(t *testing.T) {
pool, key := setupTxPool()
@@ -97,9 +171,10 @@ func TestTransactionQueue(t *testing.T) {
from, _ := deriveSender(tx)
currentState, _ := pool.currentState()
currentState.AddBalance(from, big.NewInt(1000))
+ pool.resetState()
pool.enqueueTx(tx.Hash(), tx)
- pool.promoteExecutables()
+ pool.promoteExecutables(currentState)
if len(pool.pending) != 1 {
t.Error("expected valid txs to be 1 is", len(pool.pending))
}
@@ -108,7 +183,7 @@ func TestTransactionQueue(t *testing.T) {
from, _ = deriveSender(tx)
currentState.SetNonce(from, 2)
pool.enqueueTx(tx.Hash(), tx)
- pool.promoteExecutables()
+ pool.promoteExecutables(currentState)
if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
t.Error("expected transaction to be in tx pool")
}
@@ -124,11 +199,13 @@ func TestTransactionQueue(t *testing.T) {
from, _ = deriveSender(tx1)
currentState, _ = pool.currentState()
currentState.AddBalance(from, big.NewInt(1000))
+ pool.resetState()
+
pool.enqueueTx(tx1.Hash(), tx1)
pool.enqueueTx(tx2.Hash(), tx2)
pool.enqueueTx(tx3.Hash(), tx3)
- pool.promoteExecutables()
+ pool.promoteExecutables(currentState)
if len(pool.pending) != 1 {
t.Error("expected tx pool to be 1, got", len(pool.pending))
@@ -225,7 +302,8 @@ func TestTransactionDoubleNonce(t *testing.T) {
if err := pool.add(tx2); err != nil {
t.Error("didn't expect error", err)
}
- pool.promoteExecutables()
+ state, _ := pool.currentState()
+ pool.promoteExecutables(state)
if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
}
@@ -236,7 +314,7 @@ func TestTransactionDoubleNonce(t *testing.T) {
if err := pool.add(tx3); err != nil {
t.Error("didn't expect error", err)
}
- pool.promoteExecutables()
+ pool.promoteExecutables(state)
if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
}
@@ -295,6 +373,7 @@ func TestRemovedTxEvent(t *testing.T) {
from, _ := deriveSender(tx)
currentState, _ := pool.currentState()
currentState.AddBalance(from, big.NewInt(1000000000000))
+ pool.resetState()
pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}})
pool.eventMux.Post(ChainHeadEvent{nil})
if pool.pending[from].Len() != 1 {
@@ -452,6 +531,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
state, _ := pool.currentState()
state.AddBalance(account, big.NewInt(1000000))
+ pool.resetState()
// Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(1); i <= maxQueuedPerAccount+5; i++ {
@@ -564,6 +644,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
state, _ := pool.currentState()
state.AddBalance(account, big.NewInt(1000000))
+ pool.resetState()
// Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(0); i < maxQueuedPerAccount+5; i++ {
@@ -733,7 +814,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) {
// Benchmark the speed of pool validation
b.ResetTimer()
for i := 0; i < b.N; i++ {
- pool.demoteUnexecutables()
+ pool.demoteUnexecutables(state)
}
}
@@ -757,7 +838,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
// Benchmark the speed of pool validation
b.ResetTimer()
for i := 0; i < b.N; i++ {
- pool.promoteExecutables()
+ pool.promoteExecutables(state)
}
}
diff --git a/eth/api_backend.go b/eth/api_backend.go
index b95ef79c5..f33b6f7e1 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -131,15 +131,20 @@ func (b *EthApiBackend) RemoveTx(txHash common.Hash) {
b.eth.txPool.Remove(txHash)
}
-func (b *EthApiBackend) GetPoolTransactions() types.Transactions {
+func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) {
b.eth.txMu.Lock()
defer b.eth.txMu.Unlock()
+ pending, err := b.eth.txPool.Pending()
+ if err != nil {
+ return nil, err
+ }
+
var txs types.Transactions
- for _, batch := range b.eth.txPool.Pending() {
+ for _, batch := range pending {
txs = append(txs, batch...)
}
- return txs
+ return txs, nil
}
func (b *EthApiBackend) GetPoolTransaction(hash common.Hash) *types.Transaction {
diff --git a/eth/helper_test.go b/eth/helper_test.go
index f23976785..bd6b2d0da 100644
--- a/eth/helper_test.go
+++ b/eth/helper_test.go
@@ -93,7 +93,7 @@ type testTxPool struct {
// AddBatch appends a batch of transactions to the pool, and notifies any
// listeners if the addition channel is non nil
-func (p *testTxPool) AddBatch(txs []*types.Transaction) {
+func (p *testTxPool) AddBatch(txs []*types.Transaction) error {
p.lock.Lock()
defer p.lock.Unlock()
@@ -101,10 +101,12 @@ func (p *testTxPool) AddBatch(txs []*types.Transaction) {
if p.added != nil {
p.added <- txs
}
+
+ return nil
}
// Pending returns all the transactions known to the pool
-func (p *testTxPool) Pending() map[common.Address]types.Transactions {
+func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) {
p.lock.RLock()
defer p.lock.RUnlock()
@@ -116,7 +118,7 @@ func (p *testTxPool) Pending() map[common.Address]types.Transactions {
for _, batch := range batches {
sort.Sort(types.TxByNonce(batch))
}
- return batches
+ return batches, nil
}
// newTestTransaction create a new dummy transaction.
diff --git a/eth/protocol.go b/eth/protocol.go
index 3f65c204b..7d22b33de 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -98,11 +98,11 @@ var errorToString = map[int]string{
type txPool interface {
// AddBatch should add the given transactions to the pool.
- AddBatch([]*types.Transaction)
+ AddBatch([]*types.Transaction) error
// Pending should return pending transactions.
// The slice should be modifiable by the caller.
- Pending() map[common.Address]types.Transactions
+ Pending() (map[common.Address]types.Transactions, error)
}
// statusData is the network packet for the status message.
diff --git a/eth/sync.go b/eth/sync.go
index b6918c2be..234534b4f 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -46,7 +46,8 @@ type txsync struct {
// syncTransactions starts sending all currently pending transactions to the given peer.
func (pm *ProtocolManager) syncTransactions(p *peer) {
var txs types.Transactions
- for _, batch := range pm.txpool.Pending() {
+ pending, _ := pm.txpool.Pending()
+ for _, batch := range pending {
txs = append(txs, batch...)
}
if len(txs) == 0 {
diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go
index a5fa84468..716beef69 100644
--- a/ethstats/ethstats.go
+++ b/ethstats/ethstats.go
@@ -25,6 +25,7 @@ import (
"regexp"
"runtime"
"strconv"
+ "strings"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -41,6 +42,10 @@ import (
"golang.org/x/net/websocket"
)
+// historyUpdateRange is the number of blocks a node should report upon login or
+// history request.
+const historyUpdateRange = 50
+
// Service implements an Ethereum netstats reporting daemon that pushes local
// chain statistics up to a monitoring server.
type Service struct {
@@ -53,6 +58,9 @@ type Service struct {
node string // Name of the node to display on the monitoring page
pass string // Password to authorize access to the monitoring page
host string // Remote address of the monitoring service
+
+ pongCh chan struct{} // Pong notifications are fed into this channel
+ histCh chan []uint64 // History request block numbers are fed into this channel
}
// New returns a monitoring service ready for stats reporting.
@@ -65,11 +73,13 @@ func New(url string, ethServ *eth.Ethereum, lesServ *les.LightEthereum) (*Servic
}
// Assemble and return the stats service
return &Service{
- eth: ethServ,
- les: lesServ,
- node: parts[1],
- pass: parts[3],
- host: parts[4],
+ eth: ethServ,
+ les: lesServ,
+ node: parts[1],
+ pass: parts[3],
+ host: parts[4],
+ pongCh: make(chan struct{}),
+ histCh: make(chan []uint64, 1),
}, nil
}
@@ -115,7 +125,11 @@ func (s *Service) loop() {
// Loop reporting until termination
for {
// Establish a websocket connection to the server and authenticate the node
- conn, err := websocket.Dial(fmt.Sprintf("wss://%s/api", s.host), "", "http://localhost/")
+ url := fmt.Sprintf("%s/api", s.host)
+ if !strings.Contains(url, "://") {
+ url = "wss://" + url
+ }
+ conn, err := websocket.Dial(url, "", "http://localhost/")
if err != nil {
glog.V(logger.Warn).Infof("Stats server unreachable: %v", err)
time.Sleep(10 * time.Second)
@@ -130,22 +144,34 @@ func (s *Service) loop() {
time.Sleep(10 * time.Second)
continue
}
- if err = s.report(in, out); err != nil {
+ go s.readLoop(conn, in)
+
+ // Send the initial stats so our node looks decent from the get go
+ if err = s.report(out); err != nil {
glog.V(logger.Warn).Infof("Initial stats report failed: %v", err)
conn.Close()
continue
}
+ if err = s.reportHistory(out, nil); err != nil {
+ glog.V(logger.Warn).Infof("History report failed: %v", err)
+ conn.Close()
+ continue
+ }
// Keep sending status updates until the connection breaks
fullReport := time.NewTicker(15 * time.Second)
for err == nil {
select {
case <-fullReport.C:
- if err = s.report(in, out); err != nil {
+ if err = s.report(out); err != nil {
glog.V(logger.Warn).Infof("Full stats report failed: %v", err)
}
- case head := <-headSub.Chan():
- if head == nil { // node stopped
+ case list := <-s.histCh:
+ if err = s.reportHistory(out, list); err != nil {
+ glog.V(logger.Warn).Infof("Block history report failed: %v", err)
+ }
+ case head, ok := <-headSub.Chan():
+ if !ok { // node stopped
conn.Close()
return
}
@@ -155,8 +181,8 @@ func (s *Service) loop() {
if err = s.reportPending(out); err != nil {
glog.V(logger.Warn).Infof("Post-block transaction stats report failed: %v", err)
}
- case ev := <-txSub.Chan():
- if ev == nil { // node stopped
+ case _, ok := <-txSub.Chan():
+ if !ok { // node stopped
conn.Close()
return
}
@@ -178,6 +204,76 @@ func (s *Service) loop() {
}
}
+// readLoop loops as long as the connection is alive and retrieves data packets
+// from the network socket. If any of them match an active request, it forwards
+// it, if they themselves are requests it initiates a reply, and lastly it drops
+// unknown packets.
+func (s *Service) readLoop(conn *websocket.Conn, in *json.Decoder) {
+ // If the read loop exists, close the connection
+ defer conn.Close()
+
+ for {
+ // Retrieve the next generic network packet and bail out on error
+ var msg map[string][]interface{}
+ if err := in.Decode(&msg); err != nil {
+ glog.V(logger.Warn).Infof("Failed to decode stats server message: %v", err)
+ return
+ }
+ if len(msg["emit"]) == 0 {
+ glog.V(logger.Warn).Infof("Stats server sent non-broadcast: %v", msg)
+ return
+ }
+ command, ok := msg["emit"][0].(string)
+ if !ok {
+ glog.V(logger.Warn).Infof("Invalid stats server message type: %v", msg["emit"][0])
+ return
+ }
+ // If the message is a ping reply, deliver (someone must be listening!)
+ if len(msg["emit"]) == 2 && command == "node-pong" {
+ select {
+ case s.pongCh <- struct{}{}:
+ // Pong delivered, continue listening
+ continue
+ default:
+ // Ping routine dead, abort
+ glog.V(logger.Warn).Infof("Stats server pinger seems to have died")
+ return
+ }
+ }
+ // If the message is a history request, forward to the event processor
+ if len(msg["emit"]) == 2 && command == "history" {
+ // Make sure the request is valid and doesn't crash us
+ request, ok := msg["emit"][1].(map[string]interface{})
+ if !ok {
+ glog.V(logger.Warn).Infof("Invalid history request: %v", msg["emit"][1])
+ return
+ }
+ list, ok := request["list"].([]interface{})
+ if !ok {
+ glog.V(logger.Warn).Infof("Invalid history block list: %v", request["list"])
+ return
+ }
+ // Convert the block number list to an integer list
+ numbers := make([]uint64, len(list))
+ for i, num := range list {
+ n, ok := num.(float64)
+ if !ok {
+ glog.V(logger.Warn).Infof("Invalid history block number: %v", num)
+ return
+ }
+ numbers[i] = uint64(n)
+ }
+ select {
+ case s.histCh <- numbers:
+ continue
+ default:
+ }
+ }
+ // Report anything else and continue
+ glog.V(logger.Info).Infof("Unknown stats message: %v", msg)
+ }
+}
+
// nodeInfo is the collection of metainformation about a node that is displayed
// on the monitoring page.
type nodeInfo struct {
@@ -190,6 +286,7 @@ type nodeInfo struct {
Os string `json:"os"`
OsVer string `json:"os_v"`
Client string `json:"client"`
+ History bool `json:"canUpdateHistory"`
}
// authMsg is the authentication infos needed to login to a monitoring server.
@@ -224,6 +321,7 @@ func (s *Service) login(in *json.Decoder, out *json.Encoder) error {
Os: runtime.GOOS,
OsVer: runtime.GOARCH,
Client: "0.1.1",
+ History: true,
},
Secret: s.pass,
}
@@ -244,8 +342,8 @@ func (s *Service) login(in *json.Decoder, out *json.Encoder) error {
// report collects all possible data to report and send it to the stats server.
// This should only be used on reconnects or rarely to avoid overloading the
// server. Use the individual methods for reporting subscribed events.
-func (s *Service) report(in *json.Decoder, out *json.Encoder) error {
- if err := s.reportLatency(in, out); err != nil {
+func (s *Service) report(out *json.Encoder) error {
+ if err := s.reportLatency(out); err != nil {
return err
}
if err := s.reportBlock(out, nil); err != nil {
@@ -262,7 +360,7 @@ func (s *Service) report(in *json.Decoder, out *json.Encoder) error {
// reportLatency sends a ping request to the server, measures the RTT time and
// finally sends a latency update.
-func (s *Service) reportLatency(in *json.Decoder, out *json.Encoder) error {
+func (s *Service) reportLatency(out *json.Encoder) error {
// Send the current time to the ethstats server
start := time.Now()
@@ -276,9 +374,12 @@ func (s *Service) reportLatency(in *json.Decoder, out *json.Encoder) error {
return err
}
// Wait for the pong request to arrive back
- var pong map[string][]interface{}
- if err := in.Decode(&pong); err != nil || len(pong["emit"]) != 2 || pong["emit"][0].(string) != "node-pong" {
- return errors.New("unexpected ping reply")
+ select {
+ case <-s.pongCh:
+ // Pong delivered, report the latency
+ case <-time.After(3 * time.Second):
+ // Ping timeout, abort
+ return errors.New("ping timed out")
}
// Send back the measured latency
latency := map[string][]interface{}{
@@ -297,6 +398,7 @@ func (s *Service) reportLatency(in *json.Decoder, out *json.Encoder) error {
type blockStats struct {
Number *big.Int `json:"number"`
Hash common.Hash `json:"hash"`
+ Timestamp *big.Int `json:"timestamp"`
Miner common.Address `json:"miner"`
GasUsed *big.Int `json:"gasUsed"`
GasLimit *big.Int `json:"gasLimit"`
@@ -330,9 +432,26 @@ func (s uncleStats) MarshalJSON() ([]byte, error) {
// reportBlock retrieves the current chain head and repors it to the stats server.
func (s *Service) reportBlock(out *json.Encoder, block *types.Block) error {
- // Gather the head block infos from the local blockchain
+ // Assemble the block stats report and send it to the server
+ stats := map[string]interface{}{
+ "id": s.node,
+ "block": s.assembleBlockStats(block),
+ }
+ report := map[string][]interface{}{
+ "emit": []interface{}{"block", stats},
+ }
+ if err := out.Encode(report); err != nil {
+ return err
+ }
+ return nil
+}
+
+// assembleBlockStats retrieves any required metadata to report a single block
+// and assembles the block stats. If block is nil, the current head is processed.
+func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
+ // Gather the block infos from the local blockchain
var (
- head *types.Header
+ header *types.Header
td *big.Int
txs []*types.Transaction
uncles []*types.Header
@@ -342,37 +461,77 @@ func (s *Service) reportBlock(out *json.Encoder, block *types.Block) error {
if block == nil {
block = s.eth.BlockChain().CurrentBlock()
}
- head = block.Header()
- td = s.eth.BlockChain().GetTd(head.Hash(), head.Number.Uint64())
+ header = block.Header()
+ td = s.eth.BlockChain().GetTd(header.Hash(), header.Number.Uint64())
txs = block.Transactions()
uncles = block.Uncles()
} else {
// Light nodes would need on-demand lookups for transactions/uncles, skip
if block != nil {
- head = block.Header()
+ header = block.Header()
+ } else {
+ header = s.les.BlockChain().CurrentHeader()
+ }
+ td = s.les.BlockChain().GetTd(header.Hash(), header.Number.Uint64())
+ }
+ // Assemble and return the block stats
+ return &blockStats{
+ Number: header.Number,
+ Hash: header.Hash(),
+ Timestamp: header.Time,
+ Miner: header.Coinbase,
+ GasUsed: new(big.Int).Set(header.GasUsed),
+ GasLimit: new(big.Int).Set(header.GasLimit),
+ Diff: header.Difficulty.String(),
+ TotalDiff: td.String(),
+ Txs: txs,
+ Uncles: uncles,
+ }
+}
+
+// reportHistory retrieves the most recent batch of blocks and reports it to the
+// stats server.
+func (s *Service) reportHistory(out *json.Encoder, list []uint64) error {
+ // Figure out the indexes that need reporting
+ indexes := make([]uint64, 0, historyUpdateRange)
+ if len(list) > 0 {
+ // Specific indexes requested, send them back in particular
+ for _, idx := range list {
+ indexes = append(indexes, idx)
+ }
+ } else {
+ // No indexes requested, send back the top ones
+ var head *types.Header
+ if s.eth != nil {
+ head = s.eth.BlockChain().CurrentHeader()
} else {
head = s.les.BlockChain().CurrentHeader()
}
- td = s.les.BlockChain().GetTd(head.Hash(), head.Number.Uint64())
+ start := head.Number.Int64() - historyUpdateRange
+ if start < 0 {
+ start = 0
+ }
+ for i := uint64(start); i <= head.Number.Uint64(); i++ {
+ indexes = append(indexes, i)
+ }
}
- // Assemble the block stats report and send it to the server
+ // Gather the batch of blocks to report
+ history := make([]*blockStats, len(indexes))
+ for i, number := range indexes {
+ if s.eth != nil {
+ history[i] = s.assembleBlockStats(s.eth.BlockChain().GetBlockByNumber(number))
+ } else {
+ history[i] = s.assembleBlockStats(types.NewBlockWithHeader(s.les.BlockChain().GetHeaderByNumber(number)))
+ }
+ }
+ // Assemble the history report and send it to the server
stats := map[string]interface{}{
- "id": s.node,
- "block": &blockStats{
- Number: head.Number,
- Hash: head.Hash(),
- Miner: head.Coinbase,
- GasUsed: new(big.Int).Set(head.GasUsed),
- GasLimit: new(big.Int).Set(head.GasLimit),
- Diff: head.Difficulty.String(),
- TotalDiff: td.String(),
- Txs: txs,
- Uncles: uncles,
- },
+ "id": s.node,
+ "history": history,
}
report := map[string][]interface{}{
- "emit": []interface{}{"block", stats},
+ "emit": []interface{}{"history", stats},
}
if err := out.Encode(report); err != nil {
return err
diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go
index a25eff5ed..fd86b6465 100644
--- a/internal/ethapi/api.go
+++ b/internal/ethapi/api.go
@@ -1273,8 +1273,12 @@ func (s *PublicTransactionPoolAPI) SignTransaction(ctx context.Context, args Sig
// PendingTransactions returns the transactions that are in the transaction pool and have a from address that is one of
// the accounts this node manages.
-func (s *PublicTransactionPoolAPI) PendingTransactions() []*RPCTransaction {
- pending := s.b.GetPoolTransactions()
+func (s *PublicTransactionPoolAPI) PendingTransactions() ([]*RPCTransaction, error) {
+ pending, err := s.b.GetPoolTransactions()
+ if err != nil {
+ return nil, err
+ }
+
transactions := make([]*RPCTransaction, 0, len(pending))
for _, tx := range pending {
var signer types.Signer = types.HomesteadSigner{}
@@ -1286,13 +1290,17 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() []*RPCTransaction {
transactions = append(transactions, newRPCPendingTransaction(tx))
}
}
- return transactions
+ return transactions, nil
}
// Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the
// pool and reinsert it with the new gas price and limit.
func (s *PublicTransactionPoolAPI) Resend(ctx context.Context, tx Tx, gasPrice, gasLimit *rpc.HexNumber) (common.Hash, error) {
- pending := s.b.GetPoolTransactions()
+ pending, err := s.b.GetPoolTransactions()
+ if err != nil {
+ return common.Hash{}, err
+ }
+
for _, p := range pending {
var signer types.Signer = types.HomesteadSigner{}
if p.Protected() {
diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go
index 77df7eb8d..36d7e754b 100644
--- a/internal/ethapi/backend.go
+++ b/internal/ethapi/backend.go
@@ -55,7 +55,7 @@ type Backend interface {
// TxPool API
SendTx(ctx context.Context, signedTx *types.Transaction) error
RemoveTx(txHash common.Hash)
- GetPoolTransactions() types.Transactions
+ GetPoolTransactions() (types.Transactions, error)
GetPoolTransaction(txHash common.Hash) *types.Transaction
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)
Stats() (pending int, queued int)
diff --git a/les/api_backend.go b/les/api_backend.go
index 8df963f6e..7dc548ec3 100644
--- a/les/api_backend.go
+++ b/les/api_backend.go
@@ -110,7 +110,7 @@ func (b *LesApiBackend) RemoveTx(txHash common.Hash) {
b.eth.txPool.RemoveTx(txHash)
}
-func (b *LesApiBackend) GetPoolTransactions() types.Transactions {
+func (b *LesApiBackend) GetPoolTransactions() (types.Transactions, error) {
return b.eth.txPool.GetTransactions()
}
diff --git a/les/handler.go b/les/handler.go
index 048f30217..b024841f2 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -88,7 +88,7 @@ type BlockChain interface {
type txPool interface {
// AddTransactions should add the given transactions to the pool.
- AddBatch([]*types.Transaction)
+ AddBatch([]*types.Transaction) error
}
type ProtocolManager struct {
@@ -876,7 +876,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if reqCnt > maxReqs || reqCnt > MaxTxSend {
return errResp(ErrRequestRejected, "")
}
- pm.txpool.AddBatch(txs)
+
+ if err := pm.txpool.AddBatch(txs); err != nil {
+ return errResp(ErrUnexpectedResponse, "msg: %v", err)
+ }
+
_, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
diff --git a/light/txpool.go b/light/txpool.go
index 309bc3a32..4a06d317d 100644
--- a/light/txpool.go
+++ b/light/txpool.go
@@ -500,7 +500,7 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
// GetTransactions returns all currently processable transactions.
// The returned slice may be modified by the caller.
-func (self *TxPool) GetTransactions() (txs types.Transactions) {
+func (self *TxPool) GetTransactions() (txs types.Transactions, err error) {
self.mu.RLock()
defer self.mu.RUnlock()
@@ -510,7 +510,7 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) {
txs[i] = tx
i++
}
- return txs
+ return txs, nil
}
// Content retrieves the data content of the transaction pool, returning all the
diff --git a/miner/worker.go b/miner/worker.go
index edbd502c1..5fa7c4115 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -519,7 +519,14 @@ func (self *worker) commitNewWork() {
if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 {
core.ApplyDAOHardFork(work.state)
}
- txs := types.NewTransactionsByPriceAndNonce(self.eth.TxPool().Pending())
+
+ pending, err := self.eth.TxPool().Pending()
+ if err != nil {
+ glog.Errorf("Could not fetch pending transactions: %v", err)
+ return
+ }
+
+ txs := types.NewTransactionsByPriceAndNonce(pending)
work.commitTransactions(self.mux, txs, self.gasPrice, self.chain)
self.eth.TxPool().RemoveBatch(work.lowGasTxs)