aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.travis.yml1
-rw-r--r--Makefile7
-rw-r--r--README.md18
-rw-r--r--VERSION2
-rw-r--r--cmd/evm/json_logger.go33
-rw-r--r--cmd/evm/main.go10
-rw-r--r--cmd/evm/runner.go10
-rw-r--r--cmd/geth/accountcmd_test.go72
-rw-r--r--cmd/geth/consolecmd_test.go54
-rw-r--r--cmd/geth/dao_test.go4
-rw-r--r--cmd/geth/genesis_test.go6
-rw-r--r--cmd/geth/main.go4
-rw-r--r--cmd/geth/run_test.go250
-rw-r--r--cmd/swarm/run_test.go255
-rw-r--r--cmd/swarm/upload_test.go78
-rw-r--r--consensus/clique/clique.go38
-rw-r--r--consensus/clique/snapshot.go10
-rw-r--r--core/state/sync.go12
-rw-r--r--core/state/sync_test.go25
-rw-r--r--core/tx_pool.go105
-rw-r--r--core/tx_pool_test.go135
-rw-r--r--core/vm/gen_structlog.go26
-rw-r--r--core/vm/logger.go36
-rw-r--r--eth/backend.go5
-rw-r--r--eth/downloader/downloader.go334
-rw-r--r--eth/downloader/metrics.go6
-rw-r--r--eth/downloader/peer.go25
-rw-r--r--eth/downloader/queue.go293
-rw-r--r--eth/downloader/statesync.go449
-rw-r--r--ethclient/ethclient.go4
-rw-r--r--ethdb/memory_database.go7
-rw-r--r--internal/cmdtest/test_cmd.go270
-rw-r--r--les/backend.go40
-rw-r--r--les/distributor.go58
-rw-r--r--les/distributor_test.go10
-rw-r--r--les/fetcher.go11
-rw-r--r--les/handler.go207
-rw-r--r--les/handler_test.go16
-rw-r--r--les/helper_test.go52
-rw-r--r--les/odr.go195
-rw-r--r--les/odr_test.go28
-rw-r--r--les/peer.go38
-rw-r--r--les/request_test.go27
-rw-r--r--les/retrieve.go395
-rw-r--r--les/server.go21
-rw-r--r--les/serverpool.go26
-rw-r--r--les/txrelay.go16
-rw-r--r--light/txpool.go2
-rw-r--r--node/service.go6
-rw-r--r--params/version.go2
-rw-r--r--rpc/http.go4
-rw-r--r--swarm/fuse/swarmfs_test.go283
-rw-r--r--swarm/fuse/swarmfs_unix.go13
-rw-r--r--swarm/fuse/swarmfs_util.go42
-rw-r--r--swarm/storage/dbstore.go2
-rw-r--r--trie/errors.go21
-rw-r--r--trie/iterator.go243
-rw-r--r--trie/iterator_test.go107
-rw-r--r--trie/proof.go2
-rw-r--r--trie/sync.go74
-rw-r--r--trie/sync_test.go30
-rw-r--r--trie/trie.go23
-rw-r--r--trie/trie_test.go57
-rw-r--r--vendor/github.com/docker/docker/LICENSE191
-rw-r--r--vendor/github.com/docker/docker/NOTICE19
-rw-r--r--vendor/github.com/docker/docker/pkg/reexec/README.md5
-rw-r--r--vendor/github.com/docker/docker/pkg/reexec/command_linux.go28
-rw-r--r--vendor/github.com/docker/docker/pkg/reexec/command_unix.go23
-rw-r--r--vendor/github.com/docker/docker/pkg/reexec/command_unsupported.go12
-rw-r--r--vendor/github.com/docker/docker/pkg/reexec/command_windows.go23
-rw-r--r--vendor/github.com/docker/docker/pkg/reexec/reexec.go47
-rw-r--r--vendor/vendor.json6
72 files changed, 3280 insertions, 1709 deletions
diff --git a/.travis.yml b/.travis.yml
index 96e429959..703ed0cb1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -53,6 +53,7 @@ matrix:
- debhelper
- dput
- gcc-multilib
+ - fakeroot
script:
# Build for the primary platforms that Trusty can manage
- go run build/ci.go debsrc -signer "Go Ethereum Linux Builder <geth-ci@ethereum.org>" -upload ppa:ethereum/ethereum
diff --git a/Makefile b/Makefile
index 2b5d84f28..b6e2ddd5e 100644
--- a/Makefile
+++ b/Makefile
@@ -2,7 +2,7 @@
# with Go source code. If you know what GOPATH is then you probably
# don't need to bother with make.
-.PHONY: geth android ios geth-cross evm all test clean
+.PHONY: geth android ios geth-cross swarm evm all test clean
.PHONY: geth-linux geth-linux-386 geth-linux-amd64 geth-linux-mips64 geth-linux-mips64le
.PHONY: geth-linux-arm geth-linux-arm-5 geth-linux-arm-6 geth-linux-arm-7 geth-linux-arm64
.PHONY: geth-darwin geth-darwin-386 geth-darwin-amd64
@@ -16,6 +16,11 @@ geth:
@echo "Done building."
@echo "Run \"$(GOBIN)/geth\" to launch geth."
+swarm:
+ build/env.sh go run build/ci.go install ./cmd/swarm
+ @echo "Done building."
+ @echo "Run \"$(GOBIN)/swarm\" to launch swarm."
+
evm:
build/env.sh go run build/ci.go install ./cmd/evm
@echo "Done building."
diff --git a/README.md b/README.md
index 7758d694f..8d4f37f5e 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-## Ethereum Go
+## Go Ethereum
Official golang implementation of the Ethereum protocol.
@@ -102,6 +102,22 @@ over between the main network and test network, you should make sure to always u
for play-money and real-money. Unless you manually move accounts, Geth will by default correctly
separate the two networks and will not make any accounts available between them.*
+### Configuration
+
+As an alternative to passing the numerous flags to the `geth` binary, you can also pass a configuration file via:
+
+```
+$ geth --config /path/to/your_config.toml
+```
+
+To get an idea how the file should look like you can use the `dumpconfig` subcommand to export your existing configuration:
+
+```
+$ geth --your-favourite-flags dumpconfig
+```
+
+*Note: This works only with geth v1.6.0 and above*
+
#### Docker quick start
One of the quickest ways to get Ethereum up and running on your machine is by using Docker:
diff --git a/VERSION b/VERSION
index ec70f7556..400084b1b 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.6.6
+1.6.7
diff --git a/cmd/evm/json_logger.go b/cmd/evm/json_logger.go
index a84d5daeb..d61981062 100644
--- a/cmd/evm/json_logger.go
+++ b/cmd/evm/json_logger.go
@@ -28,25 +28,32 @@ import (
type JSONLogger struct {
encoder *json.Encoder
+ cfg *vm.LogConfig
}
-func NewJSONLogger(writer io.Writer) *JSONLogger {
- return &JSONLogger{json.NewEncoder(writer)}
+func NewJSONLogger(cfg *vm.LogConfig, writer io.Writer) *JSONLogger {
+ return &JSONLogger{json.NewEncoder(writer), cfg}
}
// CaptureState outputs state information on the logger.
func (l *JSONLogger) CaptureState(env *vm.EVM, pc uint64, op vm.OpCode, gas, cost uint64, memory *vm.Memory, stack *vm.Stack, contract *vm.Contract, depth int, err error) error {
- return l.encoder.Encode(vm.StructLog{
- Pc: pc,
- Op: op,
- Gas: gas + cost,
- GasCost: cost,
- Memory: memory.Data(),
- Stack: stack.Data(),
- Storage: nil,
- Depth: depth,
- Err: err,
- })
+ log := vm.StructLog{
+ Pc: pc,
+ Op: op,
+ Gas: gas + cost,
+ GasCost: cost,
+ MemorySize: memory.Len(),
+ Storage: nil,
+ Depth: depth,
+ Err: err,
+ }
+ if !l.cfg.DisableMemory {
+ log.Memory = memory.Data()
+ }
+ if !l.cfg.DisableStack {
+ log.Stack = stack.Data()
+ }
+ return l.encoder.Encode(log)
}
// CaptureEnd is triggered at end of execution.
diff --git a/cmd/evm/main.go b/cmd/evm/main.go
index 48a1b92cb..1892ae3d3 100644
--- a/cmd/evm/main.go
+++ b/cmd/evm/main.go
@@ -102,6 +102,14 @@ var (
Name: "sender",
Usage: "The transaction origin",
}
+ DisableMemoryFlag = cli.BoolFlag{
+ Name: "nomemory",
+ Usage: "disable memory output",
+ }
+ DisableStackFlag = cli.BoolFlag{
+ Name: "nostack",
+ Usage: "disable stack output",
+ }
)
func init() {
@@ -123,6 +131,8 @@ func init() {
GenesisFlag,
MachineFlag,
SenderFlag,
+ DisableMemoryFlag,
+ DisableStackFlag,
}
app.Commands = []cli.Command{
compileCommand,
diff --git a/cmd/evm/runner.go b/cmd/evm/runner.go
index b1fb8998f..2ce0920f6 100644
--- a/cmd/evm/runner.go
+++ b/cmd/evm/runner.go
@@ -73,6 +73,10 @@ func runCmd(ctx *cli.Context) error {
glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.TerminalFormat(false)))
glogger.Verbosity(log.Lvl(ctx.GlobalInt(VerbosityFlag.Name)))
log.Root().SetHandler(glogger)
+ logconfig := &vm.LogConfig{
+ DisableMemory: ctx.GlobalBool(DisableMemoryFlag.Name),
+ DisableStack: ctx.GlobalBool(DisableStackFlag.Name),
+ }
var (
tracer vm.Tracer
@@ -82,12 +86,12 @@ func runCmd(ctx *cli.Context) error {
sender = common.StringToAddress("sender")
)
if ctx.GlobalBool(MachineFlag.Name) {
- tracer = NewJSONLogger(os.Stdout)
+ tracer = NewJSONLogger(logconfig, os.Stdout)
} else if ctx.GlobalBool(DebugFlag.Name) {
- debugLogger = vm.NewStructLogger(nil)
+ debugLogger = vm.NewStructLogger(logconfig)
tracer = debugLogger
} else {
- debugLogger = vm.NewStructLogger(nil)
+ debugLogger = vm.NewStructLogger(logconfig)
}
if ctx.GlobalString(GenesisFlag.Name) != "" {
gen := readGenesis(ctx.GlobalString(GenesisFlag.Name))
diff --git a/cmd/geth/accountcmd_test.go b/cmd/geth/accountcmd_test.go
index 5f9f67677..e146323ee 100644
--- a/cmd/geth/accountcmd_test.go
+++ b/cmd/geth/accountcmd_test.go
@@ -44,21 +44,21 @@ func tmpDatadirWithKeystore(t *testing.T) string {
func TestAccountListEmpty(t *testing.T) {
geth := runGeth(t, "account", "list")
- geth.expectExit()
+ geth.ExpectExit()
}
func TestAccountList(t *testing.T) {
datadir := tmpDatadirWithKeystore(t)
geth := runGeth(t, "account", "list", "--datadir", datadir)
- defer geth.expectExit()
+ defer geth.ExpectExit()
if runtime.GOOS == "windows" {
- geth.expect(`
+ geth.Expect(`
Account #0: {7ef5a6135f1fd6a02593eedc869c6d41d934aef8} keystore://{{.Datadir}}\keystore\UTC--2016-03-22T12-57-55.920751759Z--7ef5a6135f1fd6a02593eedc869c6d41d934aef8
Account #1: {f466859ead1932d743d622cb74fc058882e8648a} keystore://{{.Datadir}}\keystore\aaa
Account #2: {289d485d9771714cce91d3393d764e1311907acc} keystore://{{.Datadir}}\keystore\zzz
`)
} else {
- geth.expect(`
+ geth.Expect(`
Account #0: {7ef5a6135f1fd6a02593eedc869c6d41d934aef8} keystore://{{.Datadir}}/keystore/UTC--2016-03-22T12-57-55.920751759Z--7ef5a6135f1fd6a02593eedc869c6d41d934aef8
Account #1: {f466859ead1932d743d622cb74fc058882e8648a} keystore://{{.Datadir}}/keystore/aaa
Account #2: {289d485d9771714cce91d3393d764e1311907acc} keystore://{{.Datadir}}/keystore/zzz
@@ -68,20 +68,20 @@ Account #2: {289d485d9771714cce91d3393d764e1311907acc} keystore://{{.Datadir}}/k
func TestAccountNew(t *testing.T) {
geth := runGeth(t, "account", "new", "--lightkdf")
- defer geth.expectExit()
- geth.expect(`
+ defer geth.ExpectExit()
+ geth.Expect(`
Your new account is locked with a password. Please give a password. Do not forget this password.
!! Unsupported terminal, password will be echoed.
Passphrase: {{.InputLine "foobar"}}
Repeat passphrase: {{.InputLine "foobar"}}
`)
- geth.expectRegexp(`Address: \{[0-9a-f]{40}\}\n`)
+ geth.ExpectRegexp(`Address: \{[0-9a-f]{40}\}\n`)
}
func TestAccountNewBadRepeat(t *testing.T) {
geth := runGeth(t, "account", "new", "--lightkdf")
- defer geth.expectExit()
- geth.expect(`
+ defer geth.ExpectExit()
+ geth.Expect(`
Your new account is locked with a password. Please give a password. Do not forget this password.
!! Unsupported terminal, password will be echoed.
Passphrase: {{.InputLine "something"}}
@@ -95,8 +95,8 @@ func TestAccountUpdate(t *testing.T) {
geth := runGeth(t, "account", "update",
"--datadir", datadir, "--lightkdf",
"f466859ead1932d743d622cb74fc058882e8648a")
- defer geth.expectExit()
- geth.expect(`
+ defer geth.ExpectExit()
+ geth.Expect(`
Unlocking account f466859ead1932d743d622cb74fc058882e8648a | Attempt 1/3
!! Unsupported terminal, password will be echoed.
Passphrase: {{.InputLine "foobar"}}
@@ -108,8 +108,8 @@ Repeat passphrase: {{.InputLine "foobar2"}}
func TestWalletImport(t *testing.T) {
geth := runGeth(t, "wallet", "import", "--lightkdf", "testdata/guswallet.json")
- defer geth.expectExit()
- geth.expect(`
+ defer geth.ExpectExit()
+ geth.Expect(`
!! Unsupported terminal, password will be echoed.
Passphrase: {{.InputLine "foo"}}
Address: {d4584b5f6229b7be90727b0fc8c6b91bb427821f}
@@ -123,8 +123,8 @@ Address: {d4584b5f6229b7be90727b0fc8c6b91bb427821f}
func TestWalletImportBadPassword(t *testing.T) {
geth := runGeth(t, "wallet", "import", "--lightkdf", "testdata/guswallet.json")
- defer geth.expectExit()
- geth.expect(`
+ defer geth.ExpectExit()
+ geth.Expect(`
!! Unsupported terminal, password will be echoed.
Passphrase: {{.InputLine "wrong"}}
Fatal: could not decrypt key with given passphrase
@@ -137,19 +137,19 @@ func TestUnlockFlag(t *testing.T) {
"--datadir", datadir, "--nat", "none", "--nodiscover", "--dev",
"--unlock", "f466859ead1932d743d622cb74fc058882e8648a",
"js", "testdata/empty.js")
- geth.expect(`
+ geth.Expect(`
Unlocking account f466859ead1932d743d622cb74fc058882e8648a | Attempt 1/3
!! Unsupported terminal, password will be echoed.
Passphrase: {{.InputLine "foobar"}}
`)
- geth.expectExit()
+ geth.ExpectExit()
wantMessages := []string{
"Unlocked account",
"=0xf466859ead1932d743d622cb74fc058882e8648a",
}
for _, m := range wantMessages {
- if !strings.Contains(geth.stderrText(), m) {
+ if !strings.Contains(geth.StderrText(), m) {
t.Errorf("stderr text does not contain %q", m)
}
}
@@ -160,8 +160,8 @@ func TestUnlockFlagWrongPassword(t *testing.T) {
geth := runGeth(t,
"--datadir", datadir, "--nat", "none", "--nodiscover", "--dev",
"--unlock", "f466859ead1932d743d622cb74fc058882e8648a")
- defer geth.expectExit()
- geth.expect(`
+ defer geth.ExpectExit()
+ geth.Expect(`
Unlocking account f466859ead1932d743d622cb74fc058882e8648a | Attempt 1/3
!! Unsupported terminal, password will be echoed.
Passphrase: {{.InputLine "wrong1"}}
@@ -180,14 +180,14 @@ func TestUnlockFlagMultiIndex(t *testing.T) {
"--datadir", datadir, "--nat", "none", "--nodiscover", "--dev",
"--unlock", "0,2",
"js", "testdata/empty.js")
- geth.expect(`
+ geth.Expect(`
Unlocking account 0 | Attempt 1/3
!! Unsupported terminal, password will be echoed.
Passphrase: {{.InputLine "foobar"}}
Unlocking account 2 | Attempt 1/3
Passphrase: {{.InputLine "foobar"}}
`)
- geth.expectExit()
+ geth.ExpectExit()
wantMessages := []string{
"Unlocked account",
@@ -195,7 +195,7 @@ Passphrase: {{.InputLine "foobar"}}
"=0x289d485d9771714cce91d3393d764e1311907acc",
}
for _, m := range wantMessages {
- if !strings.Contains(geth.stderrText(), m) {
+ if !strings.Contains(geth.StderrText(), m) {
t.Errorf("stderr text does not contain %q", m)
}
}
@@ -207,7 +207,7 @@ func TestUnlockFlagPasswordFile(t *testing.T) {
"--datadir", datadir, "--nat", "none", "--nodiscover", "--dev",
"--password", "testdata/passwords.txt", "--unlock", "0,2",
"js", "testdata/empty.js")
- geth.expectExit()
+ geth.ExpectExit()
wantMessages := []string{
"Unlocked account",
@@ -215,7 +215,7 @@ func TestUnlockFlagPasswordFile(t *testing.T) {
"=0x289d485d9771714cce91d3393d764e1311907acc",
}
for _, m := range wantMessages {
- if !strings.Contains(geth.stderrText(), m) {
+ if !strings.Contains(geth.StderrText(), m) {
t.Errorf("stderr text does not contain %q", m)
}
}
@@ -226,8 +226,8 @@ func TestUnlockFlagPasswordFileWrongPassword(t *testing.T) {
geth := runGeth(t,
"--datadir", datadir, "--nat", "none", "--nodiscover", "--dev",
"--password", "testdata/wrong-passwords.txt", "--unlock", "0,2")
- defer geth.expectExit()
- geth.expect(`
+ defer geth.ExpectExit()
+ geth.Expect(`
Fatal: Failed to unlock account 0 (could not decrypt key with given passphrase)
`)
}
@@ -238,14 +238,14 @@ func TestUnlockFlagAmbiguous(t *testing.T) {
"--keystore", store, "--nat", "none", "--nodiscover", "--dev",
"--unlock", "f466859ead1932d743d622cb74fc058882e8648a",
"js", "testdata/empty.js")
- defer geth.expectExit()
+ defer geth.ExpectExit()
// Helper for the expect template, returns absolute keystore path.
- geth.setTemplateFunc("keypath", func(file string) string {
+ geth.SetTemplateFunc("keypath", func(file string) string {
abs, _ := filepath.Abs(filepath.Join(store, file))
return abs
})
- geth.expect(`
+ geth.Expect(`
Unlocking account f466859ead1932d743d622cb74fc058882e8648a | Attempt 1/3
!! Unsupported terminal, password will be echoed.
Passphrase: {{.InputLine "foobar"}}
@@ -257,14 +257,14 @@ Your passphrase unlocked keystore://{{keypath "1"}}
In order to avoid this warning, you need to remove the following duplicate key files:
keystore://{{keypath "2"}}
`)
- geth.expectExit()
+ geth.ExpectExit()
wantMessages := []string{
"Unlocked account",
"=0xf466859ead1932d743d622cb74fc058882e8648a",
}
for _, m := range wantMessages {
- if !strings.Contains(geth.stderrText(), m) {
+ if !strings.Contains(geth.StderrText(), m) {
t.Errorf("stderr text does not contain %q", m)
}
}
@@ -275,14 +275,14 @@ func TestUnlockFlagAmbiguousWrongPassword(t *testing.T) {
geth := runGeth(t,
"--keystore", store, "--nat", "none", "--nodiscover", "--dev",
"--unlock", "f466859ead1932d743d622cb74fc058882e8648a")
- defer geth.expectExit()
+ defer geth.ExpectExit()
// Helper for the expect template, returns absolute keystore path.
- geth.setTemplateFunc("keypath", func(file string) string {
+ geth.SetTemplateFunc("keypath", func(file string) string {
abs, _ := filepath.Abs(filepath.Join(store, file))
return abs
})
- geth.expect(`
+ geth.Expect(`
Unlocking account f466859ead1932d743d622cb74fc058882e8648a | Attempt 1/3
!! Unsupported terminal, password will be echoed.
Passphrase: {{.InputLine "wrong"}}
@@ -292,5 +292,5 @@ Multiple key files exist for address f466859ead1932d743d622cb74fc058882e8648a:
Testing your passphrase against all of them...
Fatal: None of the listed files could be unlocked.
`)
- geth.expectExit()
+ geth.ExpectExit()
}
diff --git a/cmd/geth/consolecmd_test.go b/cmd/geth/consolecmd_test.go
index e5472836c..258b9e6dd 100644
--- a/cmd/geth/consolecmd_test.go
+++ b/cmd/geth/consolecmd_test.go
@@ -47,15 +47,15 @@ func TestConsoleWelcome(t *testing.T) {
"console")
// Gather all the infos the welcome message needs to contain
- geth.setTemplateFunc("goos", func() string { return runtime.GOOS })
- geth.setTemplateFunc("goarch", func() string { return runtime.GOARCH })
- geth.setTemplateFunc("gover", runtime.Version)
- geth.setTemplateFunc("gethver", func() string { return params.Version })
- geth.setTemplateFunc("niltime", func() string { return time.Unix(0, 0).Format(time.RFC1123) })
- geth.setTemplateFunc("apis", func() string { return ipcAPIs })
+ geth.SetTemplateFunc("goos", func() string { return runtime.GOOS })
+ geth.SetTemplateFunc("goarch", func() string { return runtime.GOARCH })
+ geth.SetTemplateFunc("gover", runtime.Version)
+ geth.SetTemplateFunc("gethver", func() string { return params.Version })
+ geth.SetTemplateFunc("niltime", func() string { return time.Unix(0, 0).Format(time.RFC1123) })
+ geth.SetTemplateFunc("apis", func() string { return ipcAPIs })
// Verify the actual welcome message to the required template
- geth.expect(`
+ geth.Expect(`
Welcome to the Geth JavaScript console!
instance: Geth/v{{gethver}}/{{goos}}-{{goarch}}/{{gover}}
@@ -66,7 +66,7 @@ at block: 0 ({{niltime}})
> {{.InputLine "exit"}}
`)
- geth.expectExit()
+ geth.ExpectExit()
}
// Tests that a console can be attached to a running node via various means.
@@ -90,8 +90,8 @@ func TestIPCAttachWelcome(t *testing.T) {
time.Sleep(2 * time.Second) // Simple way to wait for the RPC endpoint to open
testAttachWelcome(t, geth, "ipc:"+ipc, ipcAPIs)
- geth.interrupt()
- geth.expectExit()
+ geth.Interrupt()
+ geth.ExpectExit()
}
func TestHTTPAttachWelcome(t *testing.T) {
@@ -104,8 +104,8 @@ func TestHTTPAttachWelcome(t *testing.T) {
time.Sleep(2 * time.Second) // Simple way to wait for the RPC endpoint to open
testAttachWelcome(t, geth, "http://localhost:"+port, httpAPIs)
- geth.interrupt()
- geth.expectExit()
+ geth.Interrupt()
+ geth.ExpectExit()
}
func TestWSAttachWelcome(t *testing.T) {
@@ -119,29 +119,29 @@ func TestWSAttachWelcome(t *testing.T) {
time.Sleep(2 * time.Second) // Simple way to wait for the RPC endpoint to open
testAttachWelcome(t, geth, "ws://localhost:"+port, httpAPIs)
- geth.interrupt()
- geth.expectExit()
+ geth.Interrupt()
+ geth.ExpectExit()
}
func testAttachWelcome(t *testing.T, geth *testgeth, endpoint, apis string) {
// Attach to a running geth note and terminate immediately
attach := runGeth(t, "attach", endpoint)
- defer attach.expectExit()
- attach.stdin.Close()
+ defer attach.ExpectExit()
+ attach.CloseStdin()
// Gather all the infos the welcome message needs to contain
- attach.setTemplateFunc("goos", func() string { return runtime.GOOS })
- attach.setTemplateFunc("goarch", func() string { return runtime.GOARCH })
- attach.setTemplateFunc("gover", runtime.Version)
- attach.setTemplateFunc("gethver", func() string { return params.Version })
- attach.setTemplateFunc("etherbase", func() string { return geth.Etherbase })
- attach.setTemplateFunc("niltime", func() string { return time.Unix(0, 0).Format(time.RFC1123) })
- attach.setTemplateFunc("ipc", func() bool { return strings.HasPrefix(endpoint, "ipc") })
- attach.setTemplateFunc("datadir", func() string { return geth.Datadir })
- attach.setTemplateFunc("apis", func() string { return apis })
+ attach.SetTemplateFunc("goos", func() string { return runtime.GOOS })
+ attach.SetTemplateFunc("goarch", func() string { return runtime.GOARCH })
+ attach.SetTemplateFunc("gover", runtime.Version)
+ attach.SetTemplateFunc("gethver", func() string { return params.Version })
+ attach.SetTemplateFunc("etherbase", func() string { return geth.Etherbase })
+ attach.SetTemplateFunc("niltime", func() string { return time.Unix(0, 0).Format(time.RFC1123) })
+ attach.SetTemplateFunc("ipc", func() bool { return strings.HasPrefix(endpoint, "ipc") })
+ attach.SetTemplateFunc("datadir", func() string { return geth.Datadir })
+ attach.SetTemplateFunc("apis", func() string { return apis })
// Verify the actual welcome message to the required template
- attach.expect(`
+ attach.Expect(`
Welcome to the Geth JavaScript console!
instance: Geth/v{{gethver}}/{{goos}}-{{goarch}}/{{gover}}
@@ -152,7 +152,7 @@ at block: 0 ({{niltime}}){{if ipc}}
> {{.InputLine "exit" }}
`)
- attach.expectExit()
+ attach.ExpectExit()
}
// trulyRandInt generates a crypto random integer used by the console tests to
diff --git a/cmd/geth/dao_test.go b/cmd/geth/dao_test.go
index ec7802ada..8cc66aabf 100644
--- a/cmd/geth/dao_test.go
+++ b/cmd/geth/dao_test.go
@@ -112,12 +112,12 @@ func testDAOForkBlockNewChain(t *testing.T, test int, genesis string, expectBloc
if err := ioutil.WriteFile(json, []byte(genesis), 0600); err != nil {
t.Fatalf("test %d: failed to write genesis file: %v", test, err)
}
- runGeth(t, "--datadir", datadir, "init", json).cmd.Wait()
+ runGeth(t, "--datadir", datadir, "init", json).WaitExit()
} else {
// Force chain initialization
args := []string{"--port", "0", "--maxpeers", "0", "--nodiscover", "--nat", "none", "--ipcdisable", "--datadir", datadir}
geth := runGeth(t, append(args, []string{"--exec", "2+2", "console"}...)...)
- geth.cmd.Wait()
+ geth.WaitExit()
}
// Retrieve the DAO config flag from the database
path := filepath.Join(datadir, "geth", "chaindata")
diff --git a/cmd/geth/genesis_test.go b/cmd/geth/genesis_test.go
index 6c3ca7298..a00ae00c1 100644
--- a/cmd/geth/genesis_test.go
+++ b/cmd/geth/genesis_test.go
@@ -97,14 +97,14 @@ func TestCustomGenesis(t *testing.T) {
if err := ioutil.WriteFile(json, []byte(tt.genesis), 0600); err != nil {
t.Fatalf("test %d: failed to write genesis file: %v", i, err)
}
- runGeth(t, "--datadir", datadir, "init", json).cmd.Wait()
+ runGeth(t, "--datadir", datadir, "init", json).WaitExit()
// Query the custom genesis block
geth := runGeth(t,
"--datadir", datadir, "--maxpeers", "0", "--port", "0",
"--nodiscover", "--nat", "none", "--ipcdisable",
"--exec", tt.query, "console")
- geth.expectRegexp(tt.result)
- geth.expectExit()
+ geth.ExpectRegexp(tt.result)
+ geth.ExpectExit()
}
}
diff --git a/cmd/geth/main.go b/cmd/geth/main.go
index cc481796f..b5cdd712d 100644
--- a/cmd/geth/main.go
+++ b/cmd/geth/main.go
@@ -252,10 +252,12 @@ func startNode(ctx *cli.Context, stack *node.Node) {
}()
// Start auxiliary services if enabled
if ctx.GlobalBool(utils.MiningEnabledFlag.Name) {
+ // Mining only makes sense if a full Ethereum node is running
var ethereum *eth.Ethereum
if err := stack.Service(&ethereum); err != nil {
utils.Fatalf("ethereum service not running: %v", err)
}
+ // Use a reduced number of threads if requested
if threads := ctx.GlobalInt(utils.MinerThreadsFlag.Name); threads > 0 {
type threaded interface {
SetThreads(threads int)
@@ -264,6 +266,8 @@ func startNode(ctx *cli.Context, stack *node.Node) {
th.SetThreads(threads)
}
}
+ // Set the gas price to the limits from the CLI and start mining
+ ethereum.TxPool().SetGasPrice(utils.GlobalBig(ctx, utils.GasPriceFlag.Name))
if err := ethereum.StartMining(true); err != nil {
utils.Fatalf("Failed to start mining: %v", err)
}
diff --git a/cmd/geth/run_test.go b/cmd/geth/run_test.go
index e26b4509a..da82facac 100644
--- a/cmd/geth/run_test.go
+++ b/cmd/geth/run_test.go
@@ -17,18 +17,13 @@
package main
import (
- "bufio"
- "bytes"
"fmt"
- "io"
"io/ioutil"
"os"
- "os/exec"
- "regexp"
- "sync"
"testing"
- "text/template"
- "time"
+
+ "github.com/docker/docker/pkg/reexec"
+ "github.com/ethereum/go-ethereum/internal/cmdtest"
)
func tmpdir(t *testing.T) string {
@@ -40,36 +35,37 @@ func tmpdir(t *testing.T) string {
}
type testgeth struct {
- // For total convenience, all testing methods are available.
- *testing.T
- // template variables for expect
- Datadir string
- Executable string
- Etherbase string
- Func template.FuncMap
+ *cmdtest.TestCmd
- removeDatadir bool
- cmd *exec.Cmd
- stdout *bufio.Reader
- stdin io.WriteCloser
- stderr *testlogger
+ // template variables for expect
+ Datadir string
+ Etherbase string
}
func init() {
- // Run the app if we're the child process for runGeth.
- if os.Getenv("GETH_TEST_CHILD") != "" {
+ // Run the app if we've been exec'd as "geth-test" in runGeth.
+ reexec.Register("geth-test", func() {
if err := app.Run(os.Args); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
os.Exit(0)
+ })
+}
+
+func TestMain(m *testing.M) {
+ // check if we have been reexec'd
+ if reexec.Init() {
+ return
}
+ os.Exit(m.Run())
}
// spawns geth with the given command line args. If the args don't set --datadir, the
// child g gets a temporary data directory.
func runGeth(t *testing.T, args ...string) *testgeth {
- tt := &testgeth{T: t, Executable: os.Args[0]}
+ tt := &testgeth{}
+ tt.TestCmd = cmdtest.NewTestCmd(t, tt)
for i, arg := range args {
switch {
case arg == "-datadir" || arg == "--datadir":
@@ -84,215 +80,19 @@ func runGeth(t *testing.T, args ...string) *testgeth {
}
if tt.Datadir == "" {
tt.Datadir = tmpdir(t)
- tt.removeDatadir = true
+ tt.Cleanup = func() { os.RemoveAll(tt.Datadir) }
args = append([]string{"-datadir", tt.Datadir}, args...)
// Remove the temporary datadir if something fails below.
defer func() {
if t.Failed() {
- os.RemoveAll(tt.Datadir)
+ tt.Cleanup()
}
}()
}
- // Boot "geth". This actually runs the test binary but the init function
- // will prevent any tests from running.
- tt.stderr = &testlogger{t: t}
- tt.cmd = exec.Command(os.Args[0], args...)
- tt.cmd.Env = append(os.Environ(), "GETH_TEST_CHILD=1")
- tt.cmd.Stderr = tt.stderr
- stdout, err := tt.cmd.StdoutPipe()
- if err != nil {
- t.Fatal(err)
- }
- tt.stdout = bufio.NewReader(stdout)
- if tt.stdin, err = tt.cmd.StdinPipe(); err != nil {
- t.Fatal(err)
- }
- if err := tt.cmd.Start(); err != nil {
- t.Fatal(err)
- }
- return tt
-}
-
-// InputLine writes the given text to the childs stdin.
-// This method can also be called from an expect template, e.g.:
-//
-// geth.expect(`Passphrase: {{.InputLine "password"}}`)
-func (tt *testgeth) InputLine(s string) string {
- io.WriteString(tt.stdin, s+"\n")
- return ""
-}
-
-func (tt *testgeth) setTemplateFunc(name string, fn interface{}) {
- if tt.Func == nil {
- tt.Func = make(map[string]interface{})
- }
- tt.Func[name] = fn
-}
-
-// expect runs its argument as a template, then expects the
-// child process to output the result of the template within 5s.
-//
-// If the template starts with a newline, the newline is removed
-// before matching.
-func (tt *testgeth) expect(tplsource string) {
- // Generate the expected output by running the template.
- tpl := template.Must(template.New("").Funcs(tt.Func).Parse(tplsource))
- wantbuf := new(bytes.Buffer)
- if err := tpl.Execute(wantbuf, tt); err != nil {
- panic(err)
- }
- // Trim exactly one newline at the beginning. This makes tests look
- // much nicer because all expect strings are at column 0.
- want := bytes.TrimPrefix(wantbuf.Bytes(), []byte("\n"))
- if err := tt.matchExactOutput(want); err != nil {
- tt.Fatal(err)
- }
- tt.Logf("Matched stdout text:\n%s", want)
-}
-
-func (tt *testgeth) matchExactOutput(want []byte) error {
- buf := make([]byte, len(want))
- n := 0
- tt.withKillTimeout(func() { n, _ = io.ReadFull(tt.stdout, buf) })
- buf = buf[:n]
- if n < len(want) || !bytes.Equal(buf, want) {
- // Grab any additional buffered output in case of mismatch
- // because it might help with debugging.
- buf = append(buf, make([]byte, tt.stdout.Buffered())...)
- tt.stdout.Read(buf[n:])
- // Find the mismatch position.
- for i := 0; i < n; i++ {
- if want[i] != buf[i] {
- return fmt.Errorf("Output mismatch at â—Š:\n---------------- (stdout text)\n%sâ—Š%s\n---------------- (expected text)\n%s",
- buf[:i], buf[i:n], want)
- }
- }
- if n < len(want) {
- return fmt.Errorf("Not enough output, got until â—Š:\n---------------- (stdout text)\n%s\n---------------- (expected text)\n%sâ—Š%s",
- buf, want[:n], want[n:])
- }
- }
- return nil
-}
-
-// expectRegexp expects the child process to output text matching the
-// given regular expression within 5s.
-//
-// Note that an arbitrary amount of output may be consumed by the
-// regular expression. This usually means that expect cannot be used
-// after expectRegexp.
-func (tt *testgeth) expectRegexp(resource string) (*regexp.Regexp, []string) {
- var (
- re = regexp.MustCompile(resource)
- rtee = &runeTee{in: tt.stdout}
- matches []int
- )
- tt.withKillTimeout(func() { matches = re.FindReaderSubmatchIndex(rtee) })
- output := rtee.buf.Bytes()
- if matches == nil {
- tt.Fatalf("Output did not match:\n---------------- (stdout text)\n%s\n---------------- (regular expression)\n%s",
- output, resource)
- return re, nil
- }
- tt.Logf("Matched stdout text:\n%s", output)
- var submatch []string
- for i := 0; i < len(matches); i += 2 {
- submatch = append(submatch, string(output[i:i+1]))
- }
- return re, submatch
-}
-
-// expectExit expects the child process to exit within 5s without
-// printing any additional text on stdout.
-func (tt *testgeth) expectExit() {
- var output []byte
- tt.withKillTimeout(func() {
- output, _ = ioutil.ReadAll(tt.stdout)
- })
- tt.cmd.Wait()
- if tt.removeDatadir {
- os.RemoveAll(tt.Datadir)
- }
- if len(output) > 0 {
- tt.Errorf("Unmatched stdout text:\n%s", output)
- }
-}
-
-func (tt *testgeth) interrupt() {
- tt.cmd.Process.Signal(os.Interrupt)
-}
-
-// stderrText returns any stderr output written so far.
-// The returned text holds all log lines after expectExit has
-// returned.
-func (tt *testgeth) stderrText() string {
- tt.stderr.mu.Lock()
- defer tt.stderr.mu.Unlock()
- return tt.stderr.buf.String()
-}
-
-func (tt *testgeth) withKillTimeout(fn func()) {
- timeout := time.AfterFunc(5*time.Second, func() {
- tt.Log("killing the child process (timeout)")
- tt.cmd.Process.Kill()
- if tt.removeDatadir {
- os.RemoveAll(tt.Datadir)
- }
- })
- defer timeout.Stop()
- fn()
-}
+ // Boot "geth". This actually runs the test binary but the TestMain
+ // function will prevent any tests from running.
+ tt.Run("geth-test", args...)
-// testlogger logs all written lines via t.Log and also
-// collects them for later inspection.
-type testlogger struct {
- t *testing.T
- mu sync.Mutex
- buf bytes.Buffer
-}
-
-func (tl *testlogger) Write(b []byte) (n int, err error) {
- lines := bytes.Split(b, []byte("\n"))
- for _, line := range lines {
- if len(line) > 0 {
- tl.t.Logf("(stderr) %s", line)
- }
- }
- tl.mu.Lock()
- tl.buf.Write(b)
- tl.mu.Unlock()
- return len(b), err
-}
-
-// runeTee collects text read through it into buf.
-type runeTee struct {
- in interface {
- io.Reader
- io.ByteReader
- io.RuneReader
- }
- buf bytes.Buffer
-}
-
-func (rtee *runeTee) Read(b []byte) (n int, err error) {
- n, err = rtee.in.Read(b)
- rtee.buf.Write(b[:n])
- return n, err
-}
-
-func (rtee *runeTee) ReadRune() (r rune, size int, err error) {
- r, size, err = rtee.in.ReadRune()
- if err == nil {
- rtee.buf.WriteRune(r)
- }
- return r, size, err
-}
-
-func (rtee *runeTee) ReadByte() (b byte, err error) {
- b, err = rtee.in.ReadByte()
- if err == nil {
- rtee.buf.WriteByte(b)
- }
- return b, err
+ return tt
}
diff --git a/cmd/swarm/run_test.go b/cmd/swarm/run_test.go
new file mode 100644
index 000000000..2d32a51c8
--- /dev/null
+++ b/cmd/swarm/run_test.go
@@ -0,0 +1,255 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of go-ethereum.
+//
+// go-ethereum is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// go-ethereum is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
+
+package main
+
+import (
+ "fmt"
+ "io/ioutil"
+ "net"
+ "os"
+ "path/filepath"
+ "runtime"
+ "testing"
+ "time"
+
+ "github.com/docker/docker/pkg/reexec"
+ "github.com/ethereum/go-ethereum/accounts/keystore"
+ "github.com/ethereum/go-ethereum/internal/cmdtest"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethereum/go-ethereum/swarm"
+)
+
+func init() {
+ // Run the app if we've been exec'd as "swarm-test" in runSwarm.
+ reexec.Register("swarm-test", func() {
+ if err := app.Run(os.Args); err != nil {
+ fmt.Fprintln(os.Stderr, err)
+ os.Exit(1)
+ }
+ os.Exit(0)
+ })
+}
+
+func TestMain(m *testing.M) {
+ // check if we have been reexec'd
+ if reexec.Init() {
+ return
+ }
+ os.Exit(m.Run())
+}
+
+func runSwarm(t *testing.T, args ...string) *cmdtest.TestCmd {
+ tt := cmdtest.NewTestCmd(t, nil)
+
+ // Boot "swarm". This actually runs the test binary but the TestMain
+ // function will prevent any tests from running.
+ tt.Run("swarm-test", args...)
+
+ return tt
+}
+
+type testCluster struct {
+ Nodes []*testNode
+ TmpDir string
+}
+
+// newTestCluster starts a test swarm cluster of the given size.
+//
+// A temporary directory is created and each node gets a data directory inside
+// it.
+//
+// Each node listens on 127.0.0.1 with random ports for both the HTTP and p2p
+// ports (assigned by first listening on 127.0.0.1:0 and then passing the ports
+// as flags).
+//
+// When starting more than one node, they are connected together using the
+// admin SetPeer RPC method.
+func newTestCluster(t *testing.T, size int) *testCluster {
+ cluster := &testCluster{}
+ defer func() {
+ if t.Failed() {
+ cluster.Shutdown()
+ }
+ }()
+
+ tmpdir, err := ioutil.TempDir("", "swarm-test")
+ if err != nil {
+ t.Fatal(err)
+ }
+ cluster.TmpDir = tmpdir
+
+ // start the nodes
+ cluster.Nodes = make([]*testNode, 0, size)
+ for i := 0; i < size; i++ {
+ dir := filepath.Join(cluster.TmpDir, fmt.Sprintf("swarm%02d", i))
+ if err := os.Mkdir(dir, 0700); err != nil {
+ t.Fatal(err)
+ }
+
+ node := newTestNode(t, dir)
+ node.Name = fmt.Sprintf("swarm%02d", i)
+
+ cluster.Nodes = append(cluster.Nodes, node)
+ }
+
+ if size == 1 {
+ return cluster
+ }
+
+ // connect the nodes together
+ for _, node := range cluster.Nodes {
+ if err := node.Client.Call(nil, "admin_addPeer", cluster.Nodes[0].Enode); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ // wait until all nodes have the correct number of peers
+outer:
+ for _, node := range cluster.Nodes {
+ var peers []*p2p.PeerInfo
+ for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(50 * time.Millisecond) {
+ if err := node.Client.Call(&peers, "admin_peers"); err != nil {
+ t.Fatal(err)
+ }
+ if len(peers) == len(cluster.Nodes)-1 {
+ continue outer
+ }
+ }
+ t.Fatalf("%s only has %d / %d peers", node.Name, len(peers), len(cluster.Nodes)-1)
+ }
+
+ return cluster
+}
+
+func (c *testCluster) Shutdown() {
+ for _, node := range c.Nodes {
+ node.Shutdown()
+ }
+ os.RemoveAll(c.TmpDir)
+}
+
+type testNode struct {
+ Name string
+ Addr string
+ URL string
+ Enode string
+ Dir string
+ Client *rpc.Client
+ Cmd *cmdtest.TestCmd
+}
+
+const testPassphrase = "swarm-test-passphrase"
+
+func newTestNode(t *testing.T, dir string) *testNode {
+ // create key
+ conf := &node.Config{
+ DataDir: dir,
+ IPCPath: "bzzd.ipc",
+ }
+ n, err := node.New(conf)
+ if err != nil {
+ t.Fatal(err)
+ }
+ account, err := n.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore).NewAccount(testPassphrase)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ node := &testNode{Dir: dir}
+
+ // use a unique IPCPath when running tests on Windows
+ if runtime.GOOS == "windows" {
+ conf.IPCPath = fmt.Sprintf("bzzd-%s.ipc", account.Address.String())
+ }
+
+ // assign ports
+ httpPort, err := assignTCPPort()
+ if err != nil {
+ t.Fatal(err)
+ }
+ p2pPort, err := assignTCPPort()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // start the node
+ node.Cmd = runSwarm(t,
+ "--port", p2pPort,
+ "--nodiscover",
+ "--datadir", dir,
+ "--ipcpath", conf.IPCPath,
+ "--ethapi", "",
+ "--bzzaccount", account.Address.String(),
+ "--bzznetworkid", "321",
+ "--bzzport", httpPort,
+ "--verbosity", "6",
+ )
+ node.Cmd.InputLine(testPassphrase)
+ defer func() {
+ if t.Failed() {
+ node.Shutdown()
+ }
+ }()
+
+ // wait for the node to start
+ for start := time.Now(); time.Since(start) < 10*time.Second; time.Sleep(50 * time.Millisecond) {
+ node.Client, err = rpc.Dial(conf.IPCEndpoint())
+ if err == nil {
+ break
+ }
+ }
+ if node.Client == nil {
+ t.Fatal(err)
+ }
+
+ // load info
+ var info swarm.Info
+ if err := node.Client.Call(&info, "bzz_info"); err != nil {
+ t.Fatal(err)
+ }
+ node.Addr = net.JoinHostPort("127.0.0.1", info.Port)
+ node.URL = "http://" + node.Addr
+
+ var nodeInfo p2p.NodeInfo
+ if err := node.Client.Call(&nodeInfo, "admin_nodeInfo"); err != nil {
+ t.Fatal(err)
+ }
+ node.Enode = fmt.Sprintf("enode://%s@127.0.0.1:%s", nodeInfo.ID, p2pPort)
+
+ return node
+}
+
+func (n *testNode) Shutdown() {
+ if n.Cmd != nil {
+ n.Cmd.Kill()
+ }
+}
+
+func assignTCPPort() (string, error) {
+ l, err := net.Listen("tcp", "127.0.0.1:0")
+ if err != nil {
+ return "", err
+ }
+ l.Close()
+ _, port, err := net.SplitHostPort(l.Addr().String())
+ if err != nil {
+ return "", err
+ }
+ return port, nil
+}
diff --git a/cmd/swarm/upload_test.go b/cmd/swarm/upload_test.go
new file mode 100644
index 000000000..5b74dd4f1
--- /dev/null
+++ b/cmd/swarm/upload_test.go
@@ -0,0 +1,78 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of go-ethereum.
+//
+// go-ethereum is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// go-ethereum is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
+
+package main
+
+import (
+ "io"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "testing"
+)
+
+// TestCLISwarmUp tests that running 'swarm up' makes the resulting file
+// available from all nodes via the HTTP API
+func TestCLISwarmUp(t *testing.T) {
+ t.Skip("flaky test")
+
+ // start 3 node cluster
+ t.Log("starting 3 node cluster")
+ cluster := newTestCluster(t, 3)
+ defer cluster.Shutdown()
+
+ // create a tmp file
+ tmp, err := ioutil.TempFile("", "swarm-test")
+ assertNil(t, err)
+ defer tmp.Close()
+ defer os.Remove(tmp.Name())
+ _, err = io.WriteString(tmp, "data")
+ assertNil(t, err)
+
+ // upload the file with 'swarm up' and expect a hash
+ t.Log("uploading file with 'swarm up'")
+ up := runSwarm(t, "--bzzapi", cluster.Nodes[0].URL, "up", tmp.Name())
+ _, matches := up.ExpectRegexp(`[a-f\d]{64}`)
+ up.ExpectExit()
+ hash := matches[0]
+ t.Logf("file uploaded with hash %s", hash)
+
+ // get the file from the HTTP API of each node
+ for _, node := range cluster.Nodes {
+ t.Logf("getting file from %s", node.Name)
+ res, err := http.Get(node.URL + "/bzz:/" + hash)
+ assertNil(t, err)
+ assertHTTPResponse(t, res, http.StatusOK, "data")
+ }
+}
+
+func assertNil(t *testing.T, err error) {
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func assertHTTPResponse(t *testing.T, res *http.Response, expectedStatus int, expectedBody string) {
+ defer res.Body.Close()
+ if res.StatusCode != expectedStatus {
+ t.Fatalf("expected HTTP status %d, got %s", expectedStatus, res.Status)
+ }
+ data, err := ioutil.ReadAll(res.Body)
+ assertNil(t, err)
+ if string(data) != expectedBody {
+ t.Fatalf("expected HTTP body %q, got %q", expectedBody, data)
+ }
+}
diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go
index 5a50b2eb4..d2fb6934b 100644
--- a/consensus/clique/clique.go
+++ b/consensus/clique/clique.go
@@ -99,10 +99,10 @@ var (
// their extra-data fields.
errExtraSigners = errors.New("non-checkpoint block contains extra signer list")
- // drrInvalidCheckpointSigners is returned if a checkpoint block contains an
+ // errInvalidCheckpointSigners is returned if a checkpoint block contains an
// invalid list of signers (i.e. non divisible by 20 bytes, or not the correct
// ones).
- drrInvalidCheckpointSigners = errors.New("invalid signer list on checkpoint block")
+ errInvalidCheckpointSigners = errors.New("invalid signer list on checkpoint block")
// errInvalidMixDigest is returned if a block's mix digest is non-zero.
errInvalidMixDigest = errors.New("non-zero mix digest")
@@ -297,7 +297,7 @@ func (c *Clique) verifyHeader(chain consensus.ChainReader, header *types.Header,
return errExtraSigners
}
if checkpoint && signersBytes%common.AddressLength != 0 {
- return drrInvalidCheckpointSigners
+ return errInvalidCheckpointSigners
}
// Ensure that the mix digest is zero as we don't have fork protection currently
if header.MixDigest != (common.Hash{}) {
@@ -353,7 +353,7 @@ func (c *Clique) verifyCascadingFields(chain consensus.ChainReader, header *type
}
extraSuffix := len(header.Extra) - extraSeal
if !bytes.Equal(header.Extra[extraVanity:extraSuffix], signers) {
- return drrInvalidCheckpointSigners
+ return errInvalidCheckpointSigners
}
}
// All basic checks passed, verify the seal and return
@@ -467,7 +467,6 @@ func (c *Clique) verifySeal(chain consensus.ChainReader, header *types.Header, p
if err != nil {
return err
}
- c.recents.Add(snap.Hash, snap)
// Resolve the authorization key and check against signers
signer, err := ecrecover(header, c.signatures)
@@ -479,13 +478,13 @@ func (c *Clique) verifySeal(chain consensus.ChainReader, header *types.Header, p
}
for seen, recent := range snap.Recents {
if recent == signer {
- // Signer is among recents, only fail if the current block doens't shift it out
+ // Signer is among recents, only fail if the current block doesn't shift it out
if limit := uint64(len(snap.Signers)/2 + 1); seen > number-limit {
return errUnauthorized
}
}
}
- // Ensure that the difficulty corresponts to the turn-ness of the signer
+ // Ensure that the difficulty corresponds to the turn-ness of the signer
inturn := snap.inturn(header.Number.Uint64(), signer)
if inturn && header.Difficulty.Cmp(diffInTurn) != 0 {
return errInvalidDifficulty
@@ -504,13 +503,24 @@ func (c *Clique) Prepare(chain consensus.ChainReader, header *types.Header) erro
header.Nonce = types.BlockNonce{}
number := header.Number.Uint64()
+
+ // Assemble the voting snapshot to check which votes make sense
+ snap, err := c.snapshot(chain, number-1, header.ParentHash, nil)
+ if err != nil {
+ return err
+ }
if number%c.config.Epoch != 0 {
c.lock.RLock()
- if len(c.proposals) > 0 {
- addresses := make([]common.Address, 0, len(c.proposals))
- for address := range c.proposals {
+
+ // Gather all the proposals that make sense voting on
+ addresses := make([]common.Address, 0, len(c.proposals))
+ for address, authorize := range c.proposals {
+ if snap.validVote(address, authorize) {
addresses = append(addresses, address)
}
+ }
+ // If there's pending proposals, cast a vote on them
+ if len(addresses) > 0 {
header.Coinbase = addresses[rand.Intn(len(addresses))]
if c.proposals[header.Coinbase] {
copy(header.Nonce[:], nonceAuthVote)
@@ -520,11 +530,7 @@ func (c *Clique) Prepare(chain consensus.ChainReader, header *types.Header) erro
}
c.lock.RUnlock()
}
- // Assemble the voting snapshot and set the correct difficulty
- snap, err := c.snapshot(chain, number-1, header.ParentHash, nil)
- if err != nil {
- return err
- }
+ // Set the correct difficulty
header.Difficulty = diffNoTurn
if snap.inturn(header.Number.Uint64(), c.signer) {
header.Difficulty = diffInTurn
@@ -604,7 +610,7 @@ func (c *Clique) Seal(chain consensus.ChainReader, block *types.Block, stop <-ch
// If we're amongst the recent signers, wait for the next block
for seen, recent := range snap.Recents {
if recent == signer {
- // Signer is among recents, only wait if the current block doens't shift it out
+ // Signer is among recents, only wait if the current block doesn't shift it out
if limit := uint64(len(snap.Signers)/2 + 1); number < limit || seen > number-limit {
log.Info("Signed recently, must wait for others")
<-stop
diff --git a/consensus/clique/snapshot.go b/consensus/clique/snapshot.go
index 8eaf3b62e..32a1191db 100644
--- a/consensus/clique/snapshot.go
+++ b/consensus/clique/snapshot.go
@@ -126,11 +126,17 @@ func (s *Snapshot) copy() *Snapshot {
return cpy
}
+// validVote returns whether it makes sense to cast the specified vote in the
+// given snapshot context (e.g. don't try to add an already authorized signer).
+func (s *Snapshot) validVote(address common.Address, authorize bool) bool {
+ _, signer := s.Signers[address]
+ return (signer && !authorize) || (!signer && authorize)
+}
+
// cast adds a new vote into the tally.
func (s *Snapshot) cast(address common.Address, authorize bool) bool {
// Ensure the vote is meaningful
- _, signer := s.Signers[address]
- if (signer && authorize) || (!signer && !authorize) {
+ if !s.validVote(address, authorize) {
return false
}
// Cast the vote into an existing or new tally
diff --git a/core/state/sync.go b/core/state/sync.go
index 8456a810b..2c29d706a 100644
--- a/core/state/sync.go
+++ b/core/state/sync.go
@@ -59,10 +59,16 @@ func (s *StateSync) Missing(max int) []common.Hash {
}
// Process injects a batch of retrieved trie nodes data, returning if something
-// was committed to the database and also the index of an entry if processing of
+// was committed to the memcache and also the index of an entry if processing of
// it failed.
-func (s *StateSync) Process(list []trie.SyncResult, dbw trie.DatabaseWriter) (bool, int, error) {
- return (*trie.TrieSync)(s).Process(list, dbw)
+func (s *StateSync) Process(list []trie.SyncResult) (bool, int, error) {
+ return (*trie.TrieSync)(s).Process(list)
+}
+
+// Commit flushes the data stored in the internal memcache out to persistent
+// storage, returning th enumber of items written and any occurred error.
+func (s *StateSync) Commit(dbw trie.DatabaseWriter) (int, error) {
+ return (*trie.TrieSync)(s).Commit(dbw)
}
// Pending returns the number of state entries currently pending for download.
diff --git a/core/state/sync_test.go b/core/state/sync_test.go
index 43d146e3a..108ebb320 100644
--- a/core/state/sync_test.go
+++ b/core/state/sync_test.go
@@ -138,9 +138,12 @@ func testIterativeStateSync(t *testing.T, batch int) {
}
results[i] = trie.SyncResult{Hash: hash, Data: data}
}
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
queue = append(queue[:0], sched.Missing(batch)...)
}
// Cross check that the two states are in sync
@@ -168,9 +171,12 @@ func TestIterativeDelayedStateSync(t *testing.T) {
}
results[i] = trie.SyncResult{Hash: hash, Data: data}
}
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
queue = append(queue[len(results):], sched.Missing(0)...)
}
// Cross check that the two states are in sync
@@ -206,9 +212,12 @@ func testIterativeRandomStateSync(t *testing.T, batch int) {
results = append(results, trie.SyncResult{Hash: hash, Data: data})
}
// Feed the retrieved results back and queue new tasks
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
queue = make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) {
queue[hash] = struct{}{}
@@ -249,9 +258,12 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) {
}
}
// Feed the retrieved results back and queue new tasks
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
for _, hash := range sched.Missing(0) {
queue[hash] = struct{}{}
}
@@ -283,9 +295,12 @@ func TestIncompleteStateSync(t *testing.T) {
results[i] = trie.SyncResult{Hash: hash, Data: data}
}
// Process each of the state nodes
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
for _, result := range results {
added = append(added, result.Hash)
}
diff --git a/core/tx_pool.go b/core/tx_pool.go
index 04ffa8a98..2f3cd1e93 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -35,16 +35,41 @@ import (
)
var (
- // Transaction Pool Errors
- ErrInvalidSender = errors.New("invalid sender")
- ErrNonce = errors.New("nonce too low")
- ErrUnderpriced = errors.New("transaction underpriced")
+ // ErrInvalidSender is returned if the transaction contains an invalid signature.
+ ErrInvalidSender = errors.New("invalid sender")
+
+ // ErrNonceTooLow is returned if the nonce of a transaction is lower than the
+ // one present in the local chain.
+ ErrNonceTooLow = errors.New("nonce too low")
+
+ // ErrUnderpriced is returned if a transaction's gas price is below the minimum
+ // configured for the transaction pool.
+ ErrUnderpriced = errors.New("transaction underpriced")
+
+ // ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
+ // with a different one without the required price bump.
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
- ErrBalance = errors.New("insufficient balance")
- ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value")
- ErrIntrinsicGas = errors.New("intrinsic gas too low")
- ErrGasLimit = errors.New("exceeds block gas limit")
- ErrNegativeValue = errors.New("negative value")
+
+ // ErrInsufficientFunds is returned if the total cost of executing a transaction
+ // is higher than the balance of the user's account.
+ ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value")
+
+ // ErrIntrinsicGas is returned if the transaction is specified to use less gas
+ // than required to start the invocation.
+ ErrIntrinsicGas = errors.New("intrinsic gas too low")
+
+ // ErrGasLimit is returned if a transaction's requested gas limit exceeds the
+ // maximum allowance of the current block.
+ ErrGasLimit = errors.New("exceeds block gas limit")
+
+ // ErrNegativeValue is a sanity error to ensure noone is able to specify a
+ // transaction with a negative value.
+ ErrNegativeValue = errors.New("negative value")
+
+ // ErrOversizedData is returned if the input data of a transaction is greater
+ // than some meaningful limit a user might use. This is not a consensus error
+ // making the transaction invalid, rather a DOS protection.
+ ErrOversizedData = errors.New("oversized data")
)
var (
@@ -54,16 +79,16 @@ var (
var (
// Metrics for the pending pool
- pendingDiscardCounter = metrics.NewCounter("txpool/pending/discard")
- pendingReplaceCounter = metrics.NewCounter("txpool/pending/replace")
- pendingRLCounter = metrics.NewCounter("txpool/pending/ratelimit") // Dropped due to rate limiting
- pendingNofundsCounter = metrics.NewCounter("txpool/pending/nofunds") // Dropped due to out-of-funds
+ pendingDiscardCounter = metrics.NewCounter("txpool/pending/discard")
+ pendingReplaceCounter = metrics.NewCounter("txpool/pending/replace")
+ pendingRateLimitCounter = metrics.NewCounter("txpool/pending/ratelimit") // Dropped due to rate limiting
+ pendingNofundsCounter = metrics.NewCounter("txpool/pending/nofunds") // Dropped due to out-of-funds
// Metrics for the queued pool
- queuedDiscardCounter = metrics.NewCounter("txpool/queued/discard")
- queuedReplaceCounter = metrics.NewCounter("txpool/queued/replace")
- queuedRLCounter = metrics.NewCounter("txpool/queued/ratelimit") // Dropped due to rate limiting
- queuedNofundsCounter = metrics.NewCounter("txpool/queued/nofunds") // Dropped due to out-of-funds
+ queuedDiscardCounter = metrics.NewCounter("txpool/queued/discard")
+ queuedReplaceCounter = metrics.NewCounter("txpool/queued/replace")
+ queuedRateLimitCounter = metrics.NewCounter("txpool/queued/ratelimit") // Dropped due to rate limiting
+ queuedNofundsCounter = metrics.NewCounter("txpool/queued/nofunds") // Dropped due to out-of-funds
// General tx metrics
invalidTxCounter = metrics.NewCounter("txpool/invalid")
@@ -374,7 +399,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error {
}
// Last but not least check for nonce errors
if currentState.GetNonce(from) > tx.Nonce() {
- return ErrNonce
+ return ErrNonceTooLow
}
// Check the transaction doesn't exceed the current
@@ -395,12 +420,15 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error {
if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds
}
-
intrGas := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
if tx.Gas().Cmp(intrGas) < 0 {
return ErrIntrinsicGas
}
+ // Heuristic limit, reject transactions over 32KB to prevent DOS attacks
+ if tx.Size() > 32*1024 {
+ return ErrOversizedData
+ }
return nil
}
@@ -638,8 +666,9 @@ func (pool *TxPool) removeTx(hash common.Hash) {
}
// Update the account nonce if needed
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
- pool.pendingState.SetNonce(addr, tx.Nonce())
+ pool.pendingState.SetNonce(addr, nonce)
}
+ return
}
}
// Transaction is in the future queue
@@ -696,10 +725,10 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A
// Drop all transactions over the allowed limit
for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
hash := tx.Hash()
- log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
delete(pool.all, hash)
pool.priced.Removed()
- queuedRLCounter.Inc(1)
+ queuedRateLimitCounter.Inc(1)
+ log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
queued += uint64(list.Len())
@@ -745,7 +774,18 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
for i := 0; i < len(offenders)-1; i++ {
list := pool.pending[offenders[i]]
- list.Cap(list.Len() - 1)
+ for _, tx := range list.Cap(list.Len() - 1) {
+ // Drop the transaction from the global pools too
+ hash := tx.Hash()
+ delete(pool.all, hash)
+ pool.priced.Removed()
+
+ // Update the account nonce to the dropped transaction
+ if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce {
+ pool.pendingState.SetNonce(offenders[i], nonce)
+ }
+ log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
+ }
pending--
}
}
@@ -756,12 +796,23 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A
for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
for _, addr := range offenders {
list := pool.pending[addr]
- list.Cap(list.Len() - 1)
+ for _, tx := range list.Cap(list.Len() - 1) {
+ // Drop the transaction from the global pools too
+ hash := tx.Hash()
+ delete(pool.all, hash)
+ pool.priced.Removed()
+
+ // Update the account nonce to the dropped transaction
+ if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
+ pool.pendingState.SetNonce(addr, nonce)
+ }
+ log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
+ }
pending--
}
}
}
- pendingRLCounter.Inc(int64(pendingBeforeCap - pending))
+ pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending))
}
// If we've queued more transactions than the hard limit, drop oldest ones
if queued > pool.config.GlobalQueue {
@@ -785,7 +836,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A
pool.removeTx(tx.Hash())
}
drop -= size
- queuedRLCounter.Inc(int64(size))
+ queuedRateLimitCounter.Inc(int64(size))
continue
}
// Otherwise drop only last few transactions
@@ -793,7 +844,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash())
drop--
- queuedRLCounter.Inc(1)
+ queuedRateLimitCounter.Inc(1)
}
}
}
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index 94b07170d..4e28522e9 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -18,6 +18,7 @@ package core
import (
"crypto/ecdsa"
+ "fmt"
"math/big"
"math/rand"
"testing"
@@ -52,6 +53,35 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) {
return newPool, key
}
+// validateTxPoolInternals checks various consistency invariants within the pool.
+func validateTxPoolInternals(pool *TxPool) error {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ // Ensure the total transaction set is consistent with pending + queued
+ pending, queued := pool.stats()
+ if total := len(pool.all); total != pending+queued {
+ return fmt.Errorf("total transaction count %d != %d pending + %d queued", total, pending, queued)
+ }
+ if priced := pool.priced.items.Len() - pool.priced.stales; priced != pending+queued {
+ return fmt.Errorf("total priced transaction count %d != %d pending + %d queued", priced, pending, queued)
+ }
+ // Ensure the next nonce to assign is the correct one
+ for addr, txs := range pool.pending {
+ // Find the last transaction
+ var last uint64
+ for nonce, _ := range txs.txs.items {
+ if last < nonce {
+ last = nonce
+ }
+ }
+ if nonce := pool.pendingState.GetNonce(addr); nonce != last+1 {
+ return fmt.Errorf("pending nonce mismatch: have %v, want %v", nonce, last+1)
+ }
+ }
+ return nil
+}
+
func deriveSender(tx *types.Transaction) (common.Address, error) {
return types.Sender(types.HomesteadSigner{}, tx)
}
@@ -150,8 +180,8 @@ func TestInvalidTransactions(t *testing.T) {
currentState.SetNonce(from, 1)
currentState.AddBalance(from, big.NewInt(0xffffffffffffff))
tx = transaction(0, big.NewInt(100000), key)
- if err := pool.Add(tx); err != ErrNonce {
- t.Error("expected", ErrNonce)
+ if err := pool.Add(tx); err != ErrNonceTooLow {
+ t.Error("expected", ErrNonceTooLow)
}
tx = transaction(1, big.NewInt(100000), key)
@@ -218,20 +248,25 @@ func TestTransactionQueue(t *testing.T) {
func TestRemoveTx(t *testing.T) {
pool, key := setupTxPool()
- tx := transaction(0, big.NewInt(100), key)
- from, _ := deriveSender(tx)
+ addr := crypto.PubkeyToAddress(key.PublicKey)
currentState, _ := pool.currentState()
- currentState.AddBalance(from, big.NewInt(1))
+ currentState.AddBalance(addr, big.NewInt(1))
+
+ tx1 := transaction(0, big.NewInt(100), key)
+ tx2 := transaction(2, big.NewInt(100), key)
+
+ pool.promoteTx(addr, tx1.Hash(), tx1)
+ pool.enqueueTx(tx2.Hash(), tx2)
- pool.enqueueTx(tx.Hash(), tx)
- pool.promoteTx(from, tx.Hash(), tx)
if len(pool.queue) != 1 {
t.Error("expected queue to be 1, got", len(pool.queue))
}
if len(pool.pending) != 1 {
t.Error("expected pending to be 1, got", len(pool.pending))
}
- pool.Remove(tx.Hash())
+ pool.Remove(tx1.Hash())
+ pool.Remove(tx2.Hash())
+
if len(pool.queue) > 0 {
t.Error("expected queue to be 0, got", len(pool.queue))
}
@@ -404,10 +439,10 @@ func TestTransactionDropping(t *testing.T) {
)
pool.promoteTx(account, tx0.Hash(), tx0)
pool.promoteTx(account, tx1.Hash(), tx1)
- pool.promoteTx(account, tx1.Hash(), tx2)
+ pool.promoteTx(account, tx2.Hash(), tx2)
pool.enqueueTx(tx10.Hash(), tx10)
pool.enqueueTx(tx11.Hash(), tx11)
- pool.enqueueTx(tx11.Hash(), tx12)
+ pool.enqueueTx(tx12.Hash(), tx12)
// Check that pre and post validations leave the pool as is
if pool.pending[account].Len() != 3 {
@@ -416,8 +451,8 @@ func TestTransactionDropping(t *testing.T) {
if pool.queue[account].Len() != 3 {
t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3)
}
- if len(pool.all) != 4 {
- t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4)
+ if len(pool.all) != 6 {
+ t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6)
}
pool.resetState()
if pool.pending[account].Len() != 3 {
@@ -426,8 +461,8 @@ func TestTransactionDropping(t *testing.T) {
if pool.queue[account].Len() != 3 {
t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3)
}
- if len(pool.all) != 4 {
- t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4)
+ if len(pool.all) != 6 {
+ t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6)
}
// Reduce the balance of the account, and check that invalidated transactions are dropped
state.AddBalance(account, big.NewInt(-650))
@@ -730,6 +765,12 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
if len(pool1.all) != len(pool2.all) {
t.Errorf("total transaction count mismatch: one-by-one algo %d, batch algo %d", len(pool1.all), len(pool2.all))
}
+ if err := validateTxPoolInternals(pool1); err != nil {
+ t.Errorf("pool 1 internal state corrupted: %v", err)
+ }
+ if err := validateTxPoolInternals(pool2); err != nil {
+ t.Errorf("pool 2 internal state corrupted: %v", err)
+ }
}
// Tests that if the transaction count belonging to multiple accounts go above
@@ -776,6 +817,45 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
if pending > int(DefaultTxPoolConfig.GlobalSlots) {
t.Fatalf("total pending transactions overflow allowance: %d > %d", pending, DefaultTxPoolConfig.GlobalSlots)
}
+ if err := validateTxPoolInternals(pool); err != nil {
+ t.Fatalf("pool internal state corrupted: %v", err)
+ }
+}
+
+// Tests that if transactions start being capped, transasctions are also removed from 'all'
+func TestTransactionCapClearsFromAll(t *testing.T) {
+ // Reduce the queue limits to shorten test time
+ defer func(old uint64) { DefaultTxPoolConfig.AccountSlots = old }(DefaultTxPoolConfig.AccountSlots)
+ defer func(old uint64) { DefaultTxPoolConfig.AccountQueue = old }(DefaultTxPoolConfig.AccountQueue)
+ defer func(old uint64) { DefaultTxPoolConfig.GlobalSlots = old }(DefaultTxPoolConfig.GlobalSlots)
+
+ DefaultTxPoolConfig.AccountSlots = 2
+ DefaultTxPoolConfig.AccountQueue = 2
+ DefaultTxPoolConfig.GlobalSlots = 8
+
+ // Create the pool to test the limit enforcement with
+ db, _ := ethdb.NewMemDatabase()
+ statedb, _ := state.New(common.Hash{}, db)
+
+ pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+ pool.resetState()
+
+ // Create a number of test accounts and fund them
+ state, _ := pool.currentState()
+
+ key, _ := crypto.GenerateKey()
+ addr := crypto.PubkeyToAddress(key.PublicKey)
+ state.AddBalance(addr, big.NewInt(1000000))
+
+ txs := types.Transactions{}
+ for j := 0; j < int(DefaultTxPoolConfig.GlobalSlots)*2; j++ {
+ txs = append(txs, transaction(uint64(j), big.NewInt(100000), key))
+ }
+ // Import the batch and verify that limits have been enforced
+ pool.AddBatch(txs)
+ if err := validateTxPoolInternals(pool); err != nil {
+ t.Fatalf("pool internal state corrupted: %v", err)
+ }
}
// Tests that if the transaction count belonging to multiple accounts go above
@@ -820,6 +900,9 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
t.Errorf("addr %x: total pending transactions mismatch: have %d, want %d", addr, list.Len(), DefaultTxPoolConfig.AccountSlots)
}
}
+ if err := validateTxPoolInternals(pool); err != nil {
+ t.Fatalf("pool internal state corrupted: %v", err)
+ }
}
// Tests that setting the transaction pool gas price to a higher value correctly
@@ -867,6 +950,9 @@ func TestTransactionPoolRepricing(t *testing.T) {
if queued != 3 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
}
+ if err := validateTxPoolInternals(pool); err != nil {
+ t.Fatalf("pool internal state corrupted: %v", err)
+ }
// Reprice the pool and check that underpriced transactions get dropped
pool.SetGasPrice(big.NewInt(2))
@@ -877,6 +963,9 @@ func TestTransactionPoolRepricing(t *testing.T) {
if queued != 3 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
}
+ if err := validateTxPoolInternals(pool); err != nil {
+ t.Fatalf("pool internal state corrupted: %v", err)
+ }
// Check that we can't add the old transactions back
if err := pool.Add(pricedTransaction(1, big.NewInt(100000), big.NewInt(1), keys[0])); err != ErrUnderpriced {
t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced)
@@ -884,6 +973,9 @@ func TestTransactionPoolRepricing(t *testing.T) {
if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), keys[1])); err != ErrUnderpriced {
t.Fatalf("adding underpriced queued transaction error mismatch: have %v, want %v", err, ErrUnderpriced)
}
+ if err := validateTxPoolInternals(pool); err != nil {
+ t.Fatalf("pool internal state corrupted: %v", err)
+ }
// However we can add local underpriced transactions
tx := pricedTransaction(1, big.NewInt(100000), big.NewInt(1), keys[2])
@@ -894,6 +986,9 @@ func TestTransactionPoolRepricing(t *testing.T) {
if pending, _ = pool.stats(); pending != 3 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
}
+ if err := validateTxPoolInternals(pool); err != nil {
+ t.Fatalf("pool internal state corrupted: %v", err)
+ }
}
// Tests that when the pool reaches its global transaction limit, underpriced
@@ -945,6 +1040,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
if queued != 1 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
}
+ if err := validateTxPoolInternals(pool); err != nil {
+ t.Fatalf("pool internal state corrupted: %v", err)
+ }
// Ensure that adding an underpriced transaction on block limit fails
if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), keys[1])); err != ErrUnderpriced {
t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced)
@@ -966,6 +1064,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}
+ if err := validateTxPoolInternals(pool); err != nil {
+ t.Fatalf("pool internal state corrupted: %v", err)
+ }
// Ensure that adding local transactions can push out even higher priced ones
tx := pricedTransaction(1, big.NewInt(100000), big.NewInt(0), keys[2])
@@ -980,6 +1081,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}
+ if err := validateTxPoolInternals(pool); err != nil {
+ t.Fatalf("pool internal state corrupted: %v", err)
+ }
}
// Tests that the pool rejects replacement transactions that don't meet the minimum
@@ -1041,6 +1145,9 @@ func TestTransactionReplacement(t *testing.T) {
if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(threshold+1), key)); err != nil {
t.Fatalf("failed to replace original queued transaction: %v", err)
}
+ if err := validateTxPoolInternals(pool); err != nil {
+ t.Fatalf("pool internal state corrupted: %v", err)
+ }
}
// Benchmarks the speed of validating the contents of the pending queue of the
diff --git a/core/vm/gen_structlog.go b/core/vm/gen_structlog.go
index 1c86b2256..88df942dc 100644
--- a/core/vm/gen_structlog.go
+++ b/core/vm/gen_structlog.go
@@ -18,12 +18,12 @@ func (s StructLog) MarshalJSON() ([]byte, error) {
Gas math.HexOrDecimal64 `json:"gas"`
GasCost math.HexOrDecimal64 `json:"gasCost"`
Memory hexutil.Bytes `json:"memory"`
+ MemorySize int `json:"memSize"`
Stack []*math.HexOrDecimal256 `json:"stack"`
Storage map[common.Hash]common.Hash `json:"-"`
Depth int `json:"depth"`
Err error `json:"error"`
OpName string `json:"opName"`
- MemorySize int `json:"memSize"`
}
var enc StructLog
enc.Pc = s.Pc
@@ -31,6 +31,7 @@ func (s StructLog) MarshalJSON() ([]byte, error) {
enc.Gas = math.HexOrDecimal64(s.Gas)
enc.GasCost = math.HexOrDecimal64(s.GasCost)
enc.Memory = s.Memory
+ enc.MemorySize = s.MemorySize
if s.Stack != nil {
enc.Stack = make([]*math.HexOrDecimal256, len(s.Stack))
for k, v := range s.Stack {
@@ -41,21 +42,21 @@ func (s StructLog) MarshalJSON() ([]byte, error) {
enc.Depth = s.Depth
enc.Err = s.Err
enc.OpName = s.OpName()
- enc.MemorySize = s.MemorySize()
return json.Marshal(&enc)
}
func (s *StructLog) UnmarshalJSON(input []byte) error {
type StructLog struct {
- Pc *uint64 `json:"pc"`
- Op *OpCode `json:"op"`
- Gas *math.HexOrDecimal64 `json:"gas"`
- GasCost *math.HexOrDecimal64 `json:"gasCost"`
- Memory hexutil.Bytes `json:"memory"`
- Stack []*math.HexOrDecimal256 `json:"stack"`
- Storage map[common.Hash]common.Hash `json:"-"`
- Depth *int `json:"depth"`
- Err *error `json:"error"`
+ Pc *uint64 `json:"pc"`
+ Op *OpCode `json:"op"`
+ Gas *math.HexOrDecimal64 `json:"gas"`
+ GasCost *math.HexOrDecimal64 `json:"gasCost"`
+ Memory hexutil.Bytes `json:"memory"`
+ MemorySize *int `json:"memSize"`
+ Stack []*math.HexOrDecimal256 `json:"stack"`
+ Storage map[common.Hash]common.Hash `json:"-"`
+ Depth *int `json:"depth"`
+ Err *error `json:"error"`
}
var dec StructLog
if err := json.Unmarshal(input, &dec); err != nil {
@@ -76,6 +77,9 @@ func (s *StructLog) UnmarshalJSON(input []byte) error {
if dec.Memory != nil {
s.Memory = dec.Memory
}
+ if dec.MemorySize != nil {
+ s.MemorySize = *dec.MemorySize
+ }
if dec.Stack != nil {
s.Stack = make([]*big.Int, len(dec.Stack))
for k, v := range dec.Stack {
diff --git a/core/vm/logger.go b/core/vm/logger.go
index 405ab169c..17a9c9ec3 100644
--- a/core/vm/logger.go
+++ b/core/vm/logger.go
@@ -54,35 +54,31 @@ type LogConfig struct {
// StructLog is emitted to the EVM each cycle and lists information about the current internal state
// prior to the execution of the statement.
type StructLog struct {
- Pc uint64 `json:"pc"`
- Op OpCode `json:"op"`
- Gas uint64 `json:"gas"`
- GasCost uint64 `json:"gasCost"`
- Memory []byte `json:"memory"`
- Stack []*big.Int `json:"stack"`
- Storage map[common.Hash]common.Hash `json:"-"`
- Depth int `json:"depth"`
- Err error `json:"error"`
+ Pc uint64 `json:"pc"`
+ Op OpCode `json:"op"`
+ Gas uint64 `json:"gas"`
+ GasCost uint64 `json:"gasCost"`
+ Memory []byte `json:"memory"`
+ MemorySize int `json:"memSize"`
+ Stack []*big.Int `json:"stack"`
+ Storage map[common.Hash]common.Hash `json:"-"`
+ Depth int `json:"depth"`
+ Err error `json:"error"`
}
// overrides for gencodec
type structLogMarshaling struct {
- Stack []*math.HexOrDecimal256
- Gas math.HexOrDecimal64
- GasCost math.HexOrDecimal64
- Memory hexutil.Bytes
- OpName string `json:"opName"`
- MemorySize int `json:"memSize"`
+ Stack []*math.HexOrDecimal256
+ Gas math.HexOrDecimal64
+ GasCost math.HexOrDecimal64
+ Memory hexutil.Bytes
+ OpName string `json:"opName"`
}
func (s *StructLog) OpName() string {
return s.Op.String()
}
-func (s *StructLog) MemorySize() int {
- return len(s.Memory)
-}
-
// Tracer is used to collect execution traces from an EVM transaction
// execution. CaptureState is called for each step of the VM with the
// current VM state.
@@ -181,7 +177,7 @@ func (l *StructLogger) CaptureState(env *EVM, pc uint64, op OpCode, gas, cost ui
}
}
// create a new snaptshot of the EVM.
- log := StructLog{pc, op, gas, cost, mem, stck, storage, env.depth, err}
+ log := StructLog{pc, op, gas, cost, mem, memory.Len(), stck, storage, depth, err}
l.logs = append(l.logs, log)
return nil
diff --git a/eth/backend.go b/eth/backend.go
index be2d03283..75e0e737b 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -200,10 +200,13 @@ func makeExtraData(extra []byte) []byte {
// CreateDB creates the chain database.
func CreateDB(ctx *node.ServiceContext, config *Config, name string) (ethdb.Database, error) {
db, err := ctx.OpenDatabase(name, config.DatabaseCache, config.DatabaseHandles)
+ if err != nil {
+ return nil, err
+ }
if db, ok := db.(*ethdb.LDBDatabase); ok {
db.Meter("eth/db/chaindata/")
}
- return db, err
+ return db, nil
}
// CreateConsensusEngine creates the required type of consensus engine instance for an Ethereum service
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 839969f03..e4d1392d0 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -34,7 +34,6 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
- "github.com/ethereum/go-ethereum/trie"
"github.com/rcrowley/go-metrics"
)
@@ -99,8 +98,9 @@ type Downloader struct {
mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
mux *event.TypeMux // Event multiplexer to announce sync operation events
- queue *queue // Scheduler for selecting the hashes to download
- peers *peerSet // Set of active peers from which download can proceed
+ queue *queue // Scheduler for selecting the hashes to download
+ peers *peerSet // Set of active peers from which download can proceed
+ stateDB ethdb.Database
fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section
@@ -109,9 +109,9 @@ type Downloader struct {
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
// Statistics
- syncStatsChainOrigin uint64 // Origin block number where syncing started at
- syncStatsChainHeight uint64 // Highest block number known when syncing started
- syncStatsStateDone uint64 // Number of state trie entries already pulled
+ syncStatsChainOrigin uint64 // Origin block number where syncing started at
+ syncStatsChainHeight uint64 // Highest block number known when syncing started
+ syncStatsState stateSyncStats
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
// Callbacks
@@ -136,16 +136,18 @@ type Downloader struct {
notified int32
// Channels
- newPeerCh chan *peer
headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
- stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
- stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
+ // for stateFetcher
+ stateSyncStart chan *stateSync
+ trackStateReq chan *stateReq
+ stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
+
// Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
cancelCh chan struct{} // Channel to cancel mid-flight syncs
@@ -170,8 +172,9 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he
dl := &Downloader{
mode: mode,
mux: mux,
- queue: newQueue(stateDb),
+ queue: newQueue(),
peers: newPeerSet(),
+ stateDB: stateDb,
rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000),
hasHeader: hasHeader,
@@ -188,18 +191,20 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he
insertReceipts: insertReceipts,
rollback: rollback,
dropPeer: dropPeer,
- newPeerCh: make(chan *peer, 1),
headerCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1),
receiptCh: make(chan dataPack, 1),
- stateCh: make(chan dataPack, 1),
bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1),
- stateWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 1),
quitCh: make(chan struct{}),
+ // for stateFetcher
+ stateSyncStart: make(chan *stateSync),
+ trackStateReq: make(chan *stateReq),
+ stateCh: make(chan dataPack),
}
go dl.qosTuner()
+ go dl.stateFetcher()
return dl
}
@@ -211,9 +216,6 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he
// of processed and the total number of known states are also returned. Otherwise
// these are zero.
func (d *Downloader) Progress() ethereum.SyncProgress {
- // Fetch the pending state count outside of the lock to prevent unforeseen deadlocks
- pendingStates := uint64(d.queue.PendingNodeData())
-
// Lock the current stats and return the progress
d.syncStatsLock.RLock()
defer d.syncStatsLock.RUnlock()
@@ -231,8 +233,8 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
StartingBlock: d.syncStatsChainOrigin,
CurrentBlock: current,
HighestBlock: d.syncStatsChainHeight,
- PulledStates: d.syncStatsStateDone,
- KnownStates: d.syncStatsStateDone + pendingStates,
+ PulledStates: d.syncStatsState.processed,
+ KnownStates: d.syncStatsState.processed + d.syncStatsState.pending,
}
}
@@ -324,13 +326,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
d.queue.Reset()
d.peers.Reset()
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case <-ch:
default:
}
}
- for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} {
+ for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} {
for empty := false; !empty; {
select {
case <-ch:
@@ -439,30 +441,40 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}
- return d.spawnSync(origin+1,
- func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
- func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved
- func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
- func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
- func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
- )
+
+ fetchers := []func() error{
+ func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
+ func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
+ func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
+ func() error { return d.processHeaders(origin+1, td) },
+ }
+ if d.mode == FastSync {
+ fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
+ } else if d.mode == FullSync {
+ fetchers = append(fetchers, d.processFullSyncContent)
+ }
+ err = d.spawnSync(fetchers)
+ if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
+ // If sync failed in the critical section, bump the fail counter.
+ atomic.AddUint32(&d.fsPivotFails, 1)
+ }
+ return err
}
// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
-func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
+func (d *Downloader) spawnSync(fetchers []func() error) error {
var wg sync.WaitGroup
- errc := make(chan error, len(fetchers)+1)
- wg.Add(len(fetchers) + 1)
- go func() { defer wg.Done(); errc <- d.processContent() }()
+ errc := make(chan error, len(fetchers))
+ wg.Add(len(fetchers))
for _, fn := range fetchers {
fn := fn
go func() { defer wg.Done(); errc <- fn() }()
}
// Wait for the first error, then terminate the others.
var err error
- for i := 0; i < len(fetchers)+1; i++ {
- if i == len(fetchers) {
+ for i := 0; i < len(fetchers); i++ {
+ if i == len(fetchers)-1 {
// Close the queue when all fetchers have exited.
// This will cause the block processor to end when
// it has processed the queue.
@@ -475,11 +487,6 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
d.queue.Close()
d.Cancel()
wg.Wait()
-
- // If sync failed in the critical section, bump the fail counter
- if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
- atomic.AddUint32(&d.fsPivotFails, 1)
- }
return err
}
@@ -552,7 +559,6 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
return nil, errTimeout
case <-d.bodyCh:
- case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
@@ -649,7 +655,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
return 0, errTimeout
case <-d.bodyCh:
- case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
@@ -714,7 +719,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
return 0, errTimeout
case <-d.bodyCh:
- case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
@@ -827,7 +831,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
d.dropPeer(p.id)
// Finish the sync gracefully instead of dumping the gathered data though
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@@ -927,68 +931,6 @@ func (d *Downloader) fetchReceipts(from uint64) error {
return err
}
-// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any
-// available peers, reserving a chunk of nodes for each, waiting for delivery and
-// also periodically checking for timeouts.
-func (d *Downloader) fetchNodeData() error {
- log.Debug("Downloading node state data")
-
- var (
- deliver = func(packet dataPack) (int, error) {
- start := time.Now()
- return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) {
- // If the peer returned old-requested data, forgive
- if err == trie.ErrNotRequested {
- log.Debug("Forgiving reply to stale state request", "peer", packet.PeerId())
- return
- }
- if err != nil {
- // If the node data processing failed, the root hash is very wrong, abort
- log.Error("State processing failed", "peer", packet.PeerId(), "err", err)
- d.Cancel()
- return
- }
- // Processing succeeded, notify state fetcher of continuation
- pending := d.queue.PendingNodeData()
- if pending > 0 {
- select {
- case d.stateWakeCh <- true:
- default:
- }
- }
- d.syncStatsLock.Lock()
- d.syncStatsStateDone += uint64(delivered)
- syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below
- d.syncStatsLock.Unlock()
-
- // If real database progress was made, reset any fast-sync pivot failure
- if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 {
- log.Debug("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&d.fsPivotFails))
- atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
- }
- // Log a message to the user and return
- if delivered > 0 {
- log.Info("Imported new state entries", "count", delivered, "elapsed", common.PrettyDuration(time.Since(start)), "processed", syncStatsStateDone, "pending", pending)
- }
- })
- }
- expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) }
- throttle = func() bool { return false }
- reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
- return d.queue.ReserveNodeData(p, count), false, nil
- }
- fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
- capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) }
- setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
- )
- err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
- d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
- d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "states")
-
- log.Debug("Node state data download terminated", "err", err)
- return err
-}
-
// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
@@ -1229,7 +1171,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Terminate header processing if we synced up
if len(headers) == 0 {
// Notify everyone that headers are fully processed
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@@ -1341,7 +1283,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
origin += uint64(limit)
}
// Signal the content downloaders of the availablility of new tasks
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- true:
default:
@@ -1351,71 +1293,151 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
}
}
-// processContent takes fetch results from the queue and tries to import them
-// into the chain. The type of import operation will depend on the result contents.
-func (d *Downloader) processContent() error {
- pivot := d.queue.FastSyncPivot()
+// processFullSyncContent takes fetch results from the queue and imports them into the chain.
+func (d *Downloader) processFullSyncContent() error {
for {
results := d.queue.WaitResults()
if len(results) == 0 {
- return nil // queue empty
+ return nil
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
- // Actually import the blocks
- first, last := results[0].Header, results[len(results)-1].Header
+ if err := d.importBlockResults(results); err != nil {
+ return err
+ }
+ }
+}
+
+func (d *Downloader) importBlockResults(results []*fetchResult) error {
+ for len(results) != 0 {
+ // Check for any termination requests. This makes clean shutdown faster.
+ select {
+ case <-d.quitCh:
+ return errCancelContentProcessing
+ default:
+ }
+ // Retrieve the a batch of results to import
+ items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
+ first, last := results[0].Header, results[items-1].Header
log.Debug("Inserting downloaded chain", "items", len(results),
"firstnum", first.Number, "firsthash", first.Hash(),
"lastnum", last.Number, "lasthash", last.Hash(),
)
- for len(results) != 0 {
- // Check for any termination requests
- select {
- case <-d.quitCh:
- return errCancelContentProcessing
- default:
- }
- // Retrieve the a batch of results to import
- var (
- blocks = make([]*types.Block, 0, maxResultsProcess)
- receipts = make([]types.Receipts, 0, maxResultsProcess)
- )
- items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
- for _, result := range results[:items] {
- switch {
- case d.mode == FullSync:
- blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
- case d.mode == FastSync:
- blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
- if result.Header.Number.Uint64() <= pivot {
- receipts = append(receipts, result.Receipts)
- }
- }
- }
- // Try to process the results, aborting if there's an error
- var (
- err error
- index int
- )
- switch {
- case len(receipts) > 0:
- index, err = d.insertReceipts(blocks, receipts)
- if err == nil && blocks[len(blocks)-1].NumberU64() == pivot {
- log.Debug("Committing block as new head", "number", blocks[len(blocks)-1].Number(), "hash", blocks[len(blocks)-1].Hash())
- index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash())
- }
- default:
- index, err = d.insertBlocks(blocks)
+ blocks := make([]*types.Block, items)
+ for i, result := range results[:items] {
+ blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ }
+ if index, err := d.insertBlocks(blocks); err != nil {
+ log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+ return errInvalidChain
+ }
+ // Shift the results to the next batch
+ results = results[items:]
+ }
+ return nil
+}
+
+// processFastSyncContent takes fetch results from the queue and writes them to the
+// database. It also controls the synchronisation of state nodes of the pivot block.
+func (d *Downloader) processFastSyncContent(latest *types.Header) error {
+ // Start syncing state of the reported head block.
+ // This should get us most of the state of the pivot block.
+ stateSync := d.syncState(latest.Root)
+ defer stateSync.Cancel()
+ go func() {
+ if err := stateSync.Wait(); err != nil {
+ d.queue.Close() // wake up WaitResults
+ }
+ }()
+
+ pivot := d.queue.FastSyncPivot()
+ for {
+ results := d.queue.WaitResults()
+ if len(results) == 0 {
+ return stateSync.Cancel()
+ }
+ if d.chainInsertHook != nil {
+ d.chainInsertHook(results)
+ }
+ P, beforeP, afterP := splitAroundPivot(pivot, results)
+ if err := d.commitFastSyncData(beforeP, stateSync); err != nil {
+ return err
+ }
+ if P != nil {
+ stateSync.Cancel()
+ if err := d.commitPivotBlock(P); err != nil {
+ return err
}
- if err != nil {
- log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
- return errInvalidChain
+ }
+ if err := d.importBlockResults(afterP); err != nil {
+ return err
+ }
+ }
+}
+
+func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
+ for _, result := range results {
+ num := result.Header.Number.Uint64()
+ switch {
+ case num < pivot:
+ before = append(before, result)
+ case num == pivot:
+ p = result
+ default:
+ after = append(after, result)
+ }
+ }
+ return p, before, after
+}
+
+func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
+ for len(results) != 0 {
+ // Check for any termination requests.
+ select {
+ case <-d.quitCh:
+ return errCancelContentProcessing
+ case <-stateSync.done:
+ if err := stateSync.Wait(); err != nil {
+ return err
}
- // Shift the results to the next batch
- results = results[items:]
+ default:
+ }
+ // Retrieve the a batch of results to import
+ items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
+ first, last := results[0].Header, results[items-1].Header
+ log.Debug("Inserting fast-sync blocks", "items", len(results),
+ "firstnum", first.Number, "firsthash", first.Hash(),
+ "lastnumn", last.Number, "lasthash", last.Hash(),
+ )
+ blocks := make([]*types.Block, items)
+ receipts := make([]types.Receipts, items)
+ for i, result := range results[:items] {
+ blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ receipts[i] = result.Receipts
+ }
+ if index, err := d.insertReceipts(blocks, receipts); err != nil {
+ log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+ return errInvalidChain
}
+ // Shift the results to the next batch
+ results = results[items:]
+ }
+ return nil
+}
+
+func (d *Downloader) commitPivotBlock(result *fetchResult) error {
+ b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
+ // Sync the pivot block state. This should complete reasonably quickly because
+ // we've already synced up to the reported head block state earlier.
+ if err := d.syncState(b.Root()).Wait(); err != nil {
+ return err
+ }
+ log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash())
+ if _, err := d.insertReceipts([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil {
+ return err
}
+ return d.commitHeadBlock(b.Hash())
}
// DeliverHeaders injects a new batch of block headers received from a remote
diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go
index 0d76c7dfd..58764ccf0 100644
--- a/eth/downloader/metrics.go
+++ b/eth/downloader/metrics.go
@@ -38,8 +38,6 @@ var (
receiptDropMeter = metrics.NewMeter("eth/downloader/receipts/drop")
receiptTimeoutMeter = metrics.NewMeter("eth/downloader/receipts/timeout")
- stateInMeter = metrics.NewMeter("eth/downloader/states/in")
- stateReqTimer = metrics.NewTimer("eth/downloader/states/req")
- stateDropMeter = metrics.NewMeter("eth/downloader/states/drop")
- stateTimeoutMeter = metrics.NewMeter("eth/downloader/states/timeout")
+ stateInMeter = metrics.NewMeter("eth/downloader/states/in")
+ stateDropMeter = metrics.NewMeter("eth/downloader/states/drop")
)
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 15a912f1f..dc8b09772 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -30,6 +30,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
@@ -195,7 +196,7 @@ func (p *peer) FetchReceipts(request *fetchRequest) error {
}
// FetchNodeData sends a node state data retrieval request to the remote peer.
-func (p *peer) FetchNodeData(request *fetchRequest) error {
+func (p *peer) FetchNodeData(hashes []common.Hash) error {
// Sanity check the protocol version
if p.version < 63 {
panic(fmt.Sprintf("node data fetch [eth/63+] requested on eth/%d", p.version))
@@ -205,14 +206,7 @@ func (p *peer) FetchNodeData(request *fetchRequest) error {
return errAlreadyFetching
}
p.stateStarted = time.Now()
-
- // Convert the hash set to a retrievable slice
- hashes := make([]common.Hash, 0, len(request.Hashes))
- for hash := range request.Hashes {
- hashes = append(hashes, hash)
- }
go p.getNodeData(hashes)
-
return nil
}
@@ -343,8 +337,9 @@ func (p *peer) Lacks(hash common.Hash) bool {
// peerSet represents the collection of active peer participating in the chain
// download procedure.
type peerSet struct {
- peers map[string]*peer
- lock sync.RWMutex
+ peers map[string]*peer
+ newPeerFeed event.Feed
+ lock sync.RWMutex
}
// newPeerSet creates a new peer set top track the active download sources.
@@ -354,6 +349,10 @@ func newPeerSet() *peerSet {
}
}
+func (ps *peerSet) SubscribeNewPeers(ch chan<- *peer) event.Subscription {
+ return ps.newPeerFeed.Subscribe(ch)
+}
+
// Reset iterates over the current peer set, and resets each of the known peers
// to prepare for a next batch of block retrieval.
func (ps *peerSet) Reset() {
@@ -377,9 +376,8 @@ func (ps *peerSet) Register(p *peer) error {
// Register the new peer with some meaningful defaults
ps.lock.Lock()
- defer ps.lock.Unlock()
-
if _, ok := ps.peers[p.id]; ok {
+ ps.lock.Unlock()
return errAlreadyRegistered
}
if len(ps.peers) > 0 {
@@ -399,6 +397,9 @@ func (ps *peerSet) Register(p *peer) error {
p.stateThroughput /= float64(len(ps.peers))
}
ps.peers[p.id] = p
+ ps.lock.Unlock()
+
+ ps.newPeerFeed.Send(p)
return nil
}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 855097c45..8a7735d67 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -26,20 +26,13 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/trie"
"github.com/rcrowley/go-metrics"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
-var (
- blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download
- maxInFlightStates = 8192 // Maximum number of state downloads to allow concurrently
-)
+var blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download
var (
errNoFetchesPending = errors.New("no fetches pending")
@@ -94,15 +87,6 @@ type queue struct {
receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations
receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches
- stateTaskIndex int // [eth/63] Counter indexing the added hashes to ensure prioritised retrieval order
- stateTaskPool map[common.Hash]int // [eth/63] Pending node data retrieval tasks, mapping to their priority
- stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for
- statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations
-
- stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly
- stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator
- stateWriters int // [eth/63] Number of running state DB writer goroutines
-
resultCache []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block chain
@@ -112,7 +96,7 @@ type queue struct {
}
// newQueue creates a new download queue for scheduling block retrieval.
-func newQueue(stateDb ethdb.Database) *queue {
+func newQueue() *queue {
lock := new(sync.Mutex)
return &queue{
headerPendPool: make(map[string]*fetchRequest),
@@ -125,10 +109,6 @@ func newQueue(stateDb ethdb.Database) *queue {
receiptTaskQueue: prque.New(),
receiptPendPool: make(map[string]*fetchRequest),
receiptDonePool: make(map[common.Hash]struct{}),
- stateTaskPool: make(map[common.Hash]int),
- stateTaskQueue: prque.New(),
- statePendPool: make(map[string]*fetchRequest),
- stateDatabase: stateDb,
resultCache: make([]*fetchResult, blockCacheLimit),
active: sync.NewCond(lock),
lock: lock,
@@ -158,12 +138,6 @@ func (q *queue) Reset() {
q.receiptPendPool = make(map[string]*fetchRequest)
q.receiptDonePool = make(map[common.Hash]struct{})
- q.stateTaskIndex = 0
- q.stateTaskPool = make(map[common.Hash]int)
- q.stateTaskQueue.Reset()
- q.statePendPool = make(map[string]*fetchRequest)
- q.stateScheduler = nil
-
q.resultCache = make([]*fetchResult, blockCacheLimit)
q.resultOffset = 0
}
@@ -201,28 +175,6 @@ func (q *queue) PendingReceipts() int {
return q.receiptTaskQueue.Size()
}
-// PendingNodeData retrieves the number of node data entries pending for retrieval.
-func (q *queue) PendingNodeData() int {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- return q.pendingNodeDataLocked()
-}
-
-// pendingNodeDataLocked retrieves the number of node data entries pending for retrieval.
-// The caller must hold q.lock.
-func (q *queue) pendingNodeDataLocked() int {
- var n int
- if q.stateScheduler != nil {
- n = q.stateScheduler.Pending()
- }
- // Ensure that PendingNodeData doesn't return 0 until all state is written.
- if q.stateWriters > 0 {
- n++
- }
- return n
-}
-
// InFlightHeaders retrieves whether there are header fetch requests currently
// in flight.
func (q *queue) InFlightHeaders() bool {
@@ -250,28 +202,15 @@ func (q *queue) InFlightReceipts() bool {
return len(q.receiptPendPool) > 0
}
-// InFlightNodeData retrieves whether there are node data entry fetch requests
-// currently in flight.
-func (q *queue) InFlightNodeData() bool {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- return len(q.statePendPool)+q.stateWriters > 0
-}
-
-// Idle returns if the queue is fully idle or has some data still inside. This
-// method is used by the tester to detect termination events.
+// Idle returns if the queue is fully idle or has some data still inside.
func (q *queue) Idle() bool {
q.lock.Lock()
defer q.lock.Unlock()
- queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
- pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
+ queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
+ pending := len(q.blockPendPool) + len(q.receiptPendPool)
cached := len(q.blockDonePool) + len(q.receiptDonePool)
- if q.stateScheduler != nil {
- queued += q.stateScheduler.Pending()
- }
return (queued + pending + cached) == 0
}
@@ -389,19 +328,6 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
q.receiptTaskPool[hash] = header
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
}
- if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot {
- // Pivoting point of the fast sync, switch the state retrieval to this
- log.Debug("Switching state downloads to new block", "number", header.Number, "hash", hash)
-
- q.stateTaskIndex = 0
- q.stateTaskPool = make(map[common.Hash]int)
- q.stateTaskQueue.Reset()
- for _, req := range q.statePendPool {
- req.Hashes = make(map[common.Hash]int) // Make sure executing requests fail, but don't disappear
- }
-
- q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase)
- }
inserts = append(inserts, header)
q.headerHead = hash
from++
@@ -448,31 +374,15 @@ func (q *queue) countProcessableItems() int {
if result == nil || result.Pending > 0 {
return i
}
- // Special handling for the fast-sync pivot block:
- if q.mode == FastSync {
- bnum := result.Header.Number.Uint64()
- if bnum == q.fastSyncPivot {
- // If the state of the pivot block is not
- // available yet, we cannot proceed and return 0.
- //
- // Stop before processing the pivot block to ensure that
- // resultCache has space for fsHeaderForceVerify items. Not
- // doing this could leave us unable to download the required
- // amount of headers.
- if i > 0 || len(q.stateTaskPool) > 0 || q.pendingNodeDataLocked() > 0 {
+ // Stop before processing the pivot block to ensure that
+ // resultCache has space for fsHeaderForceVerify items. Not
+ // doing this could leave us unable to download the required
+ // amount of headers.
+ if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot {
+ for j := 0; j < fsHeaderForceVerify; j++ {
+ if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
return i
}
- for j := 0; j < fsHeaderForceVerify; j++ {
- if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
- return i
- }
- }
- }
- // If we're just the fast sync pivot, stop as well
- // because the following batch needs different insertion.
- // This simplifies handling the switchover in d.process.
- if bnum == q.fastSyncPivot+1 && i > 0 {
- return i
}
}
}
@@ -519,81 +429,6 @@ func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest {
return request
}
-// ReserveNodeData reserves a set of node data hashes for the given peer, skipping
-// any previously failed download.
-func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest {
- // Create a task generator to fetch status-fetch tasks if all schedules ones are done
- generator := func(max int) {
- if q.stateScheduler != nil {
- for _, hash := range q.stateScheduler.Missing(max) {
- q.stateTaskPool[hash] = q.stateTaskIndex
- q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex))
- q.stateTaskIndex++
- }
- }
- }
- q.lock.Lock()
- defer q.lock.Unlock()
-
- return q.reserveHashes(p, count, q.stateTaskQueue, generator, q.statePendPool, maxInFlightStates)
-}
-
-// reserveHashes reserves a set of hashes for the given peer, skipping previously
-// failed ones.
-//
-// Note, this method expects the queue lock to be already held for writing. The
-// reason the lock is not obtained in here is because the parameters already need
-// to access the queue, so they already need a lock anyway.
-func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGen func(int), pendPool map[string]*fetchRequest, maxPending int) *fetchRequest {
- // Short circuit if the peer's already downloading something (sanity check to
- // not corrupt state)
- if _, ok := pendPool[p.id]; ok {
- return nil
- }
- // Calculate an upper limit on the hashes we might fetch (i.e. throttling)
- allowance := maxPending
- if allowance > 0 {
- for _, request := range pendPool {
- allowance -= len(request.Hashes)
- }
- }
- // If there's a task generator, ask it to fill our task queue
- if taskGen != nil && taskQueue.Size() < allowance {
- taskGen(allowance - taskQueue.Size())
- }
- if taskQueue.Empty() {
- return nil
- }
- // Retrieve a batch of hashes, skipping previously failed ones
- send := make(map[common.Hash]int)
- skip := make(map[common.Hash]int)
-
- for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ {
- hash, priority := taskQueue.Pop()
- if p.Lacks(hash.(common.Hash)) {
- skip[hash.(common.Hash)] = int(priority)
- } else {
- send[hash.(common.Hash)] = int(priority)
- }
- }
- // Merge all the skipped hashes back
- for hash, index := range skip {
- taskQueue.Push(hash, float32(index))
- }
- // Assemble and return the block download request
- if len(send) == 0 {
- return nil
- }
- request := &fetchRequest{
- Peer: p,
- Hashes: send,
- Time: time.Now(),
- }
- pendPool[p.id] = request
-
- return request
-}
-
// ReserveBodies reserves a set of body fetches for the given peer, skipping any
// previously failed downloads. Beside the next batch of needed fetches, it also
// returns a flag whether empty blocks were queued requiring processing.
@@ -722,12 +557,6 @@ func (q *queue) CancelReceipts(request *fetchRequest) {
q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
}
-// CancelNodeData aborts a node state data fetch request, returning all pending
-// hashes to the task queue.
-func (q *queue) CancelNodeData(request *fetchRequest) {
- q.cancel(request, q.stateTaskQueue, q.statePendPool)
-}
-
// Cancel aborts a fetch request, returning all pending hashes to the task queue.
func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
q.lock.Lock()
@@ -764,12 +593,6 @@ func (q *queue) Revoke(peerId string) {
}
delete(q.receiptPendPool, peerId)
}
- if request, ok := q.statePendPool[peerId]; ok {
- for hash, index := range request.Hashes {
- q.stateTaskQueue.Push(hash, float32(index))
- }
- delete(q.statePendPool, peerId)
- }
}
// ExpireHeaders checks for in flight requests that exceeded a timeout allowance,
@@ -799,15 +622,6 @@ func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter)
}
-// ExpireNodeData checks for in flight node data requests that exceeded a timeout
-// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- return q.expire(timeout, q.statePendPool, q.stateTaskQueue, stateTimeoutMeter)
-}
-
// expire is the generic check that move expired tasks from a pending pool back
// into a task pool, returning all entities caught with expired tasks.
//
@@ -1044,84 +858,6 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
}
}
-// DeliverNodeData injects a node state data retrieval response into the queue.
-// The method returns the number of node state accepted from the delivery.
-func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, bool, error)) (int, error) {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- // Short circuit if the data was never requested
- request := q.statePendPool[id]
- if request == nil {
- return 0, errNoFetchesPending
- }
- stateReqTimer.UpdateSince(request.Time)
- delete(q.statePendPool, id)
-
- // If no data was retrieved, mark their hashes as unavailable for the origin peer
- if len(data) == 0 {
- for hash := range request.Hashes {
- request.Peer.MarkLacking(hash)
- }
- }
- // Iterate over the downloaded data and verify each of them
- errs := make([]error, 0)
- process := []trie.SyncResult{}
- for _, blob := range data {
- // Skip any state trie entries that were not requested
- hash := common.BytesToHash(crypto.Keccak256(blob))
- if _, ok := request.Hashes[hash]; !ok {
- errs = append(errs, fmt.Errorf("non-requested state data %x", hash))
- continue
- }
- // Inject the next state trie item into the processing queue
- process = append(process, trie.SyncResult{Hash: hash, Data: blob})
- delete(request.Hashes, hash)
- delete(q.stateTaskPool, hash)
- }
- // Return all failed or missing fetches to the queue
- for hash, index := range request.Hashes {
- q.stateTaskQueue.Push(hash, float32(index))
- }
- if q.stateScheduler == nil {
- return 0, errNoFetchesPending
- }
-
- // Run valid nodes through the trie download scheduler. It writes completed nodes to a
- // batch, which is committed asynchronously. This may lead to over-fetches because the
- // scheduler treats everything as written after Process has returned, but it's
- // unlikely to be an issue in practice.
- batch := q.stateDatabase.NewBatch()
- progressed, nproc, procerr := q.stateScheduler.Process(process, batch)
- q.stateWriters += 1
- go func() {
- if procerr == nil {
- nproc = len(process)
- procerr = batch.Write()
- }
- // Return processing errors through the callback so the sync gets canceled. The
- // number of writers is decremented prior to the call so PendingNodeData will
- // return zero when the callback runs.
- q.lock.Lock()
- q.stateWriters -= 1
- q.lock.Unlock()
- callback(nproc, progressed, procerr)
- // Wake up WaitResults after the state has been written because it might be
- // waiting for completion of the pivot block's state download.
- q.active.Signal()
- }()
-
- // If none of the data items were good, it's a stale delivery
- switch {
- case len(errs) == 0:
- return len(process), nil
- case len(errs) == len(request.Hashes):
- return len(process), errStaleDelivery
- default:
- return len(process), fmt.Errorf("multiple failures: %v", errs)
- }
-}
-
// Prepare configures the result cache to allow accepting and caching inbound
// fetch results.
func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) {
@@ -1134,9 +870,4 @@ func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.
}
q.fastSyncPivot = pivot
q.mode = mode
-
- // If long running fast sync, also start up a head stateretrieval immediately
- if mode == FastSync && pivot > 0 {
- q.stateScheduler = state.NewStateSync(head.Root, q.stateDatabase)
- }
}
diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go
new file mode 100644
index 000000000..4e6612039
--- /dev/null
+++ b/eth/downloader/statesync.go
@@ -0,0 +1,449 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+ "fmt"
+ "hash"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/state"
+ "github.com/ethereum/go-ethereum/crypto/sha3"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/trie"
+)
+
+// stateReq represents a batch of state fetch requests groupped together into
+// a single data retrieval network packet.
+type stateReq struct {
+ items []common.Hash // Hashes of the state items to download
+ tasks map[common.Hash]*stateTask // Download tasks to track previous attempts
+ timeout time.Duration // Maximum round trip time for this to complete
+ timer *time.Timer // Timer to fire when the RTT timeout expires
+ peer *peer // Peer that we're requesting from
+ response [][]byte // Response data of the peer (nil for timeouts)
+}
+
+// timedOut returns if this request timed out.
+func (req *stateReq) timedOut() bool {
+ return req.response == nil
+}
+
+// stateSyncStats is a collection of progress stats to report during a state trie
+// sync to RPC requests as well as to display in user logs.
+type stateSyncStats struct {
+ processed uint64 // Number of state entries processed
+ duplicate uint64 // Number of state entries downloaded twice
+ unexpected uint64 // Number of non-requested state entries received
+ pending uint64 // Number of still pending state entries
+}
+
+// syncState starts downloading state with the given root hash.
+func (d *Downloader) syncState(root common.Hash) *stateSync {
+ s := newStateSync(d, root)
+ select {
+ case d.stateSyncStart <- s:
+ case <-d.quitCh:
+ s.err = errCancelStateFetch
+ close(s.done)
+ }
+ return s
+}
+
+// stateFetcher manages the active state sync and accepts requests
+// on its behalf.
+func (d *Downloader) stateFetcher() {
+ for {
+ select {
+ case s := <-d.stateSyncStart:
+ for next := s; next != nil; {
+ next = d.runStateSync(next)
+ }
+ case <-d.stateCh:
+ // Ignore state responses while no sync is running.
+ case <-d.quitCh:
+ return
+ }
+ }
+}
+
+// runStateSync runs a state synchronisation until it completes or another root
+// hash is requested to be switched over to.
+func (d *Downloader) runStateSync(s *stateSync) *stateSync {
+ var (
+ active = make(map[string]*stateReq) // Currently in-flight requests
+ finished []*stateReq // Completed or failed requests
+ timeout = make(chan *stateReq) // Timed out active requests
+ )
+ defer func() {
+ // Cancel active request timers on exit. Also set peers to idle so they're
+ // available for the next sync.
+ for _, req := range active {
+ req.timer.Stop()
+ req.peer.SetNodeDataIdle(len(req.items))
+ }
+ }()
+ // Run the state sync.
+ go s.run()
+ defer s.Cancel()
+
+ for {
+ // Enable sending of the first buffered element if there is one.
+ var (
+ deliverReq *stateReq
+ deliverReqCh chan *stateReq
+ )
+ if len(finished) > 0 {
+ deliverReq = finished[0]
+ deliverReqCh = s.deliver
+ }
+
+ select {
+ // The stateSync lifecycle:
+ case next := <-d.stateSyncStart:
+ return next
+
+ case <-s.done:
+ return nil
+
+ // Send the next finished request to the current sync:
+ case deliverReqCh <- deliverReq:
+ finished = append(finished[:0], finished[1:]...)
+
+ // Handle incoming state packs:
+ case pack := <-d.stateCh:
+ // Discard any data not requested (or previsouly timed out)
+ req := active[pack.PeerId()]
+ if req == nil {
+ log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items())
+ continue
+ }
+ // Finalize the request and queue up for processing
+ req.timer.Stop()
+ req.response = pack.(*statePack).states
+
+ finished = append(finished, req)
+ delete(active, pack.PeerId())
+
+ // Handle timed-out requests:
+ case req := <-timeout:
+ // If the peer is already requesting something else, ignore the stale timeout.
+ // This can happen when the timeout and the delivery happens simultaneously,
+ // causing both pathways to trigger.
+ if active[req.peer.id] != req {
+ continue
+ }
+ // Move the timed out data back into the download queue
+ finished = append(finished, req)
+ delete(active, req.peer.id)
+
+ // Track outgoing state requests:
+ case req := <-d.trackStateReq:
+ // If an active request already exists for this peer, we have a problem. In
+ // theory the trie node schedule must never assign two requests to the same
+ // peer. In practive however, a peer might receive a request, disconnect and
+ // immediately reconnect before the previous times out. In this case the first
+ // request is never honored, alas we must not silently overwrite it, as that
+ // causes valid requests to go missing and sync to get stuck.
+ if old := active[req.peer.id]; old != nil {
+ log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
+
+ // Make sure the previous one doesn't get siletly lost
+ finished = append(finished, old)
+ }
+ // Start a timer to notify the sync loop if the peer stalled.
+ req.timer = time.AfterFunc(req.timeout, func() {
+ select {
+ case timeout <- req:
+ case <-s.done:
+ // Prevent leaking of timer goroutines in the unlikely case where a
+ // timer is fired just before exiting runStateSync.
+ }
+ })
+ active[req.peer.id] = req
+ }
+ }
+}
+
+// stateSync schedules requests for downloading a particular state trie defined
+// by a given state root.
+type stateSync struct {
+ d *Downloader // Downloader instance to access and manage current peerset
+
+ sched *state.StateSync // State trie sync scheduler defining the tasks
+ keccak hash.Hash // Keccak256 hasher to verify deliveries with
+ tasks map[common.Hash]*stateTask // Set of tasks currently queued for retrieval
+
+ deliver chan *stateReq // Delivery channel multiplexing peer responses
+ cancel chan struct{} // Channel to signal a termination request
+ cancelOnce sync.Once // Ensures cancel only ever gets called once
+ done chan struct{} // Channel to signal termination completion
+ err error // Any error hit during sync (set before completion)
+}
+
+// stateTask represents a single trie node download taks, containing a set of
+// peers already attempted retrieval from to detect stalled syncs and abort.
+type stateTask struct {
+ attempts map[string]struct{}
+}
+
+// newStateSync creates a new state trie download scheduler. This method does not
+// yet start the sync. The user needs to call run to initiate.
+func newStateSync(d *Downloader, root common.Hash) *stateSync {
+ return &stateSync{
+ d: d,
+ sched: state.NewStateSync(root, d.stateDB),
+ keccak: sha3.NewKeccak256(),
+ tasks: make(map[common.Hash]*stateTask),
+ deliver: make(chan *stateReq),
+ cancel: make(chan struct{}),
+ done: make(chan struct{}),
+ }
+}
+
+// run starts the task assignment and response processing loop, blocking until
+// it finishes, and finally notifying any goroutines waiting for the loop to
+// finish.
+func (s *stateSync) run() {
+ s.err = s.loop()
+ close(s.done)
+}
+
+// Wait blocks until the sync is done or canceled.
+func (s *stateSync) Wait() error {
+ <-s.done
+ return s.err
+}
+
+// Cancel cancels the sync and waits until it has shut down.
+func (s *stateSync) Cancel() error {
+ s.cancelOnce.Do(func() { close(s.cancel) })
+ return s.Wait()
+}
+
+// loop is the main event loop of a state trie sync. It it responsible for the
+// assignment of new tasks to peers (including sending it to them) as well as
+// for the processing of inbound data. Note, that the loop does not directly
+// receive data from peers, rather those are buffered up in the downloader and
+// pushed here async. The reason is to decouple processing from data receipt
+// and timeouts.
+func (s *stateSync) loop() error {
+ // Listen for new peer events to assign tasks to them
+ newPeer := make(chan *peer, 1024)
+ peerSub := s.d.peers.SubscribeNewPeers(newPeer)
+ defer peerSub.Unsubscribe()
+
+ // Keep assigning new tasks until the sync completes or aborts
+ for s.sched.Pending() > 0 {
+ if err := s.assignTasks(); err != nil {
+ return err
+ }
+ // Tasks assigned, wait for something to happen
+ select {
+ case <-newPeer:
+ // New peer arrived, try to assign it download tasks
+
+ case <-s.cancel:
+ return errCancelStateFetch
+
+ case req := <-s.deliver:
+ // Response or timeout triggered, drop the peer if stalling
+ log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "timeout", req.timedOut())
+ if len(req.items) <= 2 && req.timedOut() {
+ // 2 items are the minimum requested, if even that times out, we've no use of
+ // this peer at the moment.
+ log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id)
+ s.d.dropPeer(req.peer.id)
+ }
+ // Process all the received blobs and check for stale delivery
+ stale, err := s.process(req)
+ if err != nil {
+ log.Warn("Node data write error", "err", err)
+ return err
+ }
+ // The the delivery contains requested data, mark the node idle (otherwise it's a timed out delivery)
+ if !stale {
+ req.peer.SetNodeDataIdle(len(req.response))
+ }
+ }
+ }
+ return nil
+}
+
+// assignTasks attempts to assing new tasks to all idle peers, either from the
+// batch currently being retried, or fetching new data from the trie sync itself.
+func (s *stateSync) assignTasks() error {
+ // Iterate over all idle peers and try to assign them state fetches
+ peers, _ := s.d.peers.NodeDataIdlePeers()
+ for _, p := range peers {
+ // Assign a batch of fetches proportional to the estimated latency/bandwidth
+ cap := p.NodeDataCapacity(s.d.requestRTT())
+ req := &stateReq{peer: p, timeout: s.d.requestTTL()}
+ s.fillTasks(cap, req)
+
+ // If the peer was assigned tasks to fetch, send the network request
+ if len(req.items) > 0 {
+ req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items))
+
+ select {
+ case s.d.trackStateReq <- req:
+ req.peer.FetchNodeData(req.items)
+ case <-s.cancel:
+ }
+ }
+ }
+ return nil
+}
+
+// fillTasks fills the given request object with a maximum of n state download
+// tasks to send to the remote peer.
+func (s *stateSync) fillTasks(n int, req *stateReq) {
+ // Refill available tasks from the scheduler.
+ if len(s.tasks) < n {
+ new := s.sched.Missing(n - len(s.tasks))
+ for _, hash := range new {
+ s.tasks[hash] = &stateTask{make(map[string]struct{})}
+ }
+ }
+ // Find tasks that haven't been tried with the request's peer.
+ req.items = make([]common.Hash, 0, n)
+ req.tasks = make(map[common.Hash]*stateTask, n)
+ for hash, t := range s.tasks {
+ // Stop when we've gathered enough requests
+ if len(req.items) == n {
+ break
+ }
+ // Skip any requests we've already tried from this peer
+ if _, ok := t.attempts[req.peer.id]; ok {
+ continue
+ }
+ // Assign the request to this peer
+ t.attempts[req.peer.id] = struct{}{}
+ req.items = append(req.items, hash)
+ req.tasks[hash] = t
+ delete(s.tasks, hash)
+ }
+}
+
+// process iterates over a batch of delivered state data, injecting each item
+// into a running state sync, re-queuing any items that were requested but not
+// delivered.
+func (s *stateSync) process(req *stateReq) (bool, error) {
+ // Collect processing stats and update progress if valid data was received
+ processed, written, duplicate, unexpected := 0, 0, 0, 0
+
+ defer func(start time.Time) {
+ if processed+written+duplicate+unexpected > 0 {
+ s.updateStats(processed, written, duplicate, unexpected, time.Since(start))
+ }
+ }(time.Now())
+
+ // Iterate over all the delivered data and inject one-by-one into the trie
+ progress, stale := false, len(req.response) > 0
+
+ for _, blob := range req.response {
+ prog, hash, err := s.processNodeData(blob)
+ switch err {
+ case nil:
+ processed++
+ case trie.ErrNotRequested:
+ unexpected++
+ case trie.ErrAlreadyProcessed:
+ duplicate++
+ default:
+ return stale, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
+ }
+ if prog {
+ progress = true
+ }
+ // If the node delivered a requested item, mark the delivery non-stale
+ if _, ok := req.tasks[hash]; ok {
+ delete(req.tasks, hash)
+ stale = false
+ }
+ }
+ // If some data managed to hit the database, flush and reset failure counters
+ if progress {
+ // Flush any accumulated data out to disk
+ batch := s.d.stateDB.NewBatch()
+
+ count, err := s.sched.Commit(batch)
+ if err != nil {
+ return stale, err
+ }
+ if err := batch.Write(); err != nil {
+ return stale, err
+ }
+ written = count
+
+ // If we're inside the critical section, reset fail counter since we progressed
+ if atomic.LoadUint32(&s.d.fsPivotFails) > 1 {
+ log.Trace("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&s.d.fsPivotFails))
+ atomic.StoreUint32(&s.d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
+ }
+ }
+ // Put unfulfilled tasks back into the retry queue
+ npeers := s.d.peers.Len()
+
+ for hash, task := range req.tasks {
+ // If the node did deliver something, missing items may be due to a protocol
+ // limit or a previous timeout + delayed delivery. Both cases should permit
+ // the node to retry the missing items (to avoid single-peer stalls).
+ if len(req.response) > 0 || req.timedOut() {
+ delete(task.attempts, req.peer.id)
+ }
+ // If we've requested the node too many times already, it may be a malicious
+ // sync where nobody has the right data. Abort.
+ if len(task.attempts) >= npeers {
+ return stale, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
+ }
+ // Missing item, place into the retry queue.
+ s.tasks[hash] = task
+ }
+ return stale, nil
+}
+
+// processNodeData tries to inject a trie node data blob delivered from a remote
+// peer into the state trie, returning whether anything useful was written or any
+// error occurred.
+func (s *stateSync) processNodeData(blob []byte) (bool, common.Hash, error) {
+ res := trie.SyncResult{Data: blob}
+
+ s.keccak.Reset()
+ s.keccak.Write(blob)
+ s.keccak.Sum(res.Hash[:0])
+
+ committed, _, err := s.sched.Process([]trie.SyncResult{res})
+ return committed, res.Hash, err
+}
+
+// updateStats bumps the various state sync progress counters and displays a log
+// message for the user to see.
+func (s *stateSync) updateStats(processed, written, duplicate, unexpected int, duration time.Duration) {
+ s.d.syncStatsLock.Lock()
+ defer s.d.syncStatsLock.Unlock()
+
+ s.d.syncStatsState.pending = uint64(s.sched.Pending())
+ s.d.syncStatsState.processed += uint64(processed)
+ s.d.syncStatsState.duplicate += uint64(duplicate)
+ s.d.syncStatsState.unexpected += uint64(unexpected)
+
+ log.Info("Imported new state entries", "count", processed, "flushed", written, "elapsed", common.PrettyDuration(duration), "processed", s.d.syncStatsState.processed, "pending", s.d.syncStatsState.pending, "retry", len(s.tasks), "duplicate", s.d.syncStatsState.duplicate, "unexpected", s.d.syncStatsState.unexpected)
+}
diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go
index 59f60d659..45bb87322 100644
--- a/ethclient/ethclient.go
+++ b/ethclient/ethclient.go
@@ -167,11 +167,11 @@ func (ec *Client) TransactionByHash(ctx context.Context, hash common.Hash) (tx *
} else if _, r, _ := tx.RawSignatureValues(); r == nil {
return nil, false, fmt.Errorf("server returned transaction without signature")
}
- var block struct{ BlockHash *common.Hash }
+ var block struct{ BlockNumber *string }
if err := json.Unmarshal(raw, &block); err != nil {
return nil, false, err
}
- return tx, block.BlockHash == nil, nil
+ return tx, block.BlockNumber == nil, nil
}
// TransactionCount returns the total number of transactions in the given block.
diff --git a/ethdb/memory_database.go b/ethdb/memory_database.go
index 65c487934..a2ee2f2cc 100644
--- a/ethdb/memory_database.go
+++ b/ethdb/memory_database.go
@@ -45,13 +45,6 @@ func (db *MemDatabase) Put(key []byte, value []byte) error {
return nil
}
-func (db *MemDatabase) Set(key []byte, value []byte) {
- db.lock.Lock()
- defer db.lock.Unlock()
-
- db.Put(key, value)
-}
-
func (db *MemDatabase) Get(key []byte) ([]byte, error) {
db.lock.RLock()
defer db.lock.RUnlock()
diff --git a/internal/cmdtest/test_cmd.go b/internal/cmdtest/test_cmd.go
new file mode 100644
index 000000000..541e51c4c
--- /dev/null
+++ b/internal/cmdtest/test_cmd.go
@@ -0,0 +1,270 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of go-ethereum.
+//
+// go-ethereum is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// go-ethereum is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
+
+package cmdtest
+
+import (
+ "bufio"
+ "bytes"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "os/exec"
+ "regexp"
+ "sync"
+ "testing"
+ "text/template"
+ "time"
+
+ "github.com/docker/docker/pkg/reexec"
+)
+
+func NewTestCmd(t *testing.T, data interface{}) *TestCmd {
+ return &TestCmd{T: t, Data: data}
+}
+
+type TestCmd struct {
+ // For total convenience, all testing methods are available.
+ *testing.T
+
+ Func template.FuncMap
+ Data interface{}
+ Cleanup func()
+
+ cmd *exec.Cmd
+ stdout *bufio.Reader
+ stdin io.WriteCloser
+ stderr *testlogger
+}
+
+// Run exec's the current binary using name as argv[0] which will trigger the
+// reexec init function for that name (e.g. "geth-test" in cmd/geth/run_test.go)
+func (tt *TestCmd) Run(name string, args ...string) {
+ tt.stderr = &testlogger{t: tt.T}
+ tt.cmd = &exec.Cmd{
+ Path: reexec.Self(),
+ Args: append([]string{name}, args...),
+ Stderr: tt.stderr,
+ }
+ stdout, err := tt.cmd.StdoutPipe()
+ if err != nil {
+ tt.Fatal(err)
+ }
+ tt.stdout = bufio.NewReader(stdout)
+ if tt.stdin, err = tt.cmd.StdinPipe(); err != nil {
+ tt.Fatal(err)
+ }
+ if err := tt.cmd.Start(); err != nil {
+ tt.Fatal(err)
+ }
+}
+
+// InputLine writes the given text to the childs stdin.
+// This method can also be called from an expect template, e.g.:
+//
+// geth.expect(`Passphrase: {{.InputLine "password"}}`)
+func (tt *TestCmd) InputLine(s string) string {
+ io.WriteString(tt.stdin, s+"\n")
+ return ""
+}
+
+func (tt *TestCmd) SetTemplateFunc(name string, fn interface{}) {
+ if tt.Func == nil {
+ tt.Func = make(map[string]interface{})
+ }
+ tt.Func[name] = fn
+}
+
+// Expect runs its argument as a template, then expects the
+// child process to output the result of the template within 5s.
+//
+// If the template starts with a newline, the newline is removed
+// before matching.
+func (tt *TestCmd) Expect(tplsource string) {
+ // Generate the expected output by running the template.
+ tpl := template.Must(template.New("").Funcs(tt.Func).Parse(tplsource))
+ wantbuf := new(bytes.Buffer)
+ if err := tpl.Execute(wantbuf, tt.Data); err != nil {
+ panic(err)
+ }
+ // Trim exactly one newline at the beginning. This makes tests look
+ // much nicer because all expect strings are at column 0.
+ want := bytes.TrimPrefix(wantbuf.Bytes(), []byte("\n"))
+ if err := tt.matchExactOutput(want); err != nil {
+ tt.Fatal(err)
+ }
+ tt.Logf("Matched stdout text:\n%s", want)
+}
+
+func (tt *TestCmd) matchExactOutput(want []byte) error {
+ buf := make([]byte, len(want))
+ n := 0
+ tt.withKillTimeout(func() { n, _ = io.ReadFull(tt.stdout, buf) })
+ buf = buf[:n]
+ if n < len(want) || !bytes.Equal(buf, want) {
+ // Grab any additional buffered output in case of mismatch
+ // because it might help with debugging.
+ buf = append(buf, make([]byte, tt.stdout.Buffered())...)
+ tt.stdout.Read(buf[n:])
+ // Find the mismatch position.
+ for i := 0; i < n; i++ {
+ if want[i] != buf[i] {
+ return fmt.Errorf("Output mismatch at â—Š:\n---------------- (stdout text)\n%sâ—Š%s\n---------------- (expected text)\n%s",
+ buf[:i], buf[i:n], want)
+ }
+ }
+ if n < len(want) {
+ return fmt.Errorf("Not enough output, got until â—Š:\n---------------- (stdout text)\n%s\n---------------- (expected text)\n%sâ—Š%s",
+ buf, want[:n], want[n:])
+ }
+ }
+ return nil
+}
+
+// ExpectRegexp expects the child process to output text matching the
+// given regular expression within 5s.
+//
+// Note that an arbitrary amount of output may be consumed by the
+// regular expression. This usually means that expect cannot be used
+// after ExpectRegexp.
+func (tt *TestCmd) ExpectRegexp(resource string) (*regexp.Regexp, []string) {
+ var (
+ re = regexp.MustCompile(resource)
+ rtee = &runeTee{in: tt.stdout}
+ matches []int
+ )
+ tt.withKillTimeout(func() { matches = re.FindReaderSubmatchIndex(rtee) })
+ output := rtee.buf.Bytes()
+ if matches == nil {
+ tt.Fatalf("Output did not match:\n---------------- (stdout text)\n%s\n---------------- (regular expression)\n%s",
+ output, resource)
+ return re, nil
+ }
+ tt.Logf("Matched stdout text:\n%s", output)
+ var submatches []string
+ for i := 0; i < len(matches); i += 2 {
+ submatch := string(output[matches[i]:matches[i+1]])
+ submatches = append(submatches, submatch)
+ }
+ return re, submatches
+}
+
+// ExpectExit expects the child process to exit within 5s without
+// printing any additional text on stdout.
+func (tt *TestCmd) ExpectExit() {
+ var output []byte
+ tt.withKillTimeout(func() {
+ output, _ = ioutil.ReadAll(tt.stdout)
+ })
+ tt.WaitExit()
+ if tt.Cleanup != nil {
+ tt.Cleanup()
+ }
+ if len(output) > 0 {
+ tt.Errorf("Unmatched stdout text:\n%s", output)
+ }
+}
+
+func (tt *TestCmd) WaitExit() {
+ tt.cmd.Wait()
+}
+
+func (tt *TestCmd) Interrupt() {
+ tt.cmd.Process.Signal(os.Interrupt)
+}
+
+// StderrText returns any stderr output written so far.
+// The returned text holds all log lines after ExpectExit has
+// returned.
+func (tt *TestCmd) StderrText() string {
+ tt.stderr.mu.Lock()
+ defer tt.stderr.mu.Unlock()
+ return tt.stderr.buf.String()
+}
+
+func (tt *TestCmd) CloseStdin() {
+ tt.stdin.Close()
+}
+
+func (tt *TestCmd) Kill() {
+ tt.cmd.Process.Kill()
+ if tt.Cleanup != nil {
+ tt.Cleanup()
+ }
+}
+
+func (tt *TestCmd) withKillTimeout(fn func()) {
+ timeout := time.AfterFunc(5*time.Second, func() {
+ tt.Log("killing the child process (timeout)")
+ tt.Kill()
+ })
+ defer timeout.Stop()
+ fn()
+}
+
+// testlogger logs all written lines via t.Log and also
+// collects them for later inspection.
+type testlogger struct {
+ t *testing.T
+ mu sync.Mutex
+ buf bytes.Buffer
+}
+
+func (tl *testlogger) Write(b []byte) (n int, err error) {
+ lines := bytes.Split(b, []byte("\n"))
+ for _, line := range lines {
+ if len(line) > 0 {
+ tl.t.Logf("(stderr) %s", line)
+ }
+ }
+ tl.mu.Lock()
+ tl.buf.Write(b)
+ tl.mu.Unlock()
+ return len(b), err
+}
+
+// runeTee collects text read through it into buf.
+type runeTee struct {
+ in interface {
+ io.Reader
+ io.ByteReader
+ io.RuneReader
+ }
+ buf bytes.Buffer
+}
+
+func (rtee *runeTee) Read(b []byte) (n int, err error) {
+ n, err = rtee.in.Read(b)
+ rtee.buf.Write(b[:n])
+ return n, err
+}
+
+func (rtee *runeTee) ReadRune() (r rune, size int, err error) {
+ r, size, err = rtee.in.ReadRune()
+ if err == nil {
+ rtee.buf.WriteRune(r)
+ }
+ return r, size, err
+}
+
+func (rtee *runeTee) ReadByte() (b byte, err error) {
+ b, err = rtee.in.ReadByte()
+ if err == nil {
+ rtee.buf.WriteByte(b)
+ }
+ return b, err
+}
diff --git a/les/backend.go b/les/backend.go
index 646c81a7b..658c73c6e 100644
--- a/les/backend.go
+++ b/les/backend.go
@@ -19,6 +19,7 @@ package les
import (
"fmt"
+ "sync"
"time"
"github.com/ethereum/go-ethereum/accounts"
@@ -38,6 +39,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/params"
rpc "github.com/ethereum/go-ethereum/rpc"
)
@@ -49,9 +51,13 @@ type LightEthereum struct {
// Channel for shutting down the service
shutdownChan chan bool
// Handlers
+ peers *peerSet
txPool *light.TxPool
blockchain *light.LightChain
protocolManager *ProtocolManager
+ serverPool *serverPool
+ reqDist *requestDistributor
+ retriever *retrieveManager
// DB interfaces
chainDb ethdb.Database // Block chain database
@@ -63,6 +69,9 @@ type LightEthereum struct {
networkId uint64
netRPCService *ethapi.PublicNetAPI
+
+ quitSync chan struct{}
+ wg sync.WaitGroup
}
func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
@@ -76,20 +85,26 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
}
log.Info("Initialised chain configuration", "config", chainConfig)
- odr := NewLesOdr(chainDb)
- relay := NewLesTxRelay()
+ peers := newPeerSet()
+ quitSync := make(chan struct{})
+
eth := &LightEthereum{
- odr: odr,
- relay: relay,
- chainDb: chainDb,
chainConfig: chainConfig,
+ chainDb: chainDb,
eventMux: ctx.EventMux,
+ peers: peers,
+ reqDist: newRequestDistributor(peers, quitSync),
accountManager: ctx.AccountManager,
engine: eth.CreateConsensusEngine(ctx, config, chainConfig, chainDb),
shutdownChan: make(chan bool),
networkId: config.NetworkId,
}
- if eth.blockchain, err = light.NewLightChain(odr, eth.chainConfig, eth.engine, eth.eventMux); err != nil {
+
+ eth.relay = NewLesTxRelay(peers, eth.reqDist)
+ eth.serverPool = newServerPool(chainDb, quitSync, &eth.wg)
+ eth.retriever = newRetrieveManager(peers, eth.reqDist, eth.serverPool)
+ eth.odr = NewLesOdr(chainDb, eth.retriever)
+ if eth.blockchain, err = light.NewLightChain(eth.odr, eth.chainConfig, eth.engine, eth.eventMux); err != nil {
return nil, err
}
// Rewind the chain in case of an incompatible config upgrade.
@@ -100,13 +115,9 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
}
eth.txPool = light.NewTxPool(eth.chainConfig, eth.eventMux, eth.blockchain, eth.relay)
- lightSync := config.SyncMode == downloader.LightSync
- if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, lightSync, config.NetworkId, eth.eventMux, eth.engine, eth.blockchain, nil, chainDb, odr, relay); err != nil {
+ if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, true, config.NetworkId, eth.eventMux, eth.engine, eth.peers, eth.blockchain, nil, chainDb, eth.odr, eth.relay, quitSync, &eth.wg); err != nil {
return nil, err
}
- relay.ps = eth.protocolManager.peers
- relay.reqDist = eth.protocolManager.reqDist
-
eth.ApiBackend = &LesApiBackend{eth, nil}
gpoParams := config.GPO
if gpoParams.Default == nil {
@@ -116,6 +127,10 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
return eth, nil
}
+func lesTopic(genesisHash common.Hash) discv5.Topic {
+ return discv5.Topic("LES@" + common.Bytes2Hex(genesisHash.Bytes()[0:8]))
+}
+
type LightDummyAPI struct{}
// Etherbase is the address that mining rewards will be send to
@@ -188,7 +203,8 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {
func (s *LightEthereum) Start(srvr *p2p.Server) error {
log.Warn("Light client mode is an experimental feature")
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.networkId)
- s.protocolManager.Start(srvr)
+ s.serverPool.start(srvr, lesTopic(s.blockchain.Genesis().Hash()))
+ s.protocolManager.Start()
return nil
}
diff --git a/les/distributor.go b/les/distributor.go
index 71afe2b73..e8ef5b02e 100644
--- a/les/distributor.go
+++ b/les/distributor.go
@@ -34,11 +34,11 @@ var ErrNoPeers = errors.New("no suitable peers available")
type requestDistributor struct {
reqQueue *list.List
lastReqOrder uint64
+ peers map[distPeer]struct{}
+ peerLock sync.RWMutex
stopChn, loopChn chan struct{}
loopNextSent bool
lock sync.Mutex
-
- getAllPeers func() map[distPeer]struct{}
}
// distPeer is an LES server peer interface for the request distributor.
@@ -71,15 +71,39 @@ type distReq struct {
}
// newRequestDistributor creates a new request distributor
-func newRequestDistributor(getAllPeers func() map[distPeer]struct{}, stopChn chan struct{}) *requestDistributor {
- r := &requestDistributor{
- reqQueue: list.New(),
- loopChn: make(chan struct{}, 2),
- stopChn: stopChn,
- getAllPeers: getAllPeers,
+func newRequestDistributor(peers *peerSet, stopChn chan struct{}) *requestDistributor {
+ d := &requestDistributor{
+ reqQueue: list.New(),
+ loopChn: make(chan struct{}, 2),
+ stopChn: stopChn,
+ peers: make(map[distPeer]struct{}),
+ }
+ if peers != nil {
+ peers.notify(d)
}
- go r.loop()
- return r
+ go d.loop()
+ return d
+}
+
+// registerPeer implements peerSetNotify
+func (d *requestDistributor) registerPeer(p *peer) {
+ d.peerLock.Lock()
+ d.peers[p] = struct{}{}
+ d.peerLock.Unlock()
+}
+
+// unregisterPeer implements peerSetNotify
+func (d *requestDistributor) unregisterPeer(p *peer) {
+ d.peerLock.Lock()
+ delete(d.peers, p)
+ d.peerLock.Unlock()
+}
+
+// registerTestPeer adds a new test peer
+func (d *requestDistributor) registerTestPeer(p distPeer) {
+ d.peerLock.Lock()
+ d.peers[p] = struct{}{}
+ d.peerLock.Unlock()
}
// distMaxWait is the maximum waiting time after which further necessary waiting
@@ -152,8 +176,7 @@ func (sp selectPeerItem) Weight() int64 {
// nextRequest returns the next possible request from any peer, along with the
// associated peer and necessary waiting time
func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
- peers := d.getAllPeers()
-
+ checkedPeers := make(map[distPeer]struct{})
elem := d.reqQueue.Front()
var (
bestPeer distPeer
@@ -162,11 +185,14 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
sel *weightedRandomSelect
)
- for (len(peers) > 0 || elem == d.reqQueue.Front()) && elem != nil {
+ d.peerLock.RLock()
+ defer d.peerLock.RUnlock()
+
+ for (len(d.peers) > 0 || elem == d.reqQueue.Front()) && elem != nil {
req := elem.Value.(*distReq)
canSend := false
- for peer, _ := range peers {
- if peer.canQueue() && req.canSend(peer) {
+ for peer, _ := range d.peers {
+ if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) {
canSend = true
cost := req.getCost(peer)
wait, bufRemain := peer.waitBefore(cost)
@@ -182,7 +208,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
bestWait = wait
}
}
- delete(peers, peer)
+ checkedPeers[peer] = struct{}{}
}
}
next := elem.Next()
diff --git a/les/distributor_test.go b/les/distributor_test.go
index ae184b21b..4e7f8bd29 100644
--- a/les/distributor_test.go
+++ b/les/distributor_test.go
@@ -122,20 +122,14 @@ func testRequestDistributor(t *testing.T, resend bool) {
stop := make(chan struct{})
defer close(stop)
+ dist := newRequestDistributor(nil, stop)
var peers [testDistPeerCount]*testDistPeer
for i, _ := range peers {
peers[i] = &testDistPeer{}
go peers[i].worker(t, !resend, stop)
+ dist.registerTestPeer(peers[i])
}
- dist := newRequestDistributor(func() map[distPeer]struct{} {
- m := make(map[distPeer]struct{})
- for _, peer := range peers {
- m[peer] = struct{}{}
- }
- return m
- }, stop)
-
var wg sync.WaitGroup
for i := 1; i <= testDistReqCount; i++ {
diff --git a/les/fetcher.go b/les/fetcher.go
index a294d00d5..4fc142f0f 100644
--- a/les/fetcher.go
+++ b/les/fetcher.go
@@ -116,6 +116,7 @@ func newLightFetcher(pm *ProtocolManager) *lightFetcher {
syncDone: make(chan *peer),
maxConfirmedTd: big.NewInt(0),
}
+ pm.peers.notify(f)
go f.syncLoop()
return f
}
@@ -209,8 +210,8 @@ func (f *lightFetcher) syncLoop() {
}
}
-// addPeer adds a new peer to the fetcher's peer set
-func (f *lightFetcher) addPeer(p *peer) {
+// registerPeer adds a new peer to the fetcher's peer set
+func (f *lightFetcher) registerPeer(p *peer) {
p.lock.Lock()
p.hasBlock = func(hash common.Hash, number uint64) bool {
return f.peerHasBlock(p, hash, number)
@@ -223,8 +224,8 @@ func (f *lightFetcher) addPeer(p *peer) {
f.peers[p] = &fetcherPeerInfo{nodeByHash: make(map[common.Hash]*fetcherTreeNode)}
}
-// removePeer removes a new peer from the fetcher's peer set
-func (f *lightFetcher) removePeer(p *peer) {
+// unregisterPeer removes a new peer from the fetcher's peer set
+func (f *lightFetcher) unregisterPeer(p *peer) {
p.lock.Lock()
p.hasBlock = nil
p.lock.Unlock()
@@ -416,7 +417,7 @@ func (f *lightFetcher) nextRequest() (*distReq, uint64) {
f.syncing = bestSyncing
var rq *distReq
- reqID := getNextReqID()
+ reqID := genReqID()
if f.syncing {
rq = &distReq{
getCost: func(dp distPeer) uint64 {
diff --git a/les/handler.go b/les/handler.go
index 64023af0f..77bc077a2 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -102,7 +102,9 @@ type ProtocolManager struct {
odr *LesOdr
server *LesServer
serverPool *serverPool
+ lesTopic discv5.Topic
reqDist *requestDistributor
+ retriever *retrieveManager
downloader *downloader.Downloader
fetcher *lightFetcher
@@ -123,12 +125,12 @@ type ProtocolManager struct {
// wait group is used for graceful shutdowns during downloading
// and processing
- wg sync.WaitGroup
+ wg *sync.WaitGroup
}
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay) (*ProtocolManager, error) {
+func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
lightSync: lightSync,
@@ -136,15 +138,20 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
blockchain: blockchain,
chainConfig: chainConfig,
chainDb: chainDb,
+ odr: odr,
networkId: networkId,
txpool: txpool,
txrelay: txrelay,
- odr: odr,
- peers: newPeerSet(),
+ peers: peers,
newPeerCh: make(chan *peer),
- quitSync: make(chan struct{}),
+ quitSync: quitSync,
+ wg: wg,
noMorePeers: make(chan struct{}),
}
+ if odr != nil {
+ manager.retriever = odr.retriever
+ manager.reqDist = odr.retriever.dist
+ }
// Initiate a sub-protocol for every implemented version we can handle
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
for i, version := range ProtocolVersions {
@@ -202,84 +209,22 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash,
nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
+ manager.peers.notify((*downloaderPeerNotify)(manager))
+ manager.fetcher = newLightFetcher(manager)
}
- manager.reqDist = newRequestDistributor(func() map[distPeer]struct{} {
- m := make(map[distPeer]struct{})
- peers := manager.peers.AllPeers()
- for _, peer := range peers {
- m[peer] = struct{}{}
- }
- return m
- }, manager.quitSync)
- if odr != nil {
- odr.removePeer = removePeer
- odr.reqDist = manager.reqDist
- }
-
- /*validator := func(block *types.Block, parent *types.Block) error {
- return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false)
- }
- heighter := func() uint64 {
- return chainman.LastBlockNumberU64()
- }
- manager.fetcher = fetcher.New(chainman.GetBlockNoOdr, validator, nil, heighter, chainman.InsertChain, manager.removePeer)
- */
return manager, nil
}
+// removePeer initiates disconnection from a peer by removing it from the peer set
func (pm *ProtocolManager) removePeer(id string) {
- // Short circuit if the peer was already removed
- peer := pm.peers.Peer(id)
- if peer == nil {
- return
- }
- log.Debug("Removing light Ethereum peer", "peer", id)
- if err := pm.peers.Unregister(id); err != nil {
- if err == errNotRegistered {
- return
- }
- }
- // Unregister the peer from the downloader and Ethereum peer set
- if pm.lightSync {
- pm.downloader.UnregisterPeer(id)
- if pm.txrelay != nil {
- pm.txrelay.removePeer(id)
- }
- if pm.fetcher != nil {
- pm.fetcher.removePeer(peer)
- }
- }
- // Hard disconnect at the networking layer
- if peer != nil {
- peer.Peer.Disconnect(p2p.DiscUselessPeer)
- }
+ pm.peers.Unregister(id)
}
-func (pm *ProtocolManager) Start(srvr *p2p.Server) {
- var topicDisc *discv5.Network
- if srvr != nil {
- topicDisc = srvr.DiscV5
- }
- lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
+func (pm *ProtocolManager) Start() {
if pm.lightSync {
- // start sync handler
- if srvr != nil { // srvr is nil during testing
- pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg)
- pm.odr.serverPool = pm.serverPool
- pm.fetcher = newLightFetcher(pm)
- }
go pm.syncer()
} else {
- if topicDisc != nil {
- go func() {
- logger := log.New("topic", lesTopic)
- logger.Info("Starting topic registration")
- defer logger.Info("Terminated topic registration")
-
- topicDisc.RegisterTopic(lesTopic, pm.quitSync)
- }()
- }
go func() {
for range pm.newPeerCh {
}
@@ -342,65 +287,10 @@ func (pm *ProtocolManager) handle(p *peer) error {
}()
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if pm.lightSync {
- requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
- reqID := getNextReqID()
- rq := &distReq{
- getCost: func(dp distPeer) uint64 {
- peer := dp.(*peer)
- return peer.GetRequestCost(GetBlockHeadersMsg, amount)
- },
- canSend: func(dp distPeer) bool {
- return dp.(*peer) == p
- },
- request: func(dp distPeer) func() {
- peer := dp.(*peer)
- cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
- peer.fcServer.QueueRequest(reqID, cost)
- return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
- },
- }
- _, ok := <-pm.reqDist.queue(rq)
- if !ok {
- return ErrNoPeers
- }
- return nil
- }
- requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
- reqID := getNextReqID()
- rq := &distReq{
- getCost: func(dp distPeer) uint64 {
- peer := dp.(*peer)
- return peer.GetRequestCost(GetBlockHeadersMsg, amount)
- },
- canSend: func(dp distPeer) bool {
- return dp.(*peer) == p
- },
- request: func(dp distPeer) func() {
- peer := dp.(*peer)
- cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
- peer.fcServer.QueueRequest(reqID, cost)
- return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
- },
- }
- _, ok := <-pm.reqDist.queue(rq)
- if !ok {
- return ErrNoPeers
- }
- return nil
- }
- if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd,
- requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
- return err
- }
- if pm.txrelay != nil {
- pm.txrelay.addPeer(p)
- }
-
p.lock.Lock()
head := p.headInfo
p.lock.Unlock()
if pm.fetcher != nil {
- pm.fetcher.addPeer(p)
pm.fetcher.announce(p, head)
}
@@ -926,7 +816,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
if deliverMsg != nil {
- err := pm.odr.Deliver(p, deliverMsg)
+ err := pm.retriever.deliver(p, deliverMsg)
if err != nil {
p.responseErrors++
if p.responseErrors > maxResponseErrors {
@@ -946,3 +836,64 @@ func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo {
Head: self.blockchain.LastBlockHash(),
}
}
+
+// downloaderPeerNotify implements peerSetNotify
+type downloaderPeerNotify ProtocolManager
+
+func (d *downloaderPeerNotify) registerPeer(p *peer) {
+ pm := (*ProtocolManager)(d)
+
+ requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
+ reqID := genReqID()
+ rq := &distReq{
+ getCost: func(dp distPeer) uint64 {
+ peer := dp.(*peer)
+ return peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ },
+ canSend: func(dp distPeer) bool {
+ return dp.(*peer) == p
+ },
+ request: func(dp distPeer) func() {
+ peer := dp.(*peer)
+ cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ peer.fcServer.QueueRequest(reqID, cost)
+ return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
+ },
+ }
+ _, ok := <-pm.reqDist.queue(rq)
+ if !ok {
+ return ErrNoPeers
+ }
+ return nil
+ }
+ requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
+ reqID := genReqID()
+ rq := &distReq{
+ getCost: func(dp distPeer) uint64 {
+ peer := dp.(*peer)
+ return peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ },
+ canSend: func(dp distPeer) bool {
+ return dp.(*peer) == p
+ },
+ request: func(dp distPeer) func() {
+ peer := dp.(*peer)
+ cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ peer.fcServer.QueueRequest(reqID, cost)
+ return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
+ },
+ }
+ _, ok := <-pm.reqDist.queue(rq)
+ if !ok {
+ return ErrNoPeers
+ }
+ return nil
+ }
+
+ pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd, requestHeadersByHash, requestHeadersByNumber, nil, nil, nil)
+}
+
+func (d *downloaderPeerNotify) unregisterPeer(p *peer) {
+ pm := (*ProtocolManager)(d)
+ pm.downloader.UnregisterPeer(p.id)
+}
diff --git a/les/handler_test.go b/les/handler_test.go
index 0b94d0d30..5df1d3463 100644
--- a/les/handler_test.go
+++ b/les/handler_test.go
@@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
@@ -42,7 +43,8 @@ func expectResponse(r p2p.MsgReader, msgcode, reqID, bv uint64, data interface{}
func TestGetBlockHeadersLes1(t *testing.T) { testGetBlockHeaders(t, 1) }
func testGetBlockHeaders(t *testing.T, protocol int) {
- pm, _, _ := newTestProtocolManagerMust(t, false, downloader.MaxHashFetch+15, nil)
+ db, _ := ethdb.NewMemDatabase()
+ pm := newTestProtocolManagerMust(t, false, downloader.MaxHashFetch+15, nil, nil, nil, db)
bc := pm.blockchain.(*core.BlockChain)
peer, _ := newTestPeer(t, "peer", protocol, pm, true)
defer peer.close()
@@ -170,7 +172,8 @@ func testGetBlockHeaders(t *testing.T, protocol int) {
func TestGetBlockBodiesLes1(t *testing.T) { testGetBlockBodies(t, 1) }
func testGetBlockBodies(t *testing.T, protocol int) {
- pm, _, _ := newTestProtocolManagerMust(t, false, downloader.MaxBlockFetch+15, nil)
+ db, _ := ethdb.NewMemDatabase()
+ pm := newTestProtocolManagerMust(t, false, downloader.MaxBlockFetch+15, nil, nil, nil, db)
bc := pm.blockchain.(*core.BlockChain)
peer, _ := newTestPeer(t, "peer", protocol, pm, true)
defer peer.close()
@@ -246,7 +249,8 @@ func TestGetCodeLes1(t *testing.T) { testGetCode(t, 1) }
func testGetCode(t *testing.T, protocol int) {
// Assemble the test environment
- pm, _, _ := newTestProtocolManagerMust(t, false, 4, testChainGen)
+ db, _ := ethdb.NewMemDatabase()
+ pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
bc := pm.blockchain.(*core.BlockChain)
peer, _ := newTestPeer(t, "peer", protocol, pm, true)
defer peer.close()
@@ -278,7 +282,8 @@ func TestGetReceiptLes1(t *testing.T) { testGetReceipt(t, 1) }
func testGetReceipt(t *testing.T, protocol int) {
// Assemble the test environment
- pm, db, _ := newTestProtocolManagerMust(t, false, 4, testChainGen)
+ db, _ := ethdb.NewMemDatabase()
+ pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
bc := pm.blockchain.(*core.BlockChain)
peer, _ := newTestPeer(t, "peer", protocol, pm, true)
defer peer.close()
@@ -304,7 +309,8 @@ func TestGetProofsLes1(t *testing.T) { testGetReceipt(t, 1) }
func testGetProofs(t *testing.T, protocol int) {
// Assemble the test environment
- pm, db, _ := newTestProtocolManagerMust(t, false, 4, testChainGen)
+ db, _ := ethdb.NewMemDatabase()
+ pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
bc := pm.blockchain.(*core.BlockChain)
peer, _ := newTestPeer(t, "peer", protocol, pm, true)
defer peer.close()
diff --git a/les/helper_test.go b/les/helper_test.go
index 7e442c131..52fddd117 100644
--- a/les/helper_test.go
+++ b/les/helper_test.go
@@ -25,7 +25,6 @@ import (
"math/big"
"sync"
"testing"
- "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
@@ -132,22 +131,22 @@ func testRCL() RequestCostList {
// newTestProtocolManager creates a new protocol manager for testing purposes,
// with the given number of blocks already known, and potential notification
// channels for different events.
-func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen)) (*ProtocolManager, ethdb.Database, *LesOdr, error) {
+func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen), peers *peerSet, odr *LesOdr, db ethdb.Database) (*ProtocolManager, error) {
var (
evmux = new(event.TypeMux)
engine = ethash.NewFaker()
- db, _ = ethdb.NewMemDatabase()
gspec = core.Genesis{
Config: params.TestChainConfig,
Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}},
}
genesis = gspec.MustCommit(db)
- odr *LesOdr
- chain BlockChain
+ chain BlockChain
)
+ if peers == nil {
+ peers = newPeerSet()
+ }
if lightSync {
- odr = NewLesOdr(db)
chain, _ = light.NewLightChain(odr, gspec.Config, engine, evmux)
} else {
blockchain, _ := core.NewBlockChain(db, gspec.Config, engine, evmux, vm.Config{})
@@ -158,9 +157,9 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
chain = blockchain
}
- pm, err := NewProtocolManager(gspec.Config, lightSync, NetworkId, evmux, engine, chain, nil, db, odr, nil)
+ pm, err := NewProtocolManager(gspec.Config, lightSync, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, make(chan struct{}), new(sync.WaitGroup))
if err != nil {
- return nil, nil, nil, err
+ return nil, err
}
if !lightSync {
srv := &LesServer{protocolManager: pm}
@@ -174,20 +173,20 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
srv.fcManager = flowcontrol.NewClientManager(50, 10, 1000000000)
srv.fcCostStats = newCostStats(nil)
}
- pm.Start(nil)
- return pm, db, odr, nil
+ pm.Start()
+ return pm, nil
}
// newTestProtocolManagerMust creates a new protocol manager for testing purposes,
// with the given number of blocks already known, and potential notification
// channels for different events. In case of an error, the constructor force-
// fails the test.
-func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen)) (*ProtocolManager, ethdb.Database, *LesOdr) {
- pm, db, odr, err := newTestProtocolManager(lightSync, blocks, generator)
+func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen), peers *peerSet, odr *LesOdr, db ethdb.Database) *ProtocolManager {
+ pm, err := newTestProtocolManager(lightSync, blocks, generator, peers, odr, db)
if err != nil {
t.Fatalf("Failed to create protocol manager: %v", err)
}
- return pm, db, odr
+ return pm
}
// testTxPool is a fake, helper transaction pool for testing purposes
@@ -342,30 +341,3 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu
func (p *testPeer) close() {
p.app.Close()
}
-
-type testServerPool struct {
- peer *peer
- lock sync.RWMutex
-}
-
-func (p *testServerPool) setPeer(peer *peer) {
- p.lock.Lock()
- defer p.lock.Unlock()
-
- p.peer = peer
-}
-
-func (p *testServerPool) getAllPeers() map[distPeer]struct{} {
- p.lock.RLock()
- defer p.lock.RUnlock()
-
- m := make(map[distPeer]struct{})
- if p.peer != nil {
- m[p.peer] = struct{}{}
- }
- return m
-}
-
-func (p *testServerPool) adjustResponseTime(*poolEntry, time.Duration, bool) {
-
-}
diff --git a/les/odr.go b/les/odr.go
index 684f36c76..3f7584b48 100644
--- a/les/odr.go
+++ b/les/odr.go
@@ -18,45 +18,24 @@ package les
import (
"context"
- "crypto/rand"
- "encoding/binary"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
)
-var (
- softRequestTimeout = time.Millisecond * 500
- hardRequestTimeout = time.Second * 10
-)
-
-// peerDropFn is a callback type for dropping a peer detected as malicious.
-type peerDropFn func(id string)
-
-type odrPeerSelector interface {
- adjustResponseTime(*poolEntry, time.Duration, bool)
-}
-
+// LesOdr implements light.OdrBackend
type LesOdr struct {
- light.OdrBackend
- db ethdb.Database
- stop chan struct{}
- removePeer peerDropFn
- mlock, clock sync.Mutex
- sentReqs map[uint64]*sentReq
- serverPool odrPeerSelector
- reqDist *requestDistributor
+ db ethdb.Database
+ stop chan struct{}
+ retriever *retrieveManager
}
-func NewLesOdr(db ethdb.Database) *LesOdr {
+func NewLesOdr(db ethdb.Database, retriever *retrieveManager) *LesOdr {
return &LesOdr{
- db: db,
- stop: make(chan struct{}),
- sentReqs: make(map[uint64]*sentReq),
+ db: db,
+ retriever: retriever,
+ stop: make(chan struct{}),
}
}
@@ -68,17 +47,6 @@ func (odr *LesOdr) Database() ethdb.Database {
return odr.db
}
-// validatorFunc is a function that processes a message.
-type validatorFunc func(ethdb.Database, *Msg) error
-
-// sentReq is a request waiting for an answer that satisfies its valFunc
-type sentReq struct {
- valFunc validatorFunc
- sentTo map[*peer]chan struct{}
- lock sync.RWMutex // protects acces to sentTo
- answered chan struct{} // closed and set to nil when any peer answers it
-}
-
const (
MsgBlockBodies = iota
MsgCode
@@ -94,156 +62,29 @@ type Msg struct {
Obj interface{}
}
-// Deliver is called by the LES protocol manager to deliver ODR reply messages to waiting requests
-func (self *LesOdr) Deliver(peer *peer, msg *Msg) error {
- var delivered chan struct{}
- self.mlock.Lock()
- req, ok := self.sentReqs[msg.ReqID]
- self.mlock.Unlock()
- if ok {
- req.lock.Lock()
- delivered, ok = req.sentTo[peer]
- req.lock.Unlock()
- }
-
- if !ok {
- return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
- }
-
- if err := req.valFunc(self.db, msg); err != nil {
- peer.Log().Warn("Invalid odr response", "err", err)
- return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID)
- }
- close(delivered)
- req.lock.Lock()
- delete(req.sentTo, peer)
- if req.answered != nil {
- close(req.answered)
- req.answered = nil
- }
- req.lock.Unlock()
- return nil
-}
-
-func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout chan struct{}, reqWg *sync.WaitGroup) {
- stime := mclock.Now()
- defer func() {
- req.lock.Lock()
- delete(req.sentTo, peer)
- req.lock.Unlock()
- reqWg.Done()
- }()
-
- select {
- case <-delivered:
- if self.serverPool != nil {
- self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), false)
- }
- return
- case <-time.After(softRequestTimeout):
- close(timeout)
- case <-self.stop:
- return
- }
-
- select {
- case <-delivered:
- case <-time.After(hardRequestTimeout):
- peer.Log().Debug("Request timed out hard")
- go self.removePeer(peer.id)
- case <-self.stop:
- return
- }
- if self.serverPool != nil {
- self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), true)
- }
-}
-
-// networkRequest sends a request to known peers until an answer is received
-// or the context is cancelled
-func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) error {
- answered := make(chan struct{})
- req := &sentReq{
- valFunc: lreq.Validate,
- sentTo: make(map[*peer]chan struct{}),
- answered: answered, // reply delivered by any peer
- }
-
- exclude := make(map[*peer]struct{})
-
- reqWg := new(sync.WaitGroup)
- reqWg.Add(1)
- defer reqWg.Done()
+// Retrieve tries to fetch an object from the LES network.
+// If the network retrieval was successful, it stores the object in local db.
+func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) {
+ lreq := LesRequest(req)
- var timeout chan struct{}
- reqID := getNextReqID()
+ reqID := genReqID()
rq := &distReq{
getCost: func(dp distPeer) uint64 {
return lreq.GetCost(dp.(*peer))
},
canSend: func(dp distPeer) bool {
p := dp.(*peer)
- _, ok := exclude[p]
- return !ok && lreq.CanSend(p)
+ return lreq.CanSend(p)
},
request: func(dp distPeer) func() {
p := dp.(*peer)
- exclude[p] = struct{}{}
- delivered := make(chan struct{})
- timeout = make(chan struct{})
- req.lock.Lock()
- req.sentTo[p] = delivered
- req.lock.Unlock()
- reqWg.Add(1)
cost := lreq.GetCost(p)
p.fcServer.QueueRequest(reqID, cost)
- go self.requestPeer(req, p, delivered, timeout, reqWg)
return func() { lreq.Request(reqID, p) }
},
}
- self.mlock.Lock()
- self.sentReqs[reqID] = req
- self.mlock.Unlock()
-
- go func() {
- reqWg.Wait()
- self.mlock.Lock()
- delete(self.sentReqs, reqID)
- self.mlock.Unlock()
- }()
-
- for {
- peerChn := self.reqDist.queue(rq)
- select {
- case <-ctx.Done():
- self.reqDist.cancel(rq)
- return ctx.Err()
- case <-answered:
- self.reqDist.cancel(rq)
- return nil
- case _, ok := <-peerChn:
- if !ok {
- return ErrNoPeers
- }
- }
-
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-answered:
- return nil
- case <-timeout:
- }
- }
-}
-
-// Retrieve tries to fetch an object from the LES network.
-// If the network retrieval was successful, it stores the object in local db.
-func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) {
- lreq := LesRequest(req)
- err = self.networkRequest(ctx, lreq)
- if err == nil {
+ if err = self.retriever.retrieve(ctx, reqID, rq, func(p distPeer, msg *Msg) error { return lreq.Validate(self.db, msg) }); err == nil {
// retrieved from network, store in db
req.StoreResult(self.db)
} else {
@@ -251,9 +92,3 @@ func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err err
}
return
}
-
-func getNextReqID() uint64 {
- var rnd [8]byte
- rand.Read(rnd[:])
- return binary.BigEndian.Uint64(rnd[:])
-}
diff --git a/les/odr_test.go b/les/odr_test.go
index 532de4d80..7b34996ce 100644
--- a/les/odr_test.go
+++ b/les/odr_test.go
@@ -158,15 +158,15 @@ func odrContractCall(ctx context.Context, db ethdb.Database, config *params.Chai
func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
// Assemble the test environment
- pm, db, odr := newTestProtocolManagerMust(t, false, 4, testChainGen)
- lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil)
+ peers := newPeerSet()
+ dist := newRequestDistributor(peers, make(chan struct{}))
+ rm := newRetrieveManager(peers, dist, nil)
+ db, _ := ethdb.NewMemDatabase()
+ ldb, _ := ethdb.NewMemDatabase()
+ odr := NewLesOdr(ldb, rm)
+ pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
+ lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb)
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
- pool := &testServerPool{}
- lpm.reqDist = newRequestDistributor(pool.getAllPeers, lpm.quitSync)
- odr.reqDist = lpm.reqDist
- pool.setPeer(lpeer)
- odr.serverPool = pool
- lpeer.hasBlock = func(common.Hash, uint64) bool { return true }
select {
case <-time.After(time.Millisecond * 100):
case err := <-err1:
@@ -198,13 +198,19 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
}
// temporarily remove peer to test odr fails
- pool.setPeer(nil)
// expect retrievals to fail (except genesis block) without a les peer
+ peers.Unregister(lpeer.id)
+ time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
test(expFail)
- pool.setPeer(lpeer)
// expect all retrievals to pass
+ peers.Register(lpeer)
+ time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
+ lpeer.lock.Lock()
+ lpeer.hasBlock = func(common.Hash, uint64) bool { return true }
+ lpeer.lock.Unlock()
test(5)
- pool.setPeer(nil)
// still expect all retrievals to pass, now data should be cached locally
+ peers.Unregister(lpeer.id)
+ time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
test(5)
}
diff --git a/les/peer.go b/les/peer.go
index ab55bafe3..791d0da24 100644
--- a/les/peer.go
+++ b/les/peer.go
@@ -166,9 +166,9 @@ func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 {
// HasBlock checks if the peer has a given block
func (p *peer) HasBlock(hash common.Hash, number uint64) bool {
p.lock.RLock()
- hashBlock := p.hasBlock
+ hasBlock := p.hasBlock
p.lock.RUnlock()
- return hashBlock != nil && hashBlock(hash, number)
+ return hasBlock != nil && hasBlock(hash, number)
}
// SendAnnounce announces the availability of a number of blocks through
@@ -433,12 +433,20 @@ func (p *peer) String() string {
)
}
+// peerSetNotify is a callback interface to notify services about added or
+// removed peers
+type peerSetNotify interface {
+ registerPeer(*peer)
+ unregisterPeer(*peer)
+}
+
// peerSet represents the collection of active peers currently participating in
// the Light Ethereum sub-protocol.
type peerSet struct {
- peers map[string]*peer
- lock sync.RWMutex
- closed bool
+ peers map[string]*peer
+ lock sync.RWMutex
+ notifyList []peerSetNotify
+ closed bool
}
// newPeerSet creates a new peer set to track the active participants.
@@ -448,6 +456,17 @@ func newPeerSet() *peerSet {
}
}
+// notify adds a service to be notified about added or removed peers
+func (ps *peerSet) notify(n peerSetNotify) {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ ps.notifyList = append(ps.notifyList, n)
+ for _, p := range ps.peers {
+ go n.registerPeer(p)
+ }
+}
+
// Register injects a new peer into the working set, or returns an error if the
// peer is already known.
func (ps *peerSet) Register(p *peer) error {
@@ -462,11 +481,14 @@ func (ps *peerSet) Register(p *peer) error {
}
ps.peers[p.id] = p
p.sendQueue = newExecQueue(100)
+ for _, n := range ps.notifyList {
+ go n.registerPeer(p)
+ }
return nil
}
// Unregister removes a remote peer from the active set, disabling any further
-// actions to/from that particular entity.
+// actions to/from that particular entity. It also initiates disconnection at the networking layer.
func (ps *peerSet) Unregister(id string) error {
ps.lock.Lock()
defer ps.lock.Unlock()
@@ -474,7 +496,11 @@ func (ps *peerSet) Unregister(id string) error {
if p, ok := ps.peers[id]; !ok {
return errNotRegistered
} else {
+ for _, n := range ps.notifyList {
+ go n.unregisterPeer(p)
+ }
p.sendQueue.quit()
+ p.Peer.Disconnect(p2p.DiscUselessPeer)
}
delete(ps.peers, id)
return nil
diff --git a/les/request_test.go b/les/request_test.go
index ba1fc15bd..3add5f20d 100644
--- a/les/request_test.go
+++ b/les/request_test.go
@@ -68,15 +68,16 @@ func tfCodeAccess(db ethdb.Database, bhash common.Hash, number uint64) light.Odr
func testAccess(t *testing.T, protocol int, fn accessTestFn) {
// Assemble the test environment
- pm, db, _ := newTestProtocolManagerMust(t, false, 4, testChainGen)
- lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil)
+ peers := newPeerSet()
+ dist := newRequestDistributor(peers, make(chan struct{}))
+ rm := newRetrieveManager(peers, dist, nil)
+ db, _ := ethdb.NewMemDatabase()
+ ldb, _ := ethdb.NewMemDatabase()
+ odr := NewLesOdr(ldb, rm)
+
+ pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
+ lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb)
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
- pool := &testServerPool{}
- lpm.reqDist = newRequestDistributor(pool.getAllPeers, lpm.quitSync)
- odr.reqDist = lpm.reqDist
- pool.setPeer(lpeer)
- odr.serverPool = pool
- lpeer.hasBlock = func(common.Hash, uint64) bool { return true }
select {
case <-time.After(time.Millisecond * 100):
case err := <-err1:
@@ -108,10 +109,16 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
}
// temporarily remove peer to test odr fails
- pool.setPeer(nil)
+ peers.Unregister(lpeer.id)
+ time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
// expect retrievals to fail (except genesis block) without a les peer
test(0)
- pool.setPeer(lpeer)
+
+ peers.Register(lpeer)
+ time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
+ lpeer.lock.Lock()
+ lpeer.hasBlock = func(common.Hash, uint64) bool { return true }
+ lpeer.lock.Unlock()
// expect all retrievals to pass
test(5)
}
diff --git a/les/retrieve.go b/les/retrieve.go
new file mode 100644
index 000000000..b060e0b0d
--- /dev/null
+++ b/les/retrieve.go
@@ -0,0 +1,395 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package light implements on-demand retrieval capable state and chain objects
+// for the Ethereum Light Client.
+package les
+
+import (
+ "context"
+ "crypto/rand"
+ "encoding/binary"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/mclock"
+)
+
+var (
+ retryQueue = time.Millisecond * 100
+ softRequestTimeout = time.Millisecond * 500
+ hardRequestTimeout = time.Second * 10
+)
+
+// retrieveManager is a layer on top of requestDistributor which takes care of
+// matching replies by request ID and handles timeouts and resends if necessary.
+type retrieveManager struct {
+ dist *requestDistributor
+ peers *peerSet
+ serverPool peerSelector
+
+ lock sync.RWMutex
+ sentReqs map[uint64]*sentReq
+}
+
+// validatorFunc is a function that processes a reply message
+type validatorFunc func(distPeer, *Msg) error
+
+// peerSelector receives feedback info about response times and timeouts
+type peerSelector interface {
+ adjustResponseTime(*poolEntry, time.Duration, bool)
+}
+
+// sentReq represents a request sent and tracked by retrieveManager
+type sentReq struct {
+ rm *retrieveManager
+ req *distReq
+ id uint64
+ validate validatorFunc
+
+ eventsCh chan reqPeerEvent
+ stopCh chan struct{}
+ stopped bool
+ err error
+
+ lock sync.RWMutex // protect access to sentTo map
+ sentTo map[distPeer]sentReqToPeer
+
+ reqQueued bool // a request has been queued but not sent
+ reqSent bool // a request has been sent but not timed out
+ reqSrtoCount int // number of requests that reached soft (but not hard) timeout
+}
+
+// sentReqToPeer notifies the request-from-peer goroutine (tryRequest) about a response
+// delivered by the given peer. Only one delivery is allowed per request per peer,
+// after which delivered is set to true, the validity of the response is sent on the
+// valid channel and no more responses are accepted.
+type sentReqToPeer struct {
+ delivered bool
+ valid chan bool
+}
+
+// reqPeerEvent is sent by the request-from-peer goroutine (tryRequest) to the
+// request state machine (retrieveLoop) through the eventsCh channel.
+type reqPeerEvent struct {
+ event int
+ peer distPeer
+}
+
+const (
+ rpSent = iota // if peer == nil, not sent (no suitable peers)
+ rpSoftTimeout
+ rpHardTimeout
+ rpDeliveredValid
+ rpDeliveredInvalid
+)
+
+// newRetrieveManager creates the retrieve manager
+func newRetrieveManager(peers *peerSet, dist *requestDistributor, serverPool peerSelector) *retrieveManager {
+ return &retrieveManager{
+ peers: peers,
+ dist: dist,
+ serverPool: serverPool,
+ sentReqs: make(map[uint64]*sentReq),
+ }
+}
+
+// retrieve sends a request (to multiple peers if necessary) and waits for an answer
+// that is delivered through the deliver function and successfully validated by the
+// validator callback. It returns when a valid answer is delivered or the context is
+// cancelled.
+func (rm *retrieveManager) retrieve(ctx context.Context, reqID uint64, req *distReq, val validatorFunc) error {
+ sentReq := rm.sendReq(reqID, req, val)
+ select {
+ case <-sentReq.stopCh:
+ case <-ctx.Done():
+ sentReq.stop(ctx.Err())
+ }
+ return sentReq.getError()
+}
+
+// sendReq starts a process that keeps trying to retrieve a valid answer for a
+// request from any suitable peers until stopped or succeeded.
+func (rm *retrieveManager) sendReq(reqID uint64, req *distReq, val validatorFunc) *sentReq {
+ r := &sentReq{
+ rm: rm,
+ req: req,
+ id: reqID,
+ sentTo: make(map[distPeer]sentReqToPeer),
+ stopCh: make(chan struct{}),
+ eventsCh: make(chan reqPeerEvent, 10),
+ validate: val,
+ }
+
+ canSend := req.canSend
+ req.canSend = func(p distPeer) bool {
+ // add an extra check to canSend: the request has not been sent to the same peer before
+ r.lock.RLock()
+ _, sent := r.sentTo[p]
+ r.lock.RUnlock()
+ return !sent && canSend(p)
+ }
+
+ request := req.request
+ req.request = func(p distPeer) func() {
+ // before actually sending the request, put an entry into the sentTo map
+ r.lock.Lock()
+ r.sentTo[p] = sentReqToPeer{false, make(chan bool, 1)}
+ r.lock.Unlock()
+ return request(p)
+ }
+ rm.lock.Lock()
+ rm.sentReqs[reqID] = r
+ rm.lock.Unlock()
+
+ go r.retrieveLoop()
+ return r
+}
+
+// deliver is called by the LES protocol manager to deliver reply messages to waiting requests
+func (rm *retrieveManager) deliver(peer distPeer, msg *Msg) error {
+ rm.lock.RLock()
+ req, ok := rm.sentReqs[msg.ReqID]
+ rm.lock.RUnlock()
+
+ if ok {
+ return req.deliver(peer, msg)
+ }
+ return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
+}
+
+// reqStateFn represents a state of the retrieve loop state machine
+type reqStateFn func() reqStateFn
+
+// retrieveLoop is the retrieval state machine event loop
+func (r *sentReq) retrieveLoop() {
+ go r.tryRequest()
+ r.reqQueued = true
+ state := r.stateRequesting
+
+ for state != nil {
+ state = state()
+ }
+
+ r.rm.lock.Lock()
+ delete(r.rm.sentReqs, r.id)
+ r.rm.lock.Unlock()
+}
+
+// stateRequesting: a request has been queued or sent recently; when it reaches soft timeout,
+// a new request is sent to a new peer
+func (r *sentReq) stateRequesting() reqStateFn {
+ select {
+ case ev := <-r.eventsCh:
+ r.update(ev)
+ switch ev.event {
+ case rpSent:
+ if ev.peer == nil {
+ // request send failed, no more suitable peers
+ if r.waiting() {
+ // we are already waiting for sent requests which may succeed so keep waiting
+ return r.stateNoMorePeers
+ }
+ // nothing to wait for, no more peers to ask, return with error
+ r.stop(ErrNoPeers)
+ // no need to go to stopped state because waiting() already returned false
+ return nil
+ }
+ case rpSoftTimeout:
+ // last request timed out, try asking a new peer
+ go r.tryRequest()
+ r.reqQueued = true
+ return r.stateRequesting
+ case rpDeliveredValid:
+ r.stop(nil)
+ return r.stateStopped
+ }
+ return r.stateRequesting
+ case <-r.stopCh:
+ return r.stateStopped
+ }
+}
+
+// stateNoMorePeers: could not send more requests because no suitable peers are available.
+// Peers may become suitable for a certain request later or new peers may appear so we
+// keep trying.
+func (r *sentReq) stateNoMorePeers() reqStateFn {
+ select {
+ case <-time.After(retryQueue):
+ go r.tryRequest()
+ r.reqQueued = true
+ return r.stateRequesting
+ case ev := <-r.eventsCh:
+ r.update(ev)
+ if ev.event == rpDeliveredValid {
+ r.stop(nil)
+ return r.stateStopped
+ }
+ return r.stateNoMorePeers
+ case <-r.stopCh:
+ return r.stateStopped
+ }
+}
+
+// stateStopped: request succeeded or cancelled, just waiting for some peers
+// to either answer or time out hard
+func (r *sentReq) stateStopped() reqStateFn {
+ for r.waiting() {
+ r.update(<-r.eventsCh)
+ }
+ return nil
+}
+
+// update updates the queued/sent flags and timed out peers counter according to the event
+func (r *sentReq) update(ev reqPeerEvent) {
+ switch ev.event {
+ case rpSent:
+ r.reqQueued = false
+ if ev.peer != nil {
+ r.reqSent = true
+ }
+ case rpSoftTimeout:
+ r.reqSent = false
+ r.reqSrtoCount++
+ case rpHardTimeout, rpDeliveredValid, rpDeliveredInvalid:
+ r.reqSrtoCount--
+ }
+}
+
+// waiting returns true if the retrieval mechanism is waiting for an answer from
+// any peer
+func (r *sentReq) waiting() bool {
+ return r.reqQueued || r.reqSent || r.reqSrtoCount > 0
+}
+
+// tryRequest tries to send the request to a new peer and waits for it to either
+// succeed or time out if it has been sent. It also sends the appropriate reqPeerEvent
+// messages to the request's event channel.
+func (r *sentReq) tryRequest() {
+ sent := r.rm.dist.queue(r.req)
+ var p distPeer
+ select {
+ case p = <-sent:
+ case <-r.stopCh:
+ if r.rm.dist.cancel(r.req) {
+ p = nil
+ } else {
+ p = <-sent
+ }
+ }
+
+ r.eventsCh <- reqPeerEvent{rpSent, p}
+ if p == nil {
+ return
+ }
+
+ reqSent := mclock.Now()
+ srto, hrto := false, false
+
+ r.lock.RLock()
+ s, ok := r.sentTo[p]
+ r.lock.RUnlock()
+ if !ok {
+ panic(nil)
+ }
+
+ defer func() {
+ // send feedback to server pool and remove peer if hard timeout happened
+ pp, ok := p.(*peer)
+ if ok && r.rm.serverPool != nil {
+ respTime := time.Duration(mclock.Now() - reqSent)
+ r.rm.serverPool.adjustResponseTime(pp.poolEntry, respTime, srto)
+ }
+ if hrto {
+ pp.Log().Debug("Request timed out hard")
+ if r.rm.peers != nil {
+ r.rm.peers.Unregister(pp.id)
+ }
+ }
+
+ r.lock.Lock()
+ delete(r.sentTo, p)
+ r.lock.Unlock()
+ }()
+
+ select {
+ case ok := <-s.valid:
+ if ok {
+ r.eventsCh <- reqPeerEvent{rpDeliveredValid, p}
+ } else {
+ r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p}
+ }
+ return
+ case <-time.After(softRequestTimeout):
+ srto = true
+ r.eventsCh <- reqPeerEvent{rpSoftTimeout, p}
+ }
+
+ select {
+ case ok := <-s.valid:
+ if ok {
+ r.eventsCh <- reqPeerEvent{rpDeliveredValid, p}
+ } else {
+ r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p}
+ }
+ case <-time.After(hardRequestTimeout):
+ hrto = true
+ r.eventsCh <- reqPeerEvent{rpHardTimeout, p}
+ }
+}
+
+// deliver a reply belonging to this request
+func (r *sentReq) deliver(peer distPeer, msg *Msg) error {
+ r.lock.Lock()
+ defer r.lock.Unlock()
+
+ s, ok := r.sentTo[peer]
+ if !ok || s.delivered {
+ return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
+ }
+ valid := r.validate(peer, msg) == nil
+ r.sentTo[peer] = sentReqToPeer{true, s.valid}
+ s.valid <- valid
+ if !valid {
+ return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID)
+ }
+ return nil
+}
+
+// stop stops the retrieval process and sets an error code that will be returned
+// by getError
+func (r *sentReq) stop(err error) {
+ r.lock.Lock()
+ if !r.stopped {
+ r.stopped = true
+ r.err = err
+ close(r.stopCh)
+ }
+ r.lock.Unlock()
+}
+
+// getError returns any retrieval error (either internally generated or set by the
+// stop function) after stopCh has been closed
+func (r *sentReq) getError() error {
+ return r.err
+}
+
+// genReqID generates a new random request ID
+func genReqID() uint64 {
+ var rnd [8]byte
+ rand.Read(rnd[:])
+ return binary.BigEndian.Uint64(rnd[:])
+}
diff --git a/les/server.go b/les/server.go
index 22fe59b7a..2ff715ea8 100644
--- a/les/server.go
+++ b/les/server.go
@@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
@@ -41,17 +42,24 @@ type LesServer struct {
fcManager *flowcontrol.ClientManager // nil if our node is client only
fcCostStats *requestCostStats
defParams *flowcontrol.ServerParams
+ lesTopic discv5.Topic
+ quitSync chan struct{}
stopped bool
}
func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
- pm, err := NewProtocolManager(eth.BlockChain().Config(), false, config.NetworkId, eth.EventMux(), eth.Engine(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil)
+ quitSync := make(chan struct{})
+ pm, err := NewProtocolManager(eth.BlockChain().Config(), false, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, quitSync, new(sync.WaitGroup))
if err != nil {
return nil, err
}
pm.blockLoop()
- srv := &LesServer{protocolManager: pm}
+ srv := &LesServer{
+ protocolManager: pm,
+ quitSync: quitSync,
+ lesTopic: lesTopic(eth.BlockChain().Genesis().Hash()),
+ }
pm.server = srv
srv.defParams = &flowcontrol.ServerParams{
@@ -69,7 +77,14 @@ func (s *LesServer) Protocols() []p2p.Protocol {
// Start starts the LES server
func (s *LesServer) Start(srvr *p2p.Server) {
- s.protocolManager.Start(srvr)
+ s.protocolManager.Start()
+ go func() {
+ logger := log.New("topic", s.lesTopic)
+ logger.Info("Starting topic registration")
+ defer logger.Info("Terminated topic registration")
+
+ srvr.DiscV5.RegisterTopic(s.lesTopic, s.quitSync)
+ }()
}
// Stop stops the LES service
diff --git a/les/serverpool.go b/les/serverpool.go
index 64fe991c6..f4e4df2fb 100644
--- a/les/serverpool.go
+++ b/les/serverpool.go
@@ -102,6 +102,8 @@ type serverPool struct {
wg *sync.WaitGroup
connWg sync.WaitGroup
+ topic discv5.Topic
+
discSetPeriod chan time.Duration
discNodes chan *discv5.Node
discLookups chan bool
@@ -118,11 +120,9 @@ type serverPool struct {
}
// newServerPool creates a new serverPool instance
-func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic discv5.Topic, quit chan struct{}, wg *sync.WaitGroup) *serverPool {
+func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *serverPool {
pool := &serverPool{
db: db,
- dbKey: append(dbPrefix, []byte(topic)...),
- server: server,
quit: quit,
wg: wg,
entries: make(map[discover.NodeID]*poolEntry),
@@ -135,19 +135,25 @@ func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic
}
pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry)
pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry)
- wg.Add(1)
+ return pool
+}
+
+func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) {
+ pool.server = server
+ pool.topic = topic
+ pool.dbKey = append([]byte("serverPool/"), []byte(topic)...)
+ pool.wg.Add(1)
pool.loadNodes()
- pool.checkDial()
+ go pool.eventLoop()
+
+ pool.checkDial()
if pool.server.DiscV5 != nil {
pool.discSetPeriod = make(chan time.Duration, 1)
pool.discNodes = make(chan *discv5.Node, 100)
pool.discLookups = make(chan bool, 100)
- go pool.server.DiscV5.SearchTopic(topic, pool.discSetPeriod, pool.discNodes, pool.discLookups)
+ go pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, pool.discNodes, pool.discLookups)
}
-
- go pool.eventLoop()
- return pool
}
// connect should be called upon any incoming connection. If the connection has been
@@ -485,7 +491,7 @@ func (pool *serverPool) checkDial() {
// dial initiates a new connection
func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) {
- if entry.state != psNotConnected {
+ if pool.server == nil || entry.state != psNotConnected {
return
}
entry.state = psDialed
diff --git a/les/txrelay.go b/les/txrelay.go
index 1ca3467e4..7a02cc837 100644
--- a/les/txrelay.go
+++ b/les/txrelay.go
@@ -39,26 +39,28 @@ type LesTxRelay struct {
reqDist *requestDistributor
}
-func NewLesTxRelay() *LesTxRelay {
- return &LesTxRelay{
+func NewLesTxRelay(ps *peerSet, reqDist *requestDistributor) *LesTxRelay {
+ r := &LesTxRelay{
txSent: make(map[common.Hash]*ltrInfo),
txPending: make(map[common.Hash]struct{}),
+ ps: ps,
+ reqDist: reqDist,
}
+ ps.notify(r)
+ return r
}
-func (self *LesTxRelay) addPeer(p *peer) {
+func (self *LesTxRelay) registerPeer(p *peer) {
self.lock.Lock()
defer self.lock.Unlock()
- self.ps.Register(p)
self.peerList = self.ps.AllPeers()
}
-func (self *LesTxRelay) removePeer(id string) {
+func (self *LesTxRelay) unregisterPeer(p *peer) {
self.lock.Lock()
defer self.lock.Unlock()
- self.ps.Unregister(id)
self.peerList = self.ps.AllPeers()
}
@@ -112,7 +114,7 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) {
pp := p
ll := list
- reqID := getNextReqID()
+ reqID := genReqID()
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*peer)
diff --git a/light/txpool.go b/light/txpool.go
index 446195806..7276874b8 100644
--- a/light/txpool.go
+++ b/light/txpool.go
@@ -360,7 +360,7 @@ func (pool *TxPool) validateTx(ctx context.Context, tx *types.Transaction) error
currentState := pool.currentState()
if n, err := currentState.GetNonce(ctx, from); err == nil {
if n > tx.Nonce() {
- return core.ErrNonce
+ return core.ErrNonceTooLow
}
} else {
return err
diff --git a/node/service.go b/node/service.go
index 5e1eb0e64..55062a500 100644
--- a/node/service.go
+++ b/node/service.go
@@ -43,7 +43,11 @@ func (ctx *ServiceContext) OpenDatabase(name string, cache int, handles int) (et
if ctx.config.DataDir == "" {
return ethdb.NewMemDatabase()
}
- return ethdb.NewLDBDatabase(ctx.config.resolvePath(name), cache, handles)
+ db, err := ethdb.NewLDBDatabase(ctx.config.resolvePath(name), cache, handles)
+ if err != nil {
+ return nil, err
+ }
+ return db, nil
}
// ResolvePath resolves a user path into the data directory if that was relative
diff --git a/params/version.go b/params/version.go
index fbf49ead9..d54c69b71 100644
--- a/params/version.go
+++ b/params/version.go
@@ -23,7 +23,7 @@ import (
const (
VersionMajor = 1 // Major version component of the current release
VersionMinor = 6 // Minor version component of the current release
- VersionPatch = 6 // Patch version component of the current release
+ VersionPatch = 7 // Patch version component of the current release
VersionMeta = "unstable" // Version metadata to append to the version string
)
diff --git a/rpc/http.go b/rpc/http.go
index 6bab02ab6..4143e2a8d 100644
--- a/rpc/http.go
+++ b/rpc/http.go
@@ -103,8 +103,8 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr
if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil {
return err
}
- for _, respmsg := range respmsgs {
- op.resp <- &respmsg
+ for i := 0; i < len(respmsgs); i++ {
+ op.resp <- &respmsgs[i]
}
return nil
}
diff --git a/swarm/fuse/swarmfs_test.go b/swarm/fuse/swarmfs_test.go
index f307b38ea..69f3cc615 100644
--- a/swarm/fuse/swarmfs_test.go
+++ b/swarm/fuse/swarmfs_test.go
@@ -21,13 +21,14 @@ package fuse
import (
"bytes"
"crypto/rand"
- "github.com/ethereum/go-ethereum/swarm/api"
- "github.com/ethereum/go-ethereum/swarm/storage"
"io"
"io/ioutil"
"os"
"path/filepath"
"testing"
+
+ "github.com/ethereum/go-ethereum/swarm/api"
+ "github.com/ethereum/go-ethereum/swarm/storage"
)
type fileInfo struct {
@@ -37,26 +38,7 @@ type fileInfo struct {
contents []byte
}
-func testFuseFileSystem(t *testing.T, f func(*api.Api)) {
-
- datadir, err := ioutil.TempDir("", "fuse")
- if err != nil {
- t.Fatalf("unable to create temp dir: %v", err)
- }
- os.RemoveAll(datadir)
-
- dpa, err := storage.NewLocalDPA(datadir)
- if err != nil {
- return
- }
- api := api.NewApi(dpa, nil)
- dpa.Start()
- f(api)
- dpa.Stop()
-}
-
func createTestFilesAndUploadToSwarm(t *testing.T, api *api.Api, files map[string]fileInfo, uploadDir string) string {
-
os.RemoveAll(uploadDir)
for fname, finfo := range files {
@@ -89,8 +71,6 @@ func createTestFilesAndUploadToSwarm(t *testing.T, api *api.Api, files map[strin
}
func mountDir(t *testing.T, api *api.Api, files map[string]fileInfo, bzzHash string, mountDir string) *SwarmFS {
-
- // Test Mount
os.RemoveAll(mountDir)
os.MkdirAll(mountDir, 0777)
swarmfs := NewSwarmFS(api)
@@ -123,11 +103,9 @@ func mountDir(t *testing.T, api *api.Api, files map[string]fileInfo, bzzHash str
compareGeneratedFileWithFileInMount(t, files, mountDir)
return swarmfs
-
}
func compareGeneratedFileWithFileInMount(t *testing.T, files map[string]fileInfo, mountDir string) {
-
err := filepath.Walk(mountDir, func(path string, f os.FileInfo, err error) error {
if f.IsDir() {
return nil
@@ -143,7 +121,6 @@ func compareGeneratedFileWithFileInMount(t *testing.T, files map[string]fileInfo
}
for fname, finfo := range files {
-
destinationFile := filepath.Join(mountDir, fname)
dfinfo, err := os.Stat(destinationFile)
@@ -163,18 +140,15 @@ func compareGeneratedFileWithFileInMount(t *testing.T, files map[string]fileInfo
if err != nil {
t.Fatalf("Could not readfile %v : %v", fname, err)
}
-
if bytes.Compare(fileContents, finfo.contents) != 0 {
t.Fatalf("File %v contents mismatch: %v , %v", fname, fileContents, finfo.contents)
}
-
// TODO: check uid and gid
}
}
func checkFile(t *testing.T, testMountDir, fname string, contents []byte) {
-
destinationFile := filepath.Join(testMountDir, fname)
dfinfo, err1 := os.Stat(destinationFile)
if err1 != nil {
@@ -201,10 +175,9 @@ func getRandomBtes(size int) []byte {
contents := make([]byte, size)
rand.Read(contents)
return contents
-
}
-func IsDirEmpty(name string) bool {
+func isDirEmpty(name string) bool {
f, err := os.Open(name)
if err != nil {
return false
@@ -218,8 +191,11 @@ func IsDirEmpty(name string) bool {
return false
}
-func testMountListAndUnmount(api *api.Api, t *testing.T) {
+type testAPI struct {
+ api *api.Api
+}
+func (ta *testAPI) mountListAndUnmount(t *testing.T) {
files := make(map[string]fileInfo)
testUploadDir, _ := ioutil.TempDir(os.TempDir(), "fuse-source")
testMountDir, _ := ioutil.TempDir(os.TempDir(), "fuse-dest")
@@ -240,9 +216,9 @@ func testMountListAndUnmount(api *api.Api, t *testing.T) {
files["twice/2.txt"] = fileInfo{0777, 444, 333, getRandomBtes(200)}
files["one/two/three/four/five/six/seven/eight/nine/10.txt"] = fileInfo{0777, 333, 444, getRandomBtes(10240)}
files["one/two/three/four/five/six/six"] = fileInfo{0777, 333, 444, getRandomBtes(10)}
- bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir)
+ bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir)
- swarmfs := mountDir(t, api, files, bzzHash, testMountDir)
+ swarmfs := mountDir(t, ta.api, files, bzzHash, testMountDir)
defer swarmfs.Stop()
// Check unmount
@@ -250,53 +226,52 @@ func testMountListAndUnmount(api *api.Api, t *testing.T) {
if err != nil {
t.Fatalf("could not unmount %v", bzzHash)
}
- if !IsDirEmpty(testMountDir) {
+ if !isDirEmpty(testMountDir) {
t.Fatalf("unmount didnt work for %v", testMountDir)
}
}
-func testMaxMounts(api *api.Api, t *testing.T) {
-
+func (ta *testAPI) maxMounts(t *testing.T) {
files := make(map[string]fileInfo)
files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
uploadDir1, _ := ioutil.TempDir(os.TempDir(), "max-upload1")
- bzzHash1 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir1)
+ bzzHash1 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir1)
mount1, _ := ioutil.TempDir(os.TempDir(), "max-mount1")
- swarmfs1 := mountDir(t, api, files, bzzHash1, mount1)
+ swarmfs1 := mountDir(t, ta.api, files, bzzHash1, mount1)
defer swarmfs1.Stop()
files["2.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
uploadDir2, _ := ioutil.TempDir(os.TempDir(), "max-upload2")
- bzzHash2 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir2)
+ bzzHash2 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir2)
mount2, _ := ioutil.TempDir(os.TempDir(), "max-mount2")
- swarmfs2 := mountDir(t, api, files, bzzHash2, mount2)
+ swarmfs2 := mountDir(t, ta.api, files, bzzHash2, mount2)
defer swarmfs2.Stop()
files["3.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
uploadDir3, _ := ioutil.TempDir(os.TempDir(), "max-upload3")
- bzzHash3 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir3)
+ bzzHash3 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir3)
mount3, _ := ioutil.TempDir(os.TempDir(), "max-mount3")
- swarmfs3 := mountDir(t, api, files, bzzHash3, mount3)
+ swarmfs3 := mountDir(t, ta.api, files, bzzHash3, mount3)
defer swarmfs3.Stop()
files["4.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
uploadDir4, _ := ioutil.TempDir(os.TempDir(), "max-upload4")
- bzzHash4 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir4)
+ bzzHash4 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir4)
mount4, _ := ioutil.TempDir(os.TempDir(), "max-mount4")
- swarmfs4 := mountDir(t, api, files, bzzHash4, mount4)
+ swarmfs4 := mountDir(t, ta.api, files, bzzHash4, mount4)
defer swarmfs4.Stop()
files["5.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
uploadDir5, _ := ioutil.TempDir(os.TempDir(), "max-upload5")
- bzzHash5 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir5)
+ bzzHash5 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir5)
mount5, _ := ioutil.TempDir(os.TempDir(), "max-mount5")
- swarmfs5 := mountDir(t, api, files, bzzHash5, mount5)
+ swarmfs5 := mountDir(t, ta.api, files, bzzHash5, mount5)
defer swarmfs5.Stop()
files["6.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
uploadDir6, _ := ioutil.TempDir(os.TempDir(), "max-upload6")
- bzzHash6 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir6)
+ bzzHash6 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir6)
mount6, _ := ioutil.TempDir(os.TempDir(), "max-mount6")
os.RemoveAll(mount6)
@@ -308,18 +283,17 @@ func testMaxMounts(api *api.Api, t *testing.T) {
}
-func testReMounts(api *api.Api, t *testing.T) {
-
+func (ta *testAPI) remount(t *testing.T) {
files := make(map[string]fileInfo)
files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
uploadDir1, _ := ioutil.TempDir(os.TempDir(), "re-upload1")
- bzzHash1 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir1)
+ bzzHash1 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir1)
testMountDir1, _ := ioutil.TempDir(os.TempDir(), "re-mount1")
- swarmfs := mountDir(t, api, files, bzzHash1, testMountDir1)
+ swarmfs := mountDir(t, ta.api, files, bzzHash1, testMountDir1)
defer swarmfs.Stop()
uploadDir2, _ := ioutil.TempDir(os.TempDir(), "re-upload2")
- bzzHash2 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir2)
+ bzzHash2 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir2)
testMountDir2, _ := ioutil.TempDir(os.TempDir(), "re-mount2")
// try mounting the same hash second time
@@ -341,19 +315,17 @@ func testReMounts(api *api.Api, t *testing.T) {
if err == nil {
t.Fatalf("Error mounting hash %v", bzzHash2)
}
-
}
-func testUnmount(api *api.Api, t *testing.T) {
-
+func (ta *testAPI) unmount(t *testing.T) {
files := make(map[string]fileInfo)
uploadDir, _ := ioutil.TempDir(os.TempDir(), "ex-upload")
testMountDir, _ := ioutil.TempDir(os.TempDir(), "ex-mount")
files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
- bzzHash := createTestFilesAndUploadToSwarm(t, api, files, uploadDir)
+ bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir)
- swarmfs := mountDir(t, api, files, bzzHash, testMountDir)
+ swarmfs := mountDir(t, ta.api, files, bzzHash, testMountDir)
defer swarmfs.Stop()
swarmfs.Unmount(testMountDir)
@@ -364,19 +336,17 @@ func testUnmount(api *api.Api, t *testing.T) {
t.Fatalf("mount state not cleaned up in unmount case %v", testMountDir)
}
}
-
}
-func testUnmountWhenResourceBusy(api *api.Api, t *testing.T) {
-
+func (ta *testAPI) unmountWhenResourceBusy(t *testing.T) {
files := make(map[string]fileInfo)
testUploadDir, _ := ioutil.TempDir(os.TempDir(), "ex-upload")
testMountDir, _ := ioutil.TempDir(os.TempDir(), "ex-mount")
files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
- bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir)
+ bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir)
- swarmfs := mountDir(t, api, files, bzzHash, testMountDir)
+ swarmfs := mountDir(t, ta.api, files, bzzHash, testMountDir)
defer swarmfs.Stop()
actualPath := filepath.Join(testMountDir, "2.txt")
@@ -395,18 +365,17 @@ func testUnmountWhenResourceBusy(api *api.Api, t *testing.T) {
t.Fatalf("mount state not cleaned up in unmount case %v", testMountDir)
}
}
-
}
-func testSeekInMultiChunkFile(api *api.Api, t *testing.T) {
+func (ta *testAPI) seekInMultiChunkFile(t *testing.T) {
files := make(map[string]fileInfo)
testUploadDir, _ := ioutil.TempDir(os.TempDir(), "seek-upload")
testMountDir, _ := ioutil.TempDir(os.TempDir(), "seek-mount")
files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10240)}
- bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir)
+ bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir)
- swarmfs := mountDir(t, api, files, bzzHash, testMountDir)
+ swarmfs := mountDir(t, ta.api, files, bzzHash, testMountDir)
defer swarmfs.Stop()
// Create a new file seek the second chunk
@@ -423,11 +392,9 @@ func testSeekInMultiChunkFile(api *api.Api, t *testing.T) {
t.Fatalf("File seek contents mismatch")
}
d.Close()
-
}
-func testCreateNewFile(api *api.Api, t *testing.T) {
-
+func (ta *testAPI) createNewFile(t *testing.T) {
files := make(map[string]fileInfo)
testUploadDir, _ := ioutil.TempDir(os.TempDir(), "create-upload")
testMountDir, _ := ioutil.TempDir(os.TempDir(), "create-mount")
@@ -435,9 +402,9 @@ func testCreateNewFile(api *api.Api, t *testing.T) {
files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
files["five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
files["six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
- bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir)
+ bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir)
- swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir)
+ swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir)
defer swarmfs1.Stop()
// Create a new file in the root dir and check
@@ -458,23 +425,21 @@ func testCreateNewFile(api *api.Api, t *testing.T) {
// mount again and see if things are okay
files["2.txt"] = fileInfo{0700, 333, 444, contents}
- swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir)
+ swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir)
defer swarmfs2.Stop()
checkFile(t, testMountDir, "2.txt", contents)
-
}
-func testCreateNewFileInsideDirectory(api *api.Api, t *testing.T) {
-
+func (ta *testAPI) createNewFileInsideDirectory(t *testing.T) {
files := make(map[string]fileInfo)
testUploadDir, _ := ioutil.TempDir(os.TempDir(), "createinsidedir-upload")
testMountDir, _ := ioutil.TempDir(os.TempDir(), "createinsidedir-mount")
files["one/1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
- bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir)
+ bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir)
- swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir)
+ swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir)
defer swarmfs1.Stop()
// Create a new file inside a existing dir and check
@@ -496,23 +461,21 @@ func testCreateNewFileInsideDirectory(api *api.Api, t *testing.T) {
// mount again and see if things are okay
files["one/2.txt"] = fileInfo{0700, 333, 444, contents}
- swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir)
+ swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir)
defer swarmfs2.Stop()
checkFile(t, testMountDir, "one/2.txt", contents)
-
}
-func testCreateNewFileInsideNewDirectory(api *api.Api, t *testing.T) {
-
+func (ta *testAPI) createNewFileInsideNewDirectory(t *testing.T) {
files := make(map[string]fileInfo)
testUploadDir, _ := ioutil.TempDir(os.TempDir(), "createinsidenewdir-upload")
testMountDir, _ := ioutil.TempDir(os.TempDir(), "createinsidenewdir-mount")
files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
- bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir)
+ bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir)
- swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir)
+ swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir)
defer swarmfs1.Stop()
// Create a new file inside a existing dir and check
@@ -535,15 +498,13 @@ func testCreateNewFileInsideNewDirectory(api *api.Api, t *testing.T) {
// mount again and see if things are okay
files["one/2.txt"] = fileInfo{0700, 333, 444, contents}
- swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir)
+ swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir)
defer swarmfs2.Stop()
checkFile(t, testMountDir, "one/2.txt", contents)
-
}
-func testRemoveExistingFile(api *api.Api, t *testing.T) {
-
+func (ta *testAPI) removeExistingFile(t *testing.T) {
files := make(map[string]fileInfo)
testUploadDir, _ := ioutil.TempDir(os.TempDir(), "remove-upload")
testMountDir, _ := ioutil.TempDir(os.TempDir(), "remove-mount")
@@ -551,9 +512,9 @@ func testRemoveExistingFile(api *api.Api, t *testing.T) {
files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
files["five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
files["six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
- bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir)
+ bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir)
- swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir)
+ swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir)
defer swarmfs1.Stop()
// Remove a file in the root dir and check
@@ -567,13 +528,11 @@ func testRemoveExistingFile(api *api.Api, t *testing.T) {
// mount again and see if things are okay
delete(files, "five.txt")
- swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir)
+ swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir)
defer swarmfs2.Stop()
-
}
-func testRemoveExistingFileInsideADir(api *api.Api, t *testing.T) {
-
+func (ta *testAPI) removeExistingFileInsideDir(t *testing.T) {
files := make(map[string]fileInfo)
testUploadDir, _ := ioutil.TempDir(os.TempDir(), "remove-upload")
testMountDir, _ := ioutil.TempDir(os.TempDir(), "remove-mount")
@@ -581,9 +540,9 @@ func testRemoveExistingFileInsideADir(api *api.Api, t *testing.T) {
files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
files["one/five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
files["one/six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
- bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir)
+ bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir)
- swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir)
+ swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir)
defer swarmfs1.Stop()
// Remove a file in the root dir and check
@@ -597,12 +556,11 @@ func testRemoveExistingFileInsideADir(api *api.Api, t *testing.T) {
// mount again and see if things are okay
delete(files, "one/five.txt")
- swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir)
+ swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir)
defer swarmfs2.Stop()
-
}
-func testRemoveNewlyAddedFile(api *api.Api, t *testing.T) {
+func (ta *testAPI) removeNewlyAddedFile(t *testing.T) {
files := make(map[string]fileInfo)
testUploadDir, _ := ioutil.TempDir(os.TempDir(), "removenew-upload")
@@ -611,9 +569,9 @@ func testRemoveNewlyAddedFile(api *api.Api, t *testing.T) {
files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
files["five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
files["six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
- bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir)
+ bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir)
- swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir)
+ swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir)
defer swarmfs1.Stop()
// Adda a new file and remove it
@@ -639,17 +597,15 @@ func testRemoveNewlyAddedFile(api *api.Api, t *testing.T) {
}
// mount again and see if things are okay
- swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir)
+ swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir)
defer swarmfs2.Stop()
if bzzHash != mi.LatestManifest {
t.Fatalf("same contents different hash orig(%v): new(%v)", bzzHash, mi.LatestManifest)
}
-
}
-func testAddNewFileAndModifyContents(api *api.Api, t *testing.T) {
-
+func (ta *testAPI) addNewFileAndModifyContents(t *testing.T) {
files := make(map[string]fileInfo)
testUploadDir, _ := ioutil.TempDir(os.TempDir(), "modifyfile-upload")
testMountDir, _ := ioutil.TempDir(os.TempDir(), "modifyfile-mount")
@@ -657,9 +613,9 @@ func testAddNewFileAndModifyContents(api *api.Api, t *testing.T) {
files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
files["five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
files["six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
- bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir)
+ bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir)
- swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir)
+ swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir)
defer swarmfs1.Stop()
// Create a new file in the root dir and check
@@ -680,7 +636,7 @@ func testAddNewFileAndModifyContents(api *api.Api, t *testing.T) {
// mount again and see if things are okay
files["2.txt"] = fileInfo{0700, 333, 444, line1}
- swarmfs2 := mountDir(t, api, files, mi1.LatestManifest, testMountDir)
+ swarmfs2 := mountDir(t, ta.api, files, mi1.LatestManifest, testMountDir)
defer swarmfs2.Stop()
checkFile(t, testMountDir, "2.txt", line1)
@@ -691,7 +647,7 @@ func testAddNewFileAndModifyContents(api *api.Api, t *testing.T) {
}
// mount again and modify
- swarmfs3 := mountDir(t, api, files, mi2.LatestManifest, testMountDir)
+ swarmfs3 := mountDir(t, ta.api, files, mi2.LatestManifest, testMountDir)
defer swarmfs3.Stop()
fd, err4 := os.OpenFile(actualPath, os.O_RDWR|os.O_APPEND, os.FileMode(0665))
@@ -713,14 +669,13 @@ func testAddNewFileAndModifyContents(api *api.Api, t *testing.T) {
b := [][]byte{line1, line2}
line1and2 := bytes.Join(b, []byte(""))
files["2.txt"] = fileInfo{0700, 333, 444, line1and2}
- swarmfs4 := mountDir(t, api, files, mi3.LatestManifest, testMountDir)
+ swarmfs4 := mountDir(t, ta.api, files, mi3.LatestManifest, testMountDir)
defer swarmfs4.Stop()
checkFile(t, testMountDir, "2.txt", line1and2)
-
}
-func testRemoveEmptyDir(api *api.Api, t *testing.T) {
+func (ta *testAPI) removeEmptyDir(t *testing.T) {
files := make(map[string]fileInfo)
testUploadDir, _ := ioutil.TempDir(os.TempDir(), "rmdir-upload")
testMountDir, _ := ioutil.TempDir(os.TempDir(), "rmdir-mount")
@@ -728,9 +683,9 @@ func testRemoveEmptyDir(api *api.Api, t *testing.T) {
files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
files["five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
files["six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
- bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir)
+ bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir)
- swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir)
+ swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir)
defer swarmfs1.Stop()
os.MkdirAll(filepath.Join(testMountDir, "newdir"), 0777)
@@ -739,15 +694,12 @@ func testRemoveEmptyDir(api *api.Api, t *testing.T) {
if err3 != nil {
t.Fatalf("Could not unmount %v", err3)
}
-
if bzzHash != mi.LatestManifest {
t.Fatalf("same contents different hash orig(%v): new(%v)", bzzHash, mi.LatestManifest)
}
-
}
-func testRemoveDirWhichHasFiles(api *api.Api, t *testing.T) {
-
+func (ta *testAPI) removeDirWhichHasFiles(t *testing.T) {
files := make(map[string]fileInfo)
testUploadDir, _ := ioutil.TempDir(os.TempDir(), "rmdir-upload")
testMountDir, _ := ioutil.TempDir(os.TempDir(), "rmdir-mount")
@@ -755,9 +707,9 @@ func testRemoveDirWhichHasFiles(api *api.Api, t *testing.T) {
files["one/1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
files["two/five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
files["two/six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
- bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir)
+ bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir)
- swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir)
+ swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir)
defer swarmfs1.Stop()
dirPath := filepath.Join(testMountDir, "two")
@@ -772,13 +724,11 @@ func testRemoveDirWhichHasFiles(api *api.Api, t *testing.T) {
delete(files, "two/five.txt")
delete(files, "two/six.txt")
- swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir)
+ swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir)
defer swarmfs2.Stop()
-
}
-func testRemoveDirWhichHasSubDirs(api *api.Api, t *testing.T) {
-
+func (ta *testAPI) removeDirWhichHasSubDirs(t *testing.T) {
files := make(map[string]fileInfo)
testUploadDir, _ := ioutil.TempDir(os.TempDir(), "rmsubdir-upload")
testMountDir, _ := ioutil.TempDir(os.TempDir(), "rmsubdir-mount")
@@ -790,9 +740,9 @@ func testRemoveDirWhichHasSubDirs(api *api.Api, t *testing.T) {
files["two/four/6.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
files["two/four/six/7.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)}
- bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir)
+ bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir)
- swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir)
+ swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir)
defer swarmfs1.Stop()
dirPath := filepath.Join(testMountDir, "two")
@@ -810,13 +760,11 @@ func testRemoveDirWhichHasSubDirs(api *api.Api, t *testing.T) {
delete(files, "two/four/6.txt")
delete(files, "two/four/six/7.txt")
- swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir)
+ swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir)
defer swarmfs2.Stop()
-
}
-func testAppendFileContentsToEnd(api *api.Api, t *testing.T) {
-
+func (ta *testAPI) appendFileContentsToEnd(t *testing.T) {
files := make(map[string]fileInfo)
testUploadDir, _ := ioutil.TempDir(os.TempDir(), "appendlargefile-upload")
testMountDir, _ := ioutil.TempDir(os.TempDir(), "appendlargefile-mount")
@@ -824,9 +772,9 @@ func testAppendFileContentsToEnd(api *api.Api, t *testing.T) {
line1 := make([]byte, 10)
rand.Read(line1)
files["1.txt"] = fileInfo{0700, 333, 444, line1}
- bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir)
+ bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir)
- swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir)
+ swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir)
defer swarmfs1.Stop()
actualPath := filepath.Join(testMountDir, "1.txt")
@@ -849,49 +797,42 @@ func testAppendFileContentsToEnd(api *api.Api, t *testing.T) {
b := [][]byte{line1, line2}
line1and2 := bytes.Join(b, []byte(""))
files["1.txt"] = fileInfo{0700, 333, 444, line1and2}
- swarmfs2 := mountDir(t, api, files, mi1.LatestManifest, testMountDir)
+ swarmfs2 := mountDir(t, ta.api, files, mi1.LatestManifest, testMountDir)
defer swarmfs2.Stop()
checkFile(t, testMountDir, "1.txt", line1and2)
-
}
-func TestSwarmFileSystem(t *testing.T) {
- testFuseFileSystem(t, func(api *api.Api) {
-
- testMountListAndUnmount(api, t)
-
- testMaxMounts(api, t)
-
- testReMounts(api, t)
-
- testUnmount(api, t)
-
- testUnmountWhenResourceBusy(api, t)
-
- testSeekInMultiChunkFile(api, t)
-
- testCreateNewFile(api, t)
-
- testCreateNewFileInsideDirectory(api, t)
-
- testCreateNewFileInsideNewDirectory(api, t)
-
- testRemoveExistingFile(api, t)
-
- testRemoveExistingFileInsideADir(api, t)
-
- testRemoveNewlyAddedFile(api, t)
-
- testAddNewFileAndModifyContents(api, t)
-
- testRemoveEmptyDir(api, t)
-
- testRemoveDirWhichHasFiles(api, t)
-
- testRemoveDirWhichHasSubDirs(api, t)
-
- testAppendFileContentsToEnd(api, t)
+func TestFUSE(t *testing.T) {
+ datadir, err := ioutil.TempDir("", "fuse")
+ if err != nil {
+ t.Fatalf("unable to create temp dir: %v", err)
+ }
+ os.RemoveAll(datadir)
- })
+ dpa, err := storage.NewLocalDPA(datadir)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ta := &testAPI{api: api.NewApi(dpa, nil)}
+ dpa.Start()
+ defer dpa.Stop()
+
+ t.Run("mountListAndUmount", ta.mountListAndUnmount)
+ t.Run("maxMounts", ta.maxMounts)
+ t.Run("remount", ta.remount)
+ t.Run("unmount", ta.unmount)
+ t.Run("unmountWhenResourceBusy", ta.unmountWhenResourceBusy)
+ t.Run("seekInMultiChunkFile", ta.seekInMultiChunkFile)
+ t.Run("createNewFile", ta.createNewFile)
+ t.Run("createNewFileInsideDirectory", ta.createNewFileInsideDirectory)
+ t.Run("createNewFileInsideNewDirectory", ta.createNewFileInsideNewDirectory)
+ t.Run("removeExistingFile", ta.removeExistingFile)
+ t.Run("removeExistingFileInsideDir", ta.removeExistingFileInsideDir)
+ t.Run("removeNewlyAddedFile", ta.removeNewlyAddedFile)
+ t.Run("addNewFileAndModifyContents", ta.addNewFileAndModifyContents)
+ t.Run("removeEmptyDir", ta.removeEmptyDir)
+ t.Run("removeDirWhichHasFiles", ta.removeDirWhichHasFiles)
+ t.Run("removeDirWhichHasSubDirs", ta.removeDirWhichHasSubDirs)
+ t.Run("appendFileContentsToEnd", ta.appendFileContentsToEnd)
}
diff --git a/swarm/fuse/swarmfs_unix.go b/swarm/fuse/swarmfs_unix.go
index f4eecef24..1a8390a4b 100644
--- a/swarm/fuse/swarmfs_unix.go
+++ b/swarm/fuse/swarmfs_unix.go
@@ -19,18 +19,19 @@
package fuse
import (
- "bazil.org/fuse"
- "bazil.org/fuse/fs"
"errors"
"fmt"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/swarm/api"
"os"
"path/filepath"
"strings"
"sync"
"time"
+
+ "bazil.org/fuse"
+ "bazil.org/fuse/fs"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/swarm/api"
)
var (
@@ -203,7 +204,7 @@ func (self *SwarmFS) Unmount(mountpoint string) (*MountInfo, error) {
}
err = fuse.Unmount(cleanedMountPoint)
if err != nil {
- err1 := externalUnMount(cleanedMountPoint)
+ err1 := externalUnmount(cleanedMountPoint)
if err1 != nil {
errStr := fmt.Sprintf("UnMount error: %v", err)
log.Warn(errStr)
diff --git a/swarm/fuse/swarmfs_util.go b/swarm/fuse/swarmfs_util.go
index d20ab258e..d39966c0e 100644
--- a/swarm/fuse/swarmfs_util.go
+++ b/swarm/fuse/swarmfs_util.go
@@ -19,47 +19,31 @@
package fuse
import (
+ "context"
"fmt"
- "github.com/ethereum/go-ethereum/log"
"os/exec"
"runtime"
- "time"
-)
-func externalUnMount(mountPoint string) error {
+ "github.com/ethereum/go-ethereum/log"
+)
- var cmd *exec.Cmd
+func externalUnmount(mountPoint string) error {
+ ctx, cancel := context.WithTimeout(context.Background(), unmountTimeout)
+ defer cancel()
+ // Try generic umount.
+ if err := exec.CommandContext(ctx, "umount", mountPoint).Run(); err == nil {
+ return nil
+ }
+ // Try FUSE-specific commands if umount didn't work.
switch runtime.GOOS {
-
case "darwin":
- cmd = exec.Command("/usr/bin/diskutil", "umount", "force", mountPoint)
-
+ return exec.CommandContext(ctx, "diskutil", "umount", "force", mountPoint).Run()
case "linux":
- cmd = exec.Command("fusermount", "-u", mountPoint)
-
+ return exec.CommandContext(ctx, "fusermount", "-u", mountPoint).Run()
default:
return fmt.Errorf("unmount: unimplemented")
}
-
- errc := make(chan error, 1)
- go func() {
- defer close(errc)
-
- if err := exec.Command("umount", mountPoint).Run(); err == nil {
- return
- }
- errc <- cmd.Run()
- }()
-
- select {
-
- case <-time.After(unmountTimeout):
- return fmt.Errorf("umount timeout")
-
- case err := <-errc:
- return err
- }
}
func addFileToSwarm(sf *SwarmFile, content []byte, size int) error {
diff --git a/swarm/storage/dbstore.go b/swarm/storage/dbstore.go
index 30925a919..31ff5b64e 100644
--- a/swarm/storage/dbstore.go
+++ b/swarm/storage/dbstore.go
@@ -399,7 +399,7 @@ func (s *DbStore) Get(key Key) (chunk *Chunk, err error) {
hash := hasher.Sum(nil)
if !bytes.Equal(hash, key) {
s.delete(index.Idx, getIndexKey(key))
- panic("Invalid Chunk in Database. Please repair with command: 'swarm cleandb'")
+ log.Warn("Invalid Chunk in Database. Please repair with command: 'swarm cleandb'")
}
chunk = &Chunk{
diff --git a/trie/errors.go b/trie/errors.go
index e23f9d563..567b80078 100644
--- a/trie/errors.go
+++ b/trie/errors.go
@@ -23,24 +23,13 @@ import (
)
// MissingNodeError is returned by the trie functions (TryGet, TryUpdate, TryDelete)
-// in the case where a trie node is not present in the local database. Contains
-// information necessary for retrieving the missing node through an ODR service.
-//
-// NodeHash is the hash of the missing node
-//
-// RootHash is the original root of the trie that contains the node
-//
-// PrefixLen is the nibble length of the key prefix that leads from the root to
-// the missing node
-//
-// SuffixLen is the nibble length of the remaining part of the key that hints on
-// which further nodes should also be retrieved (can be zero when there are no
-// such hints in the error message)
+// in the case where a trie node is not present in the local database. It contains
+// information necessary for retrieving the missing node.
type MissingNodeError struct {
- RootHash, NodeHash common.Hash
- PrefixLen, SuffixLen int
+ NodeHash common.Hash // hash of the missing node
+ Path []byte // hex-encoded path to the missing node
}
func (err *MissingNodeError) Error() string {
- return fmt.Sprintf("Missing trie node %064x", err.NodeHash)
+ return fmt.Sprintf("missing trie node %x (path %x)", err.NodeHash, err.Path)
}
diff --git a/trie/iterator.go b/trie/iterator.go
index 26ae1d5ad..76146c0d6 100644
--- a/trie/iterator.go
+++ b/trie/iterator.go
@@ -24,14 +24,13 @@ import (
"github.com/ethereum/go-ethereum/common"
)
-var iteratorEnd = errors.New("end of iteration")
-
// Iterator is a key-value trie iterator that traverses a Trie.
type Iterator struct {
nodeIt NodeIterator
Key []byte // Current data key on which the iterator is positioned on
Value []byte // Current data value on which the iterator is positioned on
+ Err error
}
// NewIterator creates a new key-value iterator from a node iterator
@@ -45,35 +44,42 @@ func NewIterator(it NodeIterator) *Iterator {
func (it *Iterator) Next() bool {
for it.nodeIt.Next(true) {
if it.nodeIt.Leaf() {
- it.Key = hexToKeybytes(it.nodeIt.Path())
+ it.Key = it.nodeIt.LeafKey()
it.Value = it.nodeIt.LeafBlob()
return true
}
}
it.Key = nil
it.Value = nil
+ it.Err = it.nodeIt.Error()
return false
}
// NodeIterator is an iterator to traverse the trie pre-order.
type NodeIterator interface {
- // Hash returns the hash of the current node
- Hash() common.Hash
- // Parent returns the hash of the parent of the current node
- Parent() common.Hash
- // Leaf returns true iff the current node is a leaf node.
- Leaf() bool
- // LeafBlob returns the contents of the node, if it is a leaf.
- // Callers must not retain references to the return value after calling Next()
- LeafBlob() []byte
- // Path returns the hex-encoded path to the current node.
- // Callers must not retain references to the return value after calling Next()
- Path() []byte
// Next moves the iterator to the next node. If the parameter is false, any child
// nodes will be skipped.
Next(bool) bool
// Error returns the error status of the iterator.
Error() error
+
+ // Hash returns the hash of the current node.
+ Hash() common.Hash
+ // Parent returns the hash of the parent of the current node. The hash may be the one
+ // grandparent if the immediate parent is an internal node with no hash.
+ Parent() common.Hash
+ // Path returns the hex-encoded path to the current node.
+ // Callers must not retain references to the return value after calling Next.
+ // For leaf nodes, the last element of the path is the 'terminator symbol' 0x10.
+ Path() []byte
+
+ // Leaf returns true iff the current node is a leaf node.
+ // LeafBlob, LeafKey return the contents and key of the leaf node. These
+ // method panic if the iterator is not positioned at a leaf.
+ // Callers must not retain references to their return value after calling Next
+ Leaf() bool
+ LeafBlob() []byte
+ LeafKey() []byte
}
// nodeIteratorState represents the iteration state at one particular node of the
@@ -89,8 +95,21 @@ type nodeIteratorState struct {
type nodeIterator struct {
trie *Trie // Trie being iterated
stack []*nodeIteratorState // Hierarchy of trie nodes persisting the iteration state
- err error // Failure set in case of an internal error in the iterator
path []byte // Path to the current node
+ err error // Failure set in case of an internal error in the iterator
+}
+
+// iteratorEnd is stored in nodeIterator.err when iteration is done.
+var iteratorEnd = errors.New("end of iteration")
+
+// seekError is stored in nodeIterator.err if the initial seek has failed.
+type seekError struct {
+ key []byte
+ err error
+}
+
+func (e seekError) Error() string {
+ return "seek error: " + e.err.Error()
}
func newNodeIterator(trie *Trie, start []byte) NodeIterator {
@@ -98,60 +117,57 @@ func newNodeIterator(trie *Trie, start []byte) NodeIterator {
return new(nodeIterator)
}
it := &nodeIterator{trie: trie}
- it.seek(start)
+ it.err = it.seek(start)
return it
}
-// Hash returns the hash of the current node
func (it *nodeIterator) Hash() common.Hash {
if len(it.stack) == 0 {
return common.Hash{}
}
-
return it.stack[len(it.stack)-1].hash
}
-// Parent returns the hash of the parent node
func (it *nodeIterator) Parent() common.Hash {
if len(it.stack) == 0 {
return common.Hash{}
}
-
return it.stack[len(it.stack)-1].parent
}
-// Leaf returns true if the current node is a leaf
func (it *nodeIterator) Leaf() bool {
- if len(it.stack) == 0 {
- return false
- }
-
- _, ok := it.stack[len(it.stack)-1].node.(valueNode)
- return ok
+ return hasTerm(it.path)
}
-// LeafBlob returns the data for the current node, if it is a leaf
func (it *nodeIterator) LeafBlob() []byte {
- if len(it.stack) == 0 {
- return nil
+ if len(it.stack) > 0 {
+ if node, ok := it.stack[len(it.stack)-1].node.(valueNode); ok {
+ return []byte(node)
+ }
}
+ panic("not at leaf")
+}
- if node, ok := it.stack[len(it.stack)-1].node.(valueNode); ok {
- return []byte(node)
+func (it *nodeIterator) LeafKey() []byte {
+ if len(it.stack) > 0 {
+ if _, ok := it.stack[len(it.stack)-1].node.(valueNode); ok {
+ return hexToKeybytes(it.path)
+ }
}
- return nil
+ panic("not at leaf")
}
-// Path returns the hex-encoded path to the current node
func (it *nodeIterator) Path() []byte {
return it.path
}
-// Error returns the error set in case of an internal error in the iterator
func (it *nodeIterator) Error() error {
if it.err == iteratorEnd {
return nil
}
+ if seek, ok := it.err.(seekError); ok {
+ return seek.err
+ }
return it.err
}
@@ -160,29 +176,37 @@ func (it *nodeIterator) Error() error {
// sets the Error field to the encountered failure. If `descend` is false,
// skips iterating over any subnodes of the current node.
func (it *nodeIterator) Next(descend bool) bool {
- if it.err != nil {
+ if it.err == iteratorEnd {
return false
}
- // Otherwise step forward with the iterator and report any errors
+ if seek, ok := it.err.(seekError); ok {
+ if it.err = it.seek(seek.key); it.err != nil {
+ return false
+ }
+ }
+ // Otherwise step forward with the iterator and report any errors.
state, parentIndex, path, err := it.peek(descend)
- if err != nil {
- it.err = err
+ it.err = err
+ if it.err != nil {
return false
}
it.push(state, parentIndex, path)
return true
}
-func (it *nodeIterator) seek(prefix []byte) {
+func (it *nodeIterator) seek(prefix []byte) error {
// The path we're looking for is the hex encoded key without terminator.
key := keybytesToHex(prefix)
key = key[:len(key)-1]
// Move forward until we're just before the closest match to key.
for {
state, parentIndex, path, err := it.peek(bytes.HasPrefix(key, it.path))
- if err != nil || bytes.Compare(path, key) >= 0 {
- it.err = err
- return
+ if err == iteratorEnd {
+ return iteratorEnd
+ } else if err != nil {
+ return seekError{prefix, err}
+ } else if bytes.Compare(path, key) >= 0 {
+ return nil
}
it.push(state, parentIndex, path)
}
@@ -197,7 +221,8 @@ func (it *nodeIterator) peek(descend bool) (*nodeIteratorState, *int, []byte, er
if root != emptyRoot {
state.hash = root
}
- return state, nil, nil, nil
+ err := state.resolve(it.trie, nil)
+ return state, nil, nil, err
}
if !descend {
// If we're skipping children, pop the current node first
@@ -205,72 +230,73 @@ func (it *nodeIterator) peek(descend bool) (*nodeIteratorState, *int, []byte, er
}
// Continue iteration to the next child
- for {
- if len(it.stack) == 0 {
- return nil, nil, nil, iteratorEnd
- }
+ for len(it.stack) > 0 {
parent := it.stack[len(it.stack)-1]
ancestor := parent.hash
if (ancestor == common.Hash{}) {
ancestor = parent.parent
}
- if node, ok := parent.node.(*fullNode); ok {
- // Full node, move to the first non-nil child.
- for i := parent.index + 1; i < len(node.Children); i++ {
- child := node.Children[i]
- if child != nil {
- hash, _ := child.cache()
- state := &nodeIteratorState{
- hash: common.BytesToHash(hash),
- node: child,
- parent: ancestor,
- index: -1,
- pathlen: len(it.path),
- }
- path := append(it.path, byte(i))
- parent.index = i - 1
- return state, &parent.index, path, nil
- }
+ state, path, ok := it.nextChild(parent, ancestor)
+ if ok {
+ if err := state.resolve(it.trie, path); err != nil {
+ return parent, &parent.index, path, err
}
- } else if node, ok := parent.node.(*shortNode); ok {
- // Short node, return the pointer singleton child
- if parent.index < 0 {
- hash, _ := node.Val.cache()
+ return state, &parent.index, path, nil
+ }
+ // No more child nodes, move back up.
+ it.pop()
+ }
+ return nil, nil, nil, iteratorEnd
+}
+
+func (st *nodeIteratorState) resolve(tr *Trie, path []byte) error {
+ if hash, ok := st.node.(hashNode); ok {
+ resolved, err := tr.resolveHash(hash, path)
+ if err != nil {
+ return err
+ }
+ st.node = resolved
+ st.hash = common.BytesToHash(hash)
+ }
+ return nil
+}
+
+func (it *nodeIterator) nextChild(parent *nodeIteratorState, ancestor common.Hash) (*nodeIteratorState, []byte, bool) {
+ switch node := parent.node.(type) {
+ case *fullNode:
+ // Full node, move to the first non-nil child.
+ for i := parent.index + 1; i < len(node.Children); i++ {
+ child := node.Children[i]
+ if child != nil {
+ hash, _ := child.cache()
state := &nodeIteratorState{
hash: common.BytesToHash(hash),
- node: node.Val,
+ node: child,
parent: ancestor,
index: -1,
pathlen: len(it.path),
}
- var path []byte
- if hasTerm(node.Key) {
- path = append(it.path, node.Key[:len(node.Key)-1]...)
- } else {
- path = append(it.path, node.Key...)
- }
- return state, &parent.index, path, nil
+ path := append(it.path, byte(i))
+ parent.index = i - 1
+ return state, path, true
}
- } else if hash, ok := parent.node.(hashNode); ok {
- // Hash node, resolve the hash child from the database
- if parent.index < 0 {
- node, err := it.trie.resolveHash(hash, nil, nil)
- if err != nil {
- return it.stack[len(it.stack)-1], &parent.index, it.path, err
- }
- state := &nodeIteratorState{
- hash: common.BytesToHash(hash),
- node: node,
- parent: ancestor,
- index: -1,
- pathlen: len(it.path),
- }
- return state, &parent.index, it.path, nil
+ }
+ case *shortNode:
+ // Short node, return the pointer singleton child
+ if parent.index < 0 {
+ hash, _ := node.Val.cache()
+ state := &nodeIteratorState{
+ hash: common.BytesToHash(hash),
+ node: node.Val,
+ parent: ancestor,
+ index: -1,
+ pathlen: len(it.path),
}
+ path := append(it.path, node.Key...)
+ return state, path, true
}
- // No more child nodes, move back up.
- it.pop()
}
+ return parent, it.path, false
}
func (it *nodeIterator) push(state *nodeIteratorState, parentIndex *int, path []byte) {
@@ -288,23 +314,21 @@ func (it *nodeIterator) pop() {
}
func compareNodes(a, b NodeIterator) int {
- cmp := bytes.Compare(a.Path(), b.Path())
- if cmp != 0 {
+ if cmp := bytes.Compare(a.Path(), b.Path()); cmp != 0 {
return cmp
}
-
if a.Leaf() && !b.Leaf() {
return -1
} else if b.Leaf() && !a.Leaf() {
return 1
}
-
- cmp = bytes.Compare(a.Hash().Bytes(), b.Hash().Bytes())
- if cmp != 0 {
+ if cmp := bytes.Compare(a.Hash().Bytes(), b.Hash().Bytes()); cmp != 0 {
return cmp
}
-
- return bytes.Compare(a.LeafBlob(), b.LeafBlob())
+ if a.Leaf() && b.Leaf() {
+ return bytes.Compare(a.LeafBlob(), b.LeafBlob())
+ }
+ return 0
}
type differenceIterator struct {
@@ -341,6 +365,10 @@ func (it *differenceIterator) LeafBlob() []byte {
return it.b.LeafBlob()
}
+func (it *differenceIterator) LeafKey() []byte {
+ return it.b.LeafKey()
+}
+
func (it *differenceIterator) Path() []byte {
return it.b.Path()
}
@@ -410,7 +438,6 @@ func (h *nodeIteratorHeap) Pop() interface{} {
type unionIterator struct {
items *nodeIteratorHeap // Nodes returned are the union of the ones in these iterators
count int // Number of nodes scanned across all tries
- err error // The error, if one has been encountered
}
// NewUnionIterator constructs a NodeIterator that iterates over elements in the union
@@ -421,9 +448,7 @@ func NewUnionIterator(iters []NodeIterator) (NodeIterator, *int) {
copy(h, iters)
heap.Init(&h)
- ui := &unionIterator{
- items: &h,
- }
+ ui := &unionIterator{items: &h}
return ui, &ui.count
}
@@ -443,6 +468,10 @@ func (it *unionIterator) LeafBlob() []byte {
return (*it.items)[0].LeafBlob()
}
+func (it *unionIterator) LeafKey() []byte {
+ return (*it.items)[0].LeafKey()
+}
+
func (it *unionIterator) Path() []byte {
return (*it.items)[0].Path()
}
diff --git a/trie/iterator_test.go b/trie/iterator_test.go
index f161fd99d..4808d8b0c 100644
--- a/trie/iterator_test.go
+++ b/trie/iterator_test.go
@@ -19,6 +19,7 @@ package trie
import (
"bytes"
"fmt"
+ "math/rand"
"testing"
"github.com/ethereum/go-ethereum/common"
@@ -239,8 +240,8 @@ func TestUnionIterator(t *testing.T) {
all := []struct{ k, v string }{
{"aardvark", "c"},
- {"barb", "bd"},
{"barb", "ba"},
+ {"barb", "bd"},
{"bard", "bc"},
{"bars", "bb"},
{"bars", "be"},
@@ -267,3 +268,107 @@ func TestUnionIterator(t *testing.T) {
t.Errorf("Iterator returned extra values.")
}
}
+
+func TestIteratorNoDups(t *testing.T) {
+ var tr Trie
+ for _, val := range testdata1 {
+ tr.Update([]byte(val.k), []byte(val.v))
+ }
+ checkIteratorNoDups(t, tr.NodeIterator(nil), nil)
+}
+
+// This test checks that nodeIterator.Next can be retried after inserting missing trie nodes.
+func TestIteratorContinueAfterError(t *testing.T) {
+ db, _ := ethdb.NewMemDatabase()
+ tr, _ := New(common.Hash{}, db)
+ for _, val := range testdata1 {
+ tr.Update([]byte(val.k), []byte(val.v))
+ }
+ tr.Commit()
+ wantNodeCount := checkIteratorNoDups(t, tr.NodeIterator(nil), nil)
+ keys := db.Keys()
+ t.Log("node count", wantNodeCount)
+
+ for i := 0; i < 20; i++ {
+ // Create trie that will load all nodes from DB.
+ tr, _ := New(tr.Hash(), db)
+
+ // Remove a random node from the database. It can't be the root node
+ // because that one is already loaded.
+ var rkey []byte
+ for {
+ if rkey = keys[rand.Intn(len(keys))]; !bytes.Equal(rkey, tr.Hash().Bytes()) {
+ break
+ }
+ }
+ rval, _ := db.Get(rkey)
+ db.Delete(rkey)
+
+ // Iterate until the error is hit.
+ seen := make(map[string]bool)
+ it := tr.NodeIterator(nil)
+ checkIteratorNoDups(t, it, seen)
+ missing, ok := it.Error().(*MissingNodeError)
+ if !ok || !bytes.Equal(missing.NodeHash[:], rkey) {
+ t.Fatal("didn't hit missing node, got", it.Error())
+ }
+
+ // Add the node back and continue iteration.
+ db.Put(rkey, rval)
+ checkIteratorNoDups(t, it, seen)
+ if it.Error() != nil {
+ t.Fatal("unexpected error", it.Error())
+ }
+ if len(seen) != wantNodeCount {
+ t.Fatal("wrong node iteration count, got", len(seen), "want", wantNodeCount)
+ }
+ }
+}
+
+// Similar to the test above, this one checks that failure to create nodeIterator at a
+// certain key prefix behaves correctly when Next is called. The expectation is that Next
+// should retry seeking before returning true for the first time.
+func TestIteratorContinueAfterSeekError(t *testing.T) {
+ // Commit test trie to db, then remove the node containing "bars".
+ db, _ := ethdb.NewMemDatabase()
+ ctr, _ := New(common.Hash{}, db)
+ for _, val := range testdata1 {
+ ctr.Update([]byte(val.k), []byte(val.v))
+ }
+ root, _ := ctr.Commit()
+ barNodeHash := common.HexToHash("05041990364eb72fcb1127652ce40d8bab765f2bfe53225b1170d276cc101c2e")
+ barNode, _ := db.Get(barNodeHash[:])
+ db.Delete(barNodeHash[:])
+
+ // Create a new iterator that seeks to "bars". Seeking can't proceed because
+ // the node is missing.
+ tr, _ := New(root, db)
+ it := tr.NodeIterator([]byte("bars"))
+ missing, ok := it.Error().(*MissingNodeError)
+ if !ok {
+ t.Fatal("want MissingNodeError, got", it.Error())
+ } else if missing.NodeHash != barNodeHash {
+ t.Fatal("wrong node missing")
+ }
+
+ // Reinsert the missing node.
+ db.Put(barNodeHash[:], barNode[:])
+
+ // Check that iteration produces the right set of values.
+ if err := checkIteratorOrder(testdata1[2:], NewIterator(it)); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func checkIteratorNoDups(t *testing.T, it NodeIterator, seen map[string]bool) int {
+ if seen == nil {
+ seen = make(map[string]bool)
+ }
+ for it.Next(true) {
+ if seen[string(it.Path())] {
+ t.Fatalf("iterator visited node path %x twice", it.Path())
+ }
+ seen[string(it.Path())] = true
+ }
+ return len(seen)
+}
diff --git a/trie/proof.go b/trie/proof.go
index fb7734b86..1f8f76b1b 100644
--- a/trie/proof.go
+++ b/trie/proof.go
@@ -58,7 +58,7 @@ func (t *Trie) Prove(key []byte) []rlp.RawValue {
nodes = append(nodes, n)
case hashNode:
var err error
- tn, err = t.resolveHash(n, nil, nil)
+ tn, err = t.resolveHash(n, nil)
if err != nil {
log.Error(fmt.Sprintf("Unhandled trie error: %v", err))
return nil
diff --git a/trie/sync.go b/trie/sync.go
index 168501392..9e8449431 100644
--- a/trie/sync.go
+++ b/trie/sync.go
@@ -28,6 +28,10 @@ import (
// node it did not request.
var ErrNotRequested = errors.New("not requested")
+// ErrAlreadyProcessed is returned by the trie sync when it's requested to process a
+// node it already processed previously.
+var ErrAlreadyProcessed = errors.New("already processed")
+
// request represents a scheduled or already in-flight state retrieval request.
type request struct {
hash common.Hash // Hash of the node data content to retrieve
@@ -48,6 +52,21 @@ type SyncResult struct {
Data []byte // Data content of the retrieved node
}
+// syncMemBatch is an in-memory buffer of successfully downloaded but not yet
+// persisted data items.
+type syncMemBatch struct {
+ batch map[common.Hash][]byte // In-memory membatch of recently ocmpleted items
+ order []common.Hash // Order of completion to prevent out-of-order data loss
+}
+
+// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
+func newSyncMemBatch() *syncMemBatch {
+ return &syncMemBatch{
+ batch: make(map[common.Hash][]byte),
+ order: make([]common.Hash, 0, 256),
+ }
+}
+
// TrieSyncLeafCallback is a callback type invoked when a trie sync reaches a
// leaf node. It's used by state syncing to check if the leaf node requires some
// further data syncing.
@@ -57,7 +76,8 @@ type TrieSyncLeafCallback func(leaf []byte, parent common.Hash) error
// unknown trie hashes to retrieve, accepts node data associated with said hashes
// and reconstructs the trie step by step until all is done.
type TrieSync struct {
- database DatabaseReader
+ database DatabaseReader // Persistent database to check for existing entries
+ membatch *syncMemBatch // Memory buffer to avoid frequest database writes
requests map[common.Hash]*request // Pending requests pertaining to a key hash
queue *prque.Prque // Priority queue with the pending requests
}
@@ -66,6 +86,7 @@ type TrieSync struct {
func NewTrieSync(root common.Hash, database DatabaseReader, callback TrieSyncLeafCallback) *TrieSync {
ts := &TrieSync{
database: database,
+ membatch: newSyncMemBatch(),
requests: make(map[common.Hash]*request),
queue: prque.New(),
}
@@ -79,6 +100,9 @@ func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, c
if root == emptyRoot {
return
}
+ if _, ok := s.membatch.batch[root]; ok {
+ return
+ }
key := root.Bytes()
blob, _ := s.database.Get(key)
if local, err := decodeNode(key, blob, 0); local != nil && err == nil {
@@ -111,6 +135,9 @@ func (s *TrieSync) AddRawEntry(hash common.Hash, depth int, parent common.Hash)
if hash == emptyState {
return
}
+ if _, ok := s.membatch.batch[hash]; ok {
+ return
+ }
if blob, _ := s.database.Get(hash.Bytes()); blob != nil {
return
}
@@ -144,7 +171,7 @@ func (s *TrieSync) Missing(max int) []common.Hash {
// Process injects a batch of retrieved trie nodes data, returning if something
// was committed to the database and also the index of an entry if processing of
// it failed.
-func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, error) {
+func (s *TrieSync) Process(results []SyncResult) (bool, int, error) {
committed := false
for i, item := range results {
@@ -153,10 +180,13 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int,
if request == nil {
return committed, i, ErrNotRequested
}
+ if request.data != nil {
+ return committed, i, ErrAlreadyProcessed
+ }
// If the item is a raw entry request, commit directly
if request.raw {
request.data = item.Data
- s.commit(request, dbw)
+ s.commit(request)
committed = true
continue
}
@@ -173,7 +203,7 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int,
return committed, i, err
}
if len(requests) == 0 && request.deps == 0 {
- s.commit(request, dbw)
+ s.commit(request)
committed = true
continue
}
@@ -185,6 +215,22 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int,
return committed, 0, nil
}
+// Commit flushes the data stored in the internal membatch out to persistent
+// storage, returning th enumber of items written and any occurred error.
+func (s *TrieSync) Commit(dbw DatabaseWriter) (int, error) {
+ // Dump the membatch into a database dbw
+ for i, key := range s.membatch.order {
+ if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil {
+ return i, err
+ }
+ }
+ written := len(s.membatch.order)
+
+ // Drop the membatch data and return
+ s.membatch = newSyncMemBatch()
+ return written, nil
+}
+
// Pending returns the number of state entries currently pending for download.
func (s *TrieSync) Pending() int {
return len(s.requests)
@@ -246,13 +292,17 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) {
// If the child references another node, resolve or schedule
if node, ok := (child.node).(hashNode); ok {
// Try to resolve the node from the local database
+ hash := common.BytesToHash(node)
+ if _, ok := s.membatch.batch[hash]; ok {
+ continue
+ }
blob, _ := s.database.Get(node)
if local, err := decodeNode(node[:], blob, 0); local != nil && err == nil {
continue
}
// Locally unknown node, schedule for retrieval
requests = append(requests, &request{
- hash: common.BytesToHash(node),
+ hash: hash,
parents: []*request{req},
depth: child.depth,
callback: req.callback,
@@ -262,21 +312,21 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) {
return requests, nil
}
-// commit finalizes a retrieval request and stores it into the database. If any
+// commit finalizes a retrieval request and stores it into the membatch. If any
// of the referencing parent requests complete due to this commit, they are also
// committed themselves.
-func (s *TrieSync) commit(req *request, dbw DatabaseWriter) (err error) {
- // Write the node content to disk
- if err := dbw.Put(req.hash[:], req.data); err != nil {
- return err
- }
+func (s *TrieSync) commit(req *request) (err error) {
+ // Write the node content to the membatch
+ s.membatch.batch[req.hash] = req.data
+ s.membatch.order = append(s.membatch.order, req.hash)
+
delete(s.requests, req.hash)
// Check all parents for completion
for _, parent := range req.parents {
parent.deps--
if parent.deps == 0 {
- if err := s.commit(parent, dbw); err != nil {
+ if err := s.commit(parent); err != nil {
return err
}
}
diff --git a/trie/sync_test.go b/trie/sync_test.go
index d778555b9..ec16a25bd 100644
--- a/trie/sync_test.go
+++ b/trie/sync_test.go
@@ -122,9 +122,12 @@ func testIterativeTrieSync(t *testing.T, batch int) {
}
results[i] = SyncResult{hash, data}
}
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
queue = append(queue[:0], sched.Missing(batch)...)
}
// Cross check that the two tries are in sync
@@ -152,9 +155,12 @@ func TestIterativeDelayedTrieSync(t *testing.T) {
}
results[i] = SyncResult{hash, data}
}
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
queue = append(queue[len(results):], sched.Missing(10000)...)
}
// Cross check that the two tries are in sync
@@ -190,9 +196,12 @@ func testIterativeRandomTrieSync(t *testing.T, batch int) {
results = append(results, SyncResult{hash, data})
}
// Feed the retrieved results back and queue new tasks
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
queue = make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) {
queue[hash] = struct{}{}
@@ -231,9 +240,12 @@ func TestIterativeRandomDelayedTrieSync(t *testing.T) {
}
}
// Feed the retrieved results back and queue new tasks
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
for _, result := range results {
delete(queue, result.Hash)
}
@@ -272,9 +284,12 @@ func TestDuplicateAvoidanceTrieSync(t *testing.T) {
results[i] = SyncResult{hash, data}
}
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
queue = append(queue[:0], sched.Missing(0)...)
}
// Cross check that the two tries are in sync
@@ -304,9 +319,12 @@ func TestIncompleteTrieSync(t *testing.T) {
results[i] = SyncResult{hash, data}
}
// Process each of the trie nodes
- if _, index, err := sched.Process(results, dstDb); err != nil {
+ if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
+ if index, err := sched.Commit(dstDb); err != nil {
+ t.Fatalf("failed to commit data #%d: %v", index, err)
+ }
for _, result := range results {
added = append(added, result.Hash)
}
diff --git a/trie/trie.go b/trie/trie.go
index cbe496574..a3151b1ce 100644
--- a/trie/trie.go
+++ b/trie/trie.go
@@ -116,7 +116,7 @@ func New(root common.Hash, db Database) (*Trie, error) {
if db == nil {
panic("trie.New: cannot use existing root without a database")
}
- rootnode, err := trie.resolveHash(root[:], nil, nil)
+ rootnode, err := trie.resolveHash(root[:], nil)
if err != nil {
return nil, err
}
@@ -180,7 +180,7 @@ func (t *Trie) tryGet(origNode node, key []byte, pos int) (value []byte, newnode
}
return value, n, didResolve, err
case hashNode:
- child, err := t.resolveHash(n, key[:pos], key[pos:])
+ child, err := t.resolveHash(n, key[:pos])
if err != nil {
return nil, n, true, err
}
@@ -283,7 +283,7 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error
// We've hit a part of the trie that isn't loaded yet. Load
// the node and insert into it. This leaves all child nodes on
// the path to the value in the trie.
- rn, err := t.resolveHash(n, prefix, key)
+ rn, err := t.resolveHash(n, prefix)
if err != nil {
return false, nil, err
}
@@ -388,7 +388,7 @@ func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) {
// shortNode{..., shortNode{...}}. Since the entry
// might not be loaded yet, resolve it just for this
// check.
- cnode, err := t.resolve(n.Children[pos], prefix, []byte{byte(pos)})
+ cnode, err := t.resolve(n.Children[pos], prefix)
if err != nil {
return false, nil, err
}
@@ -414,7 +414,7 @@ func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) {
// We've hit a part of the trie that isn't loaded yet. Load
// the node and delete from it. This leaves all child nodes on
// the path to the value in the trie.
- rn, err := t.resolveHash(n, prefix, key)
+ rn, err := t.resolveHash(n, prefix)
if err != nil {
return false, nil, err
}
@@ -436,24 +436,19 @@ func concat(s1 []byte, s2 ...byte) []byte {
return r
}
-func (t *Trie) resolve(n node, prefix, suffix []byte) (node, error) {
+func (t *Trie) resolve(n node, prefix []byte) (node, error) {
if n, ok := n.(hashNode); ok {
- return t.resolveHash(n, prefix, suffix)
+ return t.resolveHash(n, prefix)
}
return n, nil
}
-func (t *Trie) resolveHash(n hashNode, prefix, suffix []byte) (node, error) {
+func (t *Trie) resolveHash(n hashNode, prefix []byte) (node, error) {
cacheMissCounter.Inc(1)
enc, err := t.db.Get(n)
if err != nil || enc == nil {
- return nil, &MissingNodeError{
- RootHash: t.originalRoot,
- NodeHash: common.BytesToHash(n),
- PrefixLen: len(prefix),
- SuffixLen: len(suffix),
- }
+ return nil, &MissingNodeError{NodeHash: common.BytesToHash(n), Path: prefix}
}
dec := mustDecodeNode(n, enc, t.cachegen)
return dec, nil
diff --git a/trie/trie_test.go b/trie/trie_test.go
index 61adbba0c..1c9095070 100644
--- a/trie/trie_test.go
+++ b/trie/trie_test.go
@@ -19,6 +19,7 @@ package trie
import (
"bytes"
"encoding/binary"
+ "errors"
"fmt"
"io/ioutil"
"math/rand"
@@ -34,7 +35,7 @@ import (
func init() {
spew.Config.Indent = " "
- spew.Config.DisableMethods = true
+ spew.Config.DisableMethods = false
}
// Used for testing
@@ -357,6 +358,7 @@ type randTestStep struct {
op int
key []byte // for opUpdate, opDelete, opGet
value []byte // for opUpdate
+ err error // for debugging
}
const (
@@ -406,7 +408,7 @@ func runRandTest(rt randTest) bool {
tr, _ := New(common.Hash{}, db)
values := make(map[string]string) // tracks content of the trie
- for _, step := range rt {
+ for i, step := range rt {
switch step.op {
case opUpdate:
tr.Update(step.key, step.value)
@@ -418,23 +420,22 @@ func runRandTest(rt randTest) bool {
v := tr.Get(step.key)
want := values[string(step.key)]
if string(v) != want {
- fmt.Printf("mismatch for key 0x%x, got 0x%x want 0x%x", step.key, v, want)
- return false
+ rt[i].err = fmt.Errorf("mismatch for key 0x%x, got 0x%x want 0x%x", step.key, v, want)
}
case opCommit:
- if _, err := tr.Commit(); err != nil {
- panic(err)
- }
+ _, rt[i].err = tr.Commit()
case opHash:
tr.Hash()
case opReset:
hash, err := tr.Commit()
if err != nil {
- panic(err)
+ rt[i].err = err
+ return false
}
newtr, err := New(hash, db)
if err != nil {
- panic(err)
+ rt[i].err = err
+ return false
}
tr = newtr
case opItercheckhash:
@@ -444,17 +445,20 @@ func runRandTest(rt randTest) bool {
checktr.Update(it.Key, it.Value)
}
if tr.Hash() != checktr.Hash() {
- fmt.Println("hashes not equal")
- return false
+ rt[i].err = fmt.Errorf("hash mismatch in opItercheckhash")
}
case opCheckCacheInvariant:
- return checkCacheInvariant(tr.root, nil, tr.cachegen, false, 0)
+ rt[i].err = checkCacheInvariant(tr.root, nil, tr.cachegen, false, 0)
+ }
+ // Abort the test on error.
+ if rt[i].err != nil {
+ return false
}
}
return true
}
-func checkCacheInvariant(n, parent node, parentCachegen uint16, parentDirty bool, depth int) bool {
+func checkCacheInvariant(n, parent node, parentCachegen uint16, parentDirty bool, depth int) error {
var children []node
var flag nodeFlag
switch n := n.(type) {
@@ -465,33 +469,34 @@ func checkCacheInvariant(n, parent node, parentCachegen uint16, parentDirty bool
flag = n.flags
children = n.Children[:]
default:
- return true
+ return nil
}
- showerror := func() {
- fmt.Printf("at depth %d node %s", depth, spew.Sdump(n))
- fmt.Printf("parent: %s", spew.Sdump(parent))
+ errorf := func(format string, args ...interface{}) error {
+ msg := fmt.Sprintf(format, args...)
+ msg += fmt.Sprintf("\nat depth %d node %s", depth, spew.Sdump(n))
+ msg += fmt.Sprintf("parent: %s", spew.Sdump(parent))
+ return errors.New(msg)
}
if flag.gen > parentCachegen {
- fmt.Printf("cache invariant violation: %d > %d\n", flag.gen, parentCachegen)
- showerror()
- return false
+ return errorf("cache invariant violation: %d > %d\n", flag.gen, parentCachegen)
}
if depth > 0 && !parentDirty && flag.dirty {
- fmt.Printf("cache invariant violation: child is dirty but parent isn't\n")
- showerror()
- return false
+ return errorf("cache invariant violation: %d > %d\n", flag.gen, parentCachegen)
}
for _, child := range children {
- if !checkCacheInvariant(child, n, flag.gen, flag.dirty, depth+1) {
- return false
+ if err := checkCacheInvariant(child, n, flag.gen, flag.dirty, depth+1); err != nil {
+ return err
}
}
- return true
+ return nil
}
func TestRandom(t *testing.T) {
if err := quick.Check(runRandTest, nil); err != nil {
+ if cerr, ok := err.(*quick.CheckError); ok {
+ t.Fatalf("random test iteration %d failed: %s", cerr.Count, spew.Sdump(cerr.In))
+ }
t.Fatal(err)
}
}
diff --git a/vendor/github.com/docker/docker/LICENSE b/vendor/github.com/docker/docker/LICENSE
new file mode 100644
index 000000000..9c8e20ab8
--- /dev/null
+++ b/vendor/github.com/docker/docker/LICENSE
@@ -0,0 +1,191 @@
+
+ Apache License
+ Version 2.0, January 2004
+ https://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ Copyright 2013-2017 Docker, Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/vendor/github.com/docker/docker/NOTICE b/vendor/github.com/docker/docker/NOTICE
new file mode 100644
index 000000000..0c74e15b0
--- /dev/null
+++ b/vendor/github.com/docker/docker/NOTICE
@@ -0,0 +1,19 @@
+Docker
+Copyright 2012-2017 Docker, Inc.
+
+This product includes software developed at Docker, Inc. (https://www.docker.com).
+
+This product contains software (https://github.com/kr/pty) developed
+by Keith Rarick, licensed under the MIT License.
+
+The following is courtesy of our legal counsel:
+
+
+Use and transfer of Docker may be subject to certain restrictions by the
+United States and other governments.
+It is your responsibility to ensure that your use and/or transfer does not
+violate applicable laws.
+
+For more information, please see https://www.bis.doc.gov
+
+See also https://www.apache.org/dev/crypto.html and/or seek legal counsel.
diff --git a/vendor/github.com/docker/docker/pkg/reexec/README.md b/vendor/github.com/docker/docker/pkg/reexec/README.md
new file mode 100644
index 000000000..6658f69b6
--- /dev/null
+++ b/vendor/github.com/docker/docker/pkg/reexec/README.md
@@ -0,0 +1,5 @@
+# reexec
+
+The `reexec` package facilitates the busybox style reexec of the docker binary that we require because
+of the forking limitations of using Go. Handlers can be registered with a name and the argv 0 of
+the exec of the binary will be used to find and execute custom init paths.
diff --git a/vendor/github.com/docker/docker/pkg/reexec/command_linux.go b/vendor/github.com/docker/docker/pkg/reexec/command_linux.go
new file mode 100644
index 000000000..34ae2a9dc
--- /dev/null
+++ b/vendor/github.com/docker/docker/pkg/reexec/command_linux.go
@@ -0,0 +1,28 @@
+// +build linux
+
+package reexec
+
+import (
+ "os/exec"
+ "syscall"
+)
+
+// Self returns the path to the current process's binary.
+// Returns "/proc/self/exe".
+func Self() string {
+ return "/proc/self/exe"
+}
+
+// Command returns *exec.Cmd which has Path as current binary. Also it setting
+// SysProcAttr.Pdeathsig to SIGTERM.
+// This will use the in-memory version (/proc/self/exe) of the current binary,
+// it is thus safe to delete or replace the on-disk binary (os.Args[0]).
+func Command(args ...string) *exec.Cmd {
+ return &exec.Cmd{
+ Path: Self(),
+ Args: args,
+ SysProcAttr: &syscall.SysProcAttr{
+ Pdeathsig: syscall.SIGTERM,
+ },
+ }
+}
diff --git a/vendor/github.com/docker/docker/pkg/reexec/command_unix.go b/vendor/github.com/docker/docker/pkg/reexec/command_unix.go
new file mode 100644
index 000000000..778a720e3
--- /dev/null
+++ b/vendor/github.com/docker/docker/pkg/reexec/command_unix.go
@@ -0,0 +1,23 @@
+// +build freebsd solaris darwin
+
+package reexec
+
+import (
+ "os/exec"
+)
+
+// Self returns the path to the current process's binary.
+// Uses os.Args[0].
+func Self() string {
+ return naiveSelf()
+}
+
+// Command returns *exec.Cmd which has Path as current binary.
+// For example if current binary is "docker" at "/usr/bin/", then cmd.Path will
+// be set to "/usr/bin/docker".
+func Command(args ...string) *exec.Cmd {
+ return &exec.Cmd{
+ Path: Self(),
+ Args: args,
+ }
+}
diff --git a/vendor/github.com/docker/docker/pkg/reexec/command_unsupported.go b/vendor/github.com/docker/docker/pkg/reexec/command_unsupported.go
new file mode 100644
index 000000000..76edd8242
--- /dev/null
+++ b/vendor/github.com/docker/docker/pkg/reexec/command_unsupported.go
@@ -0,0 +1,12 @@
+// +build !linux,!windows,!freebsd,!solaris,!darwin
+
+package reexec
+
+import (
+ "os/exec"
+)
+
+// Command is unsupported on operating systems apart from Linux, Windows, Solaris and Darwin.
+func Command(args ...string) *exec.Cmd {
+ return nil
+}
diff --git a/vendor/github.com/docker/docker/pkg/reexec/command_windows.go b/vendor/github.com/docker/docker/pkg/reexec/command_windows.go
new file mode 100644
index 000000000..ca871c422
--- /dev/null
+++ b/vendor/github.com/docker/docker/pkg/reexec/command_windows.go
@@ -0,0 +1,23 @@
+// +build windows
+
+package reexec
+
+import (
+ "os/exec"
+)
+
+// Self returns the path to the current process's binary.
+// Uses os.Args[0].
+func Self() string {
+ return naiveSelf()
+}
+
+// Command returns *exec.Cmd which has Path as current binary.
+// For example if current binary is "docker.exe" at "C:\", then cmd.Path will
+// be set to "C:\docker.exe".
+func Command(args ...string) *exec.Cmd {
+ return &exec.Cmd{
+ Path: Self(),
+ Args: args,
+ }
+}
diff --git a/vendor/github.com/docker/docker/pkg/reexec/reexec.go b/vendor/github.com/docker/docker/pkg/reexec/reexec.go
new file mode 100644
index 000000000..c56671d91
--- /dev/null
+++ b/vendor/github.com/docker/docker/pkg/reexec/reexec.go
@@ -0,0 +1,47 @@
+package reexec
+
+import (
+ "fmt"
+ "os"
+ "os/exec"
+ "path/filepath"
+)
+
+var registeredInitializers = make(map[string]func())
+
+// Register adds an initialization func under the specified name
+func Register(name string, initializer func()) {
+ if _, exists := registeredInitializers[name]; exists {
+ panic(fmt.Sprintf("reexec func already registered under name %q", name))
+ }
+
+ registeredInitializers[name] = initializer
+}
+
+// Init is called as the first part of the exec process and returns true if an
+// initialization function was called.
+func Init() bool {
+ initializer, exists := registeredInitializers[os.Args[0]]
+ if exists {
+ initializer()
+
+ return true
+ }
+ return false
+}
+
+func naiveSelf() string {
+ name := os.Args[0]
+ if filepath.Base(name) == name {
+ if lp, err := exec.LookPath(name); err == nil {
+ return lp
+ }
+ }
+ // handle conversion of relative paths to absolute
+ if absName, err := filepath.Abs(name); err == nil {
+ return absName
+ }
+ // if we couldn't get absolute name, return original
+ // (NOTE: Go only errors on Abs() if os.Getwd fails)
+ return name
+}
diff --git a/vendor/vendor.json b/vendor/vendor.json
index 607f193a3..1c2c58500 100644
--- a/vendor/vendor.json
+++ b/vendor/vendor.json
@@ -75,6 +75,12 @@
"revisionTime": "2017-02-01T22:58:49Z"
},
{
+ "checksumSHA1": "lutCa+IVM60R1OYBm9RtDAW50Ys=",
+ "path": "github.com/docker/docker/pkg/reexec",
+ "revision": "83ee902ecc3790c33c1e2d87334074436056bb49",
+ "revisionTime": "2017-04-22T21:51:12Z"
+ },
+ {
"checksumSHA1": "zYnPsNAVm1/ViwCkN++dX2JQhBo=",
"path": "github.com/edsrzf/mmap-go",
"revision": "935e0e8a636ca4ba70b713f3e38a19e1b77739e8",