diff options
72 files changed, 3280 insertions, 1709 deletions
diff --git a/.travis.yml b/.travis.yml index 96e429959..703ed0cb1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -53,6 +53,7 @@ matrix: - debhelper - dput - gcc-multilib + - fakeroot script: # Build for the primary platforms that Trusty can manage - go run build/ci.go debsrc -signer "Go Ethereum Linux Builder <geth-ci@ethereum.org>" -upload ppa:ethereum/ethereum @@ -2,7 +2,7 @@ # with Go source code. If you know what GOPATH is then you probably # don't need to bother with make. -.PHONY: geth android ios geth-cross evm all test clean +.PHONY: geth android ios geth-cross swarm evm all test clean .PHONY: geth-linux geth-linux-386 geth-linux-amd64 geth-linux-mips64 geth-linux-mips64le .PHONY: geth-linux-arm geth-linux-arm-5 geth-linux-arm-6 geth-linux-arm-7 geth-linux-arm64 .PHONY: geth-darwin geth-darwin-386 geth-darwin-amd64 @@ -16,6 +16,11 @@ geth: @echo "Done building." @echo "Run \"$(GOBIN)/geth\" to launch geth." +swarm: + build/env.sh go run build/ci.go install ./cmd/swarm + @echo "Done building." + @echo "Run \"$(GOBIN)/swarm\" to launch swarm." + evm: build/env.sh go run build/ci.go install ./cmd/evm @echo "Done building." @@ -1,4 +1,4 @@ -## Ethereum Go +## Go Ethereum Official golang implementation of the Ethereum protocol. @@ -102,6 +102,22 @@ over between the main network and test network, you should make sure to always u for play-money and real-money. Unless you manually move accounts, Geth will by default correctly separate the two networks and will not make any accounts available between them.* +### Configuration + +As an alternative to passing the numerous flags to the `geth` binary, you can also pass a configuration file via: + +``` +$ geth --config /path/to/your_config.toml +``` + +To get an idea how the file should look like you can use the `dumpconfig` subcommand to export your existing configuration: + +``` +$ geth --your-favourite-flags dumpconfig +``` + +*Note: This works only with geth v1.6.0 and above* + #### Docker quick start One of the quickest ways to get Ethereum up and running on your machine is by using Docker: @@ -1 +1 @@ -1.6.6 +1.6.7 diff --git a/cmd/evm/json_logger.go b/cmd/evm/json_logger.go index a84d5daeb..d61981062 100644 --- a/cmd/evm/json_logger.go +++ b/cmd/evm/json_logger.go @@ -28,25 +28,32 @@ import ( type JSONLogger struct { encoder *json.Encoder + cfg *vm.LogConfig } -func NewJSONLogger(writer io.Writer) *JSONLogger { - return &JSONLogger{json.NewEncoder(writer)} +func NewJSONLogger(cfg *vm.LogConfig, writer io.Writer) *JSONLogger { + return &JSONLogger{json.NewEncoder(writer), cfg} } // CaptureState outputs state information on the logger. func (l *JSONLogger) CaptureState(env *vm.EVM, pc uint64, op vm.OpCode, gas, cost uint64, memory *vm.Memory, stack *vm.Stack, contract *vm.Contract, depth int, err error) error { - return l.encoder.Encode(vm.StructLog{ - Pc: pc, - Op: op, - Gas: gas + cost, - GasCost: cost, - Memory: memory.Data(), - Stack: stack.Data(), - Storage: nil, - Depth: depth, - Err: err, - }) + log := vm.StructLog{ + Pc: pc, + Op: op, + Gas: gas + cost, + GasCost: cost, + MemorySize: memory.Len(), + Storage: nil, + Depth: depth, + Err: err, + } + if !l.cfg.DisableMemory { + log.Memory = memory.Data() + } + if !l.cfg.DisableStack { + log.Stack = stack.Data() + } + return l.encoder.Encode(log) } // CaptureEnd is triggered at end of execution. diff --git a/cmd/evm/main.go b/cmd/evm/main.go index 48a1b92cb..1892ae3d3 100644 --- a/cmd/evm/main.go +++ b/cmd/evm/main.go @@ -102,6 +102,14 @@ var ( Name: "sender", Usage: "The transaction origin", } + DisableMemoryFlag = cli.BoolFlag{ + Name: "nomemory", + Usage: "disable memory output", + } + DisableStackFlag = cli.BoolFlag{ + Name: "nostack", + Usage: "disable stack output", + } ) func init() { @@ -123,6 +131,8 @@ func init() { GenesisFlag, MachineFlag, SenderFlag, + DisableMemoryFlag, + DisableStackFlag, } app.Commands = []cli.Command{ compileCommand, diff --git a/cmd/evm/runner.go b/cmd/evm/runner.go index b1fb8998f..2ce0920f6 100644 --- a/cmd/evm/runner.go +++ b/cmd/evm/runner.go @@ -73,6 +73,10 @@ func runCmd(ctx *cli.Context) error { glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.TerminalFormat(false))) glogger.Verbosity(log.Lvl(ctx.GlobalInt(VerbosityFlag.Name))) log.Root().SetHandler(glogger) + logconfig := &vm.LogConfig{ + DisableMemory: ctx.GlobalBool(DisableMemoryFlag.Name), + DisableStack: ctx.GlobalBool(DisableStackFlag.Name), + } var ( tracer vm.Tracer @@ -82,12 +86,12 @@ func runCmd(ctx *cli.Context) error { sender = common.StringToAddress("sender") ) if ctx.GlobalBool(MachineFlag.Name) { - tracer = NewJSONLogger(os.Stdout) + tracer = NewJSONLogger(logconfig, os.Stdout) } else if ctx.GlobalBool(DebugFlag.Name) { - debugLogger = vm.NewStructLogger(nil) + debugLogger = vm.NewStructLogger(logconfig) tracer = debugLogger } else { - debugLogger = vm.NewStructLogger(nil) + debugLogger = vm.NewStructLogger(logconfig) } if ctx.GlobalString(GenesisFlag.Name) != "" { gen := readGenesis(ctx.GlobalString(GenesisFlag.Name)) diff --git a/cmd/geth/accountcmd_test.go b/cmd/geth/accountcmd_test.go index 5f9f67677..e146323ee 100644 --- a/cmd/geth/accountcmd_test.go +++ b/cmd/geth/accountcmd_test.go @@ -44,21 +44,21 @@ func tmpDatadirWithKeystore(t *testing.T) string { func TestAccountListEmpty(t *testing.T) { geth := runGeth(t, "account", "list") - geth.expectExit() + geth.ExpectExit() } func TestAccountList(t *testing.T) { datadir := tmpDatadirWithKeystore(t) geth := runGeth(t, "account", "list", "--datadir", datadir) - defer geth.expectExit() + defer geth.ExpectExit() if runtime.GOOS == "windows" { - geth.expect(` + geth.Expect(` Account #0: {7ef5a6135f1fd6a02593eedc869c6d41d934aef8} keystore://{{.Datadir}}\keystore\UTC--2016-03-22T12-57-55.920751759Z--7ef5a6135f1fd6a02593eedc869c6d41d934aef8 Account #1: {f466859ead1932d743d622cb74fc058882e8648a} keystore://{{.Datadir}}\keystore\aaa Account #2: {289d485d9771714cce91d3393d764e1311907acc} keystore://{{.Datadir}}\keystore\zzz `) } else { - geth.expect(` + geth.Expect(` Account #0: {7ef5a6135f1fd6a02593eedc869c6d41d934aef8} keystore://{{.Datadir}}/keystore/UTC--2016-03-22T12-57-55.920751759Z--7ef5a6135f1fd6a02593eedc869c6d41d934aef8 Account #1: {f466859ead1932d743d622cb74fc058882e8648a} keystore://{{.Datadir}}/keystore/aaa Account #2: {289d485d9771714cce91d3393d764e1311907acc} keystore://{{.Datadir}}/keystore/zzz @@ -68,20 +68,20 @@ Account #2: {289d485d9771714cce91d3393d764e1311907acc} keystore://{{.Datadir}}/k func TestAccountNew(t *testing.T) { geth := runGeth(t, "account", "new", "--lightkdf") - defer geth.expectExit() - geth.expect(` + defer geth.ExpectExit() + geth.Expect(` Your new account is locked with a password. Please give a password. Do not forget this password. !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "foobar"}} Repeat passphrase: {{.InputLine "foobar"}} `) - geth.expectRegexp(`Address: \{[0-9a-f]{40}\}\n`) + geth.ExpectRegexp(`Address: \{[0-9a-f]{40}\}\n`) } func TestAccountNewBadRepeat(t *testing.T) { geth := runGeth(t, "account", "new", "--lightkdf") - defer geth.expectExit() - geth.expect(` + defer geth.ExpectExit() + geth.Expect(` Your new account is locked with a password. Please give a password. Do not forget this password. !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "something"}} @@ -95,8 +95,8 @@ func TestAccountUpdate(t *testing.T) { geth := runGeth(t, "account", "update", "--datadir", datadir, "--lightkdf", "f466859ead1932d743d622cb74fc058882e8648a") - defer geth.expectExit() - geth.expect(` + defer geth.ExpectExit() + geth.Expect(` Unlocking account f466859ead1932d743d622cb74fc058882e8648a | Attempt 1/3 !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "foobar"}} @@ -108,8 +108,8 @@ Repeat passphrase: {{.InputLine "foobar2"}} func TestWalletImport(t *testing.T) { geth := runGeth(t, "wallet", "import", "--lightkdf", "testdata/guswallet.json") - defer geth.expectExit() - geth.expect(` + defer geth.ExpectExit() + geth.Expect(` !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "foo"}} Address: {d4584b5f6229b7be90727b0fc8c6b91bb427821f} @@ -123,8 +123,8 @@ Address: {d4584b5f6229b7be90727b0fc8c6b91bb427821f} func TestWalletImportBadPassword(t *testing.T) { geth := runGeth(t, "wallet", "import", "--lightkdf", "testdata/guswallet.json") - defer geth.expectExit() - geth.expect(` + defer geth.ExpectExit() + geth.Expect(` !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "wrong"}} Fatal: could not decrypt key with given passphrase @@ -137,19 +137,19 @@ func TestUnlockFlag(t *testing.T) { "--datadir", datadir, "--nat", "none", "--nodiscover", "--dev", "--unlock", "f466859ead1932d743d622cb74fc058882e8648a", "js", "testdata/empty.js") - geth.expect(` + geth.Expect(` Unlocking account f466859ead1932d743d622cb74fc058882e8648a | Attempt 1/3 !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "foobar"}} `) - geth.expectExit() + geth.ExpectExit() wantMessages := []string{ "Unlocked account", "=0xf466859ead1932d743d622cb74fc058882e8648a", } for _, m := range wantMessages { - if !strings.Contains(geth.stderrText(), m) { + if !strings.Contains(geth.StderrText(), m) { t.Errorf("stderr text does not contain %q", m) } } @@ -160,8 +160,8 @@ func TestUnlockFlagWrongPassword(t *testing.T) { geth := runGeth(t, "--datadir", datadir, "--nat", "none", "--nodiscover", "--dev", "--unlock", "f466859ead1932d743d622cb74fc058882e8648a") - defer geth.expectExit() - geth.expect(` + defer geth.ExpectExit() + geth.Expect(` Unlocking account f466859ead1932d743d622cb74fc058882e8648a | Attempt 1/3 !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "wrong1"}} @@ -180,14 +180,14 @@ func TestUnlockFlagMultiIndex(t *testing.T) { "--datadir", datadir, "--nat", "none", "--nodiscover", "--dev", "--unlock", "0,2", "js", "testdata/empty.js") - geth.expect(` + geth.Expect(` Unlocking account 0 | Attempt 1/3 !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "foobar"}} Unlocking account 2 | Attempt 1/3 Passphrase: {{.InputLine "foobar"}} `) - geth.expectExit() + geth.ExpectExit() wantMessages := []string{ "Unlocked account", @@ -195,7 +195,7 @@ Passphrase: {{.InputLine "foobar"}} "=0x289d485d9771714cce91d3393d764e1311907acc", } for _, m := range wantMessages { - if !strings.Contains(geth.stderrText(), m) { + if !strings.Contains(geth.StderrText(), m) { t.Errorf("stderr text does not contain %q", m) } } @@ -207,7 +207,7 @@ func TestUnlockFlagPasswordFile(t *testing.T) { "--datadir", datadir, "--nat", "none", "--nodiscover", "--dev", "--password", "testdata/passwords.txt", "--unlock", "0,2", "js", "testdata/empty.js") - geth.expectExit() + geth.ExpectExit() wantMessages := []string{ "Unlocked account", @@ -215,7 +215,7 @@ func TestUnlockFlagPasswordFile(t *testing.T) { "=0x289d485d9771714cce91d3393d764e1311907acc", } for _, m := range wantMessages { - if !strings.Contains(geth.stderrText(), m) { + if !strings.Contains(geth.StderrText(), m) { t.Errorf("stderr text does not contain %q", m) } } @@ -226,8 +226,8 @@ func TestUnlockFlagPasswordFileWrongPassword(t *testing.T) { geth := runGeth(t, "--datadir", datadir, "--nat", "none", "--nodiscover", "--dev", "--password", "testdata/wrong-passwords.txt", "--unlock", "0,2") - defer geth.expectExit() - geth.expect(` + defer geth.ExpectExit() + geth.Expect(` Fatal: Failed to unlock account 0 (could not decrypt key with given passphrase) `) } @@ -238,14 +238,14 @@ func TestUnlockFlagAmbiguous(t *testing.T) { "--keystore", store, "--nat", "none", "--nodiscover", "--dev", "--unlock", "f466859ead1932d743d622cb74fc058882e8648a", "js", "testdata/empty.js") - defer geth.expectExit() + defer geth.ExpectExit() // Helper for the expect template, returns absolute keystore path. - geth.setTemplateFunc("keypath", func(file string) string { + geth.SetTemplateFunc("keypath", func(file string) string { abs, _ := filepath.Abs(filepath.Join(store, file)) return abs }) - geth.expect(` + geth.Expect(` Unlocking account f466859ead1932d743d622cb74fc058882e8648a | Attempt 1/3 !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "foobar"}} @@ -257,14 +257,14 @@ Your passphrase unlocked keystore://{{keypath "1"}} In order to avoid this warning, you need to remove the following duplicate key files: keystore://{{keypath "2"}} `) - geth.expectExit() + geth.ExpectExit() wantMessages := []string{ "Unlocked account", "=0xf466859ead1932d743d622cb74fc058882e8648a", } for _, m := range wantMessages { - if !strings.Contains(geth.stderrText(), m) { + if !strings.Contains(geth.StderrText(), m) { t.Errorf("stderr text does not contain %q", m) } } @@ -275,14 +275,14 @@ func TestUnlockFlagAmbiguousWrongPassword(t *testing.T) { geth := runGeth(t, "--keystore", store, "--nat", "none", "--nodiscover", "--dev", "--unlock", "f466859ead1932d743d622cb74fc058882e8648a") - defer geth.expectExit() + defer geth.ExpectExit() // Helper for the expect template, returns absolute keystore path. - geth.setTemplateFunc("keypath", func(file string) string { + geth.SetTemplateFunc("keypath", func(file string) string { abs, _ := filepath.Abs(filepath.Join(store, file)) return abs }) - geth.expect(` + geth.Expect(` Unlocking account f466859ead1932d743d622cb74fc058882e8648a | Attempt 1/3 !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "wrong"}} @@ -292,5 +292,5 @@ Multiple key files exist for address f466859ead1932d743d622cb74fc058882e8648a: Testing your passphrase against all of them... Fatal: None of the listed files could be unlocked. `) - geth.expectExit() + geth.ExpectExit() } diff --git a/cmd/geth/consolecmd_test.go b/cmd/geth/consolecmd_test.go index e5472836c..258b9e6dd 100644 --- a/cmd/geth/consolecmd_test.go +++ b/cmd/geth/consolecmd_test.go @@ -47,15 +47,15 @@ func TestConsoleWelcome(t *testing.T) { "console") // Gather all the infos the welcome message needs to contain - geth.setTemplateFunc("goos", func() string { return runtime.GOOS }) - geth.setTemplateFunc("goarch", func() string { return runtime.GOARCH }) - geth.setTemplateFunc("gover", runtime.Version) - geth.setTemplateFunc("gethver", func() string { return params.Version }) - geth.setTemplateFunc("niltime", func() string { return time.Unix(0, 0).Format(time.RFC1123) }) - geth.setTemplateFunc("apis", func() string { return ipcAPIs }) + geth.SetTemplateFunc("goos", func() string { return runtime.GOOS }) + geth.SetTemplateFunc("goarch", func() string { return runtime.GOARCH }) + geth.SetTemplateFunc("gover", runtime.Version) + geth.SetTemplateFunc("gethver", func() string { return params.Version }) + geth.SetTemplateFunc("niltime", func() string { return time.Unix(0, 0).Format(time.RFC1123) }) + geth.SetTemplateFunc("apis", func() string { return ipcAPIs }) // Verify the actual welcome message to the required template - geth.expect(` + geth.Expect(` Welcome to the Geth JavaScript console! instance: Geth/v{{gethver}}/{{goos}}-{{goarch}}/{{gover}} @@ -66,7 +66,7 @@ at block: 0 ({{niltime}}) > {{.InputLine "exit"}} `) - geth.expectExit() + geth.ExpectExit() } // Tests that a console can be attached to a running node via various means. @@ -90,8 +90,8 @@ func TestIPCAttachWelcome(t *testing.T) { time.Sleep(2 * time.Second) // Simple way to wait for the RPC endpoint to open testAttachWelcome(t, geth, "ipc:"+ipc, ipcAPIs) - geth.interrupt() - geth.expectExit() + geth.Interrupt() + geth.ExpectExit() } func TestHTTPAttachWelcome(t *testing.T) { @@ -104,8 +104,8 @@ func TestHTTPAttachWelcome(t *testing.T) { time.Sleep(2 * time.Second) // Simple way to wait for the RPC endpoint to open testAttachWelcome(t, geth, "http://localhost:"+port, httpAPIs) - geth.interrupt() - geth.expectExit() + geth.Interrupt() + geth.ExpectExit() } func TestWSAttachWelcome(t *testing.T) { @@ -119,29 +119,29 @@ func TestWSAttachWelcome(t *testing.T) { time.Sleep(2 * time.Second) // Simple way to wait for the RPC endpoint to open testAttachWelcome(t, geth, "ws://localhost:"+port, httpAPIs) - geth.interrupt() - geth.expectExit() + geth.Interrupt() + geth.ExpectExit() } func testAttachWelcome(t *testing.T, geth *testgeth, endpoint, apis string) { // Attach to a running geth note and terminate immediately attach := runGeth(t, "attach", endpoint) - defer attach.expectExit() - attach.stdin.Close() + defer attach.ExpectExit() + attach.CloseStdin() // Gather all the infos the welcome message needs to contain - attach.setTemplateFunc("goos", func() string { return runtime.GOOS }) - attach.setTemplateFunc("goarch", func() string { return runtime.GOARCH }) - attach.setTemplateFunc("gover", runtime.Version) - attach.setTemplateFunc("gethver", func() string { return params.Version }) - attach.setTemplateFunc("etherbase", func() string { return geth.Etherbase }) - attach.setTemplateFunc("niltime", func() string { return time.Unix(0, 0).Format(time.RFC1123) }) - attach.setTemplateFunc("ipc", func() bool { return strings.HasPrefix(endpoint, "ipc") }) - attach.setTemplateFunc("datadir", func() string { return geth.Datadir }) - attach.setTemplateFunc("apis", func() string { return apis }) + attach.SetTemplateFunc("goos", func() string { return runtime.GOOS }) + attach.SetTemplateFunc("goarch", func() string { return runtime.GOARCH }) + attach.SetTemplateFunc("gover", runtime.Version) + attach.SetTemplateFunc("gethver", func() string { return params.Version }) + attach.SetTemplateFunc("etherbase", func() string { return geth.Etherbase }) + attach.SetTemplateFunc("niltime", func() string { return time.Unix(0, 0).Format(time.RFC1123) }) + attach.SetTemplateFunc("ipc", func() bool { return strings.HasPrefix(endpoint, "ipc") }) + attach.SetTemplateFunc("datadir", func() string { return geth.Datadir }) + attach.SetTemplateFunc("apis", func() string { return apis }) // Verify the actual welcome message to the required template - attach.expect(` + attach.Expect(` Welcome to the Geth JavaScript console! instance: Geth/v{{gethver}}/{{goos}}-{{goarch}}/{{gover}} @@ -152,7 +152,7 @@ at block: 0 ({{niltime}}){{if ipc}} > {{.InputLine "exit" }} `) - attach.expectExit() + attach.ExpectExit() } // trulyRandInt generates a crypto random integer used by the console tests to diff --git a/cmd/geth/dao_test.go b/cmd/geth/dao_test.go index ec7802ada..8cc66aabf 100644 --- a/cmd/geth/dao_test.go +++ b/cmd/geth/dao_test.go @@ -112,12 +112,12 @@ func testDAOForkBlockNewChain(t *testing.T, test int, genesis string, expectBloc if err := ioutil.WriteFile(json, []byte(genesis), 0600); err != nil { t.Fatalf("test %d: failed to write genesis file: %v", test, err) } - runGeth(t, "--datadir", datadir, "init", json).cmd.Wait() + runGeth(t, "--datadir", datadir, "init", json).WaitExit() } else { // Force chain initialization args := []string{"--port", "0", "--maxpeers", "0", "--nodiscover", "--nat", "none", "--ipcdisable", "--datadir", datadir} geth := runGeth(t, append(args, []string{"--exec", "2+2", "console"}...)...) - geth.cmd.Wait() + geth.WaitExit() } // Retrieve the DAO config flag from the database path := filepath.Join(datadir, "geth", "chaindata") diff --git a/cmd/geth/genesis_test.go b/cmd/geth/genesis_test.go index 6c3ca7298..a00ae00c1 100644 --- a/cmd/geth/genesis_test.go +++ b/cmd/geth/genesis_test.go @@ -97,14 +97,14 @@ func TestCustomGenesis(t *testing.T) { if err := ioutil.WriteFile(json, []byte(tt.genesis), 0600); err != nil { t.Fatalf("test %d: failed to write genesis file: %v", i, err) } - runGeth(t, "--datadir", datadir, "init", json).cmd.Wait() + runGeth(t, "--datadir", datadir, "init", json).WaitExit() // Query the custom genesis block geth := runGeth(t, "--datadir", datadir, "--maxpeers", "0", "--port", "0", "--nodiscover", "--nat", "none", "--ipcdisable", "--exec", tt.query, "console") - geth.expectRegexp(tt.result) - geth.expectExit() + geth.ExpectRegexp(tt.result) + geth.ExpectExit() } } diff --git a/cmd/geth/main.go b/cmd/geth/main.go index cc481796f..b5cdd712d 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -252,10 +252,12 @@ func startNode(ctx *cli.Context, stack *node.Node) { }() // Start auxiliary services if enabled if ctx.GlobalBool(utils.MiningEnabledFlag.Name) { + // Mining only makes sense if a full Ethereum node is running var ethereum *eth.Ethereum if err := stack.Service(ðereum); err != nil { utils.Fatalf("ethereum service not running: %v", err) } + // Use a reduced number of threads if requested if threads := ctx.GlobalInt(utils.MinerThreadsFlag.Name); threads > 0 { type threaded interface { SetThreads(threads int) @@ -264,6 +266,8 @@ func startNode(ctx *cli.Context, stack *node.Node) { th.SetThreads(threads) } } + // Set the gas price to the limits from the CLI and start mining + ethereum.TxPool().SetGasPrice(utils.GlobalBig(ctx, utils.GasPriceFlag.Name)) if err := ethereum.StartMining(true); err != nil { utils.Fatalf("Failed to start mining: %v", err) } diff --git a/cmd/geth/run_test.go b/cmd/geth/run_test.go index e26b4509a..da82facac 100644 --- a/cmd/geth/run_test.go +++ b/cmd/geth/run_test.go @@ -17,18 +17,13 @@ package main import ( - "bufio" - "bytes" "fmt" - "io" "io/ioutil" "os" - "os/exec" - "regexp" - "sync" "testing" - "text/template" - "time" + + "github.com/docker/docker/pkg/reexec" + "github.com/ethereum/go-ethereum/internal/cmdtest" ) func tmpdir(t *testing.T) string { @@ -40,36 +35,37 @@ func tmpdir(t *testing.T) string { } type testgeth struct { - // For total convenience, all testing methods are available. - *testing.T - // template variables for expect - Datadir string - Executable string - Etherbase string - Func template.FuncMap + *cmdtest.TestCmd - removeDatadir bool - cmd *exec.Cmd - stdout *bufio.Reader - stdin io.WriteCloser - stderr *testlogger + // template variables for expect + Datadir string + Etherbase string } func init() { - // Run the app if we're the child process for runGeth. - if os.Getenv("GETH_TEST_CHILD") != "" { + // Run the app if we've been exec'd as "geth-test" in runGeth. + reexec.Register("geth-test", func() { if err := app.Run(os.Args); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } os.Exit(0) + }) +} + +func TestMain(m *testing.M) { + // check if we have been reexec'd + if reexec.Init() { + return } + os.Exit(m.Run()) } // spawns geth with the given command line args. If the args don't set --datadir, the // child g gets a temporary data directory. func runGeth(t *testing.T, args ...string) *testgeth { - tt := &testgeth{T: t, Executable: os.Args[0]} + tt := &testgeth{} + tt.TestCmd = cmdtest.NewTestCmd(t, tt) for i, arg := range args { switch { case arg == "-datadir" || arg == "--datadir": @@ -84,215 +80,19 @@ func runGeth(t *testing.T, args ...string) *testgeth { } if tt.Datadir == "" { tt.Datadir = tmpdir(t) - tt.removeDatadir = true + tt.Cleanup = func() { os.RemoveAll(tt.Datadir) } args = append([]string{"-datadir", tt.Datadir}, args...) // Remove the temporary datadir if something fails below. defer func() { if t.Failed() { - os.RemoveAll(tt.Datadir) + tt.Cleanup() } }() } - // Boot "geth". This actually runs the test binary but the init function - // will prevent any tests from running. - tt.stderr = &testlogger{t: t} - tt.cmd = exec.Command(os.Args[0], args...) - tt.cmd.Env = append(os.Environ(), "GETH_TEST_CHILD=1") - tt.cmd.Stderr = tt.stderr - stdout, err := tt.cmd.StdoutPipe() - if err != nil { - t.Fatal(err) - } - tt.stdout = bufio.NewReader(stdout) - if tt.stdin, err = tt.cmd.StdinPipe(); err != nil { - t.Fatal(err) - } - if err := tt.cmd.Start(); err != nil { - t.Fatal(err) - } - return tt -} - -// InputLine writes the given text to the childs stdin. -// This method can also be called from an expect template, e.g.: -// -// geth.expect(`Passphrase: {{.InputLine "password"}}`) -func (tt *testgeth) InputLine(s string) string { - io.WriteString(tt.stdin, s+"\n") - return "" -} - -func (tt *testgeth) setTemplateFunc(name string, fn interface{}) { - if tt.Func == nil { - tt.Func = make(map[string]interface{}) - } - tt.Func[name] = fn -} - -// expect runs its argument as a template, then expects the -// child process to output the result of the template within 5s. -// -// If the template starts with a newline, the newline is removed -// before matching. -func (tt *testgeth) expect(tplsource string) { - // Generate the expected output by running the template. - tpl := template.Must(template.New("").Funcs(tt.Func).Parse(tplsource)) - wantbuf := new(bytes.Buffer) - if err := tpl.Execute(wantbuf, tt); err != nil { - panic(err) - } - // Trim exactly one newline at the beginning. This makes tests look - // much nicer because all expect strings are at column 0. - want := bytes.TrimPrefix(wantbuf.Bytes(), []byte("\n")) - if err := tt.matchExactOutput(want); err != nil { - tt.Fatal(err) - } - tt.Logf("Matched stdout text:\n%s", want) -} - -func (tt *testgeth) matchExactOutput(want []byte) error { - buf := make([]byte, len(want)) - n := 0 - tt.withKillTimeout(func() { n, _ = io.ReadFull(tt.stdout, buf) }) - buf = buf[:n] - if n < len(want) || !bytes.Equal(buf, want) { - // Grab any additional buffered output in case of mismatch - // because it might help with debugging. - buf = append(buf, make([]byte, tt.stdout.Buffered())...) - tt.stdout.Read(buf[n:]) - // Find the mismatch position. - for i := 0; i < n; i++ { - if want[i] != buf[i] { - return fmt.Errorf("Output mismatch at â—Š:\n---------------- (stdout text)\n%sâ—Š%s\n---------------- (expected text)\n%s", - buf[:i], buf[i:n], want) - } - } - if n < len(want) { - return fmt.Errorf("Not enough output, got until â—Š:\n---------------- (stdout text)\n%s\n---------------- (expected text)\n%sâ—Š%s", - buf, want[:n], want[n:]) - } - } - return nil -} - -// expectRegexp expects the child process to output text matching the -// given regular expression within 5s. -// -// Note that an arbitrary amount of output may be consumed by the -// regular expression. This usually means that expect cannot be used -// after expectRegexp. -func (tt *testgeth) expectRegexp(resource string) (*regexp.Regexp, []string) { - var ( - re = regexp.MustCompile(resource) - rtee = &runeTee{in: tt.stdout} - matches []int - ) - tt.withKillTimeout(func() { matches = re.FindReaderSubmatchIndex(rtee) }) - output := rtee.buf.Bytes() - if matches == nil { - tt.Fatalf("Output did not match:\n---------------- (stdout text)\n%s\n---------------- (regular expression)\n%s", - output, resource) - return re, nil - } - tt.Logf("Matched stdout text:\n%s", output) - var submatch []string - for i := 0; i < len(matches); i += 2 { - submatch = append(submatch, string(output[i:i+1])) - } - return re, submatch -} - -// expectExit expects the child process to exit within 5s without -// printing any additional text on stdout. -func (tt *testgeth) expectExit() { - var output []byte - tt.withKillTimeout(func() { - output, _ = ioutil.ReadAll(tt.stdout) - }) - tt.cmd.Wait() - if tt.removeDatadir { - os.RemoveAll(tt.Datadir) - } - if len(output) > 0 { - tt.Errorf("Unmatched stdout text:\n%s", output) - } -} - -func (tt *testgeth) interrupt() { - tt.cmd.Process.Signal(os.Interrupt) -} - -// stderrText returns any stderr output written so far. -// The returned text holds all log lines after expectExit has -// returned. -func (tt *testgeth) stderrText() string { - tt.stderr.mu.Lock() - defer tt.stderr.mu.Unlock() - return tt.stderr.buf.String() -} - -func (tt *testgeth) withKillTimeout(fn func()) { - timeout := time.AfterFunc(5*time.Second, func() { - tt.Log("killing the child process (timeout)") - tt.cmd.Process.Kill() - if tt.removeDatadir { - os.RemoveAll(tt.Datadir) - } - }) - defer timeout.Stop() - fn() -} + // Boot "geth". This actually runs the test binary but the TestMain + // function will prevent any tests from running. + tt.Run("geth-test", args...) -// testlogger logs all written lines via t.Log and also -// collects them for later inspection. -type testlogger struct { - t *testing.T - mu sync.Mutex - buf bytes.Buffer -} - -func (tl *testlogger) Write(b []byte) (n int, err error) { - lines := bytes.Split(b, []byte("\n")) - for _, line := range lines { - if len(line) > 0 { - tl.t.Logf("(stderr) %s", line) - } - } - tl.mu.Lock() - tl.buf.Write(b) - tl.mu.Unlock() - return len(b), err -} - -// runeTee collects text read through it into buf. -type runeTee struct { - in interface { - io.Reader - io.ByteReader - io.RuneReader - } - buf bytes.Buffer -} - -func (rtee *runeTee) Read(b []byte) (n int, err error) { - n, err = rtee.in.Read(b) - rtee.buf.Write(b[:n]) - return n, err -} - -func (rtee *runeTee) ReadRune() (r rune, size int, err error) { - r, size, err = rtee.in.ReadRune() - if err == nil { - rtee.buf.WriteRune(r) - } - return r, size, err -} - -func (rtee *runeTee) ReadByte() (b byte, err error) { - b, err = rtee.in.ReadByte() - if err == nil { - rtee.buf.WriteByte(b) - } - return b, err + return tt } diff --git a/cmd/swarm/run_test.go b/cmd/swarm/run_test.go new file mode 100644 index 000000000..2d32a51c8 --- /dev/null +++ b/cmd/swarm/run_test.go @@ -0,0 +1,255 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. + +package main + +import ( + "fmt" + "io/ioutil" + "net" + "os" + "path/filepath" + "runtime" + "testing" + "time" + + "github.com/docker/docker/pkg/reexec" + "github.com/ethereum/go-ethereum/accounts/keystore" + "github.com/ethereum/go-ethereum/internal/cmdtest" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/swarm" +) + +func init() { + // Run the app if we've been exec'd as "swarm-test" in runSwarm. + reexec.Register("swarm-test", func() { + if err := app.Run(os.Args); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + os.Exit(0) + }) +} + +func TestMain(m *testing.M) { + // check if we have been reexec'd + if reexec.Init() { + return + } + os.Exit(m.Run()) +} + +func runSwarm(t *testing.T, args ...string) *cmdtest.TestCmd { + tt := cmdtest.NewTestCmd(t, nil) + + // Boot "swarm". This actually runs the test binary but the TestMain + // function will prevent any tests from running. + tt.Run("swarm-test", args...) + + return tt +} + +type testCluster struct { + Nodes []*testNode + TmpDir string +} + +// newTestCluster starts a test swarm cluster of the given size. +// +// A temporary directory is created and each node gets a data directory inside +// it. +// +// Each node listens on 127.0.0.1 with random ports for both the HTTP and p2p +// ports (assigned by first listening on 127.0.0.1:0 and then passing the ports +// as flags). +// +// When starting more than one node, they are connected together using the +// admin SetPeer RPC method. +func newTestCluster(t *testing.T, size int) *testCluster { + cluster := &testCluster{} + defer func() { + if t.Failed() { + cluster.Shutdown() + } + }() + + tmpdir, err := ioutil.TempDir("", "swarm-test") + if err != nil { + t.Fatal(err) + } + cluster.TmpDir = tmpdir + + // start the nodes + cluster.Nodes = make([]*testNode, 0, size) + for i := 0; i < size; i++ { + dir := filepath.Join(cluster.TmpDir, fmt.Sprintf("swarm%02d", i)) + if err := os.Mkdir(dir, 0700); err != nil { + t.Fatal(err) + } + + node := newTestNode(t, dir) + node.Name = fmt.Sprintf("swarm%02d", i) + + cluster.Nodes = append(cluster.Nodes, node) + } + + if size == 1 { + return cluster + } + + // connect the nodes together + for _, node := range cluster.Nodes { + if err := node.Client.Call(nil, "admin_addPeer", cluster.Nodes[0].Enode); err != nil { + t.Fatal(err) + } + } + + // wait until all nodes have the correct number of peers +outer: + for _, node := range cluster.Nodes { + var peers []*p2p.PeerInfo + for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(50 * time.Millisecond) { + if err := node.Client.Call(&peers, "admin_peers"); err != nil { + t.Fatal(err) + } + if len(peers) == len(cluster.Nodes)-1 { + continue outer + } + } + t.Fatalf("%s only has %d / %d peers", node.Name, len(peers), len(cluster.Nodes)-1) + } + + return cluster +} + +func (c *testCluster) Shutdown() { + for _, node := range c.Nodes { + node.Shutdown() + } + os.RemoveAll(c.TmpDir) +} + +type testNode struct { + Name string + Addr string + URL string + Enode string + Dir string + Client *rpc.Client + Cmd *cmdtest.TestCmd +} + +const testPassphrase = "swarm-test-passphrase" + +func newTestNode(t *testing.T, dir string) *testNode { + // create key + conf := &node.Config{ + DataDir: dir, + IPCPath: "bzzd.ipc", + } + n, err := node.New(conf) + if err != nil { + t.Fatal(err) + } + account, err := n.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore).NewAccount(testPassphrase) + if err != nil { + t.Fatal(err) + } + + node := &testNode{Dir: dir} + + // use a unique IPCPath when running tests on Windows + if runtime.GOOS == "windows" { + conf.IPCPath = fmt.Sprintf("bzzd-%s.ipc", account.Address.String()) + } + + // assign ports + httpPort, err := assignTCPPort() + if err != nil { + t.Fatal(err) + } + p2pPort, err := assignTCPPort() + if err != nil { + t.Fatal(err) + } + + // start the node + node.Cmd = runSwarm(t, + "--port", p2pPort, + "--nodiscover", + "--datadir", dir, + "--ipcpath", conf.IPCPath, + "--ethapi", "", + "--bzzaccount", account.Address.String(), + "--bzznetworkid", "321", + "--bzzport", httpPort, + "--verbosity", "6", + ) + node.Cmd.InputLine(testPassphrase) + defer func() { + if t.Failed() { + node.Shutdown() + } + }() + + // wait for the node to start + for start := time.Now(); time.Since(start) < 10*time.Second; time.Sleep(50 * time.Millisecond) { + node.Client, err = rpc.Dial(conf.IPCEndpoint()) + if err == nil { + break + } + } + if node.Client == nil { + t.Fatal(err) + } + + // load info + var info swarm.Info + if err := node.Client.Call(&info, "bzz_info"); err != nil { + t.Fatal(err) + } + node.Addr = net.JoinHostPort("127.0.0.1", info.Port) + node.URL = "http://" + node.Addr + + var nodeInfo p2p.NodeInfo + if err := node.Client.Call(&nodeInfo, "admin_nodeInfo"); err != nil { + t.Fatal(err) + } + node.Enode = fmt.Sprintf("enode://%s@127.0.0.1:%s", nodeInfo.ID, p2pPort) + + return node +} + +func (n *testNode) Shutdown() { + if n.Cmd != nil { + n.Cmd.Kill() + } +} + +func assignTCPPort() (string, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return "", err + } + l.Close() + _, port, err := net.SplitHostPort(l.Addr().String()) + if err != nil { + return "", err + } + return port, nil +} diff --git a/cmd/swarm/upload_test.go b/cmd/swarm/upload_test.go new file mode 100644 index 000000000..5b74dd4f1 --- /dev/null +++ b/cmd/swarm/upload_test.go @@ -0,0 +1,78 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. + +package main + +import ( + "io" + "io/ioutil" + "net/http" + "os" + "testing" +) + +// TestCLISwarmUp tests that running 'swarm up' makes the resulting file +// available from all nodes via the HTTP API +func TestCLISwarmUp(t *testing.T) { + t.Skip("flaky test") + + // start 3 node cluster + t.Log("starting 3 node cluster") + cluster := newTestCluster(t, 3) + defer cluster.Shutdown() + + // create a tmp file + tmp, err := ioutil.TempFile("", "swarm-test") + assertNil(t, err) + defer tmp.Close() + defer os.Remove(tmp.Name()) + _, err = io.WriteString(tmp, "data") + assertNil(t, err) + + // upload the file with 'swarm up' and expect a hash + t.Log("uploading file with 'swarm up'") + up := runSwarm(t, "--bzzapi", cluster.Nodes[0].URL, "up", tmp.Name()) + _, matches := up.ExpectRegexp(`[a-f\d]{64}`) + up.ExpectExit() + hash := matches[0] + t.Logf("file uploaded with hash %s", hash) + + // get the file from the HTTP API of each node + for _, node := range cluster.Nodes { + t.Logf("getting file from %s", node.Name) + res, err := http.Get(node.URL + "/bzz:/" + hash) + assertNil(t, err) + assertHTTPResponse(t, res, http.StatusOK, "data") + } +} + +func assertNil(t *testing.T, err error) { + if err != nil { + t.Fatal(err) + } +} + +func assertHTTPResponse(t *testing.T, res *http.Response, expectedStatus int, expectedBody string) { + defer res.Body.Close() + if res.StatusCode != expectedStatus { + t.Fatalf("expected HTTP status %d, got %s", expectedStatus, res.Status) + } + data, err := ioutil.ReadAll(res.Body) + assertNil(t, err) + if string(data) != expectedBody { + t.Fatalf("expected HTTP body %q, got %q", expectedBody, data) + } +} diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 5a50b2eb4..d2fb6934b 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -99,10 +99,10 @@ var ( // their extra-data fields. errExtraSigners = errors.New("non-checkpoint block contains extra signer list") - // drrInvalidCheckpointSigners is returned if a checkpoint block contains an + // errInvalidCheckpointSigners is returned if a checkpoint block contains an // invalid list of signers (i.e. non divisible by 20 bytes, or not the correct // ones). - drrInvalidCheckpointSigners = errors.New("invalid signer list on checkpoint block") + errInvalidCheckpointSigners = errors.New("invalid signer list on checkpoint block") // errInvalidMixDigest is returned if a block's mix digest is non-zero. errInvalidMixDigest = errors.New("non-zero mix digest") @@ -297,7 +297,7 @@ func (c *Clique) verifyHeader(chain consensus.ChainReader, header *types.Header, return errExtraSigners } if checkpoint && signersBytes%common.AddressLength != 0 { - return drrInvalidCheckpointSigners + return errInvalidCheckpointSigners } // Ensure that the mix digest is zero as we don't have fork protection currently if header.MixDigest != (common.Hash{}) { @@ -353,7 +353,7 @@ func (c *Clique) verifyCascadingFields(chain consensus.ChainReader, header *type } extraSuffix := len(header.Extra) - extraSeal if !bytes.Equal(header.Extra[extraVanity:extraSuffix], signers) { - return drrInvalidCheckpointSigners + return errInvalidCheckpointSigners } } // All basic checks passed, verify the seal and return @@ -467,7 +467,6 @@ func (c *Clique) verifySeal(chain consensus.ChainReader, header *types.Header, p if err != nil { return err } - c.recents.Add(snap.Hash, snap) // Resolve the authorization key and check against signers signer, err := ecrecover(header, c.signatures) @@ -479,13 +478,13 @@ func (c *Clique) verifySeal(chain consensus.ChainReader, header *types.Header, p } for seen, recent := range snap.Recents { if recent == signer { - // Signer is among recents, only fail if the current block doens't shift it out + // Signer is among recents, only fail if the current block doesn't shift it out if limit := uint64(len(snap.Signers)/2 + 1); seen > number-limit { return errUnauthorized } } } - // Ensure that the difficulty corresponts to the turn-ness of the signer + // Ensure that the difficulty corresponds to the turn-ness of the signer inturn := snap.inturn(header.Number.Uint64(), signer) if inturn && header.Difficulty.Cmp(diffInTurn) != 0 { return errInvalidDifficulty @@ -504,13 +503,24 @@ func (c *Clique) Prepare(chain consensus.ChainReader, header *types.Header) erro header.Nonce = types.BlockNonce{} number := header.Number.Uint64() + + // Assemble the voting snapshot to check which votes make sense + snap, err := c.snapshot(chain, number-1, header.ParentHash, nil) + if err != nil { + return err + } if number%c.config.Epoch != 0 { c.lock.RLock() - if len(c.proposals) > 0 { - addresses := make([]common.Address, 0, len(c.proposals)) - for address := range c.proposals { + + // Gather all the proposals that make sense voting on + addresses := make([]common.Address, 0, len(c.proposals)) + for address, authorize := range c.proposals { + if snap.validVote(address, authorize) { addresses = append(addresses, address) } + } + // If there's pending proposals, cast a vote on them + if len(addresses) > 0 { header.Coinbase = addresses[rand.Intn(len(addresses))] if c.proposals[header.Coinbase] { copy(header.Nonce[:], nonceAuthVote) @@ -520,11 +530,7 @@ func (c *Clique) Prepare(chain consensus.ChainReader, header *types.Header) erro } c.lock.RUnlock() } - // Assemble the voting snapshot and set the correct difficulty - snap, err := c.snapshot(chain, number-1, header.ParentHash, nil) - if err != nil { - return err - } + // Set the correct difficulty header.Difficulty = diffNoTurn if snap.inturn(header.Number.Uint64(), c.signer) { header.Difficulty = diffInTurn @@ -604,7 +610,7 @@ func (c *Clique) Seal(chain consensus.ChainReader, block *types.Block, stop <-ch // If we're amongst the recent signers, wait for the next block for seen, recent := range snap.Recents { if recent == signer { - // Signer is among recents, only wait if the current block doens't shift it out + // Signer is among recents, only wait if the current block doesn't shift it out if limit := uint64(len(snap.Signers)/2 + 1); number < limit || seen > number-limit { log.Info("Signed recently, must wait for others") <-stop diff --git a/consensus/clique/snapshot.go b/consensus/clique/snapshot.go index 8eaf3b62e..32a1191db 100644 --- a/consensus/clique/snapshot.go +++ b/consensus/clique/snapshot.go @@ -126,11 +126,17 @@ func (s *Snapshot) copy() *Snapshot { return cpy } +// validVote returns whether it makes sense to cast the specified vote in the +// given snapshot context (e.g. don't try to add an already authorized signer). +func (s *Snapshot) validVote(address common.Address, authorize bool) bool { + _, signer := s.Signers[address] + return (signer && !authorize) || (!signer && authorize) +} + // cast adds a new vote into the tally. func (s *Snapshot) cast(address common.Address, authorize bool) bool { // Ensure the vote is meaningful - _, signer := s.Signers[address] - if (signer && authorize) || (!signer && !authorize) { + if !s.validVote(address, authorize) { return false } // Cast the vote into an existing or new tally diff --git a/core/state/sync.go b/core/state/sync.go index 8456a810b..2c29d706a 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -59,10 +59,16 @@ func (s *StateSync) Missing(max int) []common.Hash { } // Process injects a batch of retrieved trie nodes data, returning if something -// was committed to the database and also the index of an entry if processing of +// was committed to the memcache and also the index of an entry if processing of // it failed. -func (s *StateSync) Process(list []trie.SyncResult, dbw trie.DatabaseWriter) (bool, int, error) { - return (*trie.TrieSync)(s).Process(list, dbw) +func (s *StateSync) Process(list []trie.SyncResult) (bool, int, error) { + return (*trie.TrieSync)(s).Process(list) +} + +// Commit flushes the data stored in the internal memcache out to persistent +// storage, returning th enumber of items written and any occurred error. +func (s *StateSync) Commit(dbw trie.DatabaseWriter) (int, error) { + return (*trie.TrieSync)(s).Commit(dbw) } // Pending returns the number of state entries currently pending for download. diff --git a/core/state/sync_test.go b/core/state/sync_test.go index 43d146e3a..108ebb320 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -138,9 +138,12 @@ func testIterativeStateSync(t *testing.T, batch int) { } results[i] = trie.SyncResult{Hash: hash, Data: data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[:0], sched.Missing(batch)...) } // Cross check that the two states are in sync @@ -168,9 +171,12 @@ func TestIterativeDelayedStateSync(t *testing.T) { } results[i] = trie.SyncResult{Hash: hash, Data: data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[len(results):], sched.Missing(0)...) } // Cross check that the two states are in sync @@ -206,9 +212,12 @@ func testIterativeRandomStateSync(t *testing.T, batch int) { results = append(results, trie.SyncResult{Hash: hash, Data: data}) } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = make(map[common.Hash]struct{}) for _, hash := range sched.Missing(batch) { queue[hash] = struct{}{} @@ -249,9 +258,12 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) { } } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } for _, hash := range sched.Missing(0) { queue[hash] = struct{}{} } @@ -283,9 +295,12 @@ func TestIncompleteStateSync(t *testing.T) { results[i] = trie.SyncResult{Hash: hash, Data: data} } // Process each of the state nodes - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } for _, result := range results { added = append(added, result.Hash) } diff --git a/core/tx_pool.go b/core/tx_pool.go index 04ffa8a98..2f3cd1e93 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -35,16 +35,41 @@ import ( ) var ( - // Transaction Pool Errors - ErrInvalidSender = errors.New("invalid sender") - ErrNonce = errors.New("nonce too low") - ErrUnderpriced = errors.New("transaction underpriced") + // ErrInvalidSender is returned if the transaction contains an invalid signature. + ErrInvalidSender = errors.New("invalid sender") + + // ErrNonceTooLow is returned if the nonce of a transaction is lower than the + // one present in the local chain. + ErrNonceTooLow = errors.New("nonce too low") + + // ErrUnderpriced is returned if a transaction's gas price is below the minimum + // configured for the transaction pool. + ErrUnderpriced = errors.New("transaction underpriced") + + // ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced + // with a different one without the required price bump. ErrReplaceUnderpriced = errors.New("replacement transaction underpriced") - ErrBalance = errors.New("insufficient balance") - ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value") - ErrIntrinsicGas = errors.New("intrinsic gas too low") - ErrGasLimit = errors.New("exceeds block gas limit") - ErrNegativeValue = errors.New("negative value") + + // ErrInsufficientFunds is returned if the total cost of executing a transaction + // is higher than the balance of the user's account. + ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value") + + // ErrIntrinsicGas is returned if the transaction is specified to use less gas + // than required to start the invocation. + ErrIntrinsicGas = errors.New("intrinsic gas too low") + + // ErrGasLimit is returned if a transaction's requested gas limit exceeds the + // maximum allowance of the current block. + ErrGasLimit = errors.New("exceeds block gas limit") + + // ErrNegativeValue is a sanity error to ensure noone is able to specify a + // transaction with a negative value. + ErrNegativeValue = errors.New("negative value") + + // ErrOversizedData is returned if the input data of a transaction is greater + // than some meaningful limit a user might use. This is not a consensus error + // making the transaction invalid, rather a DOS protection. + ErrOversizedData = errors.New("oversized data") ) var ( @@ -54,16 +79,16 @@ var ( var ( // Metrics for the pending pool - pendingDiscardCounter = metrics.NewCounter("txpool/pending/discard") - pendingReplaceCounter = metrics.NewCounter("txpool/pending/replace") - pendingRLCounter = metrics.NewCounter("txpool/pending/ratelimit") // Dropped due to rate limiting - pendingNofundsCounter = metrics.NewCounter("txpool/pending/nofunds") // Dropped due to out-of-funds + pendingDiscardCounter = metrics.NewCounter("txpool/pending/discard") + pendingReplaceCounter = metrics.NewCounter("txpool/pending/replace") + pendingRateLimitCounter = metrics.NewCounter("txpool/pending/ratelimit") // Dropped due to rate limiting + pendingNofundsCounter = metrics.NewCounter("txpool/pending/nofunds") // Dropped due to out-of-funds // Metrics for the queued pool - queuedDiscardCounter = metrics.NewCounter("txpool/queued/discard") - queuedReplaceCounter = metrics.NewCounter("txpool/queued/replace") - queuedRLCounter = metrics.NewCounter("txpool/queued/ratelimit") // Dropped due to rate limiting - queuedNofundsCounter = metrics.NewCounter("txpool/queued/nofunds") // Dropped due to out-of-funds + queuedDiscardCounter = metrics.NewCounter("txpool/queued/discard") + queuedReplaceCounter = metrics.NewCounter("txpool/queued/replace") + queuedRateLimitCounter = metrics.NewCounter("txpool/queued/ratelimit") // Dropped due to rate limiting + queuedNofundsCounter = metrics.NewCounter("txpool/queued/nofunds") // Dropped due to out-of-funds // General tx metrics invalidTxCounter = metrics.NewCounter("txpool/invalid") @@ -374,7 +399,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { } // Last but not least check for nonce errors if currentState.GetNonce(from) > tx.Nonce() { - return ErrNonce + return ErrNonceTooLow } // Check the transaction doesn't exceed the current @@ -395,12 +420,15 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { return ErrInsufficientFunds } - intrGas := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead) if tx.Gas().Cmp(intrGas) < 0 { return ErrIntrinsicGas } + // Heuristic limit, reject transactions over 32KB to prevent DOS attacks + if tx.Size() > 32*1024 { + return ErrOversizedData + } return nil } @@ -638,8 +666,9 @@ func (pool *TxPool) removeTx(hash common.Hash) { } // Update the account nonce if needed if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce { - pool.pendingState.SetNonce(addr, tx.Nonce()) + pool.pendingState.SetNonce(addr, nonce) } + return } } // Transaction is in the future queue @@ -696,10 +725,10 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A // Drop all transactions over the allowed limit for _, tx := range list.Cap(int(pool.config.AccountQueue)) { hash := tx.Hash() - log.Trace("Removed cap-exceeding queued transaction", "hash", hash) delete(pool.all, hash) pool.priced.Removed() - queuedRLCounter.Inc(1) + queuedRateLimitCounter.Inc(1) + log.Trace("Removed cap-exceeding queued transaction", "hash", hash) } queued += uint64(list.Len()) @@ -745,7 +774,18 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold { for i := 0; i < len(offenders)-1; i++ { list := pool.pending[offenders[i]] - list.Cap(list.Len() - 1) + for _, tx := range list.Cap(list.Len() - 1) { + // Drop the transaction from the global pools too + hash := tx.Hash() + delete(pool.all, hash) + pool.priced.Removed() + + // Update the account nonce to the dropped transaction + if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce { + pool.pendingState.SetNonce(offenders[i], nonce) + } + log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) + } pending-- } } @@ -756,12 +796,23 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots { for _, addr := range offenders { list := pool.pending[addr] - list.Cap(list.Len() - 1) + for _, tx := range list.Cap(list.Len() - 1) { + // Drop the transaction from the global pools too + hash := tx.Hash() + delete(pool.all, hash) + pool.priced.Removed() + + // Update the account nonce to the dropped transaction + if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce { + pool.pendingState.SetNonce(addr, nonce) + } + log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) + } pending-- } } } - pendingRLCounter.Inc(int64(pendingBeforeCap - pending)) + pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending)) } // If we've queued more transactions than the hard limit, drop oldest ones if queued > pool.config.GlobalQueue { @@ -785,7 +836,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A pool.removeTx(tx.Hash()) } drop -= size - queuedRLCounter.Inc(int64(size)) + queuedRateLimitCounter.Inc(int64(size)) continue } // Otherwise drop only last few transactions @@ -793,7 +844,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A for i := len(txs) - 1; i >= 0 && drop > 0; i-- { pool.removeTx(txs[i].Hash()) drop-- - queuedRLCounter.Inc(1) + queuedRateLimitCounter.Inc(1) } } } diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 94b07170d..4e28522e9 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -18,6 +18,7 @@ package core import ( "crypto/ecdsa" + "fmt" "math/big" "math/rand" "testing" @@ -52,6 +53,35 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { return newPool, key } +// validateTxPoolInternals checks various consistency invariants within the pool. +func validateTxPoolInternals(pool *TxPool) error { + pool.mu.RLock() + defer pool.mu.RUnlock() + + // Ensure the total transaction set is consistent with pending + queued + pending, queued := pool.stats() + if total := len(pool.all); total != pending+queued { + return fmt.Errorf("total transaction count %d != %d pending + %d queued", total, pending, queued) + } + if priced := pool.priced.items.Len() - pool.priced.stales; priced != pending+queued { + return fmt.Errorf("total priced transaction count %d != %d pending + %d queued", priced, pending, queued) + } + // Ensure the next nonce to assign is the correct one + for addr, txs := range pool.pending { + // Find the last transaction + var last uint64 + for nonce, _ := range txs.txs.items { + if last < nonce { + last = nonce + } + } + if nonce := pool.pendingState.GetNonce(addr); nonce != last+1 { + return fmt.Errorf("pending nonce mismatch: have %v, want %v", nonce, last+1) + } + } + return nil +} + func deriveSender(tx *types.Transaction) (common.Address, error) { return types.Sender(types.HomesteadSigner{}, tx) } @@ -150,8 +180,8 @@ func TestInvalidTransactions(t *testing.T) { currentState.SetNonce(from, 1) currentState.AddBalance(from, big.NewInt(0xffffffffffffff)) tx = transaction(0, big.NewInt(100000), key) - if err := pool.Add(tx); err != ErrNonce { - t.Error("expected", ErrNonce) + if err := pool.Add(tx); err != ErrNonceTooLow { + t.Error("expected", ErrNonceTooLow) } tx = transaction(1, big.NewInt(100000), key) @@ -218,20 +248,25 @@ func TestTransactionQueue(t *testing.T) { func TestRemoveTx(t *testing.T) { pool, key := setupTxPool() - tx := transaction(0, big.NewInt(100), key) - from, _ := deriveSender(tx) + addr := crypto.PubkeyToAddress(key.PublicKey) currentState, _ := pool.currentState() - currentState.AddBalance(from, big.NewInt(1)) + currentState.AddBalance(addr, big.NewInt(1)) + + tx1 := transaction(0, big.NewInt(100), key) + tx2 := transaction(2, big.NewInt(100), key) + + pool.promoteTx(addr, tx1.Hash(), tx1) + pool.enqueueTx(tx2.Hash(), tx2) - pool.enqueueTx(tx.Hash(), tx) - pool.promoteTx(from, tx.Hash(), tx) if len(pool.queue) != 1 { t.Error("expected queue to be 1, got", len(pool.queue)) } if len(pool.pending) != 1 { t.Error("expected pending to be 1, got", len(pool.pending)) } - pool.Remove(tx.Hash()) + pool.Remove(tx1.Hash()) + pool.Remove(tx2.Hash()) + if len(pool.queue) > 0 { t.Error("expected queue to be 0, got", len(pool.queue)) } @@ -404,10 +439,10 @@ func TestTransactionDropping(t *testing.T) { ) pool.promoteTx(account, tx0.Hash(), tx0) pool.promoteTx(account, tx1.Hash(), tx1) - pool.promoteTx(account, tx1.Hash(), tx2) + pool.promoteTx(account, tx2.Hash(), tx2) pool.enqueueTx(tx10.Hash(), tx10) pool.enqueueTx(tx11.Hash(), tx11) - pool.enqueueTx(tx11.Hash(), tx12) + pool.enqueueTx(tx12.Hash(), tx12) // Check that pre and post validations leave the pool as is if pool.pending[account].Len() != 3 { @@ -416,8 +451,8 @@ func TestTransactionDropping(t *testing.T) { if pool.queue[account].Len() != 3 { t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3) } - if len(pool.all) != 4 { - t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4) + if len(pool.all) != 6 { + t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6) } pool.resetState() if pool.pending[account].Len() != 3 { @@ -426,8 +461,8 @@ func TestTransactionDropping(t *testing.T) { if pool.queue[account].Len() != 3 { t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3) } - if len(pool.all) != 4 { - t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4) + if len(pool.all) != 6 { + t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6) } // Reduce the balance of the account, and check that invalidated transactions are dropped state.AddBalance(account, big.NewInt(-650)) @@ -730,6 +765,12 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { if len(pool1.all) != len(pool2.all) { t.Errorf("total transaction count mismatch: one-by-one algo %d, batch algo %d", len(pool1.all), len(pool2.all)) } + if err := validateTxPoolInternals(pool1); err != nil { + t.Errorf("pool 1 internal state corrupted: %v", err) + } + if err := validateTxPoolInternals(pool2); err != nil { + t.Errorf("pool 2 internal state corrupted: %v", err) + } } // Tests that if the transaction count belonging to multiple accounts go above @@ -776,6 +817,45 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { if pending > int(DefaultTxPoolConfig.GlobalSlots) { t.Fatalf("total pending transactions overflow allowance: %d > %d", pending, DefaultTxPoolConfig.GlobalSlots) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } +} + +// Tests that if transactions start being capped, transasctions are also removed from 'all' +func TestTransactionCapClearsFromAll(t *testing.T) { + // Reduce the queue limits to shorten test time + defer func(old uint64) { DefaultTxPoolConfig.AccountSlots = old }(DefaultTxPoolConfig.AccountSlots) + defer func(old uint64) { DefaultTxPoolConfig.AccountQueue = old }(DefaultTxPoolConfig.AccountQueue) + defer func(old uint64) { DefaultTxPoolConfig.GlobalSlots = old }(DefaultTxPoolConfig.GlobalSlots) + + DefaultTxPoolConfig.AccountSlots = 2 + DefaultTxPoolConfig.AccountQueue = 2 + DefaultTxPoolConfig.GlobalSlots = 8 + + // Create the pool to test the limit enforcement with + db, _ := ethdb.NewMemDatabase() + statedb, _ := state.New(common.Hash{}, db) + + pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool.resetState() + + // Create a number of test accounts and fund them + state, _ := pool.currentState() + + key, _ := crypto.GenerateKey() + addr := crypto.PubkeyToAddress(key.PublicKey) + state.AddBalance(addr, big.NewInt(1000000)) + + txs := types.Transactions{} + for j := 0; j < int(DefaultTxPoolConfig.GlobalSlots)*2; j++ { + txs = append(txs, transaction(uint64(j), big.NewInt(100000), key)) + } + // Import the batch and verify that limits have been enforced + pool.AddBatch(txs) + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } } // Tests that if the transaction count belonging to multiple accounts go above @@ -820,6 +900,9 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { t.Errorf("addr %x: total pending transactions mismatch: have %d, want %d", addr, list.Len(), DefaultTxPoolConfig.AccountSlots) } } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } } // Tests that setting the transaction pool gas price to a higher value correctly @@ -867,6 +950,9 @@ func TestTransactionPoolRepricing(t *testing.T) { if queued != 3 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } // Reprice the pool and check that underpriced transactions get dropped pool.SetGasPrice(big.NewInt(2)) @@ -877,6 +963,9 @@ func TestTransactionPoolRepricing(t *testing.T) { if queued != 3 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } // Check that we can't add the old transactions back if err := pool.Add(pricedTransaction(1, big.NewInt(100000), big.NewInt(1), keys[0])); err != ErrUnderpriced { t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced) @@ -884,6 +973,9 @@ func TestTransactionPoolRepricing(t *testing.T) { if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), keys[1])); err != ErrUnderpriced { t.Fatalf("adding underpriced queued transaction error mismatch: have %v, want %v", err, ErrUnderpriced) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } // However we can add local underpriced transactions tx := pricedTransaction(1, big.NewInt(100000), big.NewInt(1), keys[2]) @@ -894,6 +986,9 @@ func TestTransactionPoolRepricing(t *testing.T) { if pending, _ = pool.stats(); pending != 3 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } } // Tests that when the pool reaches its global transaction limit, underpriced @@ -945,6 +1040,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) { if queued != 1 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } // Ensure that adding an underpriced transaction on block limit fails if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), keys[1])); err != ErrUnderpriced { t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced) @@ -966,6 +1064,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) { if queued != 2 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } // Ensure that adding local transactions can push out even higher priced ones tx := pricedTransaction(1, big.NewInt(100000), big.NewInt(0), keys[2]) @@ -980,6 +1081,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) { if queued != 2 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } } // Tests that the pool rejects replacement transactions that don't meet the minimum @@ -1041,6 +1145,9 @@ func TestTransactionReplacement(t *testing.T) { if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(threshold+1), key)); err != nil { t.Fatalf("failed to replace original queued transaction: %v", err) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } } // Benchmarks the speed of validating the contents of the pending queue of the diff --git a/core/vm/gen_structlog.go b/core/vm/gen_structlog.go index 1c86b2256..88df942dc 100644 --- a/core/vm/gen_structlog.go +++ b/core/vm/gen_structlog.go @@ -18,12 +18,12 @@ func (s StructLog) MarshalJSON() ([]byte, error) { Gas math.HexOrDecimal64 `json:"gas"` GasCost math.HexOrDecimal64 `json:"gasCost"` Memory hexutil.Bytes `json:"memory"` + MemorySize int `json:"memSize"` Stack []*math.HexOrDecimal256 `json:"stack"` Storage map[common.Hash]common.Hash `json:"-"` Depth int `json:"depth"` Err error `json:"error"` OpName string `json:"opName"` - MemorySize int `json:"memSize"` } var enc StructLog enc.Pc = s.Pc @@ -31,6 +31,7 @@ func (s StructLog) MarshalJSON() ([]byte, error) { enc.Gas = math.HexOrDecimal64(s.Gas) enc.GasCost = math.HexOrDecimal64(s.GasCost) enc.Memory = s.Memory + enc.MemorySize = s.MemorySize if s.Stack != nil { enc.Stack = make([]*math.HexOrDecimal256, len(s.Stack)) for k, v := range s.Stack { @@ -41,21 +42,21 @@ func (s StructLog) MarshalJSON() ([]byte, error) { enc.Depth = s.Depth enc.Err = s.Err enc.OpName = s.OpName() - enc.MemorySize = s.MemorySize() return json.Marshal(&enc) } func (s *StructLog) UnmarshalJSON(input []byte) error { type StructLog struct { - Pc *uint64 `json:"pc"` - Op *OpCode `json:"op"` - Gas *math.HexOrDecimal64 `json:"gas"` - GasCost *math.HexOrDecimal64 `json:"gasCost"` - Memory hexutil.Bytes `json:"memory"` - Stack []*math.HexOrDecimal256 `json:"stack"` - Storage map[common.Hash]common.Hash `json:"-"` - Depth *int `json:"depth"` - Err *error `json:"error"` + Pc *uint64 `json:"pc"` + Op *OpCode `json:"op"` + Gas *math.HexOrDecimal64 `json:"gas"` + GasCost *math.HexOrDecimal64 `json:"gasCost"` + Memory hexutil.Bytes `json:"memory"` + MemorySize *int `json:"memSize"` + Stack []*math.HexOrDecimal256 `json:"stack"` + Storage map[common.Hash]common.Hash `json:"-"` + Depth *int `json:"depth"` + Err *error `json:"error"` } var dec StructLog if err := json.Unmarshal(input, &dec); err != nil { @@ -76,6 +77,9 @@ func (s *StructLog) UnmarshalJSON(input []byte) error { if dec.Memory != nil { s.Memory = dec.Memory } + if dec.MemorySize != nil { + s.MemorySize = *dec.MemorySize + } if dec.Stack != nil { s.Stack = make([]*big.Int, len(dec.Stack)) for k, v := range dec.Stack { diff --git a/core/vm/logger.go b/core/vm/logger.go index 405ab169c..17a9c9ec3 100644 --- a/core/vm/logger.go +++ b/core/vm/logger.go @@ -54,35 +54,31 @@ type LogConfig struct { // StructLog is emitted to the EVM each cycle and lists information about the current internal state // prior to the execution of the statement. type StructLog struct { - Pc uint64 `json:"pc"` - Op OpCode `json:"op"` - Gas uint64 `json:"gas"` - GasCost uint64 `json:"gasCost"` - Memory []byte `json:"memory"` - Stack []*big.Int `json:"stack"` - Storage map[common.Hash]common.Hash `json:"-"` - Depth int `json:"depth"` - Err error `json:"error"` + Pc uint64 `json:"pc"` + Op OpCode `json:"op"` + Gas uint64 `json:"gas"` + GasCost uint64 `json:"gasCost"` + Memory []byte `json:"memory"` + MemorySize int `json:"memSize"` + Stack []*big.Int `json:"stack"` + Storage map[common.Hash]common.Hash `json:"-"` + Depth int `json:"depth"` + Err error `json:"error"` } // overrides for gencodec type structLogMarshaling struct { - Stack []*math.HexOrDecimal256 - Gas math.HexOrDecimal64 - GasCost math.HexOrDecimal64 - Memory hexutil.Bytes - OpName string `json:"opName"` - MemorySize int `json:"memSize"` + Stack []*math.HexOrDecimal256 + Gas math.HexOrDecimal64 + GasCost math.HexOrDecimal64 + Memory hexutil.Bytes + OpName string `json:"opName"` } func (s *StructLog) OpName() string { return s.Op.String() } -func (s *StructLog) MemorySize() int { - return len(s.Memory) -} - // Tracer is used to collect execution traces from an EVM transaction // execution. CaptureState is called for each step of the VM with the // current VM state. @@ -181,7 +177,7 @@ func (l *StructLogger) CaptureState(env *EVM, pc uint64, op OpCode, gas, cost ui } } // create a new snaptshot of the EVM. - log := StructLog{pc, op, gas, cost, mem, stck, storage, env.depth, err} + log := StructLog{pc, op, gas, cost, mem, memory.Len(), stck, storage, depth, err} l.logs = append(l.logs, log) return nil diff --git a/eth/backend.go b/eth/backend.go index be2d03283..75e0e737b 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -200,10 +200,13 @@ func makeExtraData(extra []byte) []byte { // CreateDB creates the chain database. func CreateDB(ctx *node.ServiceContext, config *Config, name string) (ethdb.Database, error) { db, err := ctx.OpenDatabase(name, config.DatabaseCache, config.DatabaseHandles) + if err != nil { + return nil, err + } if db, ok := db.(*ethdb.LDBDatabase); ok { db.Meter("eth/db/chaindata/") } - return db, err + return db, nil } // CreateConsensusEngine creates the required type of consensus engine instance for an Ethereum service diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 839969f03..e4d1392d0 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -34,7 +34,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/trie" "github.com/rcrowley/go-metrics" ) @@ -99,8 +98,9 @@ type Downloader struct { mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) mux *event.TypeMux // Event multiplexer to announce sync operation events - queue *queue // Scheduler for selecting the hashes to download - peers *peerSet // Set of active peers from which download can proceed + queue *queue // Scheduler for selecting the hashes to download + peers *peerSet // Set of active peers from which download can proceed + stateDB ethdb.Database fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries) fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section @@ -109,9 +109,9 @@ type Downloader struct { rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) // Statistics - syncStatsChainOrigin uint64 // Origin block number where syncing started at - syncStatsChainHeight uint64 // Highest block number known when syncing started - syncStatsStateDone uint64 // Number of state trie entries already pulled + syncStatsChainOrigin uint64 // Origin block number where syncing started at + syncStatsChainHeight uint64 // Highest block number known when syncing started + syncStatsState stateSyncStats syncStatsLock sync.RWMutex // Lock protecting the sync stats fields // Callbacks @@ -136,16 +136,18 @@ type Downloader struct { notified int32 // Channels - newPeerCh chan *peer headerCh chan dataPack // [eth/62] Channel receiving inbound block headers bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts - stateCh chan dataPack // [eth/63] Channel receiving inbound node state data bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks - stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks + // for stateFetcher + stateSyncStart chan *stateSync + trackStateReq chan *stateReq + stateCh chan dataPack // [eth/63] Channel receiving inbound node state data + // Cancellation and termination cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop) cancelCh chan struct{} // Channel to cancel mid-flight syncs @@ -170,8 +172,9 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he dl := &Downloader{ mode: mode, mux: mux, - queue: newQueue(stateDb), + queue: newQueue(), peers: newPeerSet(), + stateDB: stateDb, rttEstimate: uint64(rttMaxEstimate), rttConfidence: uint64(1000000), hasHeader: hasHeader, @@ -188,18 +191,20 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he insertReceipts: insertReceipts, rollback: rollback, dropPeer: dropPeer, - newPeerCh: make(chan *peer, 1), headerCh: make(chan dataPack, 1), bodyCh: make(chan dataPack, 1), receiptCh: make(chan dataPack, 1), - stateCh: make(chan dataPack, 1), bodyWakeCh: make(chan bool, 1), receiptWakeCh: make(chan bool, 1), - stateWakeCh: make(chan bool, 1), headerProcCh: make(chan []*types.Header, 1), quitCh: make(chan struct{}), + // for stateFetcher + stateSyncStart: make(chan *stateSync), + trackStateReq: make(chan *stateReq), + stateCh: make(chan dataPack), } go dl.qosTuner() + go dl.stateFetcher() return dl } @@ -211,9 +216,6 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he // of processed and the total number of known states are also returned. Otherwise // these are zero. func (d *Downloader) Progress() ethereum.SyncProgress { - // Fetch the pending state count outside of the lock to prevent unforeseen deadlocks - pendingStates := uint64(d.queue.PendingNodeData()) - // Lock the current stats and return the progress d.syncStatsLock.RLock() defer d.syncStatsLock.RUnlock() @@ -231,8 +233,8 @@ func (d *Downloader) Progress() ethereum.SyncProgress { StartingBlock: d.syncStatsChainOrigin, CurrentBlock: current, HighestBlock: d.syncStatsChainHeight, - PulledStates: d.syncStatsStateDone, - KnownStates: d.syncStatsStateDone + pendingStates, + PulledStates: d.syncStatsState.processed, + KnownStates: d.syncStatsState.processed + d.syncStatsState.pending, } } @@ -324,13 +326,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode d.queue.Reset() d.peers.Reset() - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case <-ch: default: } } - for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} { + for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} { for empty := false; !empty; { select { case <-ch: @@ -439,30 +441,40 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e if d.syncInitHook != nil { d.syncInitHook(origin, height) } - return d.spawnSync(origin+1, - func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved - func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved - func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync - func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync - func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync - ) + + fetchers := []func() error{ + func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.processHeaders(origin+1, td) }, + } + if d.mode == FastSync { + fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) + } else if d.mode == FullSync { + fetchers = append(fetchers, d.processFullSyncContent) + } + err = d.spawnSync(fetchers) + if err != nil && d.mode == FastSync && d.fsPivotLock != nil { + // If sync failed in the critical section, bump the fail counter. + atomic.AddUint32(&d.fsPivotFails, 1) + } + return err } // spawnSync runs d.process and all given fetcher functions to completion in // separate goroutines, returning the first error that appears. -func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { +func (d *Downloader) spawnSync(fetchers []func() error) error { var wg sync.WaitGroup - errc := make(chan error, len(fetchers)+1) - wg.Add(len(fetchers) + 1) - go func() { defer wg.Done(); errc <- d.processContent() }() + errc := make(chan error, len(fetchers)) + wg.Add(len(fetchers)) for _, fn := range fetchers { fn := fn go func() { defer wg.Done(); errc <- fn() }() } // Wait for the first error, then terminate the others. var err error - for i := 0; i < len(fetchers)+1; i++ { - if i == len(fetchers) { + for i := 0; i < len(fetchers); i++ { + if i == len(fetchers)-1 { // Close the queue when all fetchers have exited. // This will cause the block processor to end when // it has processed the queue. @@ -475,11 +487,6 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { d.queue.Close() d.Cancel() wg.Wait() - - // If sync failed in the critical section, bump the fail counter - if err != nil && d.mode == FastSync && d.fsPivotLock != nil { - atomic.AddUint32(&d.fsPivotFails, 1) - } return err } @@ -552,7 +559,6 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { return nil, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -649,7 +655,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { return 0, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -714,7 +719,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { return 0, errTimeout case <-d.bodyCh: - case <-d.stateCh: case <-d.receiptCh: // Out of bounds delivery, ignore } @@ -827,7 +831,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { d.dropPeer(p.id) // Finish the sync gracefully instead of dumping the gathered data though - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -927,68 +931,6 @@ func (d *Downloader) fetchReceipts(from uint64) error { return err } -// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any -// available peers, reserving a chunk of nodes for each, waiting for delivery and -// also periodically checking for timeouts. -func (d *Downloader) fetchNodeData() error { - log.Debug("Downloading node state data") - - var ( - deliver = func(packet dataPack) (int, error) { - start := time.Now() - return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) { - // If the peer returned old-requested data, forgive - if err == trie.ErrNotRequested { - log.Debug("Forgiving reply to stale state request", "peer", packet.PeerId()) - return - } - if err != nil { - // If the node data processing failed, the root hash is very wrong, abort - log.Error("State processing failed", "peer", packet.PeerId(), "err", err) - d.Cancel() - return - } - // Processing succeeded, notify state fetcher of continuation - pending := d.queue.PendingNodeData() - if pending > 0 { - select { - case d.stateWakeCh <- true: - default: - } - } - d.syncStatsLock.Lock() - d.syncStatsStateDone += uint64(delivered) - syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below - d.syncStatsLock.Unlock() - - // If real database progress was made, reset any fast-sync pivot failure - if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 { - log.Debug("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&d.fsPivotFails)) - atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block - } - // Log a message to the user and return - if delivered > 0 { - log.Info("Imported new state entries", "count", delivered, "elapsed", common.PrettyDuration(time.Since(start)), "processed", syncStatsStateDone, "pending", pending) - } - }) - } - expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) } - throttle = func() bool { return false } - reserve = func(p *peer, count int) (*fetchRequest, bool, error) { - return d.queue.ReserveNodeData(p, count), false, nil - } - fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } - capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) } - setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) } - ) - err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire, - d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch, - d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "states") - - log.Debug("Node state data download terminated", "err", err) - return err -} - // fetchParts iteratively downloads scheduled block parts, taking any available // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. @@ -1229,7 +1171,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // Terminate header processing if we synced up if len(headers) == 0 { // Notify everyone that headers are fully processed - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -1341,7 +1283,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { origin += uint64(limit) } // Signal the content downloaders of the availablility of new tasks - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- true: default: @@ -1351,71 +1293,151 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { } } -// processContent takes fetch results from the queue and tries to import them -// into the chain. The type of import operation will depend on the result contents. -func (d *Downloader) processContent() error { - pivot := d.queue.FastSyncPivot() +// processFullSyncContent takes fetch results from the queue and imports them into the chain. +func (d *Downloader) processFullSyncContent() error { for { results := d.queue.WaitResults() if len(results) == 0 { - return nil // queue empty + return nil } if d.chainInsertHook != nil { d.chainInsertHook(results) } - // Actually import the blocks - first, last := results[0].Header, results[len(results)-1].Header + if err := d.importBlockResults(results); err != nil { + return err + } + } +} + +func (d *Downloader) importBlockResults(results []*fetchResult) error { + for len(results) != 0 { + // Check for any termination requests. This makes clean shutdown faster. + select { + case <-d.quitCh: + return errCancelContentProcessing + default: + } + // Retrieve the a batch of results to import + items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) + first, last := results[0].Header, results[items-1].Header log.Debug("Inserting downloaded chain", "items", len(results), "firstnum", first.Number, "firsthash", first.Hash(), "lastnum", last.Number, "lasthash", last.Hash(), ) - for len(results) != 0 { - // Check for any termination requests - select { - case <-d.quitCh: - return errCancelContentProcessing - default: - } - // Retrieve the a batch of results to import - var ( - blocks = make([]*types.Block, 0, maxResultsProcess) - receipts = make([]types.Receipts, 0, maxResultsProcess) - ) - items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) - for _, result := range results[:items] { - switch { - case d.mode == FullSync: - blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) - case d.mode == FastSync: - blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) - if result.Header.Number.Uint64() <= pivot { - receipts = append(receipts, result.Receipts) - } - } - } - // Try to process the results, aborting if there's an error - var ( - err error - index int - ) - switch { - case len(receipts) > 0: - index, err = d.insertReceipts(blocks, receipts) - if err == nil && blocks[len(blocks)-1].NumberU64() == pivot { - log.Debug("Committing block as new head", "number", blocks[len(blocks)-1].Number(), "hash", blocks[len(blocks)-1].Hash()) - index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash()) - } - default: - index, err = d.insertBlocks(blocks) + blocks := make([]*types.Block, items) + for i, result := range results[:items] { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + } + if index, err := d.insertBlocks(blocks); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain + } + // Shift the results to the next batch + results = results[items:] + } + return nil +} + +// processFastSyncContent takes fetch results from the queue and writes them to the +// database. It also controls the synchronisation of state nodes of the pivot block. +func (d *Downloader) processFastSyncContent(latest *types.Header) error { + // Start syncing state of the reported head block. + // This should get us most of the state of the pivot block. + stateSync := d.syncState(latest.Root) + defer stateSync.Cancel() + go func() { + if err := stateSync.Wait(); err != nil { + d.queue.Close() // wake up WaitResults + } + }() + + pivot := d.queue.FastSyncPivot() + for { + results := d.queue.WaitResults() + if len(results) == 0 { + return stateSync.Cancel() + } + if d.chainInsertHook != nil { + d.chainInsertHook(results) + } + P, beforeP, afterP := splitAroundPivot(pivot, results) + if err := d.commitFastSyncData(beforeP, stateSync); err != nil { + return err + } + if P != nil { + stateSync.Cancel() + if err := d.commitPivotBlock(P); err != nil { + return err } - if err != nil { - log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) - return errInvalidChain + } + if err := d.importBlockResults(afterP); err != nil { + return err + } + } +} + +func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) { + for _, result := range results { + num := result.Header.Number.Uint64() + switch { + case num < pivot: + before = append(before, result) + case num == pivot: + p = result + default: + after = append(after, result) + } + } + return p, before, after +} + +func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error { + for len(results) != 0 { + // Check for any termination requests. + select { + case <-d.quitCh: + return errCancelContentProcessing + case <-stateSync.done: + if err := stateSync.Wait(); err != nil { + return err } - // Shift the results to the next batch - results = results[items:] + default: + } + // Retrieve the a batch of results to import + items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) + first, last := results[0].Header, results[items-1].Header + log.Debug("Inserting fast-sync blocks", "items", len(results), + "firstnum", first.Number, "firsthash", first.Hash(), + "lastnumn", last.Number, "lasthash", last.Hash(), + ) + blocks := make([]*types.Block, items) + receipts := make([]types.Receipts, items) + for i, result := range results[:items] { + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + receipts[i] = result.Receipts + } + if index, err := d.insertReceipts(blocks, receipts); err != nil { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + return errInvalidChain } + // Shift the results to the next batch + results = results[items:] + } + return nil +} + +func (d *Downloader) commitPivotBlock(result *fetchResult) error { + b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) + // Sync the pivot block state. This should complete reasonably quickly because + // we've already synced up to the reported head block state earlier. + if err := d.syncState(b.Root()).Wait(); err != nil { + return err + } + log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash()) + if _, err := d.insertReceipts([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil { + return err } + return d.commitHeadBlock(b.Hash()) } // DeliverHeaders injects a new batch of block headers received from a remote diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go index 0d76c7dfd..58764ccf0 100644 --- a/eth/downloader/metrics.go +++ b/eth/downloader/metrics.go @@ -38,8 +38,6 @@ var ( receiptDropMeter = metrics.NewMeter("eth/downloader/receipts/drop") receiptTimeoutMeter = metrics.NewMeter("eth/downloader/receipts/timeout") - stateInMeter = metrics.NewMeter("eth/downloader/states/in") - stateReqTimer = metrics.NewTimer("eth/downloader/states/req") - stateDropMeter = metrics.NewMeter("eth/downloader/states/drop") - stateTimeoutMeter = metrics.NewMeter("eth/downloader/states/timeout") + stateInMeter = metrics.NewMeter("eth/downloader/states/in") + stateDropMeter = metrics.NewMeter("eth/downloader/states/drop") ) diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 15a912f1f..dc8b09772 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -30,6 +30,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" ) @@ -195,7 +196,7 @@ func (p *peer) FetchReceipts(request *fetchRequest) error { } // FetchNodeData sends a node state data retrieval request to the remote peer. -func (p *peer) FetchNodeData(request *fetchRequest) error { +func (p *peer) FetchNodeData(hashes []common.Hash) error { // Sanity check the protocol version if p.version < 63 { panic(fmt.Sprintf("node data fetch [eth/63+] requested on eth/%d", p.version)) @@ -205,14 +206,7 @@ func (p *peer) FetchNodeData(request *fetchRequest) error { return errAlreadyFetching } p.stateStarted = time.Now() - - // Convert the hash set to a retrievable slice - hashes := make([]common.Hash, 0, len(request.Hashes)) - for hash := range request.Hashes { - hashes = append(hashes, hash) - } go p.getNodeData(hashes) - return nil } @@ -343,8 +337,9 @@ func (p *peer) Lacks(hash common.Hash) bool { // peerSet represents the collection of active peer participating in the chain // download procedure. type peerSet struct { - peers map[string]*peer - lock sync.RWMutex + peers map[string]*peer + newPeerFeed event.Feed + lock sync.RWMutex } // newPeerSet creates a new peer set top track the active download sources. @@ -354,6 +349,10 @@ func newPeerSet() *peerSet { } } +func (ps *peerSet) SubscribeNewPeers(ch chan<- *peer) event.Subscription { + return ps.newPeerFeed.Subscribe(ch) +} + // Reset iterates over the current peer set, and resets each of the known peers // to prepare for a next batch of block retrieval. func (ps *peerSet) Reset() { @@ -377,9 +376,8 @@ func (ps *peerSet) Register(p *peer) error { // Register the new peer with some meaningful defaults ps.lock.Lock() - defer ps.lock.Unlock() - if _, ok := ps.peers[p.id]; ok { + ps.lock.Unlock() return errAlreadyRegistered } if len(ps.peers) > 0 { @@ -399,6 +397,9 @@ func (ps *peerSet) Register(p *peer) error { p.stateThroughput /= float64(len(ps.peers)) } ps.peers[p.id] = p + ps.lock.Unlock() + + ps.newPeerFeed.Send(p) return nil } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 855097c45..8a7735d67 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -26,20 +26,13 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/trie" "github.com/rcrowley/go-metrics" "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) -var ( - blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download - maxInFlightStates = 8192 // Maximum number of state downloads to allow concurrently -) +var blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download var ( errNoFetchesPending = errors.New("no fetches pending") @@ -94,15 +87,6 @@ type queue struct { receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches - stateTaskIndex int // [eth/63] Counter indexing the added hashes to ensure prioritised retrieval order - stateTaskPool map[common.Hash]int // [eth/63] Pending node data retrieval tasks, mapping to their priority - stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for - statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations - - stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly - stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator - stateWriters int // [eth/63] Number of running state DB writer goroutines - resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultOffset uint64 // Offset of the first cached fetch result in the block chain @@ -112,7 +96,7 @@ type queue struct { } // newQueue creates a new download queue for scheduling block retrieval. -func newQueue(stateDb ethdb.Database) *queue { +func newQueue() *queue { lock := new(sync.Mutex) return &queue{ headerPendPool: make(map[string]*fetchRequest), @@ -125,10 +109,6 @@ func newQueue(stateDb ethdb.Database) *queue { receiptTaskQueue: prque.New(), receiptPendPool: make(map[string]*fetchRequest), receiptDonePool: make(map[common.Hash]struct{}), - stateTaskPool: make(map[common.Hash]int), - stateTaskQueue: prque.New(), - statePendPool: make(map[string]*fetchRequest), - stateDatabase: stateDb, resultCache: make([]*fetchResult, blockCacheLimit), active: sync.NewCond(lock), lock: lock, @@ -158,12 +138,6 @@ func (q *queue) Reset() { q.receiptPendPool = make(map[string]*fetchRequest) q.receiptDonePool = make(map[common.Hash]struct{}) - q.stateTaskIndex = 0 - q.stateTaskPool = make(map[common.Hash]int) - q.stateTaskQueue.Reset() - q.statePendPool = make(map[string]*fetchRequest) - q.stateScheduler = nil - q.resultCache = make([]*fetchResult, blockCacheLimit) q.resultOffset = 0 } @@ -201,28 +175,6 @@ func (q *queue) PendingReceipts() int { return q.receiptTaskQueue.Size() } -// PendingNodeData retrieves the number of node data entries pending for retrieval. -func (q *queue) PendingNodeData() int { - q.lock.Lock() - defer q.lock.Unlock() - - return q.pendingNodeDataLocked() -} - -// pendingNodeDataLocked retrieves the number of node data entries pending for retrieval. -// The caller must hold q.lock. -func (q *queue) pendingNodeDataLocked() int { - var n int - if q.stateScheduler != nil { - n = q.stateScheduler.Pending() - } - // Ensure that PendingNodeData doesn't return 0 until all state is written. - if q.stateWriters > 0 { - n++ - } - return n -} - // InFlightHeaders retrieves whether there are header fetch requests currently // in flight. func (q *queue) InFlightHeaders() bool { @@ -250,28 +202,15 @@ func (q *queue) InFlightReceipts() bool { return len(q.receiptPendPool) > 0 } -// InFlightNodeData retrieves whether there are node data entry fetch requests -// currently in flight. -func (q *queue) InFlightNodeData() bool { - q.lock.Lock() - defer q.lock.Unlock() - - return len(q.statePendPool)+q.stateWriters > 0 -} - -// Idle returns if the queue is fully idle or has some data still inside. This -// method is used by the tester to detect termination events. +// Idle returns if the queue is fully idle or has some data still inside. func (q *queue) Idle() bool { q.lock.Lock() defer q.lock.Unlock() - queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size() - pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) + queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + pending := len(q.blockPendPool) + len(q.receiptPendPool) cached := len(q.blockDonePool) + len(q.receiptDonePool) - if q.stateScheduler != nil { - queued += q.stateScheduler.Pending() - } return (queued + pending + cached) == 0 } @@ -389,19 +328,6 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { q.receiptTaskPool[hash] = header q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64())) } - if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot { - // Pivoting point of the fast sync, switch the state retrieval to this - log.Debug("Switching state downloads to new block", "number", header.Number, "hash", hash) - - q.stateTaskIndex = 0 - q.stateTaskPool = make(map[common.Hash]int) - q.stateTaskQueue.Reset() - for _, req := range q.statePendPool { - req.Hashes = make(map[common.Hash]int) // Make sure executing requests fail, but don't disappear - } - - q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase) - } inserts = append(inserts, header) q.headerHead = hash from++ @@ -448,31 +374,15 @@ func (q *queue) countProcessableItems() int { if result == nil || result.Pending > 0 { return i } - // Special handling for the fast-sync pivot block: - if q.mode == FastSync { - bnum := result.Header.Number.Uint64() - if bnum == q.fastSyncPivot { - // If the state of the pivot block is not - // available yet, we cannot proceed and return 0. - // - // Stop before processing the pivot block to ensure that - // resultCache has space for fsHeaderForceVerify items. Not - // doing this could leave us unable to download the required - // amount of headers. - if i > 0 || len(q.stateTaskPool) > 0 || q.pendingNodeDataLocked() > 0 { + // Stop before processing the pivot block to ensure that + // resultCache has space for fsHeaderForceVerify items. Not + // doing this could leave us unable to download the required + // amount of headers. + if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot { + for j := 0; j < fsHeaderForceVerify; j++ { + if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { return i } - for j := 0; j < fsHeaderForceVerify; j++ { - if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { - return i - } - } - } - // If we're just the fast sync pivot, stop as well - // because the following batch needs different insertion. - // This simplifies handling the switchover in d.process. - if bnum == q.fastSyncPivot+1 && i > 0 { - return i } } } @@ -519,81 +429,6 @@ func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest { return request } -// ReserveNodeData reserves a set of node data hashes for the given peer, skipping -// any previously failed download. -func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest { - // Create a task generator to fetch status-fetch tasks if all schedules ones are done - generator := func(max int) { - if q.stateScheduler != nil { - for _, hash := range q.stateScheduler.Missing(max) { - q.stateTaskPool[hash] = q.stateTaskIndex - q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex)) - q.stateTaskIndex++ - } - } - } - q.lock.Lock() - defer q.lock.Unlock() - - return q.reserveHashes(p, count, q.stateTaskQueue, generator, q.statePendPool, maxInFlightStates) -} - -// reserveHashes reserves a set of hashes for the given peer, skipping previously -// failed ones. -// -// Note, this method expects the queue lock to be already held for writing. The -// reason the lock is not obtained in here is because the parameters already need -// to access the queue, so they already need a lock anyway. -func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGen func(int), pendPool map[string]*fetchRequest, maxPending int) *fetchRequest { - // Short circuit if the peer's already downloading something (sanity check to - // not corrupt state) - if _, ok := pendPool[p.id]; ok { - return nil - } - // Calculate an upper limit on the hashes we might fetch (i.e. throttling) - allowance := maxPending - if allowance > 0 { - for _, request := range pendPool { - allowance -= len(request.Hashes) - } - } - // If there's a task generator, ask it to fill our task queue - if taskGen != nil && taskQueue.Size() < allowance { - taskGen(allowance - taskQueue.Size()) - } - if taskQueue.Empty() { - return nil - } - // Retrieve a batch of hashes, skipping previously failed ones - send := make(map[common.Hash]int) - skip := make(map[common.Hash]int) - - for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ { - hash, priority := taskQueue.Pop() - if p.Lacks(hash.(common.Hash)) { - skip[hash.(common.Hash)] = int(priority) - } else { - send[hash.(common.Hash)] = int(priority) - } - } - // Merge all the skipped hashes back - for hash, index := range skip { - taskQueue.Push(hash, float32(index)) - } - // Assemble and return the block download request - if len(send) == 0 { - return nil - } - request := &fetchRequest{ - Peer: p, - Hashes: send, - Time: time.Now(), - } - pendPool[p.id] = request - - return request -} - // ReserveBodies reserves a set of body fetches for the given peer, skipping any // previously failed downloads. Beside the next batch of needed fetches, it also // returns a flag whether empty blocks were queued requiring processing. @@ -722,12 +557,6 @@ func (q *queue) CancelReceipts(request *fetchRequest) { q.cancel(request, q.receiptTaskQueue, q.receiptPendPool) } -// CancelNodeData aborts a node state data fetch request, returning all pending -// hashes to the task queue. -func (q *queue) CancelNodeData(request *fetchRequest) { - q.cancel(request, q.stateTaskQueue, q.statePendPool) -} - // Cancel aborts a fetch request, returning all pending hashes to the task queue. func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) { q.lock.Lock() @@ -764,12 +593,6 @@ func (q *queue) Revoke(peerId string) { } delete(q.receiptPendPool, peerId) } - if request, ok := q.statePendPool[peerId]; ok { - for hash, index := range request.Hashes { - q.stateTaskQueue.Push(hash, float32(index)) - } - delete(q.statePendPool, peerId) - } } // ExpireHeaders checks for in flight requests that exceeded a timeout allowance, @@ -799,15 +622,6 @@ func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int { return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter) } -// ExpireNodeData checks for in flight node data requests that exceeded a timeout -// allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int { - q.lock.Lock() - defer q.lock.Unlock() - - return q.expire(timeout, q.statePendPool, q.stateTaskQueue, stateTimeoutMeter) -} - // expire is the generic check that move expired tasks from a pending pool back // into a task pool, returning all entities caught with expired tasks. // @@ -1044,84 +858,6 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ } } -// DeliverNodeData injects a node state data retrieval response into the queue. -// The method returns the number of node state accepted from the delivery. -func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, bool, error)) (int, error) { - q.lock.Lock() - defer q.lock.Unlock() - - // Short circuit if the data was never requested - request := q.statePendPool[id] - if request == nil { - return 0, errNoFetchesPending - } - stateReqTimer.UpdateSince(request.Time) - delete(q.statePendPool, id) - - // If no data was retrieved, mark their hashes as unavailable for the origin peer - if len(data) == 0 { - for hash := range request.Hashes { - request.Peer.MarkLacking(hash) - } - } - // Iterate over the downloaded data and verify each of them - errs := make([]error, 0) - process := []trie.SyncResult{} - for _, blob := range data { - // Skip any state trie entries that were not requested - hash := common.BytesToHash(crypto.Keccak256(blob)) - if _, ok := request.Hashes[hash]; !ok { - errs = append(errs, fmt.Errorf("non-requested state data %x", hash)) - continue - } - // Inject the next state trie item into the processing queue - process = append(process, trie.SyncResult{Hash: hash, Data: blob}) - delete(request.Hashes, hash) - delete(q.stateTaskPool, hash) - } - // Return all failed or missing fetches to the queue - for hash, index := range request.Hashes { - q.stateTaskQueue.Push(hash, float32(index)) - } - if q.stateScheduler == nil { - return 0, errNoFetchesPending - } - - // Run valid nodes through the trie download scheduler. It writes completed nodes to a - // batch, which is committed asynchronously. This may lead to over-fetches because the - // scheduler treats everything as written after Process has returned, but it's - // unlikely to be an issue in practice. - batch := q.stateDatabase.NewBatch() - progressed, nproc, procerr := q.stateScheduler.Process(process, batch) - q.stateWriters += 1 - go func() { - if procerr == nil { - nproc = len(process) - procerr = batch.Write() - } - // Return processing errors through the callback so the sync gets canceled. The - // number of writers is decremented prior to the call so PendingNodeData will - // return zero when the callback runs. - q.lock.Lock() - q.stateWriters -= 1 - q.lock.Unlock() - callback(nproc, progressed, procerr) - // Wake up WaitResults after the state has been written because it might be - // waiting for completion of the pivot block's state download. - q.active.Signal() - }() - - // If none of the data items were good, it's a stale delivery - switch { - case len(errs) == 0: - return len(process), nil - case len(errs) == len(request.Hashes): - return len(process), errStaleDelivery - default: - return len(process), fmt.Errorf("multiple failures: %v", errs) - } -} - // Prepare configures the result cache to allow accepting and caching inbound // fetch results. func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) { @@ -1134,9 +870,4 @@ func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types. } q.fastSyncPivot = pivot q.mode = mode - - // If long running fast sync, also start up a head stateretrieval immediately - if mode == FastSync && pivot > 0 { - q.stateScheduler = state.NewStateSync(head.Root, q.stateDatabase) - } } diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go new file mode 100644 index 000000000..4e6612039 --- /dev/null +++ b/eth/downloader/statesync.go @@ -0,0 +1,449 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package downloader + +import ( + "fmt" + "hash" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/crypto/sha3" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/trie" +) + +// stateReq represents a batch of state fetch requests groupped together into +// a single data retrieval network packet. +type stateReq struct { + items []common.Hash // Hashes of the state items to download + tasks map[common.Hash]*stateTask // Download tasks to track previous attempts + timeout time.Duration // Maximum round trip time for this to complete + timer *time.Timer // Timer to fire when the RTT timeout expires + peer *peer // Peer that we're requesting from + response [][]byte // Response data of the peer (nil for timeouts) +} + +// timedOut returns if this request timed out. +func (req *stateReq) timedOut() bool { + return req.response == nil +} + +// stateSyncStats is a collection of progress stats to report during a state trie +// sync to RPC requests as well as to display in user logs. +type stateSyncStats struct { + processed uint64 // Number of state entries processed + duplicate uint64 // Number of state entries downloaded twice + unexpected uint64 // Number of non-requested state entries received + pending uint64 // Number of still pending state entries +} + +// syncState starts downloading state with the given root hash. +func (d *Downloader) syncState(root common.Hash) *stateSync { + s := newStateSync(d, root) + select { + case d.stateSyncStart <- s: + case <-d.quitCh: + s.err = errCancelStateFetch + close(s.done) + } + return s +} + +// stateFetcher manages the active state sync and accepts requests +// on its behalf. +func (d *Downloader) stateFetcher() { + for { + select { + case s := <-d.stateSyncStart: + for next := s; next != nil; { + next = d.runStateSync(next) + } + case <-d.stateCh: + // Ignore state responses while no sync is running. + case <-d.quitCh: + return + } + } +} + +// runStateSync runs a state synchronisation until it completes or another root +// hash is requested to be switched over to. +func (d *Downloader) runStateSync(s *stateSync) *stateSync { + var ( + active = make(map[string]*stateReq) // Currently in-flight requests + finished []*stateReq // Completed or failed requests + timeout = make(chan *stateReq) // Timed out active requests + ) + defer func() { + // Cancel active request timers on exit. Also set peers to idle so they're + // available for the next sync. + for _, req := range active { + req.timer.Stop() + req.peer.SetNodeDataIdle(len(req.items)) + } + }() + // Run the state sync. + go s.run() + defer s.Cancel() + + for { + // Enable sending of the first buffered element if there is one. + var ( + deliverReq *stateReq + deliverReqCh chan *stateReq + ) + if len(finished) > 0 { + deliverReq = finished[0] + deliverReqCh = s.deliver + } + + select { + // The stateSync lifecycle: + case next := <-d.stateSyncStart: + return next + + case <-s.done: + return nil + + // Send the next finished request to the current sync: + case deliverReqCh <- deliverReq: + finished = append(finished[:0], finished[1:]...) + + // Handle incoming state packs: + case pack := <-d.stateCh: + // Discard any data not requested (or previsouly timed out) + req := active[pack.PeerId()] + if req == nil { + log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items()) + continue + } + // Finalize the request and queue up for processing + req.timer.Stop() + req.response = pack.(*statePack).states + + finished = append(finished, req) + delete(active, pack.PeerId()) + + // Handle timed-out requests: + case req := <-timeout: + // If the peer is already requesting something else, ignore the stale timeout. + // This can happen when the timeout and the delivery happens simultaneously, + // causing both pathways to trigger. + if active[req.peer.id] != req { + continue + } + // Move the timed out data back into the download queue + finished = append(finished, req) + delete(active, req.peer.id) + + // Track outgoing state requests: + case req := <-d.trackStateReq: + // If an active request already exists for this peer, we have a problem. In + // theory the trie node schedule must never assign two requests to the same + // peer. In practive however, a peer might receive a request, disconnect and + // immediately reconnect before the previous times out. In this case the first + // request is never honored, alas we must not silently overwrite it, as that + // causes valid requests to go missing and sync to get stuck. + if old := active[req.peer.id]; old != nil { + log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id) + + // Make sure the previous one doesn't get siletly lost + finished = append(finished, old) + } + // Start a timer to notify the sync loop if the peer stalled. + req.timer = time.AfterFunc(req.timeout, func() { + select { + case timeout <- req: + case <-s.done: + // Prevent leaking of timer goroutines in the unlikely case where a + // timer is fired just before exiting runStateSync. + } + }) + active[req.peer.id] = req + } + } +} + +// stateSync schedules requests for downloading a particular state trie defined +// by a given state root. +type stateSync struct { + d *Downloader // Downloader instance to access and manage current peerset + + sched *state.StateSync // State trie sync scheduler defining the tasks + keccak hash.Hash // Keccak256 hasher to verify deliveries with + tasks map[common.Hash]*stateTask // Set of tasks currently queued for retrieval + + deliver chan *stateReq // Delivery channel multiplexing peer responses + cancel chan struct{} // Channel to signal a termination request + cancelOnce sync.Once // Ensures cancel only ever gets called once + done chan struct{} // Channel to signal termination completion + err error // Any error hit during sync (set before completion) +} + +// stateTask represents a single trie node download taks, containing a set of +// peers already attempted retrieval from to detect stalled syncs and abort. +type stateTask struct { + attempts map[string]struct{} +} + +// newStateSync creates a new state trie download scheduler. This method does not +// yet start the sync. The user needs to call run to initiate. +func newStateSync(d *Downloader, root common.Hash) *stateSync { + return &stateSync{ + d: d, + sched: state.NewStateSync(root, d.stateDB), + keccak: sha3.NewKeccak256(), + tasks: make(map[common.Hash]*stateTask), + deliver: make(chan *stateReq), + cancel: make(chan struct{}), + done: make(chan struct{}), + } +} + +// run starts the task assignment and response processing loop, blocking until +// it finishes, and finally notifying any goroutines waiting for the loop to +// finish. +func (s *stateSync) run() { + s.err = s.loop() + close(s.done) +} + +// Wait blocks until the sync is done or canceled. +func (s *stateSync) Wait() error { + <-s.done + return s.err +} + +// Cancel cancels the sync and waits until it has shut down. +func (s *stateSync) Cancel() error { + s.cancelOnce.Do(func() { close(s.cancel) }) + return s.Wait() +} + +// loop is the main event loop of a state trie sync. It it responsible for the +// assignment of new tasks to peers (including sending it to them) as well as +// for the processing of inbound data. Note, that the loop does not directly +// receive data from peers, rather those are buffered up in the downloader and +// pushed here async. The reason is to decouple processing from data receipt +// and timeouts. +func (s *stateSync) loop() error { + // Listen for new peer events to assign tasks to them + newPeer := make(chan *peer, 1024) + peerSub := s.d.peers.SubscribeNewPeers(newPeer) + defer peerSub.Unsubscribe() + + // Keep assigning new tasks until the sync completes or aborts + for s.sched.Pending() > 0 { + if err := s.assignTasks(); err != nil { + return err + } + // Tasks assigned, wait for something to happen + select { + case <-newPeer: + // New peer arrived, try to assign it download tasks + + case <-s.cancel: + return errCancelStateFetch + + case req := <-s.deliver: + // Response or timeout triggered, drop the peer if stalling + log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "timeout", req.timedOut()) + if len(req.items) <= 2 && req.timedOut() { + // 2 items are the minimum requested, if even that times out, we've no use of + // this peer at the moment. + log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id) + s.d.dropPeer(req.peer.id) + } + // Process all the received blobs and check for stale delivery + stale, err := s.process(req) + if err != nil { + log.Warn("Node data write error", "err", err) + return err + } + // The the delivery contains requested data, mark the node idle (otherwise it's a timed out delivery) + if !stale { + req.peer.SetNodeDataIdle(len(req.response)) + } + } + } + return nil +} + +// assignTasks attempts to assing new tasks to all idle peers, either from the +// batch currently being retried, or fetching new data from the trie sync itself. +func (s *stateSync) assignTasks() error { + // Iterate over all idle peers and try to assign them state fetches + peers, _ := s.d.peers.NodeDataIdlePeers() + for _, p := range peers { + // Assign a batch of fetches proportional to the estimated latency/bandwidth + cap := p.NodeDataCapacity(s.d.requestRTT()) + req := &stateReq{peer: p, timeout: s.d.requestTTL()} + s.fillTasks(cap, req) + + // If the peer was assigned tasks to fetch, send the network request + if len(req.items) > 0 { + req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items)) + + select { + case s.d.trackStateReq <- req: + req.peer.FetchNodeData(req.items) + case <-s.cancel: + } + } + } + return nil +} + +// fillTasks fills the given request object with a maximum of n state download +// tasks to send to the remote peer. +func (s *stateSync) fillTasks(n int, req *stateReq) { + // Refill available tasks from the scheduler. + if len(s.tasks) < n { + new := s.sched.Missing(n - len(s.tasks)) + for _, hash := range new { + s.tasks[hash] = &stateTask{make(map[string]struct{})} + } + } + // Find tasks that haven't been tried with the request's peer. + req.items = make([]common.Hash, 0, n) + req.tasks = make(map[common.Hash]*stateTask, n) + for hash, t := range s.tasks { + // Stop when we've gathered enough requests + if len(req.items) == n { + break + } + // Skip any requests we've already tried from this peer + if _, ok := t.attempts[req.peer.id]; ok { + continue + } + // Assign the request to this peer + t.attempts[req.peer.id] = struct{}{} + req.items = append(req.items, hash) + req.tasks[hash] = t + delete(s.tasks, hash) + } +} + +// process iterates over a batch of delivered state data, injecting each item +// into a running state sync, re-queuing any items that were requested but not +// delivered. +func (s *stateSync) process(req *stateReq) (bool, error) { + // Collect processing stats and update progress if valid data was received + processed, written, duplicate, unexpected := 0, 0, 0, 0 + + defer func(start time.Time) { + if processed+written+duplicate+unexpected > 0 { + s.updateStats(processed, written, duplicate, unexpected, time.Since(start)) + } + }(time.Now()) + + // Iterate over all the delivered data and inject one-by-one into the trie + progress, stale := false, len(req.response) > 0 + + for _, blob := range req.response { + prog, hash, err := s.processNodeData(blob) + switch err { + case nil: + processed++ + case trie.ErrNotRequested: + unexpected++ + case trie.ErrAlreadyProcessed: + duplicate++ + default: + return stale, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) + } + if prog { + progress = true + } + // If the node delivered a requested item, mark the delivery non-stale + if _, ok := req.tasks[hash]; ok { + delete(req.tasks, hash) + stale = false + } + } + // If some data managed to hit the database, flush and reset failure counters + if progress { + // Flush any accumulated data out to disk + batch := s.d.stateDB.NewBatch() + + count, err := s.sched.Commit(batch) + if err != nil { + return stale, err + } + if err := batch.Write(); err != nil { + return stale, err + } + written = count + + // If we're inside the critical section, reset fail counter since we progressed + if atomic.LoadUint32(&s.d.fsPivotFails) > 1 { + log.Trace("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&s.d.fsPivotFails)) + atomic.StoreUint32(&s.d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block + } + } + // Put unfulfilled tasks back into the retry queue + npeers := s.d.peers.Len() + + for hash, task := range req.tasks { + // If the node did deliver something, missing items may be due to a protocol + // limit or a previous timeout + delayed delivery. Both cases should permit + // the node to retry the missing items (to avoid single-peer stalls). + if len(req.response) > 0 || req.timedOut() { + delete(task.attempts, req.peer.id) + } + // If we've requested the node too many times already, it may be a malicious + // sync where nobody has the right data. Abort. + if len(task.attempts) >= npeers { + return stale, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) + } + // Missing item, place into the retry queue. + s.tasks[hash] = task + } + return stale, nil +} + +// processNodeData tries to inject a trie node data blob delivered from a remote +// peer into the state trie, returning whether anything useful was written or any +// error occurred. +func (s *stateSync) processNodeData(blob []byte) (bool, common.Hash, error) { + res := trie.SyncResult{Data: blob} + + s.keccak.Reset() + s.keccak.Write(blob) + s.keccak.Sum(res.Hash[:0]) + + committed, _, err := s.sched.Process([]trie.SyncResult{res}) + return committed, res.Hash, err +} + +// updateStats bumps the various state sync progress counters and displays a log +// message for the user to see. +func (s *stateSync) updateStats(processed, written, duplicate, unexpected int, duration time.Duration) { + s.d.syncStatsLock.Lock() + defer s.d.syncStatsLock.Unlock() + + s.d.syncStatsState.pending = uint64(s.sched.Pending()) + s.d.syncStatsState.processed += uint64(processed) + s.d.syncStatsState.duplicate += uint64(duplicate) + s.d.syncStatsState.unexpected += uint64(unexpected) + + log.Info("Imported new state entries", "count", processed, "flushed", written, "elapsed", common.PrettyDuration(duration), "processed", s.d.syncStatsState.processed, "pending", s.d.syncStatsState.pending, "retry", len(s.tasks), "duplicate", s.d.syncStatsState.duplicate, "unexpected", s.d.syncStatsState.unexpected) +} diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index 59f60d659..45bb87322 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -167,11 +167,11 @@ func (ec *Client) TransactionByHash(ctx context.Context, hash common.Hash) (tx * } else if _, r, _ := tx.RawSignatureValues(); r == nil { return nil, false, fmt.Errorf("server returned transaction without signature") } - var block struct{ BlockHash *common.Hash } + var block struct{ BlockNumber *string } if err := json.Unmarshal(raw, &block); err != nil { return nil, false, err } - return tx, block.BlockHash == nil, nil + return tx, block.BlockNumber == nil, nil } // TransactionCount returns the total number of transactions in the given block. diff --git a/ethdb/memory_database.go b/ethdb/memory_database.go index 65c487934..a2ee2f2cc 100644 --- a/ethdb/memory_database.go +++ b/ethdb/memory_database.go @@ -45,13 +45,6 @@ func (db *MemDatabase) Put(key []byte, value []byte) error { return nil } -func (db *MemDatabase) Set(key []byte, value []byte) { - db.lock.Lock() - defer db.lock.Unlock() - - db.Put(key, value) -} - func (db *MemDatabase) Get(key []byte) ([]byte, error) { db.lock.RLock() defer db.lock.RUnlock() diff --git a/internal/cmdtest/test_cmd.go b/internal/cmdtest/test_cmd.go new file mode 100644 index 000000000..541e51c4c --- /dev/null +++ b/internal/cmdtest/test_cmd.go @@ -0,0 +1,270 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. + +package cmdtest + +import ( + "bufio" + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "regexp" + "sync" + "testing" + "text/template" + "time" + + "github.com/docker/docker/pkg/reexec" +) + +func NewTestCmd(t *testing.T, data interface{}) *TestCmd { + return &TestCmd{T: t, Data: data} +} + +type TestCmd struct { + // For total convenience, all testing methods are available. + *testing.T + + Func template.FuncMap + Data interface{} + Cleanup func() + + cmd *exec.Cmd + stdout *bufio.Reader + stdin io.WriteCloser + stderr *testlogger +} + +// Run exec's the current binary using name as argv[0] which will trigger the +// reexec init function for that name (e.g. "geth-test" in cmd/geth/run_test.go) +func (tt *TestCmd) Run(name string, args ...string) { + tt.stderr = &testlogger{t: tt.T} + tt.cmd = &exec.Cmd{ + Path: reexec.Self(), + Args: append([]string{name}, args...), + Stderr: tt.stderr, + } + stdout, err := tt.cmd.StdoutPipe() + if err != nil { + tt.Fatal(err) + } + tt.stdout = bufio.NewReader(stdout) + if tt.stdin, err = tt.cmd.StdinPipe(); err != nil { + tt.Fatal(err) + } + if err := tt.cmd.Start(); err != nil { + tt.Fatal(err) + } +} + +// InputLine writes the given text to the childs stdin. +// This method can also be called from an expect template, e.g.: +// +// geth.expect(`Passphrase: {{.InputLine "password"}}`) +func (tt *TestCmd) InputLine(s string) string { + io.WriteString(tt.stdin, s+"\n") + return "" +} + +func (tt *TestCmd) SetTemplateFunc(name string, fn interface{}) { + if tt.Func == nil { + tt.Func = make(map[string]interface{}) + } + tt.Func[name] = fn +} + +// Expect runs its argument as a template, then expects the +// child process to output the result of the template within 5s. +// +// If the template starts with a newline, the newline is removed +// before matching. +func (tt *TestCmd) Expect(tplsource string) { + // Generate the expected output by running the template. + tpl := template.Must(template.New("").Funcs(tt.Func).Parse(tplsource)) + wantbuf := new(bytes.Buffer) + if err := tpl.Execute(wantbuf, tt.Data); err != nil { + panic(err) + } + // Trim exactly one newline at the beginning. This makes tests look + // much nicer because all expect strings are at column 0. + want := bytes.TrimPrefix(wantbuf.Bytes(), []byte("\n")) + if err := tt.matchExactOutput(want); err != nil { + tt.Fatal(err) + } + tt.Logf("Matched stdout text:\n%s", want) +} + +func (tt *TestCmd) matchExactOutput(want []byte) error { + buf := make([]byte, len(want)) + n := 0 + tt.withKillTimeout(func() { n, _ = io.ReadFull(tt.stdout, buf) }) + buf = buf[:n] + if n < len(want) || !bytes.Equal(buf, want) { + // Grab any additional buffered output in case of mismatch + // because it might help with debugging. + buf = append(buf, make([]byte, tt.stdout.Buffered())...) + tt.stdout.Read(buf[n:]) + // Find the mismatch position. + for i := 0; i < n; i++ { + if want[i] != buf[i] { + return fmt.Errorf("Output mismatch at â—Š:\n---------------- (stdout text)\n%sâ—Š%s\n---------------- (expected text)\n%s", + buf[:i], buf[i:n], want) + } + } + if n < len(want) { + return fmt.Errorf("Not enough output, got until â—Š:\n---------------- (stdout text)\n%s\n---------------- (expected text)\n%sâ—Š%s", + buf, want[:n], want[n:]) + } + } + return nil +} + +// ExpectRegexp expects the child process to output text matching the +// given regular expression within 5s. +// +// Note that an arbitrary amount of output may be consumed by the +// regular expression. This usually means that expect cannot be used +// after ExpectRegexp. +func (tt *TestCmd) ExpectRegexp(resource string) (*regexp.Regexp, []string) { + var ( + re = regexp.MustCompile(resource) + rtee = &runeTee{in: tt.stdout} + matches []int + ) + tt.withKillTimeout(func() { matches = re.FindReaderSubmatchIndex(rtee) }) + output := rtee.buf.Bytes() + if matches == nil { + tt.Fatalf("Output did not match:\n---------------- (stdout text)\n%s\n---------------- (regular expression)\n%s", + output, resource) + return re, nil + } + tt.Logf("Matched stdout text:\n%s", output) + var submatches []string + for i := 0; i < len(matches); i += 2 { + submatch := string(output[matches[i]:matches[i+1]]) + submatches = append(submatches, submatch) + } + return re, submatches +} + +// ExpectExit expects the child process to exit within 5s without +// printing any additional text on stdout. +func (tt *TestCmd) ExpectExit() { + var output []byte + tt.withKillTimeout(func() { + output, _ = ioutil.ReadAll(tt.stdout) + }) + tt.WaitExit() + if tt.Cleanup != nil { + tt.Cleanup() + } + if len(output) > 0 { + tt.Errorf("Unmatched stdout text:\n%s", output) + } +} + +func (tt *TestCmd) WaitExit() { + tt.cmd.Wait() +} + +func (tt *TestCmd) Interrupt() { + tt.cmd.Process.Signal(os.Interrupt) +} + +// StderrText returns any stderr output written so far. +// The returned text holds all log lines after ExpectExit has +// returned. +func (tt *TestCmd) StderrText() string { + tt.stderr.mu.Lock() + defer tt.stderr.mu.Unlock() + return tt.stderr.buf.String() +} + +func (tt *TestCmd) CloseStdin() { + tt.stdin.Close() +} + +func (tt *TestCmd) Kill() { + tt.cmd.Process.Kill() + if tt.Cleanup != nil { + tt.Cleanup() + } +} + +func (tt *TestCmd) withKillTimeout(fn func()) { + timeout := time.AfterFunc(5*time.Second, func() { + tt.Log("killing the child process (timeout)") + tt.Kill() + }) + defer timeout.Stop() + fn() +} + +// testlogger logs all written lines via t.Log and also +// collects them for later inspection. +type testlogger struct { + t *testing.T + mu sync.Mutex + buf bytes.Buffer +} + +func (tl *testlogger) Write(b []byte) (n int, err error) { + lines := bytes.Split(b, []byte("\n")) + for _, line := range lines { + if len(line) > 0 { + tl.t.Logf("(stderr) %s", line) + } + } + tl.mu.Lock() + tl.buf.Write(b) + tl.mu.Unlock() + return len(b), err +} + +// runeTee collects text read through it into buf. +type runeTee struct { + in interface { + io.Reader + io.ByteReader + io.RuneReader + } + buf bytes.Buffer +} + +func (rtee *runeTee) Read(b []byte) (n int, err error) { + n, err = rtee.in.Read(b) + rtee.buf.Write(b[:n]) + return n, err +} + +func (rtee *runeTee) ReadRune() (r rune, size int, err error) { + r, size, err = rtee.in.ReadRune() + if err == nil { + rtee.buf.WriteRune(r) + } + return r, size, err +} + +func (rtee *runeTee) ReadByte() (b byte, err error) { + b, err = rtee.in.ReadByte() + if err == nil { + rtee.buf.WriteByte(b) + } + return b, err +} diff --git a/les/backend.go b/les/backend.go index 646c81a7b..658c73c6e 100644 --- a/les/backend.go +++ b/les/backend.go @@ -19,6 +19,7 @@ package les import ( "fmt" + "sync" "time" "github.com/ethereum/go-ethereum/accounts" @@ -38,6 +39,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discv5" "github.com/ethereum/go-ethereum/params" rpc "github.com/ethereum/go-ethereum/rpc" ) @@ -49,9 +51,13 @@ type LightEthereum struct { // Channel for shutting down the service shutdownChan chan bool // Handlers + peers *peerSet txPool *light.TxPool blockchain *light.LightChain protocolManager *ProtocolManager + serverPool *serverPool + reqDist *requestDistributor + retriever *retrieveManager // DB interfaces chainDb ethdb.Database // Block chain database @@ -63,6 +69,9 @@ type LightEthereum struct { networkId uint64 netRPCService *ethapi.PublicNetAPI + + quitSync chan struct{} + wg sync.WaitGroup } func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { @@ -76,20 +85,26 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { } log.Info("Initialised chain configuration", "config", chainConfig) - odr := NewLesOdr(chainDb) - relay := NewLesTxRelay() + peers := newPeerSet() + quitSync := make(chan struct{}) + eth := &LightEthereum{ - odr: odr, - relay: relay, - chainDb: chainDb, chainConfig: chainConfig, + chainDb: chainDb, eventMux: ctx.EventMux, + peers: peers, + reqDist: newRequestDistributor(peers, quitSync), accountManager: ctx.AccountManager, engine: eth.CreateConsensusEngine(ctx, config, chainConfig, chainDb), shutdownChan: make(chan bool), networkId: config.NetworkId, } - if eth.blockchain, err = light.NewLightChain(odr, eth.chainConfig, eth.engine, eth.eventMux); err != nil { + + eth.relay = NewLesTxRelay(peers, eth.reqDist) + eth.serverPool = newServerPool(chainDb, quitSync, ð.wg) + eth.retriever = newRetrieveManager(peers, eth.reqDist, eth.serverPool) + eth.odr = NewLesOdr(chainDb, eth.retriever) + if eth.blockchain, err = light.NewLightChain(eth.odr, eth.chainConfig, eth.engine, eth.eventMux); err != nil { return nil, err } // Rewind the chain in case of an incompatible config upgrade. @@ -100,13 +115,9 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { } eth.txPool = light.NewTxPool(eth.chainConfig, eth.eventMux, eth.blockchain, eth.relay) - lightSync := config.SyncMode == downloader.LightSync - if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, lightSync, config.NetworkId, eth.eventMux, eth.engine, eth.blockchain, nil, chainDb, odr, relay); err != nil { + if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, true, config.NetworkId, eth.eventMux, eth.engine, eth.peers, eth.blockchain, nil, chainDb, eth.odr, eth.relay, quitSync, ð.wg); err != nil { return nil, err } - relay.ps = eth.protocolManager.peers - relay.reqDist = eth.protocolManager.reqDist - eth.ApiBackend = &LesApiBackend{eth, nil} gpoParams := config.GPO if gpoParams.Default == nil { @@ -116,6 +127,10 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { return eth, nil } +func lesTopic(genesisHash common.Hash) discv5.Topic { + return discv5.Topic("LES@" + common.Bytes2Hex(genesisHash.Bytes()[0:8])) +} + type LightDummyAPI struct{} // Etherbase is the address that mining rewards will be send to @@ -188,7 +203,8 @@ func (s *LightEthereum) Protocols() []p2p.Protocol { func (s *LightEthereum) Start(srvr *p2p.Server) error { log.Warn("Light client mode is an experimental feature") s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.networkId) - s.protocolManager.Start(srvr) + s.serverPool.start(srvr, lesTopic(s.blockchain.Genesis().Hash())) + s.protocolManager.Start() return nil } diff --git a/les/distributor.go b/les/distributor.go index 71afe2b73..e8ef5b02e 100644 --- a/les/distributor.go +++ b/les/distributor.go @@ -34,11 +34,11 @@ var ErrNoPeers = errors.New("no suitable peers available") type requestDistributor struct { reqQueue *list.List lastReqOrder uint64 + peers map[distPeer]struct{} + peerLock sync.RWMutex stopChn, loopChn chan struct{} loopNextSent bool lock sync.Mutex - - getAllPeers func() map[distPeer]struct{} } // distPeer is an LES server peer interface for the request distributor. @@ -71,15 +71,39 @@ type distReq struct { } // newRequestDistributor creates a new request distributor -func newRequestDistributor(getAllPeers func() map[distPeer]struct{}, stopChn chan struct{}) *requestDistributor { - r := &requestDistributor{ - reqQueue: list.New(), - loopChn: make(chan struct{}, 2), - stopChn: stopChn, - getAllPeers: getAllPeers, +func newRequestDistributor(peers *peerSet, stopChn chan struct{}) *requestDistributor { + d := &requestDistributor{ + reqQueue: list.New(), + loopChn: make(chan struct{}, 2), + stopChn: stopChn, + peers: make(map[distPeer]struct{}), + } + if peers != nil { + peers.notify(d) } - go r.loop() - return r + go d.loop() + return d +} + +// registerPeer implements peerSetNotify +func (d *requestDistributor) registerPeer(p *peer) { + d.peerLock.Lock() + d.peers[p] = struct{}{} + d.peerLock.Unlock() +} + +// unregisterPeer implements peerSetNotify +func (d *requestDistributor) unregisterPeer(p *peer) { + d.peerLock.Lock() + delete(d.peers, p) + d.peerLock.Unlock() +} + +// registerTestPeer adds a new test peer +func (d *requestDistributor) registerTestPeer(p distPeer) { + d.peerLock.Lock() + d.peers[p] = struct{}{} + d.peerLock.Unlock() } // distMaxWait is the maximum waiting time after which further necessary waiting @@ -152,8 +176,7 @@ func (sp selectPeerItem) Weight() int64 { // nextRequest returns the next possible request from any peer, along with the // associated peer and necessary waiting time func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { - peers := d.getAllPeers() - + checkedPeers := make(map[distPeer]struct{}) elem := d.reqQueue.Front() var ( bestPeer distPeer @@ -162,11 +185,14 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { sel *weightedRandomSelect ) - for (len(peers) > 0 || elem == d.reqQueue.Front()) && elem != nil { + d.peerLock.RLock() + defer d.peerLock.RUnlock() + + for (len(d.peers) > 0 || elem == d.reqQueue.Front()) && elem != nil { req := elem.Value.(*distReq) canSend := false - for peer, _ := range peers { - if peer.canQueue() && req.canSend(peer) { + for peer, _ := range d.peers { + if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) { canSend = true cost := req.getCost(peer) wait, bufRemain := peer.waitBefore(cost) @@ -182,7 +208,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { bestWait = wait } } - delete(peers, peer) + checkedPeers[peer] = struct{}{} } } next := elem.Next() diff --git a/les/distributor_test.go b/les/distributor_test.go index ae184b21b..4e7f8bd29 100644 --- a/les/distributor_test.go +++ b/les/distributor_test.go @@ -122,20 +122,14 @@ func testRequestDistributor(t *testing.T, resend bool) { stop := make(chan struct{}) defer close(stop) + dist := newRequestDistributor(nil, stop) var peers [testDistPeerCount]*testDistPeer for i, _ := range peers { peers[i] = &testDistPeer{} go peers[i].worker(t, !resend, stop) + dist.registerTestPeer(peers[i]) } - dist := newRequestDistributor(func() map[distPeer]struct{} { - m := make(map[distPeer]struct{}) - for _, peer := range peers { - m[peer] = struct{}{} - } - return m - }, stop) - var wg sync.WaitGroup for i := 1; i <= testDistReqCount; i++ { diff --git a/les/fetcher.go b/les/fetcher.go index a294d00d5..4fc142f0f 100644 --- a/les/fetcher.go +++ b/les/fetcher.go @@ -116,6 +116,7 @@ func newLightFetcher(pm *ProtocolManager) *lightFetcher { syncDone: make(chan *peer), maxConfirmedTd: big.NewInt(0), } + pm.peers.notify(f) go f.syncLoop() return f } @@ -209,8 +210,8 @@ func (f *lightFetcher) syncLoop() { } } -// addPeer adds a new peer to the fetcher's peer set -func (f *lightFetcher) addPeer(p *peer) { +// registerPeer adds a new peer to the fetcher's peer set +func (f *lightFetcher) registerPeer(p *peer) { p.lock.Lock() p.hasBlock = func(hash common.Hash, number uint64) bool { return f.peerHasBlock(p, hash, number) @@ -223,8 +224,8 @@ func (f *lightFetcher) addPeer(p *peer) { f.peers[p] = &fetcherPeerInfo{nodeByHash: make(map[common.Hash]*fetcherTreeNode)} } -// removePeer removes a new peer from the fetcher's peer set -func (f *lightFetcher) removePeer(p *peer) { +// unregisterPeer removes a new peer from the fetcher's peer set +func (f *lightFetcher) unregisterPeer(p *peer) { p.lock.Lock() p.hasBlock = nil p.lock.Unlock() @@ -416,7 +417,7 @@ func (f *lightFetcher) nextRequest() (*distReq, uint64) { f.syncing = bestSyncing var rq *distReq - reqID := getNextReqID() + reqID := genReqID() if f.syncing { rq = &distReq{ getCost: func(dp distPeer) uint64 { diff --git a/les/handler.go b/les/handler.go index 64023af0f..77bc077a2 100644 --- a/les/handler.go +++ b/les/handler.go @@ -102,7 +102,9 @@ type ProtocolManager struct { odr *LesOdr server *LesServer serverPool *serverPool + lesTopic discv5.Topic reqDist *requestDistributor + retriever *retrieveManager downloader *downloader.Downloader fetcher *lightFetcher @@ -123,12 +125,12 @@ type ProtocolManager struct { // wait group is used for graceful shutdowns during downloading // and processing - wg sync.WaitGroup + wg *sync.WaitGroup } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay) (*ProtocolManager, error) { +func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ lightSync: lightSync, @@ -136,15 +138,20 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network blockchain: blockchain, chainConfig: chainConfig, chainDb: chainDb, + odr: odr, networkId: networkId, txpool: txpool, txrelay: txrelay, - odr: odr, - peers: newPeerSet(), + peers: peers, newPeerCh: make(chan *peer), - quitSync: make(chan struct{}), + quitSync: quitSync, + wg: wg, noMorePeers: make(chan struct{}), } + if odr != nil { + manager.retriever = odr.retriever + manager.reqDist = odr.retriever.dist + } // Initiate a sub-protocol for every implemented version we can handle manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) for i, version := range ProtocolVersions { @@ -202,84 +209,22 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash, nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash, blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer) + manager.peers.notify((*downloaderPeerNotify)(manager)) + manager.fetcher = newLightFetcher(manager) } - manager.reqDist = newRequestDistributor(func() map[distPeer]struct{} { - m := make(map[distPeer]struct{}) - peers := manager.peers.AllPeers() - for _, peer := range peers { - m[peer] = struct{}{} - } - return m - }, manager.quitSync) - if odr != nil { - odr.removePeer = removePeer - odr.reqDist = manager.reqDist - } - - /*validator := func(block *types.Block, parent *types.Block) error { - return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false) - } - heighter := func() uint64 { - return chainman.LastBlockNumberU64() - } - manager.fetcher = fetcher.New(chainman.GetBlockNoOdr, validator, nil, heighter, chainman.InsertChain, manager.removePeer) - */ return manager, nil } +// removePeer initiates disconnection from a peer by removing it from the peer set func (pm *ProtocolManager) removePeer(id string) { - // Short circuit if the peer was already removed - peer := pm.peers.Peer(id) - if peer == nil { - return - } - log.Debug("Removing light Ethereum peer", "peer", id) - if err := pm.peers.Unregister(id); err != nil { - if err == errNotRegistered { - return - } - } - // Unregister the peer from the downloader and Ethereum peer set - if pm.lightSync { - pm.downloader.UnregisterPeer(id) - if pm.txrelay != nil { - pm.txrelay.removePeer(id) - } - if pm.fetcher != nil { - pm.fetcher.removePeer(peer) - } - } - // Hard disconnect at the networking layer - if peer != nil { - peer.Peer.Disconnect(p2p.DiscUselessPeer) - } + pm.peers.Unregister(id) } -func (pm *ProtocolManager) Start(srvr *p2p.Server) { - var topicDisc *discv5.Network - if srvr != nil { - topicDisc = srvr.DiscV5 - } - lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) +func (pm *ProtocolManager) Start() { if pm.lightSync { - // start sync handler - if srvr != nil { // srvr is nil during testing - pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg) - pm.odr.serverPool = pm.serverPool - pm.fetcher = newLightFetcher(pm) - } go pm.syncer() } else { - if topicDisc != nil { - go func() { - logger := log.New("topic", lesTopic) - logger.Info("Starting topic registration") - defer logger.Info("Terminated topic registration") - - topicDisc.RegisterTopic(lesTopic, pm.quitSync) - }() - } go func() { for range pm.newPeerCh { } @@ -342,65 +287,10 @@ func (pm *ProtocolManager) handle(p *peer) error { }() // Register the peer in the downloader. If the downloader considers it banned, we disconnect if pm.lightSync { - requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error { - reqID := getNextReqID() - rq := &distReq{ - getCost: func(dp distPeer) uint64 { - peer := dp.(*peer) - return peer.GetRequestCost(GetBlockHeadersMsg, amount) - }, - canSend: func(dp distPeer) bool { - return dp.(*peer) == p - }, - request: func(dp distPeer) func() { - peer := dp.(*peer) - cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) - peer.fcServer.QueueRequest(reqID, cost) - return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } - }, - } - _, ok := <-pm.reqDist.queue(rq) - if !ok { - return ErrNoPeers - } - return nil - } - requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error { - reqID := getNextReqID() - rq := &distReq{ - getCost: func(dp distPeer) uint64 { - peer := dp.(*peer) - return peer.GetRequestCost(GetBlockHeadersMsg, amount) - }, - canSend: func(dp distPeer) bool { - return dp.(*peer) == p - }, - request: func(dp distPeer) func() { - peer := dp.(*peer) - cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) - peer.fcServer.QueueRequest(reqID, cost) - return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) } - }, - } - _, ok := <-pm.reqDist.queue(rq) - if !ok { - return ErrNoPeers - } - return nil - } - if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd, - requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil { - return err - } - if pm.txrelay != nil { - pm.txrelay.addPeer(p) - } - p.lock.Lock() head := p.headInfo p.lock.Unlock() if pm.fetcher != nil { - pm.fetcher.addPeer(p) pm.fetcher.announce(p, head) } @@ -926,7 +816,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } if deliverMsg != nil { - err := pm.odr.Deliver(p, deliverMsg) + err := pm.retriever.deliver(p, deliverMsg) if err != nil { p.responseErrors++ if p.responseErrors > maxResponseErrors { @@ -946,3 +836,64 @@ func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo { Head: self.blockchain.LastBlockHash(), } } + +// downloaderPeerNotify implements peerSetNotify +type downloaderPeerNotify ProtocolManager + +func (d *downloaderPeerNotify) registerPeer(p *peer) { + pm := (*ProtocolManager)(d) + + requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error { + reqID := genReqID() + rq := &distReq{ + getCost: func(dp distPeer) uint64 { + peer := dp.(*peer) + return peer.GetRequestCost(GetBlockHeadersMsg, amount) + }, + canSend: func(dp distPeer) bool { + return dp.(*peer) == p + }, + request: func(dp distPeer) func() { + peer := dp.(*peer) + cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) + peer.fcServer.QueueRequest(reqID, cost) + return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } + }, + } + _, ok := <-pm.reqDist.queue(rq) + if !ok { + return ErrNoPeers + } + return nil + } + requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error { + reqID := genReqID() + rq := &distReq{ + getCost: func(dp distPeer) uint64 { + peer := dp.(*peer) + return peer.GetRequestCost(GetBlockHeadersMsg, amount) + }, + canSend: func(dp distPeer) bool { + return dp.(*peer) == p + }, + request: func(dp distPeer) func() { + peer := dp.(*peer) + cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) + peer.fcServer.QueueRequest(reqID, cost) + return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) } + }, + } + _, ok := <-pm.reqDist.queue(rq) + if !ok { + return ErrNoPeers + } + return nil + } + + pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd, requestHeadersByHash, requestHeadersByNumber, nil, nil, nil) +} + +func (d *downloaderPeerNotify) unregisterPeer(p *peer) { + pm := (*ProtocolManager)(d) + pm.downloader.UnregisterPeer(p.id) +} diff --git a/les/handler_test.go b/les/handler_test.go index 0b94d0d30..5df1d3463 100644 --- a/les/handler_test.go +++ b/les/handler_test.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" @@ -42,7 +43,8 @@ func expectResponse(r p2p.MsgReader, msgcode, reqID, bv uint64, data interface{} func TestGetBlockHeadersLes1(t *testing.T) { testGetBlockHeaders(t, 1) } func testGetBlockHeaders(t *testing.T, protocol int) { - pm, _, _ := newTestProtocolManagerMust(t, false, downloader.MaxHashFetch+15, nil) + db, _ := ethdb.NewMemDatabase() + pm := newTestProtocolManagerMust(t, false, downloader.MaxHashFetch+15, nil, nil, nil, db) bc := pm.blockchain.(*core.BlockChain) peer, _ := newTestPeer(t, "peer", protocol, pm, true) defer peer.close() @@ -170,7 +172,8 @@ func testGetBlockHeaders(t *testing.T, protocol int) { func TestGetBlockBodiesLes1(t *testing.T) { testGetBlockBodies(t, 1) } func testGetBlockBodies(t *testing.T, protocol int) { - pm, _, _ := newTestProtocolManagerMust(t, false, downloader.MaxBlockFetch+15, nil) + db, _ := ethdb.NewMemDatabase() + pm := newTestProtocolManagerMust(t, false, downloader.MaxBlockFetch+15, nil, nil, nil, db) bc := pm.blockchain.(*core.BlockChain) peer, _ := newTestPeer(t, "peer", protocol, pm, true) defer peer.close() @@ -246,7 +249,8 @@ func TestGetCodeLes1(t *testing.T) { testGetCode(t, 1) } func testGetCode(t *testing.T, protocol int) { // Assemble the test environment - pm, _, _ := newTestProtocolManagerMust(t, false, 4, testChainGen) + db, _ := ethdb.NewMemDatabase() + pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) bc := pm.blockchain.(*core.BlockChain) peer, _ := newTestPeer(t, "peer", protocol, pm, true) defer peer.close() @@ -278,7 +282,8 @@ func TestGetReceiptLes1(t *testing.T) { testGetReceipt(t, 1) } func testGetReceipt(t *testing.T, protocol int) { // Assemble the test environment - pm, db, _ := newTestProtocolManagerMust(t, false, 4, testChainGen) + db, _ := ethdb.NewMemDatabase() + pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) bc := pm.blockchain.(*core.BlockChain) peer, _ := newTestPeer(t, "peer", protocol, pm, true) defer peer.close() @@ -304,7 +309,8 @@ func TestGetProofsLes1(t *testing.T) { testGetReceipt(t, 1) } func testGetProofs(t *testing.T, protocol int) { // Assemble the test environment - pm, db, _ := newTestProtocolManagerMust(t, false, 4, testChainGen) + db, _ := ethdb.NewMemDatabase() + pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) bc := pm.blockchain.(*core.BlockChain) peer, _ := newTestPeer(t, "peer", protocol, pm, true) defer peer.close() diff --git a/les/helper_test.go b/les/helper_test.go index 7e442c131..52fddd117 100644 --- a/les/helper_test.go +++ b/les/helper_test.go @@ -25,7 +25,6 @@ import ( "math/big" "sync" "testing" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" @@ -132,22 +131,22 @@ func testRCL() RequestCostList { // newTestProtocolManager creates a new protocol manager for testing purposes, // with the given number of blocks already known, and potential notification // channels for different events. -func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen)) (*ProtocolManager, ethdb.Database, *LesOdr, error) { +func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen), peers *peerSet, odr *LesOdr, db ethdb.Database) (*ProtocolManager, error) { var ( evmux = new(event.TypeMux) engine = ethash.NewFaker() - db, _ = ethdb.NewMemDatabase() gspec = core.Genesis{ Config: params.TestChainConfig, Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}, } genesis = gspec.MustCommit(db) - odr *LesOdr - chain BlockChain + chain BlockChain ) + if peers == nil { + peers = newPeerSet() + } if lightSync { - odr = NewLesOdr(db) chain, _ = light.NewLightChain(odr, gspec.Config, engine, evmux) } else { blockchain, _ := core.NewBlockChain(db, gspec.Config, engine, evmux, vm.Config{}) @@ -158,9 +157,9 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor chain = blockchain } - pm, err := NewProtocolManager(gspec.Config, lightSync, NetworkId, evmux, engine, chain, nil, db, odr, nil) + pm, err := NewProtocolManager(gspec.Config, lightSync, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, make(chan struct{}), new(sync.WaitGroup)) if err != nil { - return nil, nil, nil, err + return nil, err } if !lightSync { srv := &LesServer{protocolManager: pm} @@ -174,20 +173,20 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor srv.fcManager = flowcontrol.NewClientManager(50, 10, 1000000000) srv.fcCostStats = newCostStats(nil) } - pm.Start(nil) - return pm, db, odr, nil + pm.Start() + return pm, nil } // newTestProtocolManagerMust creates a new protocol manager for testing purposes, // with the given number of blocks already known, and potential notification // channels for different events. In case of an error, the constructor force- // fails the test. -func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen)) (*ProtocolManager, ethdb.Database, *LesOdr) { - pm, db, odr, err := newTestProtocolManager(lightSync, blocks, generator) +func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen), peers *peerSet, odr *LesOdr, db ethdb.Database) *ProtocolManager { + pm, err := newTestProtocolManager(lightSync, blocks, generator, peers, odr, db) if err != nil { t.Fatalf("Failed to create protocol manager: %v", err) } - return pm, db, odr + return pm } // testTxPool is a fake, helper transaction pool for testing purposes @@ -342,30 +341,3 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu func (p *testPeer) close() { p.app.Close() } - -type testServerPool struct { - peer *peer - lock sync.RWMutex -} - -func (p *testServerPool) setPeer(peer *peer) { - p.lock.Lock() - defer p.lock.Unlock() - - p.peer = peer -} - -func (p *testServerPool) getAllPeers() map[distPeer]struct{} { - p.lock.RLock() - defer p.lock.RUnlock() - - m := make(map[distPeer]struct{}) - if p.peer != nil { - m[p.peer] = struct{}{} - } - return m -} - -func (p *testServerPool) adjustResponseTime(*poolEntry, time.Duration, bool) { - -} diff --git a/les/odr.go b/les/odr.go index 684f36c76..3f7584b48 100644 --- a/les/odr.go +++ b/les/odr.go @@ -18,45 +18,24 @@ package les import ( "context" - "crypto/rand" - "encoding/binary" - "sync" - "time" - "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" ) -var ( - softRequestTimeout = time.Millisecond * 500 - hardRequestTimeout = time.Second * 10 -) - -// peerDropFn is a callback type for dropping a peer detected as malicious. -type peerDropFn func(id string) - -type odrPeerSelector interface { - adjustResponseTime(*poolEntry, time.Duration, bool) -} - +// LesOdr implements light.OdrBackend type LesOdr struct { - light.OdrBackend - db ethdb.Database - stop chan struct{} - removePeer peerDropFn - mlock, clock sync.Mutex - sentReqs map[uint64]*sentReq - serverPool odrPeerSelector - reqDist *requestDistributor + db ethdb.Database + stop chan struct{} + retriever *retrieveManager } -func NewLesOdr(db ethdb.Database) *LesOdr { +func NewLesOdr(db ethdb.Database, retriever *retrieveManager) *LesOdr { return &LesOdr{ - db: db, - stop: make(chan struct{}), - sentReqs: make(map[uint64]*sentReq), + db: db, + retriever: retriever, + stop: make(chan struct{}), } } @@ -68,17 +47,6 @@ func (odr *LesOdr) Database() ethdb.Database { return odr.db } -// validatorFunc is a function that processes a message. -type validatorFunc func(ethdb.Database, *Msg) error - -// sentReq is a request waiting for an answer that satisfies its valFunc -type sentReq struct { - valFunc validatorFunc - sentTo map[*peer]chan struct{} - lock sync.RWMutex // protects acces to sentTo - answered chan struct{} // closed and set to nil when any peer answers it -} - const ( MsgBlockBodies = iota MsgCode @@ -94,156 +62,29 @@ type Msg struct { Obj interface{} } -// Deliver is called by the LES protocol manager to deliver ODR reply messages to waiting requests -func (self *LesOdr) Deliver(peer *peer, msg *Msg) error { - var delivered chan struct{} - self.mlock.Lock() - req, ok := self.sentReqs[msg.ReqID] - self.mlock.Unlock() - if ok { - req.lock.Lock() - delivered, ok = req.sentTo[peer] - req.lock.Unlock() - } - - if !ok { - return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID) - } - - if err := req.valFunc(self.db, msg); err != nil { - peer.Log().Warn("Invalid odr response", "err", err) - return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID) - } - close(delivered) - req.lock.Lock() - delete(req.sentTo, peer) - if req.answered != nil { - close(req.answered) - req.answered = nil - } - req.lock.Unlock() - return nil -} - -func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout chan struct{}, reqWg *sync.WaitGroup) { - stime := mclock.Now() - defer func() { - req.lock.Lock() - delete(req.sentTo, peer) - req.lock.Unlock() - reqWg.Done() - }() - - select { - case <-delivered: - if self.serverPool != nil { - self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), false) - } - return - case <-time.After(softRequestTimeout): - close(timeout) - case <-self.stop: - return - } - - select { - case <-delivered: - case <-time.After(hardRequestTimeout): - peer.Log().Debug("Request timed out hard") - go self.removePeer(peer.id) - case <-self.stop: - return - } - if self.serverPool != nil { - self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), true) - } -} - -// networkRequest sends a request to known peers until an answer is received -// or the context is cancelled -func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) error { - answered := make(chan struct{}) - req := &sentReq{ - valFunc: lreq.Validate, - sentTo: make(map[*peer]chan struct{}), - answered: answered, // reply delivered by any peer - } - - exclude := make(map[*peer]struct{}) - - reqWg := new(sync.WaitGroup) - reqWg.Add(1) - defer reqWg.Done() +// Retrieve tries to fetch an object from the LES network. +// If the network retrieval was successful, it stores the object in local db. +func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) { + lreq := LesRequest(req) - var timeout chan struct{} - reqID := getNextReqID() + reqID := genReqID() rq := &distReq{ getCost: func(dp distPeer) uint64 { return lreq.GetCost(dp.(*peer)) }, canSend: func(dp distPeer) bool { p := dp.(*peer) - _, ok := exclude[p] - return !ok && lreq.CanSend(p) + return lreq.CanSend(p) }, request: func(dp distPeer) func() { p := dp.(*peer) - exclude[p] = struct{}{} - delivered := make(chan struct{}) - timeout = make(chan struct{}) - req.lock.Lock() - req.sentTo[p] = delivered - req.lock.Unlock() - reqWg.Add(1) cost := lreq.GetCost(p) p.fcServer.QueueRequest(reqID, cost) - go self.requestPeer(req, p, delivered, timeout, reqWg) return func() { lreq.Request(reqID, p) } }, } - self.mlock.Lock() - self.sentReqs[reqID] = req - self.mlock.Unlock() - - go func() { - reqWg.Wait() - self.mlock.Lock() - delete(self.sentReqs, reqID) - self.mlock.Unlock() - }() - - for { - peerChn := self.reqDist.queue(rq) - select { - case <-ctx.Done(): - self.reqDist.cancel(rq) - return ctx.Err() - case <-answered: - self.reqDist.cancel(rq) - return nil - case _, ok := <-peerChn: - if !ok { - return ErrNoPeers - } - } - - select { - case <-ctx.Done(): - return ctx.Err() - case <-answered: - return nil - case <-timeout: - } - } -} - -// Retrieve tries to fetch an object from the LES network. -// If the network retrieval was successful, it stores the object in local db. -func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) { - lreq := LesRequest(req) - err = self.networkRequest(ctx, lreq) - if err == nil { + if err = self.retriever.retrieve(ctx, reqID, rq, func(p distPeer, msg *Msg) error { return lreq.Validate(self.db, msg) }); err == nil { // retrieved from network, store in db req.StoreResult(self.db) } else { @@ -251,9 +92,3 @@ func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err err } return } - -func getNextReqID() uint64 { - var rnd [8]byte - rand.Read(rnd[:]) - return binary.BigEndian.Uint64(rnd[:]) -} diff --git a/les/odr_test.go b/les/odr_test.go index 532de4d80..7b34996ce 100644 --- a/les/odr_test.go +++ b/les/odr_test.go @@ -158,15 +158,15 @@ func odrContractCall(ctx context.Context, db ethdb.Database, config *params.Chai func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) { // Assemble the test environment - pm, db, odr := newTestProtocolManagerMust(t, false, 4, testChainGen) - lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil) + peers := newPeerSet() + dist := newRequestDistributor(peers, make(chan struct{})) + rm := newRetrieveManager(peers, dist, nil) + db, _ := ethdb.NewMemDatabase() + ldb, _ := ethdb.NewMemDatabase() + odr := NewLesOdr(ldb, rm) + pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) + lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb) _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) - pool := &testServerPool{} - lpm.reqDist = newRequestDistributor(pool.getAllPeers, lpm.quitSync) - odr.reqDist = lpm.reqDist - pool.setPeer(lpeer) - odr.serverPool = pool - lpeer.hasBlock = func(common.Hash, uint64) bool { return true } select { case <-time.After(time.Millisecond * 100): case err := <-err1: @@ -198,13 +198,19 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) { } // temporarily remove peer to test odr fails - pool.setPeer(nil) // expect retrievals to fail (except genesis block) without a les peer + peers.Unregister(lpeer.id) + time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed test(expFail) - pool.setPeer(lpeer) // expect all retrievals to pass + peers.Register(lpeer) + time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed + lpeer.lock.Lock() + lpeer.hasBlock = func(common.Hash, uint64) bool { return true } + lpeer.lock.Unlock() test(5) - pool.setPeer(nil) // still expect all retrievals to pass, now data should be cached locally + peers.Unregister(lpeer.id) + time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed test(5) } diff --git a/les/peer.go b/les/peer.go index ab55bafe3..791d0da24 100644 --- a/les/peer.go +++ b/les/peer.go @@ -166,9 +166,9 @@ func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 { // HasBlock checks if the peer has a given block func (p *peer) HasBlock(hash common.Hash, number uint64) bool { p.lock.RLock() - hashBlock := p.hasBlock + hasBlock := p.hasBlock p.lock.RUnlock() - return hashBlock != nil && hashBlock(hash, number) + return hasBlock != nil && hasBlock(hash, number) } // SendAnnounce announces the availability of a number of blocks through @@ -433,12 +433,20 @@ func (p *peer) String() string { ) } +// peerSetNotify is a callback interface to notify services about added or +// removed peers +type peerSetNotify interface { + registerPeer(*peer) + unregisterPeer(*peer) +} + // peerSet represents the collection of active peers currently participating in // the Light Ethereum sub-protocol. type peerSet struct { - peers map[string]*peer - lock sync.RWMutex - closed bool + peers map[string]*peer + lock sync.RWMutex + notifyList []peerSetNotify + closed bool } // newPeerSet creates a new peer set to track the active participants. @@ -448,6 +456,17 @@ func newPeerSet() *peerSet { } } +// notify adds a service to be notified about added or removed peers +func (ps *peerSet) notify(n peerSetNotify) { + ps.lock.Lock() + defer ps.lock.Unlock() + + ps.notifyList = append(ps.notifyList, n) + for _, p := range ps.peers { + go n.registerPeer(p) + } +} + // Register injects a new peer into the working set, or returns an error if the // peer is already known. func (ps *peerSet) Register(p *peer) error { @@ -462,11 +481,14 @@ func (ps *peerSet) Register(p *peer) error { } ps.peers[p.id] = p p.sendQueue = newExecQueue(100) + for _, n := range ps.notifyList { + go n.registerPeer(p) + } return nil } // Unregister removes a remote peer from the active set, disabling any further -// actions to/from that particular entity. +// actions to/from that particular entity. It also initiates disconnection at the networking layer. func (ps *peerSet) Unregister(id string) error { ps.lock.Lock() defer ps.lock.Unlock() @@ -474,7 +496,11 @@ func (ps *peerSet) Unregister(id string) error { if p, ok := ps.peers[id]; !ok { return errNotRegistered } else { + for _, n := range ps.notifyList { + go n.unregisterPeer(p) + } p.sendQueue.quit() + p.Peer.Disconnect(p2p.DiscUselessPeer) } delete(ps.peers, id) return nil diff --git a/les/request_test.go b/les/request_test.go index ba1fc15bd..3add5f20d 100644 --- a/les/request_test.go +++ b/les/request_test.go @@ -68,15 +68,16 @@ func tfCodeAccess(db ethdb.Database, bhash common.Hash, number uint64) light.Odr func testAccess(t *testing.T, protocol int, fn accessTestFn) { // Assemble the test environment - pm, db, _ := newTestProtocolManagerMust(t, false, 4, testChainGen) - lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil) + peers := newPeerSet() + dist := newRequestDistributor(peers, make(chan struct{})) + rm := newRetrieveManager(peers, dist, nil) + db, _ := ethdb.NewMemDatabase() + ldb, _ := ethdb.NewMemDatabase() + odr := NewLesOdr(ldb, rm) + + pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) + lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb) _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) - pool := &testServerPool{} - lpm.reqDist = newRequestDistributor(pool.getAllPeers, lpm.quitSync) - odr.reqDist = lpm.reqDist - pool.setPeer(lpeer) - odr.serverPool = pool - lpeer.hasBlock = func(common.Hash, uint64) bool { return true } select { case <-time.After(time.Millisecond * 100): case err := <-err1: @@ -108,10 +109,16 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) { } // temporarily remove peer to test odr fails - pool.setPeer(nil) + peers.Unregister(lpeer.id) + time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed // expect retrievals to fail (except genesis block) without a les peer test(0) - pool.setPeer(lpeer) + + peers.Register(lpeer) + time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed + lpeer.lock.Lock() + lpeer.hasBlock = func(common.Hash, uint64) bool { return true } + lpeer.lock.Unlock() // expect all retrievals to pass test(5) } diff --git a/les/retrieve.go b/les/retrieve.go new file mode 100644 index 000000000..b060e0b0d --- /dev/null +++ b/les/retrieve.go @@ -0,0 +1,395 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package light implements on-demand retrieval capable state and chain objects +// for the Ethereum Light Client. +package les + +import ( + "context" + "crypto/rand" + "encoding/binary" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" +) + +var ( + retryQueue = time.Millisecond * 100 + softRequestTimeout = time.Millisecond * 500 + hardRequestTimeout = time.Second * 10 +) + +// retrieveManager is a layer on top of requestDistributor which takes care of +// matching replies by request ID and handles timeouts and resends if necessary. +type retrieveManager struct { + dist *requestDistributor + peers *peerSet + serverPool peerSelector + + lock sync.RWMutex + sentReqs map[uint64]*sentReq +} + +// validatorFunc is a function that processes a reply message +type validatorFunc func(distPeer, *Msg) error + +// peerSelector receives feedback info about response times and timeouts +type peerSelector interface { + adjustResponseTime(*poolEntry, time.Duration, bool) +} + +// sentReq represents a request sent and tracked by retrieveManager +type sentReq struct { + rm *retrieveManager + req *distReq + id uint64 + validate validatorFunc + + eventsCh chan reqPeerEvent + stopCh chan struct{} + stopped bool + err error + + lock sync.RWMutex // protect access to sentTo map + sentTo map[distPeer]sentReqToPeer + + reqQueued bool // a request has been queued but not sent + reqSent bool // a request has been sent but not timed out + reqSrtoCount int // number of requests that reached soft (but not hard) timeout +} + +// sentReqToPeer notifies the request-from-peer goroutine (tryRequest) about a response +// delivered by the given peer. Only one delivery is allowed per request per peer, +// after which delivered is set to true, the validity of the response is sent on the +// valid channel and no more responses are accepted. +type sentReqToPeer struct { + delivered bool + valid chan bool +} + +// reqPeerEvent is sent by the request-from-peer goroutine (tryRequest) to the +// request state machine (retrieveLoop) through the eventsCh channel. +type reqPeerEvent struct { + event int + peer distPeer +} + +const ( + rpSent = iota // if peer == nil, not sent (no suitable peers) + rpSoftTimeout + rpHardTimeout + rpDeliveredValid + rpDeliveredInvalid +) + +// newRetrieveManager creates the retrieve manager +func newRetrieveManager(peers *peerSet, dist *requestDistributor, serverPool peerSelector) *retrieveManager { + return &retrieveManager{ + peers: peers, + dist: dist, + serverPool: serverPool, + sentReqs: make(map[uint64]*sentReq), + } +} + +// retrieve sends a request (to multiple peers if necessary) and waits for an answer +// that is delivered through the deliver function and successfully validated by the +// validator callback. It returns when a valid answer is delivered or the context is +// cancelled. +func (rm *retrieveManager) retrieve(ctx context.Context, reqID uint64, req *distReq, val validatorFunc) error { + sentReq := rm.sendReq(reqID, req, val) + select { + case <-sentReq.stopCh: + case <-ctx.Done(): + sentReq.stop(ctx.Err()) + } + return sentReq.getError() +} + +// sendReq starts a process that keeps trying to retrieve a valid answer for a +// request from any suitable peers until stopped or succeeded. +func (rm *retrieveManager) sendReq(reqID uint64, req *distReq, val validatorFunc) *sentReq { + r := &sentReq{ + rm: rm, + req: req, + id: reqID, + sentTo: make(map[distPeer]sentReqToPeer), + stopCh: make(chan struct{}), + eventsCh: make(chan reqPeerEvent, 10), + validate: val, + } + + canSend := req.canSend + req.canSend = func(p distPeer) bool { + // add an extra check to canSend: the request has not been sent to the same peer before + r.lock.RLock() + _, sent := r.sentTo[p] + r.lock.RUnlock() + return !sent && canSend(p) + } + + request := req.request + req.request = func(p distPeer) func() { + // before actually sending the request, put an entry into the sentTo map + r.lock.Lock() + r.sentTo[p] = sentReqToPeer{false, make(chan bool, 1)} + r.lock.Unlock() + return request(p) + } + rm.lock.Lock() + rm.sentReqs[reqID] = r + rm.lock.Unlock() + + go r.retrieveLoop() + return r +} + +// deliver is called by the LES protocol manager to deliver reply messages to waiting requests +func (rm *retrieveManager) deliver(peer distPeer, msg *Msg) error { + rm.lock.RLock() + req, ok := rm.sentReqs[msg.ReqID] + rm.lock.RUnlock() + + if ok { + return req.deliver(peer, msg) + } + return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID) +} + +// reqStateFn represents a state of the retrieve loop state machine +type reqStateFn func() reqStateFn + +// retrieveLoop is the retrieval state machine event loop +func (r *sentReq) retrieveLoop() { + go r.tryRequest() + r.reqQueued = true + state := r.stateRequesting + + for state != nil { + state = state() + } + + r.rm.lock.Lock() + delete(r.rm.sentReqs, r.id) + r.rm.lock.Unlock() +} + +// stateRequesting: a request has been queued or sent recently; when it reaches soft timeout, +// a new request is sent to a new peer +func (r *sentReq) stateRequesting() reqStateFn { + select { + case ev := <-r.eventsCh: + r.update(ev) + switch ev.event { + case rpSent: + if ev.peer == nil { + // request send failed, no more suitable peers + if r.waiting() { + // we are already waiting for sent requests which may succeed so keep waiting + return r.stateNoMorePeers + } + // nothing to wait for, no more peers to ask, return with error + r.stop(ErrNoPeers) + // no need to go to stopped state because waiting() already returned false + return nil + } + case rpSoftTimeout: + // last request timed out, try asking a new peer + go r.tryRequest() + r.reqQueued = true + return r.stateRequesting + case rpDeliveredValid: + r.stop(nil) + return r.stateStopped + } + return r.stateRequesting + case <-r.stopCh: + return r.stateStopped + } +} + +// stateNoMorePeers: could not send more requests because no suitable peers are available. +// Peers may become suitable for a certain request later or new peers may appear so we +// keep trying. +func (r *sentReq) stateNoMorePeers() reqStateFn { + select { + case <-time.After(retryQueue): + go r.tryRequest() + r.reqQueued = true + return r.stateRequesting + case ev := <-r.eventsCh: + r.update(ev) + if ev.event == rpDeliveredValid { + r.stop(nil) + return r.stateStopped + } + return r.stateNoMorePeers + case <-r.stopCh: + return r.stateStopped + } +} + +// stateStopped: request succeeded or cancelled, just waiting for some peers +// to either answer or time out hard +func (r *sentReq) stateStopped() reqStateFn { + for r.waiting() { + r.update(<-r.eventsCh) + } + return nil +} + +// update updates the queued/sent flags and timed out peers counter according to the event +func (r *sentReq) update(ev reqPeerEvent) { + switch ev.event { + case rpSent: + r.reqQueued = false + if ev.peer != nil { + r.reqSent = true + } + case rpSoftTimeout: + r.reqSent = false + r.reqSrtoCount++ + case rpHardTimeout, rpDeliveredValid, rpDeliveredInvalid: + r.reqSrtoCount-- + } +} + +// waiting returns true if the retrieval mechanism is waiting for an answer from +// any peer +func (r *sentReq) waiting() bool { + return r.reqQueued || r.reqSent || r.reqSrtoCount > 0 +} + +// tryRequest tries to send the request to a new peer and waits for it to either +// succeed or time out if it has been sent. It also sends the appropriate reqPeerEvent +// messages to the request's event channel. +func (r *sentReq) tryRequest() { + sent := r.rm.dist.queue(r.req) + var p distPeer + select { + case p = <-sent: + case <-r.stopCh: + if r.rm.dist.cancel(r.req) { + p = nil + } else { + p = <-sent + } + } + + r.eventsCh <- reqPeerEvent{rpSent, p} + if p == nil { + return + } + + reqSent := mclock.Now() + srto, hrto := false, false + + r.lock.RLock() + s, ok := r.sentTo[p] + r.lock.RUnlock() + if !ok { + panic(nil) + } + + defer func() { + // send feedback to server pool and remove peer if hard timeout happened + pp, ok := p.(*peer) + if ok && r.rm.serverPool != nil { + respTime := time.Duration(mclock.Now() - reqSent) + r.rm.serverPool.adjustResponseTime(pp.poolEntry, respTime, srto) + } + if hrto { + pp.Log().Debug("Request timed out hard") + if r.rm.peers != nil { + r.rm.peers.Unregister(pp.id) + } + } + + r.lock.Lock() + delete(r.sentTo, p) + r.lock.Unlock() + }() + + select { + case ok := <-s.valid: + if ok { + r.eventsCh <- reqPeerEvent{rpDeliveredValid, p} + } else { + r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p} + } + return + case <-time.After(softRequestTimeout): + srto = true + r.eventsCh <- reqPeerEvent{rpSoftTimeout, p} + } + + select { + case ok := <-s.valid: + if ok { + r.eventsCh <- reqPeerEvent{rpDeliveredValid, p} + } else { + r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p} + } + case <-time.After(hardRequestTimeout): + hrto = true + r.eventsCh <- reqPeerEvent{rpHardTimeout, p} + } +} + +// deliver a reply belonging to this request +func (r *sentReq) deliver(peer distPeer, msg *Msg) error { + r.lock.Lock() + defer r.lock.Unlock() + + s, ok := r.sentTo[peer] + if !ok || s.delivered { + return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID) + } + valid := r.validate(peer, msg) == nil + r.sentTo[peer] = sentReqToPeer{true, s.valid} + s.valid <- valid + if !valid { + return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID) + } + return nil +} + +// stop stops the retrieval process and sets an error code that will be returned +// by getError +func (r *sentReq) stop(err error) { + r.lock.Lock() + if !r.stopped { + r.stopped = true + r.err = err + close(r.stopCh) + } + r.lock.Unlock() +} + +// getError returns any retrieval error (either internally generated or set by the +// stop function) after stopCh has been closed +func (r *sentReq) getError() error { + return r.err +} + +// genReqID generates a new random request ID +func genReqID() uint64 { + var rnd [8]byte + rand.Read(rnd[:]) + return binary.BigEndian.Uint64(rnd[:]) +} diff --git a/les/server.go b/les/server.go index 22fe59b7a..2ff715ea8 100644 --- a/les/server.go +++ b/les/server.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discv5" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" ) @@ -41,17 +42,24 @@ type LesServer struct { fcManager *flowcontrol.ClientManager // nil if our node is client only fcCostStats *requestCostStats defParams *flowcontrol.ServerParams + lesTopic discv5.Topic + quitSync chan struct{} stopped bool } func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { - pm, err := NewProtocolManager(eth.BlockChain().Config(), false, config.NetworkId, eth.EventMux(), eth.Engine(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil) + quitSync := make(chan struct{}) + pm, err := NewProtocolManager(eth.BlockChain().Config(), false, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, quitSync, new(sync.WaitGroup)) if err != nil { return nil, err } pm.blockLoop() - srv := &LesServer{protocolManager: pm} + srv := &LesServer{ + protocolManager: pm, + quitSync: quitSync, + lesTopic: lesTopic(eth.BlockChain().Genesis().Hash()), + } pm.server = srv srv.defParams = &flowcontrol.ServerParams{ @@ -69,7 +77,14 @@ func (s *LesServer) Protocols() []p2p.Protocol { // Start starts the LES server func (s *LesServer) Start(srvr *p2p.Server) { - s.protocolManager.Start(srvr) + s.protocolManager.Start() + go func() { + logger := log.New("topic", s.lesTopic) + logger.Info("Starting topic registration") + defer logger.Info("Terminated topic registration") + + srvr.DiscV5.RegisterTopic(s.lesTopic, s.quitSync) + }() } // Stop stops the LES service diff --git a/les/serverpool.go b/les/serverpool.go index 64fe991c6..f4e4df2fb 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -102,6 +102,8 @@ type serverPool struct { wg *sync.WaitGroup connWg sync.WaitGroup + topic discv5.Topic + discSetPeriod chan time.Duration discNodes chan *discv5.Node discLookups chan bool @@ -118,11 +120,9 @@ type serverPool struct { } // newServerPool creates a new serverPool instance -func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic discv5.Topic, quit chan struct{}, wg *sync.WaitGroup) *serverPool { +func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *serverPool { pool := &serverPool{ db: db, - dbKey: append(dbPrefix, []byte(topic)...), - server: server, quit: quit, wg: wg, entries: make(map[discover.NodeID]*poolEntry), @@ -135,19 +135,25 @@ func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic } pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry) pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry) - wg.Add(1) + return pool +} + +func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) { + pool.server = server + pool.topic = topic + pool.dbKey = append([]byte("serverPool/"), []byte(topic)...) + pool.wg.Add(1) pool.loadNodes() - pool.checkDial() + go pool.eventLoop() + + pool.checkDial() if pool.server.DiscV5 != nil { pool.discSetPeriod = make(chan time.Duration, 1) pool.discNodes = make(chan *discv5.Node, 100) pool.discLookups = make(chan bool, 100) - go pool.server.DiscV5.SearchTopic(topic, pool.discSetPeriod, pool.discNodes, pool.discLookups) + go pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, pool.discNodes, pool.discLookups) } - - go pool.eventLoop() - return pool } // connect should be called upon any incoming connection. If the connection has been @@ -485,7 +491,7 @@ func (pool *serverPool) checkDial() { // dial initiates a new connection func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) { - if entry.state != psNotConnected { + if pool.server == nil || entry.state != psNotConnected { return } entry.state = psDialed diff --git a/les/txrelay.go b/les/txrelay.go index 1ca3467e4..7a02cc837 100644 --- a/les/txrelay.go +++ b/les/txrelay.go @@ -39,26 +39,28 @@ type LesTxRelay struct { reqDist *requestDistributor } -func NewLesTxRelay() *LesTxRelay { - return &LesTxRelay{ +func NewLesTxRelay(ps *peerSet, reqDist *requestDistributor) *LesTxRelay { + r := &LesTxRelay{ txSent: make(map[common.Hash]*ltrInfo), txPending: make(map[common.Hash]struct{}), + ps: ps, + reqDist: reqDist, } + ps.notify(r) + return r } -func (self *LesTxRelay) addPeer(p *peer) { +func (self *LesTxRelay) registerPeer(p *peer) { self.lock.Lock() defer self.lock.Unlock() - self.ps.Register(p) self.peerList = self.ps.AllPeers() } -func (self *LesTxRelay) removePeer(id string) { +func (self *LesTxRelay) unregisterPeer(p *peer) { self.lock.Lock() defer self.lock.Unlock() - self.ps.Unregister(id) self.peerList = self.ps.AllPeers() } @@ -112,7 +114,7 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) { pp := p ll := list - reqID := getNextReqID() + reqID := genReqID() rq := &distReq{ getCost: func(dp distPeer) uint64 { peer := dp.(*peer) diff --git a/light/txpool.go b/light/txpool.go index 446195806..7276874b8 100644 --- a/light/txpool.go +++ b/light/txpool.go @@ -360,7 +360,7 @@ func (pool *TxPool) validateTx(ctx context.Context, tx *types.Transaction) error currentState := pool.currentState() if n, err := currentState.GetNonce(ctx, from); err == nil { if n > tx.Nonce() { - return core.ErrNonce + return core.ErrNonceTooLow } } else { return err diff --git a/node/service.go b/node/service.go index 5e1eb0e64..55062a500 100644 --- a/node/service.go +++ b/node/service.go @@ -43,7 +43,11 @@ func (ctx *ServiceContext) OpenDatabase(name string, cache int, handles int) (et if ctx.config.DataDir == "" { return ethdb.NewMemDatabase() } - return ethdb.NewLDBDatabase(ctx.config.resolvePath(name), cache, handles) + db, err := ethdb.NewLDBDatabase(ctx.config.resolvePath(name), cache, handles) + if err != nil { + return nil, err + } + return db, nil } // ResolvePath resolves a user path into the data directory if that was relative diff --git a/params/version.go b/params/version.go index fbf49ead9..d54c69b71 100644 --- a/params/version.go +++ b/params/version.go @@ -23,7 +23,7 @@ import ( const ( VersionMajor = 1 // Major version component of the current release VersionMinor = 6 // Minor version component of the current release - VersionPatch = 6 // Patch version component of the current release + VersionPatch = 7 // Patch version component of the current release VersionMeta = "unstable" // Version metadata to append to the version string ) diff --git a/rpc/http.go b/rpc/http.go index 6bab02ab6..4143e2a8d 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -103,8 +103,8 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { return err } - for _, respmsg := range respmsgs { - op.resp <- &respmsg + for i := 0; i < len(respmsgs); i++ { + op.resp <- &respmsgs[i] } return nil } diff --git a/swarm/fuse/swarmfs_test.go b/swarm/fuse/swarmfs_test.go index f307b38ea..69f3cc615 100644 --- a/swarm/fuse/swarmfs_test.go +++ b/swarm/fuse/swarmfs_test.go @@ -21,13 +21,14 @@ package fuse import ( "bytes" "crypto/rand" - "github.com/ethereum/go-ethereum/swarm/api" - "github.com/ethereum/go-ethereum/swarm/storage" "io" "io/ioutil" "os" "path/filepath" "testing" + + "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/storage" ) type fileInfo struct { @@ -37,26 +38,7 @@ type fileInfo struct { contents []byte } -func testFuseFileSystem(t *testing.T, f func(*api.Api)) { - - datadir, err := ioutil.TempDir("", "fuse") - if err != nil { - t.Fatalf("unable to create temp dir: %v", err) - } - os.RemoveAll(datadir) - - dpa, err := storage.NewLocalDPA(datadir) - if err != nil { - return - } - api := api.NewApi(dpa, nil) - dpa.Start() - f(api) - dpa.Stop() -} - func createTestFilesAndUploadToSwarm(t *testing.T, api *api.Api, files map[string]fileInfo, uploadDir string) string { - os.RemoveAll(uploadDir) for fname, finfo := range files { @@ -89,8 +71,6 @@ func createTestFilesAndUploadToSwarm(t *testing.T, api *api.Api, files map[strin } func mountDir(t *testing.T, api *api.Api, files map[string]fileInfo, bzzHash string, mountDir string) *SwarmFS { - - // Test Mount os.RemoveAll(mountDir) os.MkdirAll(mountDir, 0777) swarmfs := NewSwarmFS(api) @@ -123,11 +103,9 @@ func mountDir(t *testing.T, api *api.Api, files map[string]fileInfo, bzzHash str compareGeneratedFileWithFileInMount(t, files, mountDir) return swarmfs - } func compareGeneratedFileWithFileInMount(t *testing.T, files map[string]fileInfo, mountDir string) { - err := filepath.Walk(mountDir, func(path string, f os.FileInfo, err error) error { if f.IsDir() { return nil @@ -143,7 +121,6 @@ func compareGeneratedFileWithFileInMount(t *testing.T, files map[string]fileInfo } for fname, finfo := range files { - destinationFile := filepath.Join(mountDir, fname) dfinfo, err := os.Stat(destinationFile) @@ -163,18 +140,15 @@ func compareGeneratedFileWithFileInMount(t *testing.T, files map[string]fileInfo if err != nil { t.Fatalf("Could not readfile %v : %v", fname, err) } - if bytes.Compare(fileContents, finfo.contents) != 0 { t.Fatalf("File %v contents mismatch: %v , %v", fname, fileContents, finfo.contents) } - // TODO: check uid and gid } } func checkFile(t *testing.T, testMountDir, fname string, contents []byte) { - destinationFile := filepath.Join(testMountDir, fname) dfinfo, err1 := os.Stat(destinationFile) if err1 != nil { @@ -201,10 +175,9 @@ func getRandomBtes(size int) []byte { contents := make([]byte, size) rand.Read(contents) return contents - } -func IsDirEmpty(name string) bool { +func isDirEmpty(name string) bool { f, err := os.Open(name) if err != nil { return false @@ -218,8 +191,11 @@ func IsDirEmpty(name string) bool { return false } -func testMountListAndUnmount(api *api.Api, t *testing.T) { +type testAPI struct { + api *api.Api +} +func (ta *testAPI) mountListAndUnmount(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "fuse-source") testMountDir, _ := ioutil.TempDir(os.TempDir(), "fuse-dest") @@ -240,9 +216,9 @@ func testMountListAndUnmount(api *api.Api, t *testing.T) { files["twice/2.txt"] = fileInfo{0777, 444, 333, getRandomBtes(200)} files["one/two/three/four/five/six/seven/eight/nine/10.txt"] = fileInfo{0777, 333, 444, getRandomBtes(10240)} files["one/two/three/four/five/six/six"] = fileInfo{0777, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs.Stop() // Check unmount @@ -250,53 +226,52 @@ func testMountListAndUnmount(api *api.Api, t *testing.T) { if err != nil { t.Fatalf("could not unmount %v", bzzHash) } - if !IsDirEmpty(testMountDir) { + if !isDirEmpty(testMountDir) { t.Fatalf("unmount didnt work for %v", testMountDir) } } -func testMaxMounts(api *api.Api, t *testing.T) { - +func (ta *testAPI) maxMounts(t *testing.T) { files := make(map[string]fileInfo) files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} uploadDir1, _ := ioutil.TempDir(os.TempDir(), "max-upload1") - bzzHash1 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir1) + bzzHash1 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir1) mount1, _ := ioutil.TempDir(os.TempDir(), "max-mount1") - swarmfs1 := mountDir(t, api, files, bzzHash1, mount1) + swarmfs1 := mountDir(t, ta.api, files, bzzHash1, mount1) defer swarmfs1.Stop() files["2.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} uploadDir2, _ := ioutil.TempDir(os.TempDir(), "max-upload2") - bzzHash2 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir2) + bzzHash2 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir2) mount2, _ := ioutil.TempDir(os.TempDir(), "max-mount2") - swarmfs2 := mountDir(t, api, files, bzzHash2, mount2) + swarmfs2 := mountDir(t, ta.api, files, bzzHash2, mount2) defer swarmfs2.Stop() files["3.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} uploadDir3, _ := ioutil.TempDir(os.TempDir(), "max-upload3") - bzzHash3 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir3) + bzzHash3 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir3) mount3, _ := ioutil.TempDir(os.TempDir(), "max-mount3") - swarmfs3 := mountDir(t, api, files, bzzHash3, mount3) + swarmfs3 := mountDir(t, ta.api, files, bzzHash3, mount3) defer swarmfs3.Stop() files["4.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} uploadDir4, _ := ioutil.TempDir(os.TempDir(), "max-upload4") - bzzHash4 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir4) + bzzHash4 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir4) mount4, _ := ioutil.TempDir(os.TempDir(), "max-mount4") - swarmfs4 := mountDir(t, api, files, bzzHash4, mount4) + swarmfs4 := mountDir(t, ta.api, files, bzzHash4, mount4) defer swarmfs4.Stop() files["5.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} uploadDir5, _ := ioutil.TempDir(os.TempDir(), "max-upload5") - bzzHash5 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir5) + bzzHash5 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir5) mount5, _ := ioutil.TempDir(os.TempDir(), "max-mount5") - swarmfs5 := mountDir(t, api, files, bzzHash5, mount5) + swarmfs5 := mountDir(t, ta.api, files, bzzHash5, mount5) defer swarmfs5.Stop() files["6.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} uploadDir6, _ := ioutil.TempDir(os.TempDir(), "max-upload6") - bzzHash6 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir6) + bzzHash6 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir6) mount6, _ := ioutil.TempDir(os.TempDir(), "max-mount6") os.RemoveAll(mount6) @@ -308,18 +283,17 @@ func testMaxMounts(api *api.Api, t *testing.T) { } -func testReMounts(api *api.Api, t *testing.T) { - +func (ta *testAPI) remount(t *testing.T) { files := make(map[string]fileInfo) files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} uploadDir1, _ := ioutil.TempDir(os.TempDir(), "re-upload1") - bzzHash1 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir1) + bzzHash1 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir1) testMountDir1, _ := ioutil.TempDir(os.TempDir(), "re-mount1") - swarmfs := mountDir(t, api, files, bzzHash1, testMountDir1) + swarmfs := mountDir(t, ta.api, files, bzzHash1, testMountDir1) defer swarmfs.Stop() uploadDir2, _ := ioutil.TempDir(os.TempDir(), "re-upload2") - bzzHash2 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir2) + bzzHash2 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir2) testMountDir2, _ := ioutil.TempDir(os.TempDir(), "re-mount2") // try mounting the same hash second time @@ -341,19 +315,17 @@ func testReMounts(api *api.Api, t *testing.T) { if err == nil { t.Fatalf("Error mounting hash %v", bzzHash2) } - } -func testUnmount(api *api.Api, t *testing.T) { - +func (ta *testAPI) unmount(t *testing.T) { files := make(map[string]fileInfo) uploadDir, _ := ioutil.TempDir(os.TempDir(), "ex-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "ex-mount") files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, uploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir) - swarmfs := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs.Stop() swarmfs.Unmount(testMountDir) @@ -364,19 +336,17 @@ func testUnmount(api *api.Api, t *testing.T) { t.Fatalf("mount state not cleaned up in unmount case %v", testMountDir) } } - } -func testUnmountWhenResourceBusy(api *api.Api, t *testing.T) { - +func (ta *testAPI) unmountWhenResourceBusy(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "ex-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "ex-mount") files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs.Stop() actualPath := filepath.Join(testMountDir, "2.txt") @@ -395,18 +365,17 @@ func testUnmountWhenResourceBusy(api *api.Api, t *testing.T) { t.Fatalf("mount state not cleaned up in unmount case %v", testMountDir) } } - } -func testSeekInMultiChunkFile(api *api.Api, t *testing.T) { +func (ta *testAPI) seekInMultiChunkFile(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "seek-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "seek-mount") files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10240)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs.Stop() // Create a new file seek the second chunk @@ -423,11 +392,9 @@ func testSeekInMultiChunkFile(api *api.Api, t *testing.T) { t.Fatalf("File seek contents mismatch") } d.Close() - } -func testCreateNewFile(api *api.Api, t *testing.T) { - +func (ta *testAPI) createNewFile(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "create-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "create-mount") @@ -435,9 +402,9 @@ func testCreateNewFile(api *api.Api, t *testing.T) { files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() // Create a new file in the root dir and check @@ -458,23 +425,21 @@ func testCreateNewFile(api *api.Api, t *testing.T) { // mount again and see if things are okay files["2.txt"] = fileInfo{0700, 333, 444, contents} - swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir) defer swarmfs2.Stop() checkFile(t, testMountDir, "2.txt", contents) - } -func testCreateNewFileInsideDirectory(api *api.Api, t *testing.T) { - +func (ta *testAPI) createNewFileInsideDirectory(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "createinsidedir-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "createinsidedir-mount") files["one/1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() // Create a new file inside a existing dir and check @@ -496,23 +461,21 @@ func testCreateNewFileInsideDirectory(api *api.Api, t *testing.T) { // mount again and see if things are okay files["one/2.txt"] = fileInfo{0700, 333, 444, contents} - swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir) defer swarmfs2.Stop() checkFile(t, testMountDir, "one/2.txt", contents) - } -func testCreateNewFileInsideNewDirectory(api *api.Api, t *testing.T) { - +func (ta *testAPI) createNewFileInsideNewDirectory(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "createinsidenewdir-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "createinsidenewdir-mount") files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() // Create a new file inside a existing dir and check @@ -535,15 +498,13 @@ func testCreateNewFileInsideNewDirectory(api *api.Api, t *testing.T) { // mount again and see if things are okay files["one/2.txt"] = fileInfo{0700, 333, 444, contents} - swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir) defer swarmfs2.Stop() checkFile(t, testMountDir, "one/2.txt", contents) - } -func testRemoveExistingFile(api *api.Api, t *testing.T) { - +func (ta *testAPI) removeExistingFile(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "remove-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "remove-mount") @@ -551,9 +512,9 @@ func testRemoveExistingFile(api *api.Api, t *testing.T) { files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() // Remove a file in the root dir and check @@ -567,13 +528,11 @@ func testRemoveExistingFile(api *api.Api, t *testing.T) { // mount again and see if things are okay delete(files, "five.txt") - swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir) defer swarmfs2.Stop() - } -func testRemoveExistingFileInsideADir(api *api.Api, t *testing.T) { - +func (ta *testAPI) removeExistingFileInsideDir(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "remove-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "remove-mount") @@ -581,9 +540,9 @@ func testRemoveExistingFileInsideADir(api *api.Api, t *testing.T) { files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["one/five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["one/six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() // Remove a file in the root dir and check @@ -597,12 +556,11 @@ func testRemoveExistingFileInsideADir(api *api.Api, t *testing.T) { // mount again and see if things are okay delete(files, "one/five.txt") - swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir) defer swarmfs2.Stop() - } -func testRemoveNewlyAddedFile(api *api.Api, t *testing.T) { +func (ta *testAPI) removeNewlyAddedFile(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "removenew-upload") @@ -611,9 +569,9 @@ func testRemoveNewlyAddedFile(api *api.Api, t *testing.T) { files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() // Adda a new file and remove it @@ -639,17 +597,15 @@ func testRemoveNewlyAddedFile(api *api.Api, t *testing.T) { } // mount again and see if things are okay - swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir) defer swarmfs2.Stop() if bzzHash != mi.LatestManifest { t.Fatalf("same contents different hash orig(%v): new(%v)", bzzHash, mi.LatestManifest) } - } -func testAddNewFileAndModifyContents(api *api.Api, t *testing.T) { - +func (ta *testAPI) addNewFileAndModifyContents(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "modifyfile-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "modifyfile-mount") @@ -657,9 +613,9 @@ func testAddNewFileAndModifyContents(api *api.Api, t *testing.T) { files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() // Create a new file in the root dir and check @@ -680,7 +636,7 @@ func testAddNewFileAndModifyContents(api *api.Api, t *testing.T) { // mount again and see if things are okay files["2.txt"] = fileInfo{0700, 333, 444, line1} - swarmfs2 := mountDir(t, api, files, mi1.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi1.LatestManifest, testMountDir) defer swarmfs2.Stop() checkFile(t, testMountDir, "2.txt", line1) @@ -691,7 +647,7 @@ func testAddNewFileAndModifyContents(api *api.Api, t *testing.T) { } // mount again and modify - swarmfs3 := mountDir(t, api, files, mi2.LatestManifest, testMountDir) + swarmfs3 := mountDir(t, ta.api, files, mi2.LatestManifest, testMountDir) defer swarmfs3.Stop() fd, err4 := os.OpenFile(actualPath, os.O_RDWR|os.O_APPEND, os.FileMode(0665)) @@ -713,14 +669,13 @@ func testAddNewFileAndModifyContents(api *api.Api, t *testing.T) { b := [][]byte{line1, line2} line1and2 := bytes.Join(b, []byte("")) files["2.txt"] = fileInfo{0700, 333, 444, line1and2} - swarmfs4 := mountDir(t, api, files, mi3.LatestManifest, testMountDir) + swarmfs4 := mountDir(t, ta.api, files, mi3.LatestManifest, testMountDir) defer swarmfs4.Stop() checkFile(t, testMountDir, "2.txt", line1and2) - } -func testRemoveEmptyDir(api *api.Api, t *testing.T) { +func (ta *testAPI) removeEmptyDir(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "rmdir-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "rmdir-mount") @@ -728,9 +683,9 @@ func testRemoveEmptyDir(api *api.Api, t *testing.T) { files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() os.MkdirAll(filepath.Join(testMountDir, "newdir"), 0777) @@ -739,15 +694,12 @@ func testRemoveEmptyDir(api *api.Api, t *testing.T) { if err3 != nil { t.Fatalf("Could not unmount %v", err3) } - if bzzHash != mi.LatestManifest { t.Fatalf("same contents different hash orig(%v): new(%v)", bzzHash, mi.LatestManifest) } - } -func testRemoveDirWhichHasFiles(api *api.Api, t *testing.T) { - +func (ta *testAPI) removeDirWhichHasFiles(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "rmdir-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "rmdir-mount") @@ -755,9 +707,9 @@ func testRemoveDirWhichHasFiles(api *api.Api, t *testing.T) { files["one/1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["two/five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["two/six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() dirPath := filepath.Join(testMountDir, "two") @@ -772,13 +724,11 @@ func testRemoveDirWhichHasFiles(api *api.Api, t *testing.T) { delete(files, "two/five.txt") delete(files, "two/six.txt") - swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir) defer swarmfs2.Stop() - } -func testRemoveDirWhichHasSubDirs(api *api.Api, t *testing.T) { - +func (ta *testAPI) removeDirWhichHasSubDirs(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "rmsubdir-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "rmsubdir-mount") @@ -790,9 +740,9 @@ func testRemoveDirWhichHasSubDirs(api *api.Api, t *testing.T) { files["two/four/6.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["two/four/six/7.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() dirPath := filepath.Join(testMountDir, "two") @@ -810,13 +760,11 @@ func testRemoveDirWhichHasSubDirs(api *api.Api, t *testing.T) { delete(files, "two/four/6.txt") delete(files, "two/four/six/7.txt") - swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir) defer swarmfs2.Stop() - } -func testAppendFileContentsToEnd(api *api.Api, t *testing.T) { - +func (ta *testAPI) appendFileContentsToEnd(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "appendlargefile-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "appendlargefile-mount") @@ -824,9 +772,9 @@ func testAppendFileContentsToEnd(api *api.Api, t *testing.T) { line1 := make([]byte, 10) rand.Read(line1) files["1.txt"] = fileInfo{0700, 333, 444, line1} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() actualPath := filepath.Join(testMountDir, "1.txt") @@ -849,49 +797,42 @@ func testAppendFileContentsToEnd(api *api.Api, t *testing.T) { b := [][]byte{line1, line2} line1and2 := bytes.Join(b, []byte("")) files["1.txt"] = fileInfo{0700, 333, 444, line1and2} - swarmfs2 := mountDir(t, api, files, mi1.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi1.LatestManifest, testMountDir) defer swarmfs2.Stop() checkFile(t, testMountDir, "1.txt", line1and2) - } -func TestSwarmFileSystem(t *testing.T) { - testFuseFileSystem(t, func(api *api.Api) { - - testMountListAndUnmount(api, t) - - testMaxMounts(api, t) - - testReMounts(api, t) - - testUnmount(api, t) - - testUnmountWhenResourceBusy(api, t) - - testSeekInMultiChunkFile(api, t) - - testCreateNewFile(api, t) - - testCreateNewFileInsideDirectory(api, t) - - testCreateNewFileInsideNewDirectory(api, t) - - testRemoveExistingFile(api, t) - - testRemoveExistingFileInsideADir(api, t) - - testRemoveNewlyAddedFile(api, t) - - testAddNewFileAndModifyContents(api, t) - - testRemoveEmptyDir(api, t) - - testRemoveDirWhichHasFiles(api, t) - - testRemoveDirWhichHasSubDirs(api, t) - - testAppendFileContentsToEnd(api, t) +func TestFUSE(t *testing.T) { + datadir, err := ioutil.TempDir("", "fuse") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + os.RemoveAll(datadir) - }) + dpa, err := storage.NewLocalDPA(datadir) + if err != nil { + t.Fatal(err) + } + ta := &testAPI{api: api.NewApi(dpa, nil)} + dpa.Start() + defer dpa.Stop() + + t.Run("mountListAndUmount", ta.mountListAndUnmount) + t.Run("maxMounts", ta.maxMounts) + t.Run("remount", ta.remount) + t.Run("unmount", ta.unmount) + t.Run("unmountWhenResourceBusy", ta.unmountWhenResourceBusy) + t.Run("seekInMultiChunkFile", ta.seekInMultiChunkFile) + t.Run("createNewFile", ta.createNewFile) + t.Run("createNewFileInsideDirectory", ta.createNewFileInsideDirectory) + t.Run("createNewFileInsideNewDirectory", ta.createNewFileInsideNewDirectory) + t.Run("removeExistingFile", ta.removeExistingFile) + t.Run("removeExistingFileInsideDir", ta.removeExistingFileInsideDir) + t.Run("removeNewlyAddedFile", ta.removeNewlyAddedFile) + t.Run("addNewFileAndModifyContents", ta.addNewFileAndModifyContents) + t.Run("removeEmptyDir", ta.removeEmptyDir) + t.Run("removeDirWhichHasFiles", ta.removeDirWhichHasFiles) + t.Run("removeDirWhichHasSubDirs", ta.removeDirWhichHasSubDirs) + t.Run("appendFileContentsToEnd", ta.appendFileContentsToEnd) } diff --git a/swarm/fuse/swarmfs_unix.go b/swarm/fuse/swarmfs_unix.go index f4eecef24..1a8390a4b 100644 --- a/swarm/fuse/swarmfs_unix.go +++ b/swarm/fuse/swarmfs_unix.go @@ -19,18 +19,19 @@ package fuse import ( - "bazil.org/fuse" - "bazil.org/fuse/fs" "errors" "fmt" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/swarm/api" "os" "path/filepath" "strings" "sync" "time" + + "bazil.org/fuse" + "bazil.org/fuse/fs" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/api" ) var ( @@ -203,7 +204,7 @@ func (self *SwarmFS) Unmount(mountpoint string) (*MountInfo, error) { } err = fuse.Unmount(cleanedMountPoint) if err != nil { - err1 := externalUnMount(cleanedMountPoint) + err1 := externalUnmount(cleanedMountPoint) if err1 != nil { errStr := fmt.Sprintf("UnMount error: %v", err) log.Warn(errStr) diff --git a/swarm/fuse/swarmfs_util.go b/swarm/fuse/swarmfs_util.go index d20ab258e..d39966c0e 100644 --- a/swarm/fuse/swarmfs_util.go +++ b/swarm/fuse/swarmfs_util.go @@ -19,47 +19,31 @@ package fuse import ( + "context" "fmt" - "github.com/ethereum/go-ethereum/log" "os/exec" "runtime" - "time" -) -func externalUnMount(mountPoint string) error { + "github.com/ethereum/go-ethereum/log" +) - var cmd *exec.Cmd +func externalUnmount(mountPoint string) error { + ctx, cancel := context.WithTimeout(context.Background(), unmountTimeout) + defer cancel() + // Try generic umount. + if err := exec.CommandContext(ctx, "umount", mountPoint).Run(); err == nil { + return nil + } + // Try FUSE-specific commands if umount didn't work. switch runtime.GOOS { - case "darwin": - cmd = exec.Command("/usr/bin/diskutil", "umount", "force", mountPoint) - + return exec.CommandContext(ctx, "diskutil", "umount", "force", mountPoint).Run() case "linux": - cmd = exec.Command("fusermount", "-u", mountPoint) - + return exec.CommandContext(ctx, "fusermount", "-u", mountPoint).Run() default: return fmt.Errorf("unmount: unimplemented") } - - errc := make(chan error, 1) - go func() { - defer close(errc) - - if err := exec.Command("umount", mountPoint).Run(); err == nil { - return - } - errc <- cmd.Run() - }() - - select { - - case <-time.After(unmountTimeout): - return fmt.Errorf("umount timeout") - - case err := <-errc: - return err - } } func addFileToSwarm(sf *SwarmFile, content []byte, size int) error { diff --git a/swarm/storage/dbstore.go b/swarm/storage/dbstore.go index 30925a919..31ff5b64e 100644 --- a/swarm/storage/dbstore.go +++ b/swarm/storage/dbstore.go @@ -399,7 +399,7 @@ func (s *DbStore) Get(key Key) (chunk *Chunk, err error) { hash := hasher.Sum(nil) if !bytes.Equal(hash, key) { s.delete(index.Idx, getIndexKey(key)) - panic("Invalid Chunk in Database. Please repair with command: 'swarm cleandb'") + log.Warn("Invalid Chunk in Database. Please repair with command: 'swarm cleandb'") } chunk = &Chunk{ diff --git a/trie/errors.go b/trie/errors.go index e23f9d563..567b80078 100644 --- a/trie/errors.go +++ b/trie/errors.go @@ -23,24 +23,13 @@ import ( ) // MissingNodeError is returned by the trie functions (TryGet, TryUpdate, TryDelete) -// in the case where a trie node is not present in the local database. Contains -// information necessary for retrieving the missing node through an ODR service. -// -// NodeHash is the hash of the missing node -// -// RootHash is the original root of the trie that contains the node -// -// PrefixLen is the nibble length of the key prefix that leads from the root to -// the missing node -// -// SuffixLen is the nibble length of the remaining part of the key that hints on -// which further nodes should also be retrieved (can be zero when there are no -// such hints in the error message) +// in the case where a trie node is not present in the local database. It contains +// information necessary for retrieving the missing node. type MissingNodeError struct { - RootHash, NodeHash common.Hash - PrefixLen, SuffixLen int + NodeHash common.Hash // hash of the missing node + Path []byte // hex-encoded path to the missing node } func (err *MissingNodeError) Error() string { - return fmt.Sprintf("Missing trie node %064x", err.NodeHash) + return fmt.Sprintf("missing trie node %x (path %x)", err.NodeHash, err.Path) } diff --git a/trie/iterator.go b/trie/iterator.go index 26ae1d5ad..76146c0d6 100644 --- a/trie/iterator.go +++ b/trie/iterator.go @@ -24,14 +24,13 @@ import ( "github.com/ethereum/go-ethereum/common" ) -var iteratorEnd = errors.New("end of iteration") - // Iterator is a key-value trie iterator that traverses a Trie. type Iterator struct { nodeIt NodeIterator Key []byte // Current data key on which the iterator is positioned on Value []byte // Current data value on which the iterator is positioned on + Err error } // NewIterator creates a new key-value iterator from a node iterator @@ -45,35 +44,42 @@ func NewIterator(it NodeIterator) *Iterator { func (it *Iterator) Next() bool { for it.nodeIt.Next(true) { if it.nodeIt.Leaf() { - it.Key = hexToKeybytes(it.nodeIt.Path()) + it.Key = it.nodeIt.LeafKey() it.Value = it.nodeIt.LeafBlob() return true } } it.Key = nil it.Value = nil + it.Err = it.nodeIt.Error() return false } // NodeIterator is an iterator to traverse the trie pre-order. type NodeIterator interface { - // Hash returns the hash of the current node - Hash() common.Hash - // Parent returns the hash of the parent of the current node - Parent() common.Hash - // Leaf returns true iff the current node is a leaf node. - Leaf() bool - // LeafBlob returns the contents of the node, if it is a leaf. - // Callers must not retain references to the return value after calling Next() - LeafBlob() []byte - // Path returns the hex-encoded path to the current node. - // Callers must not retain references to the return value after calling Next() - Path() []byte // Next moves the iterator to the next node. If the parameter is false, any child // nodes will be skipped. Next(bool) bool // Error returns the error status of the iterator. Error() error + + // Hash returns the hash of the current node. + Hash() common.Hash + // Parent returns the hash of the parent of the current node. The hash may be the one + // grandparent if the immediate parent is an internal node with no hash. + Parent() common.Hash + // Path returns the hex-encoded path to the current node. + // Callers must not retain references to the return value after calling Next. + // For leaf nodes, the last element of the path is the 'terminator symbol' 0x10. + Path() []byte + + // Leaf returns true iff the current node is a leaf node. + // LeafBlob, LeafKey return the contents and key of the leaf node. These + // method panic if the iterator is not positioned at a leaf. + // Callers must not retain references to their return value after calling Next + Leaf() bool + LeafBlob() []byte + LeafKey() []byte } // nodeIteratorState represents the iteration state at one particular node of the @@ -89,8 +95,21 @@ type nodeIteratorState struct { type nodeIterator struct { trie *Trie // Trie being iterated stack []*nodeIteratorState // Hierarchy of trie nodes persisting the iteration state - err error // Failure set in case of an internal error in the iterator path []byte // Path to the current node + err error // Failure set in case of an internal error in the iterator +} + +// iteratorEnd is stored in nodeIterator.err when iteration is done. +var iteratorEnd = errors.New("end of iteration") + +// seekError is stored in nodeIterator.err if the initial seek has failed. +type seekError struct { + key []byte + err error +} + +func (e seekError) Error() string { + return "seek error: " + e.err.Error() } func newNodeIterator(trie *Trie, start []byte) NodeIterator { @@ -98,60 +117,57 @@ func newNodeIterator(trie *Trie, start []byte) NodeIterator { return new(nodeIterator) } it := &nodeIterator{trie: trie} - it.seek(start) + it.err = it.seek(start) return it } -// Hash returns the hash of the current node func (it *nodeIterator) Hash() common.Hash { if len(it.stack) == 0 { return common.Hash{} } - return it.stack[len(it.stack)-1].hash } -// Parent returns the hash of the parent node func (it *nodeIterator) Parent() common.Hash { if len(it.stack) == 0 { return common.Hash{} } - return it.stack[len(it.stack)-1].parent } -// Leaf returns true if the current node is a leaf func (it *nodeIterator) Leaf() bool { - if len(it.stack) == 0 { - return false - } - - _, ok := it.stack[len(it.stack)-1].node.(valueNode) - return ok + return hasTerm(it.path) } -// LeafBlob returns the data for the current node, if it is a leaf func (it *nodeIterator) LeafBlob() []byte { - if len(it.stack) == 0 { - return nil + if len(it.stack) > 0 { + if node, ok := it.stack[len(it.stack)-1].node.(valueNode); ok { + return []byte(node) + } } + panic("not at leaf") +} - if node, ok := it.stack[len(it.stack)-1].node.(valueNode); ok { - return []byte(node) +func (it *nodeIterator) LeafKey() []byte { + if len(it.stack) > 0 { + if _, ok := it.stack[len(it.stack)-1].node.(valueNode); ok { + return hexToKeybytes(it.path) + } } - return nil + panic("not at leaf") } -// Path returns the hex-encoded path to the current node func (it *nodeIterator) Path() []byte { return it.path } -// Error returns the error set in case of an internal error in the iterator func (it *nodeIterator) Error() error { if it.err == iteratorEnd { return nil } + if seek, ok := it.err.(seekError); ok { + return seek.err + } return it.err } @@ -160,29 +176,37 @@ func (it *nodeIterator) Error() error { // sets the Error field to the encountered failure. If `descend` is false, // skips iterating over any subnodes of the current node. func (it *nodeIterator) Next(descend bool) bool { - if it.err != nil { + if it.err == iteratorEnd { return false } - // Otherwise step forward with the iterator and report any errors + if seek, ok := it.err.(seekError); ok { + if it.err = it.seek(seek.key); it.err != nil { + return false + } + } + // Otherwise step forward with the iterator and report any errors. state, parentIndex, path, err := it.peek(descend) - if err != nil { - it.err = err + it.err = err + if it.err != nil { return false } it.push(state, parentIndex, path) return true } -func (it *nodeIterator) seek(prefix []byte) { +func (it *nodeIterator) seek(prefix []byte) error { // The path we're looking for is the hex encoded key without terminator. key := keybytesToHex(prefix) key = key[:len(key)-1] // Move forward until we're just before the closest match to key. for { state, parentIndex, path, err := it.peek(bytes.HasPrefix(key, it.path)) - if err != nil || bytes.Compare(path, key) >= 0 { - it.err = err - return + if err == iteratorEnd { + return iteratorEnd + } else if err != nil { + return seekError{prefix, err} + } else if bytes.Compare(path, key) >= 0 { + return nil } it.push(state, parentIndex, path) } @@ -197,7 +221,8 @@ func (it *nodeIterator) peek(descend bool) (*nodeIteratorState, *int, []byte, er if root != emptyRoot { state.hash = root } - return state, nil, nil, nil + err := state.resolve(it.trie, nil) + return state, nil, nil, err } if !descend { // If we're skipping children, pop the current node first @@ -205,72 +230,73 @@ func (it *nodeIterator) peek(descend bool) (*nodeIteratorState, *int, []byte, er } // Continue iteration to the next child - for { - if len(it.stack) == 0 { - return nil, nil, nil, iteratorEnd - } + for len(it.stack) > 0 { parent := it.stack[len(it.stack)-1] ancestor := parent.hash if (ancestor == common.Hash{}) { ancestor = parent.parent } - if node, ok := parent.node.(*fullNode); ok { - // Full node, move to the first non-nil child. - for i := parent.index + 1; i < len(node.Children); i++ { - child := node.Children[i] - if child != nil { - hash, _ := child.cache() - state := &nodeIteratorState{ - hash: common.BytesToHash(hash), - node: child, - parent: ancestor, - index: -1, - pathlen: len(it.path), - } - path := append(it.path, byte(i)) - parent.index = i - 1 - return state, &parent.index, path, nil - } + state, path, ok := it.nextChild(parent, ancestor) + if ok { + if err := state.resolve(it.trie, path); err != nil { + return parent, &parent.index, path, err } - } else if node, ok := parent.node.(*shortNode); ok { - // Short node, return the pointer singleton child - if parent.index < 0 { - hash, _ := node.Val.cache() + return state, &parent.index, path, nil + } + // No more child nodes, move back up. + it.pop() + } + return nil, nil, nil, iteratorEnd +} + +func (st *nodeIteratorState) resolve(tr *Trie, path []byte) error { + if hash, ok := st.node.(hashNode); ok { + resolved, err := tr.resolveHash(hash, path) + if err != nil { + return err + } + st.node = resolved + st.hash = common.BytesToHash(hash) + } + return nil +} + +func (it *nodeIterator) nextChild(parent *nodeIteratorState, ancestor common.Hash) (*nodeIteratorState, []byte, bool) { + switch node := parent.node.(type) { + case *fullNode: + // Full node, move to the first non-nil child. + for i := parent.index + 1; i < len(node.Children); i++ { + child := node.Children[i] + if child != nil { + hash, _ := child.cache() state := &nodeIteratorState{ hash: common.BytesToHash(hash), - node: node.Val, + node: child, parent: ancestor, index: -1, pathlen: len(it.path), } - var path []byte - if hasTerm(node.Key) { - path = append(it.path, node.Key[:len(node.Key)-1]...) - } else { - path = append(it.path, node.Key...) - } - return state, &parent.index, path, nil + path := append(it.path, byte(i)) + parent.index = i - 1 + return state, path, true } - } else if hash, ok := parent.node.(hashNode); ok { - // Hash node, resolve the hash child from the database - if parent.index < 0 { - node, err := it.trie.resolveHash(hash, nil, nil) - if err != nil { - return it.stack[len(it.stack)-1], &parent.index, it.path, err - } - state := &nodeIteratorState{ - hash: common.BytesToHash(hash), - node: node, - parent: ancestor, - index: -1, - pathlen: len(it.path), - } - return state, &parent.index, it.path, nil + } + case *shortNode: + // Short node, return the pointer singleton child + if parent.index < 0 { + hash, _ := node.Val.cache() + state := &nodeIteratorState{ + hash: common.BytesToHash(hash), + node: node.Val, + parent: ancestor, + index: -1, + pathlen: len(it.path), } + path := append(it.path, node.Key...) + return state, path, true } - // No more child nodes, move back up. - it.pop() } + return parent, it.path, false } func (it *nodeIterator) push(state *nodeIteratorState, parentIndex *int, path []byte) { @@ -288,23 +314,21 @@ func (it *nodeIterator) pop() { } func compareNodes(a, b NodeIterator) int { - cmp := bytes.Compare(a.Path(), b.Path()) - if cmp != 0 { + if cmp := bytes.Compare(a.Path(), b.Path()); cmp != 0 { return cmp } - if a.Leaf() && !b.Leaf() { return -1 } else if b.Leaf() && !a.Leaf() { return 1 } - - cmp = bytes.Compare(a.Hash().Bytes(), b.Hash().Bytes()) - if cmp != 0 { + if cmp := bytes.Compare(a.Hash().Bytes(), b.Hash().Bytes()); cmp != 0 { return cmp } - - return bytes.Compare(a.LeafBlob(), b.LeafBlob()) + if a.Leaf() && b.Leaf() { + return bytes.Compare(a.LeafBlob(), b.LeafBlob()) + } + return 0 } type differenceIterator struct { @@ -341,6 +365,10 @@ func (it *differenceIterator) LeafBlob() []byte { return it.b.LeafBlob() } +func (it *differenceIterator) LeafKey() []byte { + return it.b.LeafKey() +} + func (it *differenceIterator) Path() []byte { return it.b.Path() } @@ -410,7 +438,6 @@ func (h *nodeIteratorHeap) Pop() interface{} { type unionIterator struct { items *nodeIteratorHeap // Nodes returned are the union of the ones in these iterators count int // Number of nodes scanned across all tries - err error // The error, if one has been encountered } // NewUnionIterator constructs a NodeIterator that iterates over elements in the union @@ -421,9 +448,7 @@ func NewUnionIterator(iters []NodeIterator) (NodeIterator, *int) { copy(h, iters) heap.Init(&h) - ui := &unionIterator{ - items: &h, - } + ui := &unionIterator{items: &h} return ui, &ui.count } @@ -443,6 +468,10 @@ func (it *unionIterator) LeafBlob() []byte { return (*it.items)[0].LeafBlob() } +func (it *unionIterator) LeafKey() []byte { + return (*it.items)[0].LeafKey() +} + func (it *unionIterator) Path() []byte { return (*it.items)[0].Path() } diff --git a/trie/iterator_test.go b/trie/iterator_test.go index f161fd99d..4808d8b0c 100644 --- a/trie/iterator_test.go +++ b/trie/iterator_test.go @@ -19,6 +19,7 @@ package trie import ( "bytes" "fmt" + "math/rand" "testing" "github.com/ethereum/go-ethereum/common" @@ -239,8 +240,8 @@ func TestUnionIterator(t *testing.T) { all := []struct{ k, v string }{ {"aardvark", "c"}, - {"barb", "bd"}, {"barb", "ba"}, + {"barb", "bd"}, {"bard", "bc"}, {"bars", "bb"}, {"bars", "be"}, @@ -267,3 +268,107 @@ func TestUnionIterator(t *testing.T) { t.Errorf("Iterator returned extra values.") } } + +func TestIteratorNoDups(t *testing.T) { + var tr Trie + for _, val := range testdata1 { + tr.Update([]byte(val.k), []byte(val.v)) + } + checkIteratorNoDups(t, tr.NodeIterator(nil), nil) +} + +// This test checks that nodeIterator.Next can be retried after inserting missing trie nodes. +func TestIteratorContinueAfterError(t *testing.T) { + db, _ := ethdb.NewMemDatabase() + tr, _ := New(common.Hash{}, db) + for _, val := range testdata1 { + tr.Update([]byte(val.k), []byte(val.v)) + } + tr.Commit() + wantNodeCount := checkIteratorNoDups(t, tr.NodeIterator(nil), nil) + keys := db.Keys() + t.Log("node count", wantNodeCount) + + for i := 0; i < 20; i++ { + // Create trie that will load all nodes from DB. + tr, _ := New(tr.Hash(), db) + + // Remove a random node from the database. It can't be the root node + // because that one is already loaded. + var rkey []byte + for { + if rkey = keys[rand.Intn(len(keys))]; !bytes.Equal(rkey, tr.Hash().Bytes()) { + break + } + } + rval, _ := db.Get(rkey) + db.Delete(rkey) + + // Iterate until the error is hit. + seen := make(map[string]bool) + it := tr.NodeIterator(nil) + checkIteratorNoDups(t, it, seen) + missing, ok := it.Error().(*MissingNodeError) + if !ok || !bytes.Equal(missing.NodeHash[:], rkey) { + t.Fatal("didn't hit missing node, got", it.Error()) + } + + // Add the node back and continue iteration. + db.Put(rkey, rval) + checkIteratorNoDups(t, it, seen) + if it.Error() != nil { + t.Fatal("unexpected error", it.Error()) + } + if len(seen) != wantNodeCount { + t.Fatal("wrong node iteration count, got", len(seen), "want", wantNodeCount) + } + } +} + +// Similar to the test above, this one checks that failure to create nodeIterator at a +// certain key prefix behaves correctly when Next is called. The expectation is that Next +// should retry seeking before returning true for the first time. +func TestIteratorContinueAfterSeekError(t *testing.T) { + // Commit test trie to db, then remove the node containing "bars". + db, _ := ethdb.NewMemDatabase() + ctr, _ := New(common.Hash{}, db) + for _, val := range testdata1 { + ctr.Update([]byte(val.k), []byte(val.v)) + } + root, _ := ctr.Commit() + barNodeHash := common.HexToHash("05041990364eb72fcb1127652ce40d8bab765f2bfe53225b1170d276cc101c2e") + barNode, _ := db.Get(barNodeHash[:]) + db.Delete(barNodeHash[:]) + + // Create a new iterator that seeks to "bars". Seeking can't proceed because + // the node is missing. + tr, _ := New(root, db) + it := tr.NodeIterator([]byte("bars")) + missing, ok := it.Error().(*MissingNodeError) + if !ok { + t.Fatal("want MissingNodeError, got", it.Error()) + } else if missing.NodeHash != barNodeHash { + t.Fatal("wrong node missing") + } + + // Reinsert the missing node. + db.Put(barNodeHash[:], barNode[:]) + + // Check that iteration produces the right set of values. + if err := checkIteratorOrder(testdata1[2:], NewIterator(it)); err != nil { + t.Fatal(err) + } +} + +func checkIteratorNoDups(t *testing.T, it NodeIterator, seen map[string]bool) int { + if seen == nil { + seen = make(map[string]bool) + } + for it.Next(true) { + if seen[string(it.Path())] { + t.Fatalf("iterator visited node path %x twice", it.Path()) + } + seen[string(it.Path())] = true + } + return len(seen) +} diff --git a/trie/proof.go b/trie/proof.go index fb7734b86..1f8f76b1b 100644 --- a/trie/proof.go +++ b/trie/proof.go @@ -58,7 +58,7 @@ func (t *Trie) Prove(key []byte) []rlp.RawValue { nodes = append(nodes, n) case hashNode: var err error - tn, err = t.resolveHash(n, nil, nil) + tn, err = t.resolveHash(n, nil) if err != nil { log.Error(fmt.Sprintf("Unhandled trie error: %v", err)) return nil diff --git a/trie/sync.go b/trie/sync.go index 168501392..9e8449431 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -28,6 +28,10 @@ import ( // node it did not request. var ErrNotRequested = errors.New("not requested") +// ErrAlreadyProcessed is returned by the trie sync when it's requested to process a +// node it already processed previously. +var ErrAlreadyProcessed = errors.New("already processed") + // request represents a scheduled or already in-flight state retrieval request. type request struct { hash common.Hash // Hash of the node data content to retrieve @@ -48,6 +52,21 @@ type SyncResult struct { Data []byte // Data content of the retrieved node } +// syncMemBatch is an in-memory buffer of successfully downloaded but not yet +// persisted data items. +type syncMemBatch struct { + batch map[common.Hash][]byte // In-memory membatch of recently ocmpleted items + order []common.Hash // Order of completion to prevent out-of-order data loss +} + +// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes. +func newSyncMemBatch() *syncMemBatch { + return &syncMemBatch{ + batch: make(map[common.Hash][]byte), + order: make([]common.Hash, 0, 256), + } +} + // TrieSyncLeafCallback is a callback type invoked when a trie sync reaches a // leaf node. It's used by state syncing to check if the leaf node requires some // further data syncing. @@ -57,7 +76,8 @@ type TrieSyncLeafCallback func(leaf []byte, parent common.Hash) error // unknown trie hashes to retrieve, accepts node data associated with said hashes // and reconstructs the trie step by step until all is done. type TrieSync struct { - database DatabaseReader + database DatabaseReader // Persistent database to check for existing entries + membatch *syncMemBatch // Memory buffer to avoid frequest database writes requests map[common.Hash]*request // Pending requests pertaining to a key hash queue *prque.Prque // Priority queue with the pending requests } @@ -66,6 +86,7 @@ type TrieSync struct { func NewTrieSync(root common.Hash, database DatabaseReader, callback TrieSyncLeafCallback) *TrieSync { ts := &TrieSync{ database: database, + membatch: newSyncMemBatch(), requests: make(map[common.Hash]*request), queue: prque.New(), } @@ -79,6 +100,9 @@ func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, c if root == emptyRoot { return } + if _, ok := s.membatch.batch[root]; ok { + return + } key := root.Bytes() blob, _ := s.database.Get(key) if local, err := decodeNode(key, blob, 0); local != nil && err == nil { @@ -111,6 +135,9 @@ func (s *TrieSync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) if hash == emptyState { return } + if _, ok := s.membatch.batch[hash]; ok { + return + } if blob, _ := s.database.Get(hash.Bytes()); blob != nil { return } @@ -144,7 +171,7 @@ func (s *TrieSync) Missing(max int) []common.Hash { // Process injects a batch of retrieved trie nodes data, returning if something // was committed to the database and also the index of an entry if processing of // it failed. -func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, error) { +func (s *TrieSync) Process(results []SyncResult) (bool, int, error) { committed := false for i, item := range results { @@ -153,10 +180,13 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, if request == nil { return committed, i, ErrNotRequested } + if request.data != nil { + return committed, i, ErrAlreadyProcessed + } // If the item is a raw entry request, commit directly if request.raw { request.data = item.Data - s.commit(request, dbw) + s.commit(request) committed = true continue } @@ -173,7 +203,7 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, return committed, i, err } if len(requests) == 0 && request.deps == 0 { - s.commit(request, dbw) + s.commit(request) committed = true continue } @@ -185,6 +215,22 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, return committed, 0, nil } +// Commit flushes the data stored in the internal membatch out to persistent +// storage, returning th enumber of items written and any occurred error. +func (s *TrieSync) Commit(dbw DatabaseWriter) (int, error) { + // Dump the membatch into a database dbw + for i, key := range s.membatch.order { + if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil { + return i, err + } + } + written := len(s.membatch.order) + + // Drop the membatch data and return + s.membatch = newSyncMemBatch() + return written, nil +} + // Pending returns the number of state entries currently pending for download. func (s *TrieSync) Pending() int { return len(s.requests) @@ -246,13 +292,17 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) { // If the child references another node, resolve or schedule if node, ok := (child.node).(hashNode); ok { // Try to resolve the node from the local database + hash := common.BytesToHash(node) + if _, ok := s.membatch.batch[hash]; ok { + continue + } blob, _ := s.database.Get(node) if local, err := decodeNode(node[:], blob, 0); local != nil && err == nil { continue } // Locally unknown node, schedule for retrieval requests = append(requests, &request{ - hash: common.BytesToHash(node), + hash: hash, parents: []*request{req}, depth: child.depth, callback: req.callback, @@ -262,21 +312,21 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) { return requests, nil } -// commit finalizes a retrieval request and stores it into the database. If any +// commit finalizes a retrieval request and stores it into the membatch. If any // of the referencing parent requests complete due to this commit, they are also // committed themselves. -func (s *TrieSync) commit(req *request, dbw DatabaseWriter) (err error) { - // Write the node content to disk - if err := dbw.Put(req.hash[:], req.data); err != nil { - return err - } +func (s *TrieSync) commit(req *request) (err error) { + // Write the node content to the membatch + s.membatch.batch[req.hash] = req.data + s.membatch.order = append(s.membatch.order, req.hash) + delete(s.requests, req.hash) // Check all parents for completion for _, parent := range req.parents { parent.deps-- if parent.deps == 0 { - if err := s.commit(parent, dbw); err != nil { + if err := s.commit(parent); err != nil { return err } } diff --git a/trie/sync_test.go b/trie/sync_test.go index d778555b9..ec16a25bd 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -122,9 +122,12 @@ func testIterativeTrieSync(t *testing.T, batch int) { } results[i] = SyncResult{hash, data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[:0], sched.Missing(batch)...) } // Cross check that the two tries are in sync @@ -152,9 +155,12 @@ func TestIterativeDelayedTrieSync(t *testing.T) { } results[i] = SyncResult{hash, data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[len(results):], sched.Missing(10000)...) } // Cross check that the two tries are in sync @@ -190,9 +196,12 @@ func testIterativeRandomTrieSync(t *testing.T, batch int) { results = append(results, SyncResult{hash, data}) } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = make(map[common.Hash]struct{}) for _, hash := range sched.Missing(batch) { queue[hash] = struct{}{} @@ -231,9 +240,12 @@ func TestIterativeRandomDelayedTrieSync(t *testing.T) { } } // Feed the retrieved results back and queue new tasks - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } for _, result := range results { delete(queue, result.Hash) } @@ -272,9 +284,12 @@ func TestDuplicateAvoidanceTrieSync(t *testing.T) { results[i] = SyncResult{hash, data} } - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } queue = append(queue[:0], sched.Missing(0)...) } // Cross check that the two tries are in sync @@ -304,9 +319,12 @@ func TestIncompleteTrieSync(t *testing.T) { results[i] = SyncResult{hash, data} } // Process each of the trie nodes - if _, index, err := sched.Process(results, dstDb); err != nil { + if _, index, err := sched.Process(results); err != nil { t.Fatalf("failed to process result #%d: %v", index, err) } + if index, err := sched.Commit(dstDb); err != nil { + t.Fatalf("failed to commit data #%d: %v", index, err) + } for _, result := range results { added = append(added, result.Hash) } diff --git a/trie/trie.go b/trie/trie.go index cbe496574..a3151b1ce 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -116,7 +116,7 @@ func New(root common.Hash, db Database) (*Trie, error) { if db == nil { panic("trie.New: cannot use existing root without a database") } - rootnode, err := trie.resolveHash(root[:], nil, nil) + rootnode, err := trie.resolveHash(root[:], nil) if err != nil { return nil, err } @@ -180,7 +180,7 @@ func (t *Trie) tryGet(origNode node, key []byte, pos int) (value []byte, newnode } return value, n, didResolve, err case hashNode: - child, err := t.resolveHash(n, key[:pos], key[pos:]) + child, err := t.resolveHash(n, key[:pos]) if err != nil { return nil, n, true, err } @@ -283,7 +283,7 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error // We've hit a part of the trie that isn't loaded yet. Load // the node and insert into it. This leaves all child nodes on // the path to the value in the trie. - rn, err := t.resolveHash(n, prefix, key) + rn, err := t.resolveHash(n, prefix) if err != nil { return false, nil, err } @@ -388,7 +388,7 @@ func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) { // shortNode{..., shortNode{...}}. Since the entry // might not be loaded yet, resolve it just for this // check. - cnode, err := t.resolve(n.Children[pos], prefix, []byte{byte(pos)}) + cnode, err := t.resolve(n.Children[pos], prefix) if err != nil { return false, nil, err } @@ -414,7 +414,7 @@ func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) { // We've hit a part of the trie that isn't loaded yet. Load // the node and delete from it. This leaves all child nodes on // the path to the value in the trie. - rn, err := t.resolveHash(n, prefix, key) + rn, err := t.resolveHash(n, prefix) if err != nil { return false, nil, err } @@ -436,24 +436,19 @@ func concat(s1 []byte, s2 ...byte) []byte { return r } -func (t *Trie) resolve(n node, prefix, suffix []byte) (node, error) { +func (t *Trie) resolve(n node, prefix []byte) (node, error) { if n, ok := n.(hashNode); ok { - return t.resolveHash(n, prefix, suffix) + return t.resolveHash(n, prefix) } return n, nil } -func (t *Trie) resolveHash(n hashNode, prefix, suffix []byte) (node, error) { +func (t *Trie) resolveHash(n hashNode, prefix []byte) (node, error) { cacheMissCounter.Inc(1) enc, err := t.db.Get(n) if err != nil || enc == nil { - return nil, &MissingNodeError{ - RootHash: t.originalRoot, - NodeHash: common.BytesToHash(n), - PrefixLen: len(prefix), - SuffixLen: len(suffix), - } + return nil, &MissingNodeError{NodeHash: common.BytesToHash(n), Path: prefix} } dec := mustDecodeNode(n, enc, t.cachegen) return dec, nil diff --git a/trie/trie_test.go b/trie/trie_test.go index 61adbba0c..1c9095070 100644 --- a/trie/trie_test.go +++ b/trie/trie_test.go @@ -19,6 +19,7 @@ package trie import ( "bytes" "encoding/binary" + "errors" "fmt" "io/ioutil" "math/rand" @@ -34,7 +35,7 @@ import ( func init() { spew.Config.Indent = " " - spew.Config.DisableMethods = true + spew.Config.DisableMethods = false } // Used for testing @@ -357,6 +358,7 @@ type randTestStep struct { op int key []byte // for opUpdate, opDelete, opGet value []byte // for opUpdate + err error // for debugging } const ( @@ -406,7 +408,7 @@ func runRandTest(rt randTest) bool { tr, _ := New(common.Hash{}, db) values := make(map[string]string) // tracks content of the trie - for _, step := range rt { + for i, step := range rt { switch step.op { case opUpdate: tr.Update(step.key, step.value) @@ -418,23 +420,22 @@ func runRandTest(rt randTest) bool { v := tr.Get(step.key) want := values[string(step.key)] if string(v) != want { - fmt.Printf("mismatch for key 0x%x, got 0x%x want 0x%x", step.key, v, want) - return false + rt[i].err = fmt.Errorf("mismatch for key 0x%x, got 0x%x want 0x%x", step.key, v, want) } case opCommit: - if _, err := tr.Commit(); err != nil { - panic(err) - } + _, rt[i].err = tr.Commit() case opHash: tr.Hash() case opReset: hash, err := tr.Commit() if err != nil { - panic(err) + rt[i].err = err + return false } newtr, err := New(hash, db) if err != nil { - panic(err) + rt[i].err = err + return false } tr = newtr case opItercheckhash: @@ -444,17 +445,20 @@ func runRandTest(rt randTest) bool { checktr.Update(it.Key, it.Value) } if tr.Hash() != checktr.Hash() { - fmt.Println("hashes not equal") - return false + rt[i].err = fmt.Errorf("hash mismatch in opItercheckhash") } case opCheckCacheInvariant: - return checkCacheInvariant(tr.root, nil, tr.cachegen, false, 0) + rt[i].err = checkCacheInvariant(tr.root, nil, tr.cachegen, false, 0) + } + // Abort the test on error. + if rt[i].err != nil { + return false } } return true } -func checkCacheInvariant(n, parent node, parentCachegen uint16, parentDirty bool, depth int) bool { +func checkCacheInvariant(n, parent node, parentCachegen uint16, parentDirty bool, depth int) error { var children []node var flag nodeFlag switch n := n.(type) { @@ -465,33 +469,34 @@ func checkCacheInvariant(n, parent node, parentCachegen uint16, parentDirty bool flag = n.flags children = n.Children[:] default: - return true + return nil } - showerror := func() { - fmt.Printf("at depth %d node %s", depth, spew.Sdump(n)) - fmt.Printf("parent: %s", spew.Sdump(parent)) + errorf := func(format string, args ...interface{}) error { + msg := fmt.Sprintf(format, args...) + msg += fmt.Sprintf("\nat depth %d node %s", depth, spew.Sdump(n)) + msg += fmt.Sprintf("parent: %s", spew.Sdump(parent)) + return errors.New(msg) } if flag.gen > parentCachegen { - fmt.Printf("cache invariant violation: %d > %d\n", flag.gen, parentCachegen) - showerror() - return false + return errorf("cache invariant violation: %d > %d\n", flag.gen, parentCachegen) } if depth > 0 && !parentDirty && flag.dirty { - fmt.Printf("cache invariant violation: child is dirty but parent isn't\n") - showerror() - return false + return errorf("cache invariant violation: %d > %d\n", flag.gen, parentCachegen) } for _, child := range children { - if !checkCacheInvariant(child, n, flag.gen, flag.dirty, depth+1) { - return false + if err := checkCacheInvariant(child, n, flag.gen, flag.dirty, depth+1); err != nil { + return err } } - return true + return nil } func TestRandom(t *testing.T) { if err := quick.Check(runRandTest, nil); err != nil { + if cerr, ok := err.(*quick.CheckError); ok { + t.Fatalf("random test iteration %d failed: %s", cerr.Count, spew.Sdump(cerr.In)) + } t.Fatal(err) } } diff --git a/vendor/github.com/docker/docker/LICENSE b/vendor/github.com/docker/docker/LICENSE new file mode 100644 index 000000000..9c8e20ab8 --- /dev/null +++ b/vendor/github.com/docker/docker/LICENSE @@ -0,0 +1,191 @@ + + Apache License + Version 2.0, January 2004 + https://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + Copyright 2013-2017 Docker, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/docker/docker/NOTICE b/vendor/github.com/docker/docker/NOTICE new file mode 100644 index 000000000..0c74e15b0 --- /dev/null +++ b/vendor/github.com/docker/docker/NOTICE @@ -0,0 +1,19 @@ +Docker +Copyright 2012-2017 Docker, Inc. + +This product includes software developed at Docker, Inc. (https://www.docker.com). + +This product contains software (https://github.com/kr/pty) developed +by Keith Rarick, licensed under the MIT License. + +The following is courtesy of our legal counsel: + + +Use and transfer of Docker may be subject to certain restrictions by the +United States and other governments. +It is your responsibility to ensure that your use and/or transfer does not +violate applicable laws. + +For more information, please see https://www.bis.doc.gov + +See also https://www.apache.org/dev/crypto.html and/or seek legal counsel. diff --git a/vendor/github.com/docker/docker/pkg/reexec/README.md b/vendor/github.com/docker/docker/pkg/reexec/README.md new file mode 100644 index 000000000..6658f69b6 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/reexec/README.md @@ -0,0 +1,5 @@ +# reexec + +The `reexec` package facilitates the busybox style reexec of the docker binary that we require because +of the forking limitations of using Go. Handlers can be registered with a name and the argv 0 of +the exec of the binary will be used to find and execute custom init paths. diff --git a/vendor/github.com/docker/docker/pkg/reexec/command_linux.go b/vendor/github.com/docker/docker/pkg/reexec/command_linux.go new file mode 100644 index 000000000..34ae2a9dc --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/reexec/command_linux.go @@ -0,0 +1,28 @@ +// +build linux + +package reexec + +import ( + "os/exec" + "syscall" +) + +// Self returns the path to the current process's binary. +// Returns "/proc/self/exe". +func Self() string { + return "/proc/self/exe" +} + +// Command returns *exec.Cmd which has Path as current binary. Also it setting +// SysProcAttr.Pdeathsig to SIGTERM. +// This will use the in-memory version (/proc/self/exe) of the current binary, +// it is thus safe to delete or replace the on-disk binary (os.Args[0]). +func Command(args ...string) *exec.Cmd { + return &exec.Cmd{ + Path: Self(), + Args: args, + SysProcAttr: &syscall.SysProcAttr{ + Pdeathsig: syscall.SIGTERM, + }, + } +} diff --git a/vendor/github.com/docker/docker/pkg/reexec/command_unix.go b/vendor/github.com/docker/docker/pkg/reexec/command_unix.go new file mode 100644 index 000000000..778a720e3 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/reexec/command_unix.go @@ -0,0 +1,23 @@ +// +build freebsd solaris darwin + +package reexec + +import ( + "os/exec" +) + +// Self returns the path to the current process's binary. +// Uses os.Args[0]. +func Self() string { + return naiveSelf() +} + +// Command returns *exec.Cmd which has Path as current binary. +// For example if current binary is "docker" at "/usr/bin/", then cmd.Path will +// be set to "/usr/bin/docker". +func Command(args ...string) *exec.Cmd { + return &exec.Cmd{ + Path: Self(), + Args: args, + } +} diff --git a/vendor/github.com/docker/docker/pkg/reexec/command_unsupported.go b/vendor/github.com/docker/docker/pkg/reexec/command_unsupported.go new file mode 100644 index 000000000..76edd8242 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/reexec/command_unsupported.go @@ -0,0 +1,12 @@ +// +build !linux,!windows,!freebsd,!solaris,!darwin + +package reexec + +import ( + "os/exec" +) + +// Command is unsupported on operating systems apart from Linux, Windows, Solaris and Darwin. +func Command(args ...string) *exec.Cmd { + return nil +} diff --git a/vendor/github.com/docker/docker/pkg/reexec/command_windows.go b/vendor/github.com/docker/docker/pkg/reexec/command_windows.go new file mode 100644 index 000000000..ca871c422 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/reexec/command_windows.go @@ -0,0 +1,23 @@ +// +build windows + +package reexec + +import ( + "os/exec" +) + +// Self returns the path to the current process's binary. +// Uses os.Args[0]. +func Self() string { + return naiveSelf() +} + +// Command returns *exec.Cmd which has Path as current binary. +// For example if current binary is "docker.exe" at "C:\", then cmd.Path will +// be set to "C:\docker.exe". +func Command(args ...string) *exec.Cmd { + return &exec.Cmd{ + Path: Self(), + Args: args, + } +} diff --git a/vendor/github.com/docker/docker/pkg/reexec/reexec.go b/vendor/github.com/docker/docker/pkg/reexec/reexec.go new file mode 100644 index 000000000..c56671d91 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/reexec/reexec.go @@ -0,0 +1,47 @@ +package reexec + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" +) + +var registeredInitializers = make(map[string]func()) + +// Register adds an initialization func under the specified name +func Register(name string, initializer func()) { + if _, exists := registeredInitializers[name]; exists { + panic(fmt.Sprintf("reexec func already registered under name %q", name)) + } + + registeredInitializers[name] = initializer +} + +// Init is called as the first part of the exec process and returns true if an +// initialization function was called. +func Init() bool { + initializer, exists := registeredInitializers[os.Args[0]] + if exists { + initializer() + + return true + } + return false +} + +func naiveSelf() string { + name := os.Args[0] + if filepath.Base(name) == name { + if lp, err := exec.LookPath(name); err == nil { + return lp + } + } + // handle conversion of relative paths to absolute + if absName, err := filepath.Abs(name); err == nil { + return absName + } + // if we couldn't get absolute name, return original + // (NOTE: Go only errors on Abs() if os.Getwd fails) + return name +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 607f193a3..1c2c58500 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -75,6 +75,12 @@ "revisionTime": "2017-02-01T22:58:49Z" }, { + "checksumSHA1": "lutCa+IVM60R1OYBm9RtDAW50Ys=", + "path": "github.com/docker/docker/pkg/reexec", + "revision": "83ee902ecc3790c33c1e2d87334074436056bb49", + "revisionTime": "2017-04-22T21:51:12Z" + }, + { "checksumSHA1": "zYnPsNAVm1/ViwCkN++dX2JQhBo=", "path": "github.com/edsrzf/mmap-go", "revision": "935e0e8a636ca4ba70b713f3e38a19e1b77739e8", |