diff options
49 files changed, 2253 insertions, 1161 deletions
@@ -102,6 +102,22 @@ over between the main network and test network, you should make sure to always u for play-money and real-money. Unless you manually move accounts, Geth will by default correctly separate the two networks and will not make any accounts available between them.* +### Configuration + +As an alternative to passing the numerous flags to the `geth` binary, you can also pass a configuration file via: + +``` +$ geth --config /path/to/your_config.toml +``` + +To get an idea how the file should look like you can use the `dumpconfig` subcommand to export your existing configuration: + +``` +$ geth --your-favourite-flags dumpconfig +``` + +*Note: This works only with geth v1.6.0 and above* + #### Docker quick start One of the quickest ways to get Ethereum up and running on your machine is by using Docker: diff --git a/cmd/evm/json_logger.go b/cmd/evm/json_logger.go index a84d5daeb..d61981062 100644 --- a/cmd/evm/json_logger.go +++ b/cmd/evm/json_logger.go @@ -28,25 +28,32 @@ import ( type JSONLogger struct { encoder *json.Encoder + cfg *vm.LogConfig } -func NewJSONLogger(writer io.Writer) *JSONLogger { - return &JSONLogger{json.NewEncoder(writer)} +func NewJSONLogger(cfg *vm.LogConfig, writer io.Writer) *JSONLogger { + return &JSONLogger{json.NewEncoder(writer), cfg} } // CaptureState outputs state information on the logger. func (l *JSONLogger) CaptureState(env *vm.EVM, pc uint64, op vm.OpCode, gas, cost uint64, memory *vm.Memory, stack *vm.Stack, contract *vm.Contract, depth int, err error) error { - return l.encoder.Encode(vm.StructLog{ - Pc: pc, - Op: op, - Gas: gas + cost, - GasCost: cost, - Memory: memory.Data(), - Stack: stack.Data(), - Storage: nil, - Depth: depth, - Err: err, - }) + log := vm.StructLog{ + Pc: pc, + Op: op, + Gas: gas + cost, + GasCost: cost, + MemorySize: memory.Len(), + Storage: nil, + Depth: depth, + Err: err, + } + if !l.cfg.DisableMemory { + log.Memory = memory.Data() + } + if !l.cfg.DisableStack { + log.Stack = stack.Data() + } + return l.encoder.Encode(log) } // CaptureEnd is triggered at end of execution. diff --git a/cmd/evm/main.go b/cmd/evm/main.go index 48a1b92cb..1892ae3d3 100644 --- a/cmd/evm/main.go +++ b/cmd/evm/main.go @@ -102,6 +102,14 @@ var ( Name: "sender", Usage: "The transaction origin", } + DisableMemoryFlag = cli.BoolFlag{ + Name: "nomemory", + Usage: "disable memory output", + } + DisableStackFlag = cli.BoolFlag{ + Name: "nostack", + Usage: "disable stack output", + } ) func init() { @@ -123,6 +131,8 @@ func init() { GenesisFlag, MachineFlag, SenderFlag, + DisableMemoryFlag, + DisableStackFlag, } app.Commands = []cli.Command{ compileCommand, diff --git a/cmd/evm/runner.go b/cmd/evm/runner.go index b1fb8998f..2ce0920f6 100644 --- a/cmd/evm/runner.go +++ b/cmd/evm/runner.go @@ -73,6 +73,10 @@ func runCmd(ctx *cli.Context) error { glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.TerminalFormat(false))) glogger.Verbosity(log.Lvl(ctx.GlobalInt(VerbosityFlag.Name))) log.Root().SetHandler(glogger) + logconfig := &vm.LogConfig{ + DisableMemory: ctx.GlobalBool(DisableMemoryFlag.Name), + DisableStack: ctx.GlobalBool(DisableStackFlag.Name), + } var ( tracer vm.Tracer @@ -82,12 +86,12 @@ func runCmd(ctx *cli.Context) error { sender = common.StringToAddress("sender") ) if ctx.GlobalBool(MachineFlag.Name) { - tracer = NewJSONLogger(os.Stdout) + tracer = NewJSONLogger(logconfig, os.Stdout) } else if ctx.GlobalBool(DebugFlag.Name) { - debugLogger = vm.NewStructLogger(nil) + debugLogger = vm.NewStructLogger(logconfig) tracer = debugLogger } else { - debugLogger = vm.NewStructLogger(nil) + debugLogger = vm.NewStructLogger(logconfig) } if ctx.GlobalString(GenesisFlag.Name) != "" { gen := readGenesis(ctx.GlobalString(GenesisFlag.Name)) diff --git a/cmd/geth/accountcmd_test.go b/cmd/geth/accountcmd_test.go index 5f9f67677..e146323ee 100644 --- a/cmd/geth/accountcmd_test.go +++ b/cmd/geth/accountcmd_test.go @@ -44,21 +44,21 @@ func tmpDatadirWithKeystore(t *testing.T) string { func TestAccountListEmpty(t *testing.T) { geth := runGeth(t, "account", "list") - geth.expectExit() + geth.ExpectExit() } func TestAccountList(t *testing.T) { datadir := tmpDatadirWithKeystore(t) geth := runGeth(t, "account", "list", "--datadir", datadir) - defer geth.expectExit() + defer geth.ExpectExit() if runtime.GOOS == "windows" { - geth.expect(` + geth.Expect(` Account #0: {7ef5a6135f1fd6a02593eedc869c6d41d934aef8} keystore://{{.Datadir}}\keystore\UTC--2016-03-22T12-57-55.920751759Z--7ef5a6135f1fd6a02593eedc869c6d41d934aef8 Account #1: {f466859ead1932d743d622cb74fc058882e8648a} keystore://{{.Datadir}}\keystore\aaa Account #2: {289d485d9771714cce91d3393d764e1311907acc} keystore://{{.Datadir}}\keystore\zzz `) } else { - geth.expect(` + geth.Expect(` Account #0: {7ef5a6135f1fd6a02593eedc869c6d41d934aef8} keystore://{{.Datadir}}/keystore/UTC--2016-03-22T12-57-55.920751759Z--7ef5a6135f1fd6a02593eedc869c6d41d934aef8 Account #1: {f466859ead1932d743d622cb74fc058882e8648a} keystore://{{.Datadir}}/keystore/aaa Account #2: {289d485d9771714cce91d3393d764e1311907acc} keystore://{{.Datadir}}/keystore/zzz @@ -68,20 +68,20 @@ Account #2: {289d485d9771714cce91d3393d764e1311907acc} keystore://{{.Datadir}}/k func TestAccountNew(t *testing.T) { geth := runGeth(t, "account", "new", "--lightkdf") - defer geth.expectExit() - geth.expect(` + defer geth.ExpectExit() + geth.Expect(` Your new account is locked with a password. Please give a password. Do not forget this password. !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "foobar"}} Repeat passphrase: {{.InputLine "foobar"}} `) - geth.expectRegexp(`Address: \{[0-9a-f]{40}\}\n`) + geth.ExpectRegexp(`Address: \{[0-9a-f]{40}\}\n`) } func TestAccountNewBadRepeat(t *testing.T) { geth := runGeth(t, "account", "new", "--lightkdf") - defer geth.expectExit() - geth.expect(` + defer geth.ExpectExit() + geth.Expect(` Your new account is locked with a password. Please give a password. Do not forget this password. !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "something"}} @@ -95,8 +95,8 @@ func TestAccountUpdate(t *testing.T) { geth := runGeth(t, "account", "update", "--datadir", datadir, "--lightkdf", "f466859ead1932d743d622cb74fc058882e8648a") - defer geth.expectExit() - geth.expect(` + defer geth.ExpectExit() + geth.Expect(` Unlocking account f466859ead1932d743d622cb74fc058882e8648a | Attempt 1/3 !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "foobar"}} @@ -108,8 +108,8 @@ Repeat passphrase: {{.InputLine "foobar2"}} func TestWalletImport(t *testing.T) { geth := runGeth(t, "wallet", "import", "--lightkdf", "testdata/guswallet.json") - defer geth.expectExit() - geth.expect(` + defer geth.ExpectExit() + geth.Expect(` !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "foo"}} Address: {d4584b5f6229b7be90727b0fc8c6b91bb427821f} @@ -123,8 +123,8 @@ Address: {d4584b5f6229b7be90727b0fc8c6b91bb427821f} func TestWalletImportBadPassword(t *testing.T) { geth := runGeth(t, "wallet", "import", "--lightkdf", "testdata/guswallet.json") - defer geth.expectExit() - geth.expect(` + defer geth.ExpectExit() + geth.Expect(` !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "wrong"}} Fatal: could not decrypt key with given passphrase @@ -137,19 +137,19 @@ func TestUnlockFlag(t *testing.T) { "--datadir", datadir, "--nat", "none", "--nodiscover", "--dev", "--unlock", "f466859ead1932d743d622cb74fc058882e8648a", "js", "testdata/empty.js") - geth.expect(` + geth.Expect(` Unlocking account f466859ead1932d743d622cb74fc058882e8648a | Attempt 1/3 !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "foobar"}} `) - geth.expectExit() + geth.ExpectExit() wantMessages := []string{ "Unlocked account", "=0xf466859ead1932d743d622cb74fc058882e8648a", } for _, m := range wantMessages { - if !strings.Contains(geth.stderrText(), m) { + if !strings.Contains(geth.StderrText(), m) { t.Errorf("stderr text does not contain %q", m) } } @@ -160,8 +160,8 @@ func TestUnlockFlagWrongPassword(t *testing.T) { geth := runGeth(t, "--datadir", datadir, "--nat", "none", "--nodiscover", "--dev", "--unlock", "f466859ead1932d743d622cb74fc058882e8648a") - defer geth.expectExit() - geth.expect(` + defer geth.ExpectExit() + geth.Expect(` Unlocking account f466859ead1932d743d622cb74fc058882e8648a | Attempt 1/3 !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "wrong1"}} @@ -180,14 +180,14 @@ func TestUnlockFlagMultiIndex(t *testing.T) { "--datadir", datadir, "--nat", "none", "--nodiscover", "--dev", "--unlock", "0,2", "js", "testdata/empty.js") - geth.expect(` + geth.Expect(` Unlocking account 0 | Attempt 1/3 !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "foobar"}} Unlocking account 2 | Attempt 1/3 Passphrase: {{.InputLine "foobar"}} `) - geth.expectExit() + geth.ExpectExit() wantMessages := []string{ "Unlocked account", @@ -195,7 +195,7 @@ Passphrase: {{.InputLine "foobar"}} "=0x289d485d9771714cce91d3393d764e1311907acc", } for _, m := range wantMessages { - if !strings.Contains(geth.stderrText(), m) { + if !strings.Contains(geth.StderrText(), m) { t.Errorf("stderr text does not contain %q", m) } } @@ -207,7 +207,7 @@ func TestUnlockFlagPasswordFile(t *testing.T) { "--datadir", datadir, "--nat", "none", "--nodiscover", "--dev", "--password", "testdata/passwords.txt", "--unlock", "0,2", "js", "testdata/empty.js") - geth.expectExit() + geth.ExpectExit() wantMessages := []string{ "Unlocked account", @@ -215,7 +215,7 @@ func TestUnlockFlagPasswordFile(t *testing.T) { "=0x289d485d9771714cce91d3393d764e1311907acc", } for _, m := range wantMessages { - if !strings.Contains(geth.stderrText(), m) { + if !strings.Contains(geth.StderrText(), m) { t.Errorf("stderr text does not contain %q", m) } } @@ -226,8 +226,8 @@ func TestUnlockFlagPasswordFileWrongPassword(t *testing.T) { geth := runGeth(t, "--datadir", datadir, "--nat", "none", "--nodiscover", "--dev", "--password", "testdata/wrong-passwords.txt", "--unlock", "0,2") - defer geth.expectExit() - geth.expect(` + defer geth.ExpectExit() + geth.Expect(` Fatal: Failed to unlock account 0 (could not decrypt key with given passphrase) `) } @@ -238,14 +238,14 @@ func TestUnlockFlagAmbiguous(t *testing.T) { "--keystore", store, "--nat", "none", "--nodiscover", "--dev", "--unlock", "f466859ead1932d743d622cb74fc058882e8648a", "js", "testdata/empty.js") - defer geth.expectExit() + defer geth.ExpectExit() // Helper for the expect template, returns absolute keystore path. - geth.setTemplateFunc("keypath", func(file string) string { + geth.SetTemplateFunc("keypath", func(file string) string { abs, _ := filepath.Abs(filepath.Join(store, file)) return abs }) - geth.expect(` + geth.Expect(` Unlocking account f466859ead1932d743d622cb74fc058882e8648a | Attempt 1/3 !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "foobar"}} @@ -257,14 +257,14 @@ Your passphrase unlocked keystore://{{keypath "1"}} In order to avoid this warning, you need to remove the following duplicate key files: keystore://{{keypath "2"}} `) - geth.expectExit() + geth.ExpectExit() wantMessages := []string{ "Unlocked account", "=0xf466859ead1932d743d622cb74fc058882e8648a", } for _, m := range wantMessages { - if !strings.Contains(geth.stderrText(), m) { + if !strings.Contains(geth.StderrText(), m) { t.Errorf("stderr text does not contain %q", m) } } @@ -275,14 +275,14 @@ func TestUnlockFlagAmbiguousWrongPassword(t *testing.T) { geth := runGeth(t, "--keystore", store, "--nat", "none", "--nodiscover", "--dev", "--unlock", "f466859ead1932d743d622cb74fc058882e8648a") - defer geth.expectExit() + defer geth.ExpectExit() // Helper for the expect template, returns absolute keystore path. - geth.setTemplateFunc("keypath", func(file string) string { + geth.SetTemplateFunc("keypath", func(file string) string { abs, _ := filepath.Abs(filepath.Join(store, file)) return abs }) - geth.expect(` + geth.Expect(` Unlocking account f466859ead1932d743d622cb74fc058882e8648a | Attempt 1/3 !! Unsupported terminal, password will be echoed. Passphrase: {{.InputLine "wrong"}} @@ -292,5 +292,5 @@ Multiple key files exist for address f466859ead1932d743d622cb74fc058882e8648a: Testing your passphrase against all of them... Fatal: None of the listed files could be unlocked. `) - geth.expectExit() + geth.ExpectExit() } diff --git a/cmd/geth/consolecmd_test.go b/cmd/geth/consolecmd_test.go index e5472836c..258b9e6dd 100644 --- a/cmd/geth/consolecmd_test.go +++ b/cmd/geth/consolecmd_test.go @@ -47,15 +47,15 @@ func TestConsoleWelcome(t *testing.T) { "console") // Gather all the infos the welcome message needs to contain - geth.setTemplateFunc("goos", func() string { return runtime.GOOS }) - geth.setTemplateFunc("goarch", func() string { return runtime.GOARCH }) - geth.setTemplateFunc("gover", runtime.Version) - geth.setTemplateFunc("gethver", func() string { return params.Version }) - geth.setTemplateFunc("niltime", func() string { return time.Unix(0, 0).Format(time.RFC1123) }) - geth.setTemplateFunc("apis", func() string { return ipcAPIs }) + geth.SetTemplateFunc("goos", func() string { return runtime.GOOS }) + geth.SetTemplateFunc("goarch", func() string { return runtime.GOARCH }) + geth.SetTemplateFunc("gover", runtime.Version) + geth.SetTemplateFunc("gethver", func() string { return params.Version }) + geth.SetTemplateFunc("niltime", func() string { return time.Unix(0, 0).Format(time.RFC1123) }) + geth.SetTemplateFunc("apis", func() string { return ipcAPIs }) // Verify the actual welcome message to the required template - geth.expect(` + geth.Expect(` Welcome to the Geth JavaScript console! instance: Geth/v{{gethver}}/{{goos}}-{{goarch}}/{{gover}} @@ -66,7 +66,7 @@ at block: 0 ({{niltime}}) > {{.InputLine "exit"}} `) - geth.expectExit() + geth.ExpectExit() } // Tests that a console can be attached to a running node via various means. @@ -90,8 +90,8 @@ func TestIPCAttachWelcome(t *testing.T) { time.Sleep(2 * time.Second) // Simple way to wait for the RPC endpoint to open testAttachWelcome(t, geth, "ipc:"+ipc, ipcAPIs) - geth.interrupt() - geth.expectExit() + geth.Interrupt() + geth.ExpectExit() } func TestHTTPAttachWelcome(t *testing.T) { @@ -104,8 +104,8 @@ func TestHTTPAttachWelcome(t *testing.T) { time.Sleep(2 * time.Second) // Simple way to wait for the RPC endpoint to open testAttachWelcome(t, geth, "http://localhost:"+port, httpAPIs) - geth.interrupt() - geth.expectExit() + geth.Interrupt() + geth.ExpectExit() } func TestWSAttachWelcome(t *testing.T) { @@ -119,29 +119,29 @@ func TestWSAttachWelcome(t *testing.T) { time.Sleep(2 * time.Second) // Simple way to wait for the RPC endpoint to open testAttachWelcome(t, geth, "ws://localhost:"+port, httpAPIs) - geth.interrupt() - geth.expectExit() + geth.Interrupt() + geth.ExpectExit() } func testAttachWelcome(t *testing.T, geth *testgeth, endpoint, apis string) { // Attach to a running geth note and terminate immediately attach := runGeth(t, "attach", endpoint) - defer attach.expectExit() - attach.stdin.Close() + defer attach.ExpectExit() + attach.CloseStdin() // Gather all the infos the welcome message needs to contain - attach.setTemplateFunc("goos", func() string { return runtime.GOOS }) - attach.setTemplateFunc("goarch", func() string { return runtime.GOARCH }) - attach.setTemplateFunc("gover", runtime.Version) - attach.setTemplateFunc("gethver", func() string { return params.Version }) - attach.setTemplateFunc("etherbase", func() string { return geth.Etherbase }) - attach.setTemplateFunc("niltime", func() string { return time.Unix(0, 0).Format(time.RFC1123) }) - attach.setTemplateFunc("ipc", func() bool { return strings.HasPrefix(endpoint, "ipc") }) - attach.setTemplateFunc("datadir", func() string { return geth.Datadir }) - attach.setTemplateFunc("apis", func() string { return apis }) + attach.SetTemplateFunc("goos", func() string { return runtime.GOOS }) + attach.SetTemplateFunc("goarch", func() string { return runtime.GOARCH }) + attach.SetTemplateFunc("gover", runtime.Version) + attach.SetTemplateFunc("gethver", func() string { return params.Version }) + attach.SetTemplateFunc("etherbase", func() string { return geth.Etherbase }) + attach.SetTemplateFunc("niltime", func() string { return time.Unix(0, 0).Format(time.RFC1123) }) + attach.SetTemplateFunc("ipc", func() bool { return strings.HasPrefix(endpoint, "ipc") }) + attach.SetTemplateFunc("datadir", func() string { return geth.Datadir }) + attach.SetTemplateFunc("apis", func() string { return apis }) // Verify the actual welcome message to the required template - attach.expect(` + attach.Expect(` Welcome to the Geth JavaScript console! instance: Geth/v{{gethver}}/{{goos}}-{{goarch}}/{{gover}} @@ -152,7 +152,7 @@ at block: 0 ({{niltime}}){{if ipc}} > {{.InputLine "exit" }} `) - attach.expectExit() + attach.ExpectExit() } // trulyRandInt generates a crypto random integer used by the console tests to diff --git a/cmd/geth/dao_test.go b/cmd/geth/dao_test.go index ec7802ada..8cc66aabf 100644 --- a/cmd/geth/dao_test.go +++ b/cmd/geth/dao_test.go @@ -112,12 +112,12 @@ func testDAOForkBlockNewChain(t *testing.T, test int, genesis string, expectBloc if err := ioutil.WriteFile(json, []byte(genesis), 0600); err != nil { t.Fatalf("test %d: failed to write genesis file: %v", test, err) } - runGeth(t, "--datadir", datadir, "init", json).cmd.Wait() + runGeth(t, "--datadir", datadir, "init", json).WaitExit() } else { // Force chain initialization args := []string{"--port", "0", "--maxpeers", "0", "--nodiscover", "--nat", "none", "--ipcdisable", "--datadir", datadir} geth := runGeth(t, append(args, []string{"--exec", "2+2", "console"}...)...) - geth.cmd.Wait() + geth.WaitExit() } // Retrieve the DAO config flag from the database path := filepath.Join(datadir, "geth", "chaindata") diff --git a/cmd/geth/genesis_test.go b/cmd/geth/genesis_test.go index 6c3ca7298..a00ae00c1 100644 --- a/cmd/geth/genesis_test.go +++ b/cmd/geth/genesis_test.go @@ -97,14 +97,14 @@ func TestCustomGenesis(t *testing.T) { if err := ioutil.WriteFile(json, []byte(tt.genesis), 0600); err != nil { t.Fatalf("test %d: failed to write genesis file: %v", i, err) } - runGeth(t, "--datadir", datadir, "init", json).cmd.Wait() + runGeth(t, "--datadir", datadir, "init", json).WaitExit() // Query the custom genesis block geth := runGeth(t, "--datadir", datadir, "--maxpeers", "0", "--port", "0", "--nodiscover", "--nat", "none", "--ipcdisable", "--exec", tt.query, "console") - geth.expectRegexp(tt.result) - geth.expectExit() + geth.ExpectRegexp(tt.result) + geth.ExpectExit() } } diff --git a/cmd/geth/run_test.go b/cmd/geth/run_test.go index e26b4509a..da82facac 100644 --- a/cmd/geth/run_test.go +++ b/cmd/geth/run_test.go @@ -17,18 +17,13 @@ package main import ( - "bufio" - "bytes" "fmt" - "io" "io/ioutil" "os" - "os/exec" - "regexp" - "sync" "testing" - "text/template" - "time" + + "github.com/docker/docker/pkg/reexec" + "github.com/ethereum/go-ethereum/internal/cmdtest" ) func tmpdir(t *testing.T) string { @@ -40,36 +35,37 @@ func tmpdir(t *testing.T) string { } type testgeth struct { - // For total convenience, all testing methods are available. - *testing.T - // template variables for expect - Datadir string - Executable string - Etherbase string - Func template.FuncMap + *cmdtest.TestCmd - removeDatadir bool - cmd *exec.Cmd - stdout *bufio.Reader - stdin io.WriteCloser - stderr *testlogger + // template variables for expect + Datadir string + Etherbase string } func init() { - // Run the app if we're the child process for runGeth. - if os.Getenv("GETH_TEST_CHILD") != "" { + // Run the app if we've been exec'd as "geth-test" in runGeth. + reexec.Register("geth-test", func() { if err := app.Run(os.Args); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } os.Exit(0) + }) +} + +func TestMain(m *testing.M) { + // check if we have been reexec'd + if reexec.Init() { + return } + os.Exit(m.Run()) } // spawns geth with the given command line args. If the args don't set --datadir, the // child g gets a temporary data directory. func runGeth(t *testing.T, args ...string) *testgeth { - tt := &testgeth{T: t, Executable: os.Args[0]} + tt := &testgeth{} + tt.TestCmd = cmdtest.NewTestCmd(t, tt) for i, arg := range args { switch { case arg == "-datadir" || arg == "--datadir": @@ -84,215 +80,19 @@ func runGeth(t *testing.T, args ...string) *testgeth { } if tt.Datadir == "" { tt.Datadir = tmpdir(t) - tt.removeDatadir = true + tt.Cleanup = func() { os.RemoveAll(tt.Datadir) } args = append([]string{"-datadir", tt.Datadir}, args...) // Remove the temporary datadir if something fails below. defer func() { if t.Failed() { - os.RemoveAll(tt.Datadir) + tt.Cleanup() } }() } - // Boot "geth". This actually runs the test binary but the init function - // will prevent any tests from running. - tt.stderr = &testlogger{t: t} - tt.cmd = exec.Command(os.Args[0], args...) - tt.cmd.Env = append(os.Environ(), "GETH_TEST_CHILD=1") - tt.cmd.Stderr = tt.stderr - stdout, err := tt.cmd.StdoutPipe() - if err != nil { - t.Fatal(err) - } - tt.stdout = bufio.NewReader(stdout) - if tt.stdin, err = tt.cmd.StdinPipe(); err != nil { - t.Fatal(err) - } - if err := tt.cmd.Start(); err != nil { - t.Fatal(err) - } - return tt -} - -// InputLine writes the given text to the childs stdin. -// This method can also be called from an expect template, e.g.: -// -// geth.expect(`Passphrase: {{.InputLine "password"}}`) -func (tt *testgeth) InputLine(s string) string { - io.WriteString(tt.stdin, s+"\n") - return "" -} - -func (tt *testgeth) setTemplateFunc(name string, fn interface{}) { - if tt.Func == nil { - tt.Func = make(map[string]interface{}) - } - tt.Func[name] = fn -} - -// expect runs its argument as a template, then expects the -// child process to output the result of the template within 5s. -// -// If the template starts with a newline, the newline is removed -// before matching. -func (tt *testgeth) expect(tplsource string) { - // Generate the expected output by running the template. - tpl := template.Must(template.New("").Funcs(tt.Func).Parse(tplsource)) - wantbuf := new(bytes.Buffer) - if err := tpl.Execute(wantbuf, tt); err != nil { - panic(err) - } - // Trim exactly one newline at the beginning. This makes tests look - // much nicer because all expect strings are at column 0. - want := bytes.TrimPrefix(wantbuf.Bytes(), []byte("\n")) - if err := tt.matchExactOutput(want); err != nil { - tt.Fatal(err) - } - tt.Logf("Matched stdout text:\n%s", want) -} - -func (tt *testgeth) matchExactOutput(want []byte) error { - buf := make([]byte, len(want)) - n := 0 - tt.withKillTimeout(func() { n, _ = io.ReadFull(tt.stdout, buf) }) - buf = buf[:n] - if n < len(want) || !bytes.Equal(buf, want) { - // Grab any additional buffered output in case of mismatch - // because it might help with debugging. - buf = append(buf, make([]byte, tt.stdout.Buffered())...) - tt.stdout.Read(buf[n:]) - // Find the mismatch position. - for i := 0; i < n; i++ { - if want[i] != buf[i] { - return fmt.Errorf("Output mismatch at â—Š:\n---------------- (stdout text)\n%sâ—Š%s\n---------------- (expected text)\n%s", - buf[:i], buf[i:n], want) - } - } - if n < len(want) { - return fmt.Errorf("Not enough output, got until â—Š:\n---------------- (stdout text)\n%s\n---------------- (expected text)\n%sâ—Š%s", - buf, want[:n], want[n:]) - } - } - return nil -} - -// expectRegexp expects the child process to output text matching the -// given regular expression within 5s. -// -// Note that an arbitrary amount of output may be consumed by the -// regular expression. This usually means that expect cannot be used -// after expectRegexp. -func (tt *testgeth) expectRegexp(resource string) (*regexp.Regexp, []string) { - var ( - re = regexp.MustCompile(resource) - rtee = &runeTee{in: tt.stdout} - matches []int - ) - tt.withKillTimeout(func() { matches = re.FindReaderSubmatchIndex(rtee) }) - output := rtee.buf.Bytes() - if matches == nil { - tt.Fatalf("Output did not match:\n---------------- (stdout text)\n%s\n---------------- (regular expression)\n%s", - output, resource) - return re, nil - } - tt.Logf("Matched stdout text:\n%s", output) - var submatch []string - for i := 0; i < len(matches); i += 2 { - submatch = append(submatch, string(output[i:i+1])) - } - return re, submatch -} - -// expectExit expects the child process to exit within 5s without -// printing any additional text on stdout. -func (tt *testgeth) expectExit() { - var output []byte - tt.withKillTimeout(func() { - output, _ = ioutil.ReadAll(tt.stdout) - }) - tt.cmd.Wait() - if tt.removeDatadir { - os.RemoveAll(tt.Datadir) - } - if len(output) > 0 { - tt.Errorf("Unmatched stdout text:\n%s", output) - } -} - -func (tt *testgeth) interrupt() { - tt.cmd.Process.Signal(os.Interrupt) -} - -// stderrText returns any stderr output written so far. -// The returned text holds all log lines after expectExit has -// returned. -func (tt *testgeth) stderrText() string { - tt.stderr.mu.Lock() - defer tt.stderr.mu.Unlock() - return tt.stderr.buf.String() -} - -func (tt *testgeth) withKillTimeout(fn func()) { - timeout := time.AfterFunc(5*time.Second, func() { - tt.Log("killing the child process (timeout)") - tt.cmd.Process.Kill() - if tt.removeDatadir { - os.RemoveAll(tt.Datadir) - } - }) - defer timeout.Stop() - fn() -} + // Boot "geth". This actually runs the test binary but the TestMain + // function will prevent any tests from running. + tt.Run("geth-test", args...) -// testlogger logs all written lines via t.Log and also -// collects them for later inspection. -type testlogger struct { - t *testing.T - mu sync.Mutex - buf bytes.Buffer -} - -func (tl *testlogger) Write(b []byte) (n int, err error) { - lines := bytes.Split(b, []byte("\n")) - for _, line := range lines { - if len(line) > 0 { - tl.t.Logf("(stderr) %s", line) - } - } - tl.mu.Lock() - tl.buf.Write(b) - tl.mu.Unlock() - return len(b), err -} - -// runeTee collects text read through it into buf. -type runeTee struct { - in interface { - io.Reader - io.ByteReader - io.RuneReader - } - buf bytes.Buffer -} - -func (rtee *runeTee) Read(b []byte) (n int, err error) { - n, err = rtee.in.Read(b) - rtee.buf.Write(b[:n]) - return n, err -} - -func (rtee *runeTee) ReadRune() (r rune, size int, err error) { - r, size, err = rtee.in.ReadRune() - if err == nil { - rtee.buf.WriteRune(r) - } - return r, size, err -} - -func (rtee *runeTee) ReadByte() (b byte, err error) { - b, err = rtee.in.ReadByte() - if err == nil { - rtee.buf.WriteByte(b) - } - return b, err + return tt } diff --git a/cmd/swarm/run_test.go b/cmd/swarm/run_test.go new file mode 100644 index 000000000..2d32a51c8 --- /dev/null +++ b/cmd/swarm/run_test.go @@ -0,0 +1,255 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. + +package main + +import ( + "fmt" + "io/ioutil" + "net" + "os" + "path/filepath" + "runtime" + "testing" + "time" + + "github.com/docker/docker/pkg/reexec" + "github.com/ethereum/go-ethereum/accounts/keystore" + "github.com/ethereum/go-ethereum/internal/cmdtest" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/swarm" +) + +func init() { + // Run the app if we've been exec'd as "swarm-test" in runSwarm. + reexec.Register("swarm-test", func() { + if err := app.Run(os.Args); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + os.Exit(0) + }) +} + +func TestMain(m *testing.M) { + // check if we have been reexec'd + if reexec.Init() { + return + } + os.Exit(m.Run()) +} + +func runSwarm(t *testing.T, args ...string) *cmdtest.TestCmd { + tt := cmdtest.NewTestCmd(t, nil) + + // Boot "swarm". This actually runs the test binary but the TestMain + // function will prevent any tests from running. + tt.Run("swarm-test", args...) + + return tt +} + +type testCluster struct { + Nodes []*testNode + TmpDir string +} + +// newTestCluster starts a test swarm cluster of the given size. +// +// A temporary directory is created and each node gets a data directory inside +// it. +// +// Each node listens on 127.0.0.1 with random ports for both the HTTP and p2p +// ports (assigned by first listening on 127.0.0.1:0 and then passing the ports +// as flags). +// +// When starting more than one node, they are connected together using the +// admin SetPeer RPC method. +func newTestCluster(t *testing.T, size int) *testCluster { + cluster := &testCluster{} + defer func() { + if t.Failed() { + cluster.Shutdown() + } + }() + + tmpdir, err := ioutil.TempDir("", "swarm-test") + if err != nil { + t.Fatal(err) + } + cluster.TmpDir = tmpdir + + // start the nodes + cluster.Nodes = make([]*testNode, 0, size) + for i := 0; i < size; i++ { + dir := filepath.Join(cluster.TmpDir, fmt.Sprintf("swarm%02d", i)) + if err := os.Mkdir(dir, 0700); err != nil { + t.Fatal(err) + } + + node := newTestNode(t, dir) + node.Name = fmt.Sprintf("swarm%02d", i) + + cluster.Nodes = append(cluster.Nodes, node) + } + + if size == 1 { + return cluster + } + + // connect the nodes together + for _, node := range cluster.Nodes { + if err := node.Client.Call(nil, "admin_addPeer", cluster.Nodes[0].Enode); err != nil { + t.Fatal(err) + } + } + + // wait until all nodes have the correct number of peers +outer: + for _, node := range cluster.Nodes { + var peers []*p2p.PeerInfo + for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(50 * time.Millisecond) { + if err := node.Client.Call(&peers, "admin_peers"); err != nil { + t.Fatal(err) + } + if len(peers) == len(cluster.Nodes)-1 { + continue outer + } + } + t.Fatalf("%s only has %d / %d peers", node.Name, len(peers), len(cluster.Nodes)-1) + } + + return cluster +} + +func (c *testCluster) Shutdown() { + for _, node := range c.Nodes { + node.Shutdown() + } + os.RemoveAll(c.TmpDir) +} + +type testNode struct { + Name string + Addr string + URL string + Enode string + Dir string + Client *rpc.Client + Cmd *cmdtest.TestCmd +} + +const testPassphrase = "swarm-test-passphrase" + +func newTestNode(t *testing.T, dir string) *testNode { + // create key + conf := &node.Config{ + DataDir: dir, + IPCPath: "bzzd.ipc", + } + n, err := node.New(conf) + if err != nil { + t.Fatal(err) + } + account, err := n.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore).NewAccount(testPassphrase) + if err != nil { + t.Fatal(err) + } + + node := &testNode{Dir: dir} + + // use a unique IPCPath when running tests on Windows + if runtime.GOOS == "windows" { + conf.IPCPath = fmt.Sprintf("bzzd-%s.ipc", account.Address.String()) + } + + // assign ports + httpPort, err := assignTCPPort() + if err != nil { + t.Fatal(err) + } + p2pPort, err := assignTCPPort() + if err != nil { + t.Fatal(err) + } + + // start the node + node.Cmd = runSwarm(t, + "--port", p2pPort, + "--nodiscover", + "--datadir", dir, + "--ipcpath", conf.IPCPath, + "--ethapi", "", + "--bzzaccount", account.Address.String(), + "--bzznetworkid", "321", + "--bzzport", httpPort, + "--verbosity", "6", + ) + node.Cmd.InputLine(testPassphrase) + defer func() { + if t.Failed() { + node.Shutdown() + } + }() + + // wait for the node to start + for start := time.Now(); time.Since(start) < 10*time.Second; time.Sleep(50 * time.Millisecond) { + node.Client, err = rpc.Dial(conf.IPCEndpoint()) + if err == nil { + break + } + } + if node.Client == nil { + t.Fatal(err) + } + + // load info + var info swarm.Info + if err := node.Client.Call(&info, "bzz_info"); err != nil { + t.Fatal(err) + } + node.Addr = net.JoinHostPort("127.0.0.1", info.Port) + node.URL = "http://" + node.Addr + + var nodeInfo p2p.NodeInfo + if err := node.Client.Call(&nodeInfo, "admin_nodeInfo"); err != nil { + t.Fatal(err) + } + node.Enode = fmt.Sprintf("enode://%s@127.0.0.1:%s", nodeInfo.ID, p2pPort) + + return node +} + +func (n *testNode) Shutdown() { + if n.Cmd != nil { + n.Cmd.Kill() + } +} + +func assignTCPPort() (string, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return "", err + } + l.Close() + _, port, err := net.SplitHostPort(l.Addr().String()) + if err != nil { + return "", err + } + return port, nil +} diff --git a/cmd/swarm/upload_test.go b/cmd/swarm/upload_test.go new file mode 100644 index 000000000..5656186e1 --- /dev/null +++ b/cmd/swarm/upload_test.go @@ -0,0 +1,76 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. + +package main + +import ( + "io" + "io/ioutil" + "net/http" + "os" + "testing" +) + +// TestCLISwarmUp tests that running 'swarm up' makes the resulting file +// available from all nodes via the HTTP API +func TestCLISwarmUp(t *testing.T) { + // start 3 node cluster + t.Log("starting 3 node cluster") + cluster := newTestCluster(t, 3) + defer cluster.Shutdown() + + // create a tmp file + tmp, err := ioutil.TempFile("", "swarm-test") + assertNil(t, err) + defer tmp.Close() + defer os.Remove(tmp.Name()) + _, err = io.WriteString(tmp, "data") + assertNil(t, err) + + // upload the file with 'swarm up' and expect a hash + t.Log("uploading file with 'swarm up'") + up := runSwarm(t, "--bzzapi", cluster.Nodes[0].URL, "up", tmp.Name()) + _, matches := up.ExpectRegexp(`[a-f\d]{64}`) + up.ExpectExit() + hash := matches[0] + t.Logf("file uploaded with hash %s", hash) + + // get the file from the HTTP API of each node + for _, node := range cluster.Nodes { + t.Logf("getting file from %s", node.Name) + res, err := http.Get(node.URL + "/bzz:/" + hash) + assertNil(t, err) + assertHTTPResponse(t, res, http.StatusOK, "data") + } +} + +func assertNil(t *testing.T, err error) { + if err != nil { + t.Fatal(err) + } +} + +func assertHTTPResponse(t *testing.T, res *http.Response, expectedStatus int, expectedBody string) { + defer res.Body.Close() + if res.StatusCode != expectedStatus { + t.Fatalf("expected HTTP status %d, got %s", expectedStatus, res.Status) + } + data, err := ioutil.ReadAll(res.Body) + assertNil(t, err) + if string(data) != expectedBody { + t.Fatalf("expected HTTP body %q, got %q", expectedBody, data) + } +} diff --git a/core/vm/gen_structlog.go b/core/vm/gen_structlog.go index 1c86b2256..88df942dc 100644 --- a/core/vm/gen_structlog.go +++ b/core/vm/gen_structlog.go @@ -18,12 +18,12 @@ func (s StructLog) MarshalJSON() ([]byte, error) { Gas math.HexOrDecimal64 `json:"gas"` GasCost math.HexOrDecimal64 `json:"gasCost"` Memory hexutil.Bytes `json:"memory"` + MemorySize int `json:"memSize"` Stack []*math.HexOrDecimal256 `json:"stack"` Storage map[common.Hash]common.Hash `json:"-"` Depth int `json:"depth"` Err error `json:"error"` OpName string `json:"opName"` - MemorySize int `json:"memSize"` } var enc StructLog enc.Pc = s.Pc @@ -31,6 +31,7 @@ func (s StructLog) MarshalJSON() ([]byte, error) { enc.Gas = math.HexOrDecimal64(s.Gas) enc.GasCost = math.HexOrDecimal64(s.GasCost) enc.Memory = s.Memory + enc.MemorySize = s.MemorySize if s.Stack != nil { enc.Stack = make([]*math.HexOrDecimal256, len(s.Stack)) for k, v := range s.Stack { @@ -41,21 +42,21 @@ func (s StructLog) MarshalJSON() ([]byte, error) { enc.Depth = s.Depth enc.Err = s.Err enc.OpName = s.OpName() - enc.MemorySize = s.MemorySize() return json.Marshal(&enc) } func (s *StructLog) UnmarshalJSON(input []byte) error { type StructLog struct { - Pc *uint64 `json:"pc"` - Op *OpCode `json:"op"` - Gas *math.HexOrDecimal64 `json:"gas"` - GasCost *math.HexOrDecimal64 `json:"gasCost"` - Memory hexutil.Bytes `json:"memory"` - Stack []*math.HexOrDecimal256 `json:"stack"` - Storage map[common.Hash]common.Hash `json:"-"` - Depth *int `json:"depth"` - Err *error `json:"error"` + Pc *uint64 `json:"pc"` + Op *OpCode `json:"op"` + Gas *math.HexOrDecimal64 `json:"gas"` + GasCost *math.HexOrDecimal64 `json:"gasCost"` + Memory hexutil.Bytes `json:"memory"` + MemorySize *int `json:"memSize"` + Stack []*math.HexOrDecimal256 `json:"stack"` + Storage map[common.Hash]common.Hash `json:"-"` + Depth *int `json:"depth"` + Err *error `json:"error"` } var dec StructLog if err := json.Unmarshal(input, &dec); err != nil { @@ -76,6 +77,9 @@ func (s *StructLog) UnmarshalJSON(input []byte) error { if dec.Memory != nil { s.Memory = dec.Memory } + if dec.MemorySize != nil { + s.MemorySize = *dec.MemorySize + } if dec.Stack != nil { s.Stack = make([]*big.Int, len(dec.Stack)) for k, v := range dec.Stack { diff --git a/core/vm/logger.go b/core/vm/logger.go index 405ab169c..17a9c9ec3 100644 --- a/core/vm/logger.go +++ b/core/vm/logger.go @@ -54,35 +54,31 @@ type LogConfig struct { // StructLog is emitted to the EVM each cycle and lists information about the current internal state // prior to the execution of the statement. type StructLog struct { - Pc uint64 `json:"pc"` - Op OpCode `json:"op"` - Gas uint64 `json:"gas"` - GasCost uint64 `json:"gasCost"` - Memory []byte `json:"memory"` - Stack []*big.Int `json:"stack"` - Storage map[common.Hash]common.Hash `json:"-"` - Depth int `json:"depth"` - Err error `json:"error"` + Pc uint64 `json:"pc"` + Op OpCode `json:"op"` + Gas uint64 `json:"gas"` + GasCost uint64 `json:"gasCost"` + Memory []byte `json:"memory"` + MemorySize int `json:"memSize"` + Stack []*big.Int `json:"stack"` + Storage map[common.Hash]common.Hash `json:"-"` + Depth int `json:"depth"` + Err error `json:"error"` } // overrides for gencodec type structLogMarshaling struct { - Stack []*math.HexOrDecimal256 - Gas math.HexOrDecimal64 - GasCost math.HexOrDecimal64 - Memory hexutil.Bytes - OpName string `json:"opName"` - MemorySize int `json:"memSize"` + Stack []*math.HexOrDecimal256 + Gas math.HexOrDecimal64 + GasCost math.HexOrDecimal64 + Memory hexutil.Bytes + OpName string `json:"opName"` } func (s *StructLog) OpName() string { return s.Op.String() } -func (s *StructLog) MemorySize() int { - return len(s.Memory) -} - // Tracer is used to collect execution traces from an EVM transaction // execution. CaptureState is called for each step of the VM with the // current VM state. @@ -181,7 +177,7 @@ func (l *StructLogger) CaptureState(env *EVM, pc uint64, op OpCode, gas, cost ui } } // create a new snaptshot of the EVM. - log := StructLog{pc, op, gas, cost, mem, stck, storage, env.depth, err} + log := StructLog{pc, op, gas, cost, mem, memory.Len(), stck, storage, depth, err} l.logs = append(l.logs, log) return nil diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index 59f60d659..45bb87322 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -167,11 +167,11 @@ func (ec *Client) TransactionByHash(ctx context.Context, hash common.Hash) (tx * } else if _, r, _ := tx.RawSignatureValues(); r == nil { return nil, false, fmt.Errorf("server returned transaction without signature") } - var block struct{ BlockHash *common.Hash } + var block struct{ BlockNumber *string } if err := json.Unmarshal(raw, &block); err != nil { return nil, false, err } - return tx, block.BlockHash == nil, nil + return tx, block.BlockNumber == nil, nil } // TransactionCount returns the total number of transactions in the given block. diff --git a/ethdb/memory_database.go b/ethdb/memory_database.go index 65c487934..a2ee2f2cc 100644 --- a/ethdb/memory_database.go +++ b/ethdb/memory_database.go @@ -45,13 +45,6 @@ func (db *MemDatabase) Put(key []byte, value []byte) error { return nil } -func (db *MemDatabase) Set(key []byte, value []byte) { - db.lock.Lock() - defer db.lock.Unlock() - - db.Put(key, value) -} - func (db *MemDatabase) Get(key []byte) ([]byte, error) { db.lock.RLock() defer db.lock.RUnlock() diff --git a/internal/cmdtest/test_cmd.go b/internal/cmdtest/test_cmd.go new file mode 100644 index 000000000..541e51c4c --- /dev/null +++ b/internal/cmdtest/test_cmd.go @@ -0,0 +1,270 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. + +package cmdtest + +import ( + "bufio" + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "regexp" + "sync" + "testing" + "text/template" + "time" + + "github.com/docker/docker/pkg/reexec" +) + +func NewTestCmd(t *testing.T, data interface{}) *TestCmd { + return &TestCmd{T: t, Data: data} +} + +type TestCmd struct { + // For total convenience, all testing methods are available. + *testing.T + + Func template.FuncMap + Data interface{} + Cleanup func() + + cmd *exec.Cmd + stdout *bufio.Reader + stdin io.WriteCloser + stderr *testlogger +} + +// Run exec's the current binary using name as argv[0] which will trigger the +// reexec init function for that name (e.g. "geth-test" in cmd/geth/run_test.go) +func (tt *TestCmd) Run(name string, args ...string) { + tt.stderr = &testlogger{t: tt.T} + tt.cmd = &exec.Cmd{ + Path: reexec.Self(), + Args: append([]string{name}, args...), + Stderr: tt.stderr, + } + stdout, err := tt.cmd.StdoutPipe() + if err != nil { + tt.Fatal(err) + } + tt.stdout = bufio.NewReader(stdout) + if tt.stdin, err = tt.cmd.StdinPipe(); err != nil { + tt.Fatal(err) + } + if err := tt.cmd.Start(); err != nil { + tt.Fatal(err) + } +} + +// InputLine writes the given text to the childs stdin. +// This method can also be called from an expect template, e.g.: +// +// geth.expect(`Passphrase: {{.InputLine "password"}}`) +func (tt *TestCmd) InputLine(s string) string { + io.WriteString(tt.stdin, s+"\n") + return "" +} + +func (tt *TestCmd) SetTemplateFunc(name string, fn interface{}) { + if tt.Func == nil { + tt.Func = make(map[string]interface{}) + } + tt.Func[name] = fn +} + +// Expect runs its argument as a template, then expects the +// child process to output the result of the template within 5s. +// +// If the template starts with a newline, the newline is removed +// before matching. +func (tt *TestCmd) Expect(tplsource string) { + // Generate the expected output by running the template. + tpl := template.Must(template.New("").Funcs(tt.Func).Parse(tplsource)) + wantbuf := new(bytes.Buffer) + if err := tpl.Execute(wantbuf, tt.Data); err != nil { + panic(err) + } + // Trim exactly one newline at the beginning. This makes tests look + // much nicer because all expect strings are at column 0. + want := bytes.TrimPrefix(wantbuf.Bytes(), []byte("\n")) + if err := tt.matchExactOutput(want); err != nil { + tt.Fatal(err) + } + tt.Logf("Matched stdout text:\n%s", want) +} + +func (tt *TestCmd) matchExactOutput(want []byte) error { + buf := make([]byte, len(want)) + n := 0 + tt.withKillTimeout(func() { n, _ = io.ReadFull(tt.stdout, buf) }) + buf = buf[:n] + if n < len(want) || !bytes.Equal(buf, want) { + // Grab any additional buffered output in case of mismatch + // because it might help with debugging. + buf = append(buf, make([]byte, tt.stdout.Buffered())...) + tt.stdout.Read(buf[n:]) + // Find the mismatch position. + for i := 0; i < n; i++ { + if want[i] != buf[i] { + return fmt.Errorf("Output mismatch at â—Š:\n---------------- (stdout text)\n%sâ—Š%s\n---------------- (expected text)\n%s", + buf[:i], buf[i:n], want) + } + } + if n < len(want) { + return fmt.Errorf("Not enough output, got until â—Š:\n---------------- (stdout text)\n%s\n---------------- (expected text)\n%sâ—Š%s", + buf, want[:n], want[n:]) + } + } + return nil +} + +// ExpectRegexp expects the child process to output text matching the +// given regular expression within 5s. +// +// Note that an arbitrary amount of output may be consumed by the +// regular expression. This usually means that expect cannot be used +// after ExpectRegexp. +func (tt *TestCmd) ExpectRegexp(resource string) (*regexp.Regexp, []string) { + var ( + re = regexp.MustCompile(resource) + rtee = &runeTee{in: tt.stdout} + matches []int + ) + tt.withKillTimeout(func() { matches = re.FindReaderSubmatchIndex(rtee) }) + output := rtee.buf.Bytes() + if matches == nil { + tt.Fatalf("Output did not match:\n---------------- (stdout text)\n%s\n---------------- (regular expression)\n%s", + output, resource) + return re, nil + } + tt.Logf("Matched stdout text:\n%s", output) + var submatches []string + for i := 0; i < len(matches); i += 2 { + submatch := string(output[matches[i]:matches[i+1]]) + submatches = append(submatches, submatch) + } + return re, submatches +} + +// ExpectExit expects the child process to exit within 5s without +// printing any additional text on stdout. +func (tt *TestCmd) ExpectExit() { + var output []byte + tt.withKillTimeout(func() { + output, _ = ioutil.ReadAll(tt.stdout) + }) + tt.WaitExit() + if tt.Cleanup != nil { + tt.Cleanup() + } + if len(output) > 0 { + tt.Errorf("Unmatched stdout text:\n%s", output) + } +} + +func (tt *TestCmd) WaitExit() { + tt.cmd.Wait() +} + +func (tt *TestCmd) Interrupt() { + tt.cmd.Process.Signal(os.Interrupt) +} + +// StderrText returns any stderr output written so far. +// The returned text holds all log lines after ExpectExit has +// returned. +func (tt *TestCmd) StderrText() string { + tt.stderr.mu.Lock() + defer tt.stderr.mu.Unlock() + return tt.stderr.buf.String() +} + +func (tt *TestCmd) CloseStdin() { + tt.stdin.Close() +} + +func (tt *TestCmd) Kill() { + tt.cmd.Process.Kill() + if tt.Cleanup != nil { + tt.Cleanup() + } +} + +func (tt *TestCmd) withKillTimeout(fn func()) { + timeout := time.AfterFunc(5*time.Second, func() { + tt.Log("killing the child process (timeout)") + tt.Kill() + }) + defer timeout.Stop() + fn() +} + +// testlogger logs all written lines via t.Log and also +// collects them for later inspection. +type testlogger struct { + t *testing.T + mu sync.Mutex + buf bytes.Buffer +} + +func (tl *testlogger) Write(b []byte) (n int, err error) { + lines := bytes.Split(b, []byte("\n")) + for _, line := range lines { + if len(line) > 0 { + tl.t.Logf("(stderr) %s", line) + } + } + tl.mu.Lock() + tl.buf.Write(b) + tl.mu.Unlock() + return len(b), err +} + +// runeTee collects text read through it into buf. +type runeTee struct { + in interface { + io.Reader + io.ByteReader + io.RuneReader + } + buf bytes.Buffer +} + +func (rtee *runeTee) Read(b []byte) (n int, err error) { + n, err = rtee.in.Read(b) + rtee.buf.Write(b[:n]) + return n, err +} + +func (rtee *runeTee) ReadRune() (r rune, size int, err error) { + r, size, err = rtee.in.ReadRune() + if err == nil { + rtee.buf.WriteRune(r) + } + return r, size, err +} + +func (rtee *runeTee) ReadByte() (b byte, err error) { + b, err = rtee.in.ReadByte() + if err == nil { + rtee.buf.WriteByte(b) + } + return b, err +} diff --git a/les/backend.go b/les/backend.go index 646c81a7b..658c73c6e 100644 --- a/les/backend.go +++ b/les/backend.go @@ -19,6 +19,7 @@ package les import ( "fmt" + "sync" "time" "github.com/ethereum/go-ethereum/accounts" @@ -38,6 +39,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discv5" "github.com/ethereum/go-ethereum/params" rpc "github.com/ethereum/go-ethereum/rpc" ) @@ -49,9 +51,13 @@ type LightEthereum struct { // Channel for shutting down the service shutdownChan chan bool // Handlers + peers *peerSet txPool *light.TxPool blockchain *light.LightChain protocolManager *ProtocolManager + serverPool *serverPool + reqDist *requestDistributor + retriever *retrieveManager // DB interfaces chainDb ethdb.Database // Block chain database @@ -63,6 +69,9 @@ type LightEthereum struct { networkId uint64 netRPCService *ethapi.PublicNetAPI + + quitSync chan struct{} + wg sync.WaitGroup } func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { @@ -76,20 +85,26 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { } log.Info("Initialised chain configuration", "config", chainConfig) - odr := NewLesOdr(chainDb) - relay := NewLesTxRelay() + peers := newPeerSet() + quitSync := make(chan struct{}) + eth := &LightEthereum{ - odr: odr, - relay: relay, - chainDb: chainDb, chainConfig: chainConfig, + chainDb: chainDb, eventMux: ctx.EventMux, + peers: peers, + reqDist: newRequestDistributor(peers, quitSync), accountManager: ctx.AccountManager, engine: eth.CreateConsensusEngine(ctx, config, chainConfig, chainDb), shutdownChan: make(chan bool), networkId: config.NetworkId, } - if eth.blockchain, err = light.NewLightChain(odr, eth.chainConfig, eth.engine, eth.eventMux); err != nil { + + eth.relay = NewLesTxRelay(peers, eth.reqDist) + eth.serverPool = newServerPool(chainDb, quitSync, ð.wg) + eth.retriever = newRetrieveManager(peers, eth.reqDist, eth.serverPool) + eth.odr = NewLesOdr(chainDb, eth.retriever) + if eth.blockchain, err = light.NewLightChain(eth.odr, eth.chainConfig, eth.engine, eth.eventMux); err != nil { return nil, err } // Rewind the chain in case of an incompatible config upgrade. @@ -100,13 +115,9 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { } eth.txPool = light.NewTxPool(eth.chainConfig, eth.eventMux, eth.blockchain, eth.relay) - lightSync := config.SyncMode == downloader.LightSync - if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, lightSync, config.NetworkId, eth.eventMux, eth.engine, eth.blockchain, nil, chainDb, odr, relay); err != nil { + if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, true, config.NetworkId, eth.eventMux, eth.engine, eth.peers, eth.blockchain, nil, chainDb, eth.odr, eth.relay, quitSync, ð.wg); err != nil { return nil, err } - relay.ps = eth.protocolManager.peers - relay.reqDist = eth.protocolManager.reqDist - eth.ApiBackend = &LesApiBackend{eth, nil} gpoParams := config.GPO if gpoParams.Default == nil { @@ -116,6 +127,10 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { return eth, nil } +func lesTopic(genesisHash common.Hash) discv5.Topic { + return discv5.Topic("LES@" + common.Bytes2Hex(genesisHash.Bytes()[0:8])) +} + type LightDummyAPI struct{} // Etherbase is the address that mining rewards will be send to @@ -188,7 +203,8 @@ func (s *LightEthereum) Protocols() []p2p.Protocol { func (s *LightEthereum) Start(srvr *p2p.Server) error { log.Warn("Light client mode is an experimental feature") s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.networkId) - s.protocolManager.Start(srvr) + s.serverPool.start(srvr, lesTopic(s.blockchain.Genesis().Hash())) + s.protocolManager.Start() return nil } diff --git a/les/distributor.go b/les/distributor.go index 71afe2b73..e8ef5b02e 100644 --- a/les/distributor.go +++ b/les/distributor.go @@ -34,11 +34,11 @@ var ErrNoPeers = errors.New("no suitable peers available") type requestDistributor struct { reqQueue *list.List lastReqOrder uint64 + peers map[distPeer]struct{} + peerLock sync.RWMutex stopChn, loopChn chan struct{} loopNextSent bool lock sync.Mutex - - getAllPeers func() map[distPeer]struct{} } // distPeer is an LES server peer interface for the request distributor. @@ -71,15 +71,39 @@ type distReq struct { } // newRequestDistributor creates a new request distributor -func newRequestDistributor(getAllPeers func() map[distPeer]struct{}, stopChn chan struct{}) *requestDistributor { - r := &requestDistributor{ - reqQueue: list.New(), - loopChn: make(chan struct{}, 2), - stopChn: stopChn, - getAllPeers: getAllPeers, +func newRequestDistributor(peers *peerSet, stopChn chan struct{}) *requestDistributor { + d := &requestDistributor{ + reqQueue: list.New(), + loopChn: make(chan struct{}, 2), + stopChn: stopChn, + peers: make(map[distPeer]struct{}), + } + if peers != nil { + peers.notify(d) } - go r.loop() - return r + go d.loop() + return d +} + +// registerPeer implements peerSetNotify +func (d *requestDistributor) registerPeer(p *peer) { + d.peerLock.Lock() + d.peers[p] = struct{}{} + d.peerLock.Unlock() +} + +// unregisterPeer implements peerSetNotify +func (d *requestDistributor) unregisterPeer(p *peer) { + d.peerLock.Lock() + delete(d.peers, p) + d.peerLock.Unlock() +} + +// registerTestPeer adds a new test peer +func (d *requestDistributor) registerTestPeer(p distPeer) { + d.peerLock.Lock() + d.peers[p] = struct{}{} + d.peerLock.Unlock() } // distMaxWait is the maximum waiting time after which further necessary waiting @@ -152,8 +176,7 @@ func (sp selectPeerItem) Weight() int64 { // nextRequest returns the next possible request from any peer, along with the // associated peer and necessary waiting time func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { - peers := d.getAllPeers() - + checkedPeers := make(map[distPeer]struct{}) elem := d.reqQueue.Front() var ( bestPeer distPeer @@ -162,11 +185,14 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { sel *weightedRandomSelect ) - for (len(peers) > 0 || elem == d.reqQueue.Front()) && elem != nil { + d.peerLock.RLock() + defer d.peerLock.RUnlock() + + for (len(d.peers) > 0 || elem == d.reqQueue.Front()) && elem != nil { req := elem.Value.(*distReq) canSend := false - for peer, _ := range peers { - if peer.canQueue() && req.canSend(peer) { + for peer, _ := range d.peers { + if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) { canSend = true cost := req.getCost(peer) wait, bufRemain := peer.waitBefore(cost) @@ -182,7 +208,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { bestWait = wait } } - delete(peers, peer) + checkedPeers[peer] = struct{}{} } } next := elem.Next() diff --git a/les/distributor_test.go b/les/distributor_test.go index ae184b21b..4e7f8bd29 100644 --- a/les/distributor_test.go +++ b/les/distributor_test.go @@ -122,20 +122,14 @@ func testRequestDistributor(t *testing.T, resend bool) { stop := make(chan struct{}) defer close(stop) + dist := newRequestDistributor(nil, stop) var peers [testDistPeerCount]*testDistPeer for i, _ := range peers { peers[i] = &testDistPeer{} go peers[i].worker(t, !resend, stop) + dist.registerTestPeer(peers[i]) } - dist := newRequestDistributor(func() map[distPeer]struct{} { - m := make(map[distPeer]struct{}) - for _, peer := range peers { - m[peer] = struct{}{} - } - return m - }, stop) - var wg sync.WaitGroup for i := 1; i <= testDistReqCount; i++ { diff --git a/les/fetcher.go b/les/fetcher.go index a294d00d5..4fc142f0f 100644 --- a/les/fetcher.go +++ b/les/fetcher.go @@ -116,6 +116,7 @@ func newLightFetcher(pm *ProtocolManager) *lightFetcher { syncDone: make(chan *peer), maxConfirmedTd: big.NewInt(0), } + pm.peers.notify(f) go f.syncLoop() return f } @@ -209,8 +210,8 @@ func (f *lightFetcher) syncLoop() { } } -// addPeer adds a new peer to the fetcher's peer set -func (f *lightFetcher) addPeer(p *peer) { +// registerPeer adds a new peer to the fetcher's peer set +func (f *lightFetcher) registerPeer(p *peer) { p.lock.Lock() p.hasBlock = func(hash common.Hash, number uint64) bool { return f.peerHasBlock(p, hash, number) @@ -223,8 +224,8 @@ func (f *lightFetcher) addPeer(p *peer) { f.peers[p] = &fetcherPeerInfo{nodeByHash: make(map[common.Hash]*fetcherTreeNode)} } -// removePeer removes a new peer from the fetcher's peer set -func (f *lightFetcher) removePeer(p *peer) { +// unregisterPeer removes a new peer from the fetcher's peer set +func (f *lightFetcher) unregisterPeer(p *peer) { p.lock.Lock() p.hasBlock = nil p.lock.Unlock() @@ -416,7 +417,7 @@ func (f *lightFetcher) nextRequest() (*distReq, uint64) { f.syncing = bestSyncing var rq *distReq - reqID := getNextReqID() + reqID := genReqID() if f.syncing { rq = &distReq{ getCost: func(dp distPeer) uint64 { diff --git a/les/handler.go b/les/handler.go index 64023af0f..77bc077a2 100644 --- a/les/handler.go +++ b/les/handler.go @@ -102,7 +102,9 @@ type ProtocolManager struct { odr *LesOdr server *LesServer serverPool *serverPool + lesTopic discv5.Topic reqDist *requestDistributor + retriever *retrieveManager downloader *downloader.Downloader fetcher *lightFetcher @@ -123,12 +125,12 @@ type ProtocolManager struct { // wait group is used for graceful shutdowns during downloading // and processing - wg sync.WaitGroup + wg *sync.WaitGroup } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay) (*ProtocolManager, error) { +func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ lightSync: lightSync, @@ -136,15 +138,20 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network blockchain: blockchain, chainConfig: chainConfig, chainDb: chainDb, + odr: odr, networkId: networkId, txpool: txpool, txrelay: txrelay, - odr: odr, - peers: newPeerSet(), + peers: peers, newPeerCh: make(chan *peer), - quitSync: make(chan struct{}), + quitSync: quitSync, + wg: wg, noMorePeers: make(chan struct{}), } + if odr != nil { + manager.retriever = odr.retriever + manager.reqDist = odr.retriever.dist + } // Initiate a sub-protocol for every implemented version we can handle manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) for i, version := range ProtocolVersions { @@ -202,84 +209,22 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash, nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash, blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer) + manager.peers.notify((*downloaderPeerNotify)(manager)) + manager.fetcher = newLightFetcher(manager) } - manager.reqDist = newRequestDistributor(func() map[distPeer]struct{} { - m := make(map[distPeer]struct{}) - peers := manager.peers.AllPeers() - for _, peer := range peers { - m[peer] = struct{}{} - } - return m - }, manager.quitSync) - if odr != nil { - odr.removePeer = removePeer - odr.reqDist = manager.reqDist - } - - /*validator := func(block *types.Block, parent *types.Block) error { - return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false) - } - heighter := func() uint64 { - return chainman.LastBlockNumberU64() - } - manager.fetcher = fetcher.New(chainman.GetBlockNoOdr, validator, nil, heighter, chainman.InsertChain, manager.removePeer) - */ return manager, nil } +// removePeer initiates disconnection from a peer by removing it from the peer set func (pm *ProtocolManager) removePeer(id string) { - // Short circuit if the peer was already removed - peer := pm.peers.Peer(id) - if peer == nil { - return - } - log.Debug("Removing light Ethereum peer", "peer", id) - if err := pm.peers.Unregister(id); err != nil { - if err == errNotRegistered { - return - } - } - // Unregister the peer from the downloader and Ethereum peer set - if pm.lightSync { - pm.downloader.UnregisterPeer(id) - if pm.txrelay != nil { - pm.txrelay.removePeer(id) - } - if pm.fetcher != nil { - pm.fetcher.removePeer(peer) - } - } - // Hard disconnect at the networking layer - if peer != nil { - peer.Peer.Disconnect(p2p.DiscUselessPeer) - } + pm.peers.Unregister(id) } -func (pm *ProtocolManager) Start(srvr *p2p.Server) { - var topicDisc *discv5.Network - if srvr != nil { - topicDisc = srvr.DiscV5 - } - lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) +func (pm *ProtocolManager) Start() { if pm.lightSync { - // start sync handler - if srvr != nil { // srvr is nil during testing - pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg) - pm.odr.serverPool = pm.serverPool - pm.fetcher = newLightFetcher(pm) - } go pm.syncer() } else { - if topicDisc != nil { - go func() { - logger := log.New("topic", lesTopic) - logger.Info("Starting topic registration") - defer logger.Info("Terminated topic registration") - - topicDisc.RegisterTopic(lesTopic, pm.quitSync) - }() - } go func() { for range pm.newPeerCh { } @@ -342,65 +287,10 @@ func (pm *ProtocolManager) handle(p *peer) error { }() // Register the peer in the downloader. If the downloader considers it banned, we disconnect if pm.lightSync { - requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error { - reqID := getNextReqID() - rq := &distReq{ - getCost: func(dp distPeer) uint64 { - peer := dp.(*peer) - return peer.GetRequestCost(GetBlockHeadersMsg, amount) - }, - canSend: func(dp distPeer) bool { - return dp.(*peer) == p - }, - request: func(dp distPeer) func() { - peer := dp.(*peer) - cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) - peer.fcServer.QueueRequest(reqID, cost) - return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } - }, - } - _, ok := <-pm.reqDist.queue(rq) - if !ok { - return ErrNoPeers - } - return nil - } - requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error { - reqID := getNextReqID() - rq := &distReq{ - getCost: func(dp distPeer) uint64 { - peer := dp.(*peer) - return peer.GetRequestCost(GetBlockHeadersMsg, amount) - }, - canSend: func(dp distPeer) bool { - return dp.(*peer) == p - }, - request: func(dp distPeer) func() { - peer := dp.(*peer) - cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) - peer.fcServer.QueueRequest(reqID, cost) - return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) } - }, - } - _, ok := <-pm.reqDist.queue(rq) - if !ok { - return ErrNoPeers - } - return nil - } - if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd, - requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil { - return err - } - if pm.txrelay != nil { - pm.txrelay.addPeer(p) - } - p.lock.Lock() head := p.headInfo p.lock.Unlock() if pm.fetcher != nil { - pm.fetcher.addPeer(p) pm.fetcher.announce(p, head) } @@ -926,7 +816,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } if deliverMsg != nil { - err := pm.odr.Deliver(p, deliverMsg) + err := pm.retriever.deliver(p, deliverMsg) if err != nil { p.responseErrors++ if p.responseErrors > maxResponseErrors { @@ -946,3 +836,64 @@ func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo { Head: self.blockchain.LastBlockHash(), } } + +// downloaderPeerNotify implements peerSetNotify +type downloaderPeerNotify ProtocolManager + +func (d *downloaderPeerNotify) registerPeer(p *peer) { + pm := (*ProtocolManager)(d) + + requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error { + reqID := genReqID() + rq := &distReq{ + getCost: func(dp distPeer) uint64 { + peer := dp.(*peer) + return peer.GetRequestCost(GetBlockHeadersMsg, amount) + }, + canSend: func(dp distPeer) bool { + return dp.(*peer) == p + }, + request: func(dp distPeer) func() { + peer := dp.(*peer) + cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) + peer.fcServer.QueueRequest(reqID, cost) + return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } + }, + } + _, ok := <-pm.reqDist.queue(rq) + if !ok { + return ErrNoPeers + } + return nil + } + requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error { + reqID := genReqID() + rq := &distReq{ + getCost: func(dp distPeer) uint64 { + peer := dp.(*peer) + return peer.GetRequestCost(GetBlockHeadersMsg, amount) + }, + canSend: func(dp distPeer) bool { + return dp.(*peer) == p + }, + request: func(dp distPeer) func() { + peer := dp.(*peer) + cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) + peer.fcServer.QueueRequest(reqID, cost) + return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) } + }, + } + _, ok := <-pm.reqDist.queue(rq) + if !ok { + return ErrNoPeers + } + return nil + } + + pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd, requestHeadersByHash, requestHeadersByNumber, nil, nil, nil) +} + +func (d *downloaderPeerNotify) unregisterPeer(p *peer) { + pm := (*ProtocolManager)(d) + pm.downloader.UnregisterPeer(p.id) +} diff --git a/les/handler_test.go b/les/handler_test.go index 0b94d0d30..5df1d3463 100644 --- a/les/handler_test.go +++ b/les/handler_test.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" @@ -42,7 +43,8 @@ func expectResponse(r p2p.MsgReader, msgcode, reqID, bv uint64, data interface{} func TestGetBlockHeadersLes1(t *testing.T) { testGetBlockHeaders(t, 1) } func testGetBlockHeaders(t *testing.T, protocol int) { - pm, _, _ := newTestProtocolManagerMust(t, false, downloader.MaxHashFetch+15, nil) + db, _ := ethdb.NewMemDatabase() + pm := newTestProtocolManagerMust(t, false, downloader.MaxHashFetch+15, nil, nil, nil, db) bc := pm.blockchain.(*core.BlockChain) peer, _ := newTestPeer(t, "peer", protocol, pm, true) defer peer.close() @@ -170,7 +172,8 @@ func testGetBlockHeaders(t *testing.T, protocol int) { func TestGetBlockBodiesLes1(t *testing.T) { testGetBlockBodies(t, 1) } func testGetBlockBodies(t *testing.T, protocol int) { - pm, _, _ := newTestProtocolManagerMust(t, false, downloader.MaxBlockFetch+15, nil) + db, _ := ethdb.NewMemDatabase() + pm := newTestProtocolManagerMust(t, false, downloader.MaxBlockFetch+15, nil, nil, nil, db) bc := pm.blockchain.(*core.BlockChain) peer, _ := newTestPeer(t, "peer", protocol, pm, true) defer peer.close() @@ -246,7 +249,8 @@ func TestGetCodeLes1(t *testing.T) { testGetCode(t, 1) } func testGetCode(t *testing.T, protocol int) { // Assemble the test environment - pm, _, _ := newTestProtocolManagerMust(t, false, 4, testChainGen) + db, _ := ethdb.NewMemDatabase() + pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) bc := pm.blockchain.(*core.BlockChain) peer, _ := newTestPeer(t, "peer", protocol, pm, true) defer peer.close() @@ -278,7 +282,8 @@ func TestGetReceiptLes1(t *testing.T) { testGetReceipt(t, 1) } func testGetReceipt(t *testing.T, protocol int) { // Assemble the test environment - pm, db, _ := newTestProtocolManagerMust(t, false, 4, testChainGen) + db, _ := ethdb.NewMemDatabase() + pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) bc := pm.blockchain.(*core.BlockChain) peer, _ := newTestPeer(t, "peer", protocol, pm, true) defer peer.close() @@ -304,7 +309,8 @@ func TestGetProofsLes1(t *testing.T) { testGetReceipt(t, 1) } func testGetProofs(t *testing.T, protocol int) { // Assemble the test environment - pm, db, _ := newTestProtocolManagerMust(t, false, 4, testChainGen) + db, _ := ethdb.NewMemDatabase() + pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) bc := pm.blockchain.(*core.BlockChain) peer, _ := newTestPeer(t, "peer", protocol, pm, true) defer peer.close() diff --git a/les/helper_test.go b/les/helper_test.go index 7e442c131..52fddd117 100644 --- a/les/helper_test.go +++ b/les/helper_test.go @@ -25,7 +25,6 @@ import ( "math/big" "sync" "testing" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" @@ -132,22 +131,22 @@ func testRCL() RequestCostList { // newTestProtocolManager creates a new protocol manager for testing purposes, // with the given number of blocks already known, and potential notification // channels for different events. -func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen)) (*ProtocolManager, ethdb.Database, *LesOdr, error) { +func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen), peers *peerSet, odr *LesOdr, db ethdb.Database) (*ProtocolManager, error) { var ( evmux = new(event.TypeMux) engine = ethash.NewFaker() - db, _ = ethdb.NewMemDatabase() gspec = core.Genesis{ Config: params.TestChainConfig, Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}, } genesis = gspec.MustCommit(db) - odr *LesOdr - chain BlockChain + chain BlockChain ) + if peers == nil { + peers = newPeerSet() + } if lightSync { - odr = NewLesOdr(db) chain, _ = light.NewLightChain(odr, gspec.Config, engine, evmux) } else { blockchain, _ := core.NewBlockChain(db, gspec.Config, engine, evmux, vm.Config{}) @@ -158,9 +157,9 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor chain = blockchain } - pm, err := NewProtocolManager(gspec.Config, lightSync, NetworkId, evmux, engine, chain, nil, db, odr, nil) + pm, err := NewProtocolManager(gspec.Config, lightSync, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, make(chan struct{}), new(sync.WaitGroup)) if err != nil { - return nil, nil, nil, err + return nil, err } if !lightSync { srv := &LesServer{protocolManager: pm} @@ -174,20 +173,20 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor srv.fcManager = flowcontrol.NewClientManager(50, 10, 1000000000) srv.fcCostStats = newCostStats(nil) } - pm.Start(nil) - return pm, db, odr, nil + pm.Start() + return pm, nil } // newTestProtocolManagerMust creates a new protocol manager for testing purposes, // with the given number of blocks already known, and potential notification // channels for different events. In case of an error, the constructor force- // fails the test. -func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen)) (*ProtocolManager, ethdb.Database, *LesOdr) { - pm, db, odr, err := newTestProtocolManager(lightSync, blocks, generator) +func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen), peers *peerSet, odr *LesOdr, db ethdb.Database) *ProtocolManager { + pm, err := newTestProtocolManager(lightSync, blocks, generator, peers, odr, db) if err != nil { t.Fatalf("Failed to create protocol manager: %v", err) } - return pm, db, odr + return pm } // testTxPool is a fake, helper transaction pool for testing purposes @@ -342,30 +341,3 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu func (p *testPeer) close() { p.app.Close() } - -type testServerPool struct { - peer *peer - lock sync.RWMutex -} - -func (p *testServerPool) setPeer(peer *peer) { - p.lock.Lock() - defer p.lock.Unlock() - - p.peer = peer -} - -func (p *testServerPool) getAllPeers() map[distPeer]struct{} { - p.lock.RLock() - defer p.lock.RUnlock() - - m := make(map[distPeer]struct{}) - if p.peer != nil { - m[p.peer] = struct{}{} - } - return m -} - -func (p *testServerPool) adjustResponseTime(*poolEntry, time.Duration, bool) { - -} diff --git a/les/odr.go b/les/odr.go index 684f36c76..3f7584b48 100644 --- a/les/odr.go +++ b/les/odr.go @@ -18,45 +18,24 @@ package les import ( "context" - "crypto/rand" - "encoding/binary" - "sync" - "time" - "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" ) -var ( - softRequestTimeout = time.Millisecond * 500 - hardRequestTimeout = time.Second * 10 -) - -// peerDropFn is a callback type for dropping a peer detected as malicious. -type peerDropFn func(id string) - -type odrPeerSelector interface { - adjustResponseTime(*poolEntry, time.Duration, bool) -} - +// LesOdr implements light.OdrBackend type LesOdr struct { - light.OdrBackend - db ethdb.Database - stop chan struct{} - removePeer peerDropFn - mlock, clock sync.Mutex - sentReqs map[uint64]*sentReq - serverPool odrPeerSelector - reqDist *requestDistributor + db ethdb.Database + stop chan struct{} + retriever *retrieveManager } -func NewLesOdr(db ethdb.Database) *LesOdr { +func NewLesOdr(db ethdb.Database, retriever *retrieveManager) *LesOdr { return &LesOdr{ - db: db, - stop: make(chan struct{}), - sentReqs: make(map[uint64]*sentReq), + db: db, + retriever: retriever, + stop: make(chan struct{}), } } @@ -68,17 +47,6 @@ func (odr *LesOdr) Database() ethdb.Database { return odr.db } -// validatorFunc is a function that processes a message. -type validatorFunc func(ethdb.Database, *Msg) error - -// sentReq is a request waiting for an answer that satisfies its valFunc -type sentReq struct { - valFunc validatorFunc - sentTo map[*peer]chan struct{} - lock sync.RWMutex // protects acces to sentTo - answered chan struct{} // closed and set to nil when any peer answers it -} - const ( MsgBlockBodies = iota MsgCode @@ -94,156 +62,29 @@ type Msg struct { Obj interface{} } -// Deliver is called by the LES protocol manager to deliver ODR reply messages to waiting requests -func (self *LesOdr) Deliver(peer *peer, msg *Msg) error { - var delivered chan struct{} - self.mlock.Lock() - req, ok := self.sentReqs[msg.ReqID] - self.mlock.Unlock() - if ok { - req.lock.Lock() - delivered, ok = req.sentTo[peer] - req.lock.Unlock() - } - - if !ok { - return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID) - } - - if err := req.valFunc(self.db, msg); err != nil { - peer.Log().Warn("Invalid odr response", "err", err) - return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID) - } - close(delivered) - req.lock.Lock() - delete(req.sentTo, peer) - if req.answered != nil { - close(req.answered) - req.answered = nil - } - req.lock.Unlock() - return nil -} - -func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout chan struct{}, reqWg *sync.WaitGroup) { - stime := mclock.Now() - defer func() { - req.lock.Lock() - delete(req.sentTo, peer) - req.lock.Unlock() - reqWg.Done() - }() - - select { - case <-delivered: - if self.serverPool != nil { - self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), false) - } - return - case <-time.After(softRequestTimeout): - close(timeout) - case <-self.stop: - return - } - - select { - case <-delivered: - case <-time.After(hardRequestTimeout): - peer.Log().Debug("Request timed out hard") - go self.removePeer(peer.id) - case <-self.stop: - return - } - if self.serverPool != nil { - self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), true) - } -} - -// networkRequest sends a request to known peers until an answer is received -// or the context is cancelled -func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) error { - answered := make(chan struct{}) - req := &sentReq{ - valFunc: lreq.Validate, - sentTo: make(map[*peer]chan struct{}), - answered: answered, // reply delivered by any peer - } - - exclude := make(map[*peer]struct{}) - - reqWg := new(sync.WaitGroup) - reqWg.Add(1) - defer reqWg.Done() +// Retrieve tries to fetch an object from the LES network. +// If the network retrieval was successful, it stores the object in local db. +func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) { + lreq := LesRequest(req) - var timeout chan struct{} - reqID := getNextReqID() + reqID := genReqID() rq := &distReq{ getCost: func(dp distPeer) uint64 { return lreq.GetCost(dp.(*peer)) }, canSend: func(dp distPeer) bool { p := dp.(*peer) - _, ok := exclude[p] - return !ok && lreq.CanSend(p) + return lreq.CanSend(p) }, request: func(dp distPeer) func() { p := dp.(*peer) - exclude[p] = struct{}{} - delivered := make(chan struct{}) - timeout = make(chan struct{}) - req.lock.Lock() - req.sentTo[p] = delivered - req.lock.Unlock() - reqWg.Add(1) cost := lreq.GetCost(p) p.fcServer.QueueRequest(reqID, cost) - go self.requestPeer(req, p, delivered, timeout, reqWg) return func() { lreq.Request(reqID, p) } }, } - self.mlock.Lock() - self.sentReqs[reqID] = req - self.mlock.Unlock() - - go func() { - reqWg.Wait() - self.mlock.Lock() - delete(self.sentReqs, reqID) - self.mlock.Unlock() - }() - - for { - peerChn := self.reqDist.queue(rq) - select { - case <-ctx.Done(): - self.reqDist.cancel(rq) - return ctx.Err() - case <-answered: - self.reqDist.cancel(rq) - return nil - case _, ok := <-peerChn: - if !ok { - return ErrNoPeers - } - } - - select { - case <-ctx.Done(): - return ctx.Err() - case <-answered: - return nil - case <-timeout: - } - } -} - -// Retrieve tries to fetch an object from the LES network. -// If the network retrieval was successful, it stores the object in local db. -func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) { - lreq := LesRequest(req) - err = self.networkRequest(ctx, lreq) - if err == nil { + if err = self.retriever.retrieve(ctx, reqID, rq, func(p distPeer, msg *Msg) error { return lreq.Validate(self.db, msg) }); err == nil { // retrieved from network, store in db req.StoreResult(self.db) } else { @@ -251,9 +92,3 @@ func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err err } return } - -func getNextReqID() uint64 { - var rnd [8]byte - rand.Read(rnd[:]) - return binary.BigEndian.Uint64(rnd[:]) -} diff --git a/les/odr_test.go b/les/odr_test.go index 532de4d80..7b34996ce 100644 --- a/les/odr_test.go +++ b/les/odr_test.go @@ -158,15 +158,15 @@ func odrContractCall(ctx context.Context, db ethdb.Database, config *params.Chai func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) { // Assemble the test environment - pm, db, odr := newTestProtocolManagerMust(t, false, 4, testChainGen) - lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil) + peers := newPeerSet() + dist := newRequestDistributor(peers, make(chan struct{})) + rm := newRetrieveManager(peers, dist, nil) + db, _ := ethdb.NewMemDatabase() + ldb, _ := ethdb.NewMemDatabase() + odr := NewLesOdr(ldb, rm) + pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) + lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb) _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) - pool := &testServerPool{} - lpm.reqDist = newRequestDistributor(pool.getAllPeers, lpm.quitSync) - odr.reqDist = lpm.reqDist - pool.setPeer(lpeer) - odr.serverPool = pool - lpeer.hasBlock = func(common.Hash, uint64) bool { return true } select { case <-time.After(time.Millisecond * 100): case err := <-err1: @@ -198,13 +198,19 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) { } // temporarily remove peer to test odr fails - pool.setPeer(nil) // expect retrievals to fail (except genesis block) without a les peer + peers.Unregister(lpeer.id) + time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed test(expFail) - pool.setPeer(lpeer) // expect all retrievals to pass + peers.Register(lpeer) + time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed + lpeer.lock.Lock() + lpeer.hasBlock = func(common.Hash, uint64) bool { return true } + lpeer.lock.Unlock() test(5) - pool.setPeer(nil) // still expect all retrievals to pass, now data should be cached locally + peers.Unregister(lpeer.id) + time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed test(5) } diff --git a/les/peer.go b/les/peer.go index ab55bafe3..791d0da24 100644 --- a/les/peer.go +++ b/les/peer.go @@ -166,9 +166,9 @@ func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 { // HasBlock checks if the peer has a given block func (p *peer) HasBlock(hash common.Hash, number uint64) bool { p.lock.RLock() - hashBlock := p.hasBlock + hasBlock := p.hasBlock p.lock.RUnlock() - return hashBlock != nil && hashBlock(hash, number) + return hasBlock != nil && hasBlock(hash, number) } // SendAnnounce announces the availability of a number of blocks through @@ -433,12 +433,20 @@ func (p *peer) String() string { ) } +// peerSetNotify is a callback interface to notify services about added or +// removed peers +type peerSetNotify interface { + registerPeer(*peer) + unregisterPeer(*peer) +} + // peerSet represents the collection of active peers currently participating in // the Light Ethereum sub-protocol. type peerSet struct { - peers map[string]*peer - lock sync.RWMutex - closed bool + peers map[string]*peer + lock sync.RWMutex + notifyList []peerSetNotify + closed bool } // newPeerSet creates a new peer set to track the active participants. @@ -448,6 +456,17 @@ func newPeerSet() *peerSet { } } +// notify adds a service to be notified about added or removed peers +func (ps *peerSet) notify(n peerSetNotify) { + ps.lock.Lock() + defer ps.lock.Unlock() + + ps.notifyList = append(ps.notifyList, n) + for _, p := range ps.peers { + go n.registerPeer(p) + } +} + // Register injects a new peer into the working set, or returns an error if the // peer is already known. func (ps *peerSet) Register(p *peer) error { @@ -462,11 +481,14 @@ func (ps *peerSet) Register(p *peer) error { } ps.peers[p.id] = p p.sendQueue = newExecQueue(100) + for _, n := range ps.notifyList { + go n.registerPeer(p) + } return nil } // Unregister removes a remote peer from the active set, disabling any further -// actions to/from that particular entity. +// actions to/from that particular entity. It also initiates disconnection at the networking layer. func (ps *peerSet) Unregister(id string) error { ps.lock.Lock() defer ps.lock.Unlock() @@ -474,7 +496,11 @@ func (ps *peerSet) Unregister(id string) error { if p, ok := ps.peers[id]; !ok { return errNotRegistered } else { + for _, n := range ps.notifyList { + go n.unregisterPeer(p) + } p.sendQueue.quit() + p.Peer.Disconnect(p2p.DiscUselessPeer) } delete(ps.peers, id) return nil diff --git a/les/request_test.go b/les/request_test.go index ba1fc15bd..3add5f20d 100644 --- a/les/request_test.go +++ b/les/request_test.go @@ -68,15 +68,16 @@ func tfCodeAccess(db ethdb.Database, bhash common.Hash, number uint64) light.Odr func testAccess(t *testing.T, protocol int, fn accessTestFn) { // Assemble the test environment - pm, db, _ := newTestProtocolManagerMust(t, false, 4, testChainGen) - lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil) + peers := newPeerSet() + dist := newRequestDistributor(peers, make(chan struct{})) + rm := newRetrieveManager(peers, dist, nil) + db, _ := ethdb.NewMemDatabase() + ldb, _ := ethdb.NewMemDatabase() + odr := NewLesOdr(ldb, rm) + + pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) + lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb) _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) - pool := &testServerPool{} - lpm.reqDist = newRequestDistributor(pool.getAllPeers, lpm.quitSync) - odr.reqDist = lpm.reqDist - pool.setPeer(lpeer) - odr.serverPool = pool - lpeer.hasBlock = func(common.Hash, uint64) bool { return true } select { case <-time.After(time.Millisecond * 100): case err := <-err1: @@ -108,10 +109,16 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) { } // temporarily remove peer to test odr fails - pool.setPeer(nil) + peers.Unregister(lpeer.id) + time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed // expect retrievals to fail (except genesis block) without a les peer test(0) - pool.setPeer(lpeer) + + peers.Register(lpeer) + time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed + lpeer.lock.Lock() + lpeer.hasBlock = func(common.Hash, uint64) bool { return true } + lpeer.lock.Unlock() // expect all retrievals to pass test(5) } diff --git a/les/retrieve.go b/les/retrieve.go new file mode 100644 index 000000000..b060e0b0d --- /dev/null +++ b/les/retrieve.go @@ -0,0 +1,395 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package light implements on-demand retrieval capable state and chain objects +// for the Ethereum Light Client. +package les + +import ( + "context" + "crypto/rand" + "encoding/binary" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" +) + +var ( + retryQueue = time.Millisecond * 100 + softRequestTimeout = time.Millisecond * 500 + hardRequestTimeout = time.Second * 10 +) + +// retrieveManager is a layer on top of requestDistributor which takes care of +// matching replies by request ID and handles timeouts and resends if necessary. +type retrieveManager struct { + dist *requestDistributor + peers *peerSet + serverPool peerSelector + + lock sync.RWMutex + sentReqs map[uint64]*sentReq +} + +// validatorFunc is a function that processes a reply message +type validatorFunc func(distPeer, *Msg) error + +// peerSelector receives feedback info about response times and timeouts +type peerSelector interface { + adjustResponseTime(*poolEntry, time.Duration, bool) +} + +// sentReq represents a request sent and tracked by retrieveManager +type sentReq struct { + rm *retrieveManager + req *distReq + id uint64 + validate validatorFunc + + eventsCh chan reqPeerEvent + stopCh chan struct{} + stopped bool + err error + + lock sync.RWMutex // protect access to sentTo map + sentTo map[distPeer]sentReqToPeer + + reqQueued bool // a request has been queued but not sent + reqSent bool // a request has been sent but not timed out + reqSrtoCount int // number of requests that reached soft (but not hard) timeout +} + +// sentReqToPeer notifies the request-from-peer goroutine (tryRequest) about a response +// delivered by the given peer. Only one delivery is allowed per request per peer, +// after which delivered is set to true, the validity of the response is sent on the +// valid channel and no more responses are accepted. +type sentReqToPeer struct { + delivered bool + valid chan bool +} + +// reqPeerEvent is sent by the request-from-peer goroutine (tryRequest) to the +// request state machine (retrieveLoop) through the eventsCh channel. +type reqPeerEvent struct { + event int + peer distPeer +} + +const ( + rpSent = iota // if peer == nil, not sent (no suitable peers) + rpSoftTimeout + rpHardTimeout + rpDeliveredValid + rpDeliveredInvalid +) + +// newRetrieveManager creates the retrieve manager +func newRetrieveManager(peers *peerSet, dist *requestDistributor, serverPool peerSelector) *retrieveManager { + return &retrieveManager{ + peers: peers, + dist: dist, + serverPool: serverPool, + sentReqs: make(map[uint64]*sentReq), + } +} + +// retrieve sends a request (to multiple peers if necessary) and waits for an answer +// that is delivered through the deliver function and successfully validated by the +// validator callback. It returns when a valid answer is delivered or the context is +// cancelled. +func (rm *retrieveManager) retrieve(ctx context.Context, reqID uint64, req *distReq, val validatorFunc) error { + sentReq := rm.sendReq(reqID, req, val) + select { + case <-sentReq.stopCh: + case <-ctx.Done(): + sentReq.stop(ctx.Err()) + } + return sentReq.getError() +} + +// sendReq starts a process that keeps trying to retrieve a valid answer for a +// request from any suitable peers until stopped or succeeded. +func (rm *retrieveManager) sendReq(reqID uint64, req *distReq, val validatorFunc) *sentReq { + r := &sentReq{ + rm: rm, + req: req, + id: reqID, + sentTo: make(map[distPeer]sentReqToPeer), + stopCh: make(chan struct{}), + eventsCh: make(chan reqPeerEvent, 10), + validate: val, + } + + canSend := req.canSend + req.canSend = func(p distPeer) bool { + // add an extra check to canSend: the request has not been sent to the same peer before + r.lock.RLock() + _, sent := r.sentTo[p] + r.lock.RUnlock() + return !sent && canSend(p) + } + + request := req.request + req.request = func(p distPeer) func() { + // before actually sending the request, put an entry into the sentTo map + r.lock.Lock() + r.sentTo[p] = sentReqToPeer{false, make(chan bool, 1)} + r.lock.Unlock() + return request(p) + } + rm.lock.Lock() + rm.sentReqs[reqID] = r + rm.lock.Unlock() + + go r.retrieveLoop() + return r +} + +// deliver is called by the LES protocol manager to deliver reply messages to waiting requests +func (rm *retrieveManager) deliver(peer distPeer, msg *Msg) error { + rm.lock.RLock() + req, ok := rm.sentReqs[msg.ReqID] + rm.lock.RUnlock() + + if ok { + return req.deliver(peer, msg) + } + return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID) +} + +// reqStateFn represents a state of the retrieve loop state machine +type reqStateFn func() reqStateFn + +// retrieveLoop is the retrieval state machine event loop +func (r *sentReq) retrieveLoop() { + go r.tryRequest() + r.reqQueued = true + state := r.stateRequesting + + for state != nil { + state = state() + } + + r.rm.lock.Lock() + delete(r.rm.sentReqs, r.id) + r.rm.lock.Unlock() +} + +// stateRequesting: a request has been queued or sent recently; when it reaches soft timeout, +// a new request is sent to a new peer +func (r *sentReq) stateRequesting() reqStateFn { + select { + case ev := <-r.eventsCh: + r.update(ev) + switch ev.event { + case rpSent: + if ev.peer == nil { + // request send failed, no more suitable peers + if r.waiting() { + // we are already waiting for sent requests which may succeed so keep waiting + return r.stateNoMorePeers + } + // nothing to wait for, no more peers to ask, return with error + r.stop(ErrNoPeers) + // no need to go to stopped state because waiting() already returned false + return nil + } + case rpSoftTimeout: + // last request timed out, try asking a new peer + go r.tryRequest() + r.reqQueued = true + return r.stateRequesting + case rpDeliveredValid: + r.stop(nil) + return r.stateStopped + } + return r.stateRequesting + case <-r.stopCh: + return r.stateStopped + } +} + +// stateNoMorePeers: could not send more requests because no suitable peers are available. +// Peers may become suitable for a certain request later or new peers may appear so we +// keep trying. +func (r *sentReq) stateNoMorePeers() reqStateFn { + select { + case <-time.After(retryQueue): + go r.tryRequest() + r.reqQueued = true + return r.stateRequesting + case ev := <-r.eventsCh: + r.update(ev) + if ev.event == rpDeliveredValid { + r.stop(nil) + return r.stateStopped + } + return r.stateNoMorePeers + case <-r.stopCh: + return r.stateStopped + } +} + +// stateStopped: request succeeded or cancelled, just waiting for some peers +// to either answer or time out hard +func (r *sentReq) stateStopped() reqStateFn { + for r.waiting() { + r.update(<-r.eventsCh) + } + return nil +} + +// update updates the queued/sent flags and timed out peers counter according to the event +func (r *sentReq) update(ev reqPeerEvent) { + switch ev.event { + case rpSent: + r.reqQueued = false + if ev.peer != nil { + r.reqSent = true + } + case rpSoftTimeout: + r.reqSent = false + r.reqSrtoCount++ + case rpHardTimeout, rpDeliveredValid, rpDeliveredInvalid: + r.reqSrtoCount-- + } +} + +// waiting returns true if the retrieval mechanism is waiting for an answer from +// any peer +func (r *sentReq) waiting() bool { + return r.reqQueued || r.reqSent || r.reqSrtoCount > 0 +} + +// tryRequest tries to send the request to a new peer and waits for it to either +// succeed or time out if it has been sent. It also sends the appropriate reqPeerEvent +// messages to the request's event channel. +func (r *sentReq) tryRequest() { + sent := r.rm.dist.queue(r.req) + var p distPeer + select { + case p = <-sent: + case <-r.stopCh: + if r.rm.dist.cancel(r.req) { + p = nil + } else { + p = <-sent + } + } + + r.eventsCh <- reqPeerEvent{rpSent, p} + if p == nil { + return + } + + reqSent := mclock.Now() + srto, hrto := false, false + + r.lock.RLock() + s, ok := r.sentTo[p] + r.lock.RUnlock() + if !ok { + panic(nil) + } + + defer func() { + // send feedback to server pool and remove peer if hard timeout happened + pp, ok := p.(*peer) + if ok && r.rm.serverPool != nil { + respTime := time.Duration(mclock.Now() - reqSent) + r.rm.serverPool.adjustResponseTime(pp.poolEntry, respTime, srto) + } + if hrto { + pp.Log().Debug("Request timed out hard") + if r.rm.peers != nil { + r.rm.peers.Unregister(pp.id) + } + } + + r.lock.Lock() + delete(r.sentTo, p) + r.lock.Unlock() + }() + + select { + case ok := <-s.valid: + if ok { + r.eventsCh <- reqPeerEvent{rpDeliveredValid, p} + } else { + r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p} + } + return + case <-time.After(softRequestTimeout): + srto = true + r.eventsCh <- reqPeerEvent{rpSoftTimeout, p} + } + + select { + case ok := <-s.valid: + if ok { + r.eventsCh <- reqPeerEvent{rpDeliveredValid, p} + } else { + r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p} + } + case <-time.After(hardRequestTimeout): + hrto = true + r.eventsCh <- reqPeerEvent{rpHardTimeout, p} + } +} + +// deliver a reply belonging to this request +func (r *sentReq) deliver(peer distPeer, msg *Msg) error { + r.lock.Lock() + defer r.lock.Unlock() + + s, ok := r.sentTo[peer] + if !ok || s.delivered { + return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID) + } + valid := r.validate(peer, msg) == nil + r.sentTo[peer] = sentReqToPeer{true, s.valid} + s.valid <- valid + if !valid { + return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID) + } + return nil +} + +// stop stops the retrieval process and sets an error code that will be returned +// by getError +func (r *sentReq) stop(err error) { + r.lock.Lock() + if !r.stopped { + r.stopped = true + r.err = err + close(r.stopCh) + } + r.lock.Unlock() +} + +// getError returns any retrieval error (either internally generated or set by the +// stop function) after stopCh has been closed +func (r *sentReq) getError() error { + return r.err +} + +// genReqID generates a new random request ID +func genReqID() uint64 { + var rnd [8]byte + rand.Read(rnd[:]) + return binary.BigEndian.Uint64(rnd[:]) +} diff --git a/les/server.go b/les/server.go index 22fe59b7a..2ff715ea8 100644 --- a/les/server.go +++ b/les/server.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discv5" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" ) @@ -41,17 +42,24 @@ type LesServer struct { fcManager *flowcontrol.ClientManager // nil if our node is client only fcCostStats *requestCostStats defParams *flowcontrol.ServerParams + lesTopic discv5.Topic + quitSync chan struct{} stopped bool } func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { - pm, err := NewProtocolManager(eth.BlockChain().Config(), false, config.NetworkId, eth.EventMux(), eth.Engine(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil) + quitSync := make(chan struct{}) + pm, err := NewProtocolManager(eth.BlockChain().Config(), false, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, quitSync, new(sync.WaitGroup)) if err != nil { return nil, err } pm.blockLoop() - srv := &LesServer{protocolManager: pm} + srv := &LesServer{ + protocolManager: pm, + quitSync: quitSync, + lesTopic: lesTopic(eth.BlockChain().Genesis().Hash()), + } pm.server = srv srv.defParams = &flowcontrol.ServerParams{ @@ -69,7 +77,14 @@ func (s *LesServer) Protocols() []p2p.Protocol { // Start starts the LES server func (s *LesServer) Start(srvr *p2p.Server) { - s.protocolManager.Start(srvr) + s.protocolManager.Start() + go func() { + logger := log.New("topic", s.lesTopic) + logger.Info("Starting topic registration") + defer logger.Info("Terminated topic registration") + + srvr.DiscV5.RegisterTopic(s.lesTopic, s.quitSync) + }() } // Stop stops the LES service diff --git a/les/serverpool.go b/les/serverpool.go index 64fe991c6..f4e4df2fb 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -102,6 +102,8 @@ type serverPool struct { wg *sync.WaitGroup connWg sync.WaitGroup + topic discv5.Topic + discSetPeriod chan time.Duration discNodes chan *discv5.Node discLookups chan bool @@ -118,11 +120,9 @@ type serverPool struct { } // newServerPool creates a new serverPool instance -func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic discv5.Topic, quit chan struct{}, wg *sync.WaitGroup) *serverPool { +func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *serverPool { pool := &serverPool{ db: db, - dbKey: append(dbPrefix, []byte(topic)...), - server: server, quit: quit, wg: wg, entries: make(map[discover.NodeID]*poolEntry), @@ -135,19 +135,25 @@ func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic } pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry) pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry) - wg.Add(1) + return pool +} + +func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) { + pool.server = server + pool.topic = topic + pool.dbKey = append([]byte("serverPool/"), []byte(topic)...) + pool.wg.Add(1) pool.loadNodes() - pool.checkDial() + go pool.eventLoop() + + pool.checkDial() if pool.server.DiscV5 != nil { pool.discSetPeriod = make(chan time.Duration, 1) pool.discNodes = make(chan *discv5.Node, 100) pool.discLookups = make(chan bool, 100) - go pool.server.DiscV5.SearchTopic(topic, pool.discSetPeriod, pool.discNodes, pool.discLookups) + go pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, pool.discNodes, pool.discLookups) } - - go pool.eventLoop() - return pool } // connect should be called upon any incoming connection. If the connection has been @@ -485,7 +491,7 @@ func (pool *serverPool) checkDial() { // dial initiates a new connection func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) { - if entry.state != psNotConnected { + if pool.server == nil || entry.state != psNotConnected { return } entry.state = psDialed diff --git a/les/txrelay.go b/les/txrelay.go index 1ca3467e4..7a02cc837 100644 --- a/les/txrelay.go +++ b/les/txrelay.go @@ -39,26 +39,28 @@ type LesTxRelay struct { reqDist *requestDistributor } -func NewLesTxRelay() *LesTxRelay { - return &LesTxRelay{ +func NewLesTxRelay(ps *peerSet, reqDist *requestDistributor) *LesTxRelay { + r := &LesTxRelay{ txSent: make(map[common.Hash]*ltrInfo), txPending: make(map[common.Hash]struct{}), + ps: ps, + reqDist: reqDist, } + ps.notify(r) + return r } -func (self *LesTxRelay) addPeer(p *peer) { +func (self *LesTxRelay) registerPeer(p *peer) { self.lock.Lock() defer self.lock.Unlock() - self.ps.Register(p) self.peerList = self.ps.AllPeers() } -func (self *LesTxRelay) removePeer(id string) { +func (self *LesTxRelay) unregisterPeer(p *peer) { self.lock.Lock() defer self.lock.Unlock() - self.ps.Unregister(id) self.peerList = self.ps.AllPeers() } @@ -112,7 +114,7 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) { pp := p ll := list - reqID := getNextReqID() + reqID := genReqID() rq := &distReq{ getCost: func(dp distPeer) uint64 { peer := dp.(*peer) diff --git a/swarm/fuse/swarmfs_test.go b/swarm/fuse/swarmfs_test.go index f307b38ea..69f3cc615 100644 --- a/swarm/fuse/swarmfs_test.go +++ b/swarm/fuse/swarmfs_test.go @@ -21,13 +21,14 @@ package fuse import ( "bytes" "crypto/rand" - "github.com/ethereum/go-ethereum/swarm/api" - "github.com/ethereum/go-ethereum/swarm/storage" "io" "io/ioutil" "os" "path/filepath" "testing" + + "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/storage" ) type fileInfo struct { @@ -37,26 +38,7 @@ type fileInfo struct { contents []byte } -func testFuseFileSystem(t *testing.T, f func(*api.Api)) { - - datadir, err := ioutil.TempDir("", "fuse") - if err != nil { - t.Fatalf("unable to create temp dir: %v", err) - } - os.RemoveAll(datadir) - - dpa, err := storage.NewLocalDPA(datadir) - if err != nil { - return - } - api := api.NewApi(dpa, nil) - dpa.Start() - f(api) - dpa.Stop() -} - func createTestFilesAndUploadToSwarm(t *testing.T, api *api.Api, files map[string]fileInfo, uploadDir string) string { - os.RemoveAll(uploadDir) for fname, finfo := range files { @@ -89,8 +71,6 @@ func createTestFilesAndUploadToSwarm(t *testing.T, api *api.Api, files map[strin } func mountDir(t *testing.T, api *api.Api, files map[string]fileInfo, bzzHash string, mountDir string) *SwarmFS { - - // Test Mount os.RemoveAll(mountDir) os.MkdirAll(mountDir, 0777) swarmfs := NewSwarmFS(api) @@ -123,11 +103,9 @@ func mountDir(t *testing.T, api *api.Api, files map[string]fileInfo, bzzHash str compareGeneratedFileWithFileInMount(t, files, mountDir) return swarmfs - } func compareGeneratedFileWithFileInMount(t *testing.T, files map[string]fileInfo, mountDir string) { - err := filepath.Walk(mountDir, func(path string, f os.FileInfo, err error) error { if f.IsDir() { return nil @@ -143,7 +121,6 @@ func compareGeneratedFileWithFileInMount(t *testing.T, files map[string]fileInfo } for fname, finfo := range files { - destinationFile := filepath.Join(mountDir, fname) dfinfo, err := os.Stat(destinationFile) @@ -163,18 +140,15 @@ func compareGeneratedFileWithFileInMount(t *testing.T, files map[string]fileInfo if err != nil { t.Fatalf("Could not readfile %v : %v", fname, err) } - if bytes.Compare(fileContents, finfo.contents) != 0 { t.Fatalf("File %v contents mismatch: %v , %v", fname, fileContents, finfo.contents) } - // TODO: check uid and gid } } func checkFile(t *testing.T, testMountDir, fname string, contents []byte) { - destinationFile := filepath.Join(testMountDir, fname) dfinfo, err1 := os.Stat(destinationFile) if err1 != nil { @@ -201,10 +175,9 @@ func getRandomBtes(size int) []byte { contents := make([]byte, size) rand.Read(contents) return contents - } -func IsDirEmpty(name string) bool { +func isDirEmpty(name string) bool { f, err := os.Open(name) if err != nil { return false @@ -218,8 +191,11 @@ func IsDirEmpty(name string) bool { return false } -func testMountListAndUnmount(api *api.Api, t *testing.T) { +type testAPI struct { + api *api.Api +} +func (ta *testAPI) mountListAndUnmount(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "fuse-source") testMountDir, _ := ioutil.TempDir(os.TempDir(), "fuse-dest") @@ -240,9 +216,9 @@ func testMountListAndUnmount(api *api.Api, t *testing.T) { files["twice/2.txt"] = fileInfo{0777, 444, 333, getRandomBtes(200)} files["one/two/three/four/five/six/seven/eight/nine/10.txt"] = fileInfo{0777, 333, 444, getRandomBtes(10240)} files["one/two/three/four/five/six/six"] = fileInfo{0777, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs.Stop() // Check unmount @@ -250,53 +226,52 @@ func testMountListAndUnmount(api *api.Api, t *testing.T) { if err != nil { t.Fatalf("could not unmount %v", bzzHash) } - if !IsDirEmpty(testMountDir) { + if !isDirEmpty(testMountDir) { t.Fatalf("unmount didnt work for %v", testMountDir) } } -func testMaxMounts(api *api.Api, t *testing.T) { - +func (ta *testAPI) maxMounts(t *testing.T) { files := make(map[string]fileInfo) files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} uploadDir1, _ := ioutil.TempDir(os.TempDir(), "max-upload1") - bzzHash1 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir1) + bzzHash1 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir1) mount1, _ := ioutil.TempDir(os.TempDir(), "max-mount1") - swarmfs1 := mountDir(t, api, files, bzzHash1, mount1) + swarmfs1 := mountDir(t, ta.api, files, bzzHash1, mount1) defer swarmfs1.Stop() files["2.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} uploadDir2, _ := ioutil.TempDir(os.TempDir(), "max-upload2") - bzzHash2 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir2) + bzzHash2 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir2) mount2, _ := ioutil.TempDir(os.TempDir(), "max-mount2") - swarmfs2 := mountDir(t, api, files, bzzHash2, mount2) + swarmfs2 := mountDir(t, ta.api, files, bzzHash2, mount2) defer swarmfs2.Stop() files["3.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} uploadDir3, _ := ioutil.TempDir(os.TempDir(), "max-upload3") - bzzHash3 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir3) + bzzHash3 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir3) mount3, _ := ioutil.TempDir(os.TempDir(), "max-mount3") - swarmfs3 := mountDir(t, api, files, bzzHash3, mount3) + swarmfs3 := mountDir(t, ta.api, files, bzzHash3, mount3) defer swarmfs3.Stop() files["4.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} uploadDir4, _ := ioutil.TempDir(os.TempDir(), "max-upload4") - bzzHash4 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir4) + bzzHash4 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir4) mount4, _ := ioutil.TempDir(os.TempDir(), "max-mount4") - swarmfs4 := mountDir(t, api, files, bzzHash4, mount4) + swarmfs4 := mountDir(t, ta.api, files, bzzHash4, mount4) defer swarmfs4.Stop() files["5.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} uploadDir5, _ := ioutil.TempDir(os.TempDir(), "max-upload5") - bzzHash5 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir5) + bzzHash5 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir5) mount5, _ := ioutil.TempDir(os.TempDir(), "max-mount5") - swarmfs5 := mountDir(t, api, files, bzzHash5, mount5) + swarmfs5 := mountDir(t, ta.api, files, bzzHash5, mount5) defer swarmfs5.Stop() files["6.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} uploadDir6, _ := ioutil.TempDir(os.TempDir(), "max-upload6") - bzzHash6 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir6) + bzzHash6 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir6) mount6, _ := ioutil.TempDir(os.TempDir(), "max-mount6") os.RemoveAll(mount6) @@ -308,18 +283,17 @@ func testMaxMounts(api *api.Api, t *testing.T) { } -func testReMounts(api *api.Api, t *testing.T) { - +func (ta *testAPI) remount(t *testing.T) { files := make(map[string]fileInfo) files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} uploadDir1, _ := ioutil.TempDir(os.TempDir(), "re-upload1") - bzzHash1 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir1) + bzzHash1 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir1) testMountDir1, _ := ioutil.TempDir(os.TempDir(), "re-mount1") - swarmfs := mountDir(t, api, files, bzzHash1, testMountDir1) + swarmfs := mountDir(t, ta.api, files, bzzHash1, testMountDir1) defer swarmfs.Stop() uploadDir2, _ := ioutil.TempDir(os.TempDir(), "re-upload2") - bzzHash2 := createTestFilesAndUploadToSwarm(t, api, files, uploadDir2) + bzzHash2 := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir2) testMountDir2, _ := ioutil.TempDir(os.TempDir(), "re-mount2") // try mounting the same hash second time @@ -341,19 +315,17 @@ func testReMounts(api *api.Api, t *testing.T) { if err == nil { t.Fatalf("Error mounting hash %v", bzzHash2) } - } -func testUnmount(api *api.Api, t *testing.T) { - +func (ta *testAPI) unmount(t *testing.T) { files := make(map[string]fileInfo) uploadDir, _ := ioutil.TempDir(os.TempDir(), "ex-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "ex-mount") files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, uploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, uploadDir) - swarmfs := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs.Stop() swarmfs.Unmount(testMountDir) @@ -364,19 +336,17 @@ func testUnmount(api *api.Api, t *testing.T) { t.Fatalf("mount state not cleaned up in unmount case %v", testMountDir) } } - } -func testUnmountWhenResourceBusy(api *api.Api, t *testing.T) { - +func (ta *testAPI) unmountWhenResourceBusy(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "ex-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "ex-mount") files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs.Stop() actualPath := filepath.Join(testMountDir, "2.txt") @@ -395,18 +365,17 @@ func testUnmountWhenResourceBusy(api *api.Api, t *testing.T) { t.Fatalf("mount state not cleaned up in unmount case %v", testMountDir) } } - } -func testSeekInMultiChunkFile(api *api.Api, t *testing.T) { +func (ta *testAPI) seekInMultiChunkFile(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "seek-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "seek-mount") files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10240)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs.Stop() // Create a new file seek the second chunk @@ -423,11 +392,9 @@ func testSeekInMultiChunkFile(api *api.Api, t *testing.T) { t.Fatalf("File seek contents mismatch") } d.Close() - } -func testCreateNewFile(api *api.Api, t *testing.T) { - +func (ta *testAPI) createNewFile(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "create-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "create-mount") @@ -435,9 +402,9 @@ func testCreateNewFile(api *api.Api, t *testing.T) { files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() // Create a new file in the root dir and check @@ -458,23 +425,21 @@ func testCreateNewFile(api *api.Api, t *testing.T) { // mount again and see if things are okay files["2.txt"] = fileInfo{0700, 333, 444, contents} - swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir) defer swarmfs2.Stop() checkFile(t, testMountDir, "2.txt", contents) - } -func testCreateNewFileInsideDirectory(api *api.Api, t *testing.T) { - +func (ta *testAPI) createNewFileInsideDirectory(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "createinsidedir-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "createinsidedir-mount") files["one/1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() // Create a new file inside a existing dir and check @@ -496,23 +461,21 @@ func testCreateNewFileInsideDirectory(api *api.Api, t *testing.T) { // mount again and see if things are okay files["one/2.txt"] = fileInfo{0700, 333, 444, contents} - swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir) defer swarmfs2.Stop() checkFile(t, testMountDir, "one/2.txt", contents) - } -func testCreateNewFileInsideNewDirectory(api *api.Api, t *testing.T) { - +func (ta *testAPI) createNewFileInsideNewDirectory(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "createinsidenewdir-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "createinsidenewdir-mount") files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() // Create a new file inside a existing dir and check @@ -535,15 +498,13 @@ func testCreateNewFileInsideNewDirectory(api *api.Api, t *testing.T) { // mount again and see if things are okay files["one/2.txt"] = fileInfo{0700, 333, 444, contents} - swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir) defer swarmfs2.Stop() checkFile(t, testMountDir, "one/2.txt", contents) - } -func testRemoveExistingFile(api *api.Api, t *testing.T) { - +func (ta *testAPI) removeExistingFile(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "remove-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "remove-mount") @@ -551,9 +512,9 @@ func testRemoveExistingFile(api *api.Api, t *testing.T) { files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() // Remove a file in the root dir and check @@ -567,13 +528,11 @@ func testRemoveExistingFile(api *api.Api, t *testing.T) { // mount again and see if things are okay delete(files, "five.txt") - swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir) defer swarmfs2.Stop() - } -func testRemoveExistingFileInsideADir(api *api.Api, t *testing.T) { - +func (ta *testAPI) removeExistingFileInsideDir(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "remove-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "remove-mount") @@ -581,9 +540,9 @@ func testRemoveExistingFileInsideADir(api *api.Api, t *testing.T) { files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["one/five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["one/six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() // Remove a file in the root dir and check @@ -597,12 +556,11 @@ func testRemoveExistingFileInsideADir(api *api.Api, t *testing.T) { // mount again and see if things are okay delete(files, "one/five.txt") - swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir) defer swarmfs2.Stop() - } -func testRemoveNewlyAddedFile(api *api.Api, t *testing.T) { +func (ta *testAPI) removeNewlyAddedFile(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "removenew-upload") @@ -611,9 +569,9 @@ func testRemoveNewlyAddedFile(api *api.Api, t *testing.T) { files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() // Adda a new file and remove it @@ -639,17 +597,15 @@ func testRemoveNewlyAddedFile(api *api.Api, t *testing.T) { } // mount again and see if things are okay - swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir) defer swarmfs2.Stop() if bzzHash != mi.LatestManifest { t.Fatalf("same contents different hash orig(%v): new(%v)", bzzHash, mi.LatestManifest) } - } -func testAddNewFileAndModifyContents(api *api.Api, t *testing.T) { - +func (ta *testAPI) addNewFileAndModifyContents(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "modifyfile-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "modifyfile-mount") @@ -657,9 +613,9 @@ func testAddNewFileAndModifyContents(api *api.Api, t *testing.T) { files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() // Create a new file in the root dir and check @@ -680,7 +636,7 @@ func testAddNewFileAndModifyContents(api *api.Api, t *testing.T) { // mount again and see if things are okay files["2.txt"] = fileInfo{0700, 333, 444, line1} - swarmfs2 := mountDir(t, api, files, mi1.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi1.LatestManifest, testMountDir) defer swarmfs2.Stop() checkFile(t, testMountDir, "2.txt", line1) @@ -691,7 +647,7 @@ func testAddNewFileAndModifyContents(api *api.Api, t *testing.T) { } // mount again and modify - swarmfs3 := mountDir(t, api, files, mi2.LatestManifest, testMountDir) + swarmfs3 := mountDir(t, ta.api, files, mi2.LatestManifest, testMountDir) defer swarmfs3.Stop() fd, err4 := os.OpenFile(actualPath, os.O_RDWR|os.O_APPEND, os.FileMode(0665)) @@ -713,14 +669,13 @@ func testAddNewFileAndModifyContents(api *api.Api, t *testing.T) { b := [][]byte{line1, line2} line1and2 := bytes.Join(b, []byte("")) files["2.txt"] = fileInfo{0700, 333, 444, line1and2} - swarmfs4 := mountDir(t, api, files, mi3.LatestManifest, testMountDir) + swarmfs4 := mountDir(t, ta.api, files, mi3.LatestManifest, testMountDir) defer swarmfs4.Stop() checkFile(t, testMountDir, "2.txt", line1and2) - } -func testRemoveEmptyDir(api *api.Api, t *testing.T) { +func (ta *testAPI) removeEmptyDir(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "rmdir-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "rmdir-mount") @@ -728,9 +683,9 @@ func testRemoveEmptyDir(api *api.Api, t *testing.T) { files["1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() os.MkdirAll(filepath.Join(testMountDir, "newdir"), 0777) @@ -739,15 +694,12 @@ func testRemoveEmptyDir(api *api.Api, t *testing.T) { if err3 != nil { t.Fatalf("Could not unmount %v", err3) } - if bzzHash != mi.LatestManifest { t.Fatalf("same contents different hash orig(%v): new(%v)", bzzHash, mi.LatestManifest) } - } -func testRemoveDirWhichHasFiles(api *api.Api, t *testing.T) { - +func (ta *testAPI) removeDirWhichHasFiles(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "rmdir-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "rmdir-mount") @@ -755,9 +707,9 @@ func testRemoveDirWhichHasFiles(api *api.Api, t *testing.T) { files["one/1.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["two/five.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["two/six.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() dirPath := filepath.Join(testMountDir, "two") @@ -772,13 +724,11 @@ func testRemoveDirWhichHasFiles(api *api.Api, t *testing.T) { delete(files, "two/five.txt") delete(files, "two/six.txt") - swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir) defer swarmfs2.Stop() - } -func testRemoveDirWhichHasSubDirs(api *api.Api, t *testing.T) { - +func (ta *testAPI) removeDirWhichHasSubDirs(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "rmsubdir-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "rmsubdir-mount") @@ -790,9 +740,9 @@ func testRemoveDirWhichHasSubDirs(api *api.Api, t *testing.T) { files["two/four/6.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} files["two/four/six/7.txt"] = fileInfo{0700, 333, 444, getRandomBtes(10)} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() dirPath := filepath.Join(testMountDir, "two") @@ -810,13 +760,11 @@ func testRemoveDirWhichHasSubDirs(api *api.Api, t *testing.T) { delete(files, "two/four/6.txt") delete(files, "two/four/six/7.txt") - swarmfs2 := mountDir(t, api, files, mi.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi.LatestManifest, testMountDir) defer swarmfs2.Stop() - } -func testAppendFileContentsToEnd(api *api.Api, t *testing.T) { - +func (ta *testAPI) appendFileContentsToEnd(t *testing.T) { files := make(map[string]fileInfo) testUploadDir, _ := ioutil.TempDir(os.TempDir(), "appendlargefile-upload") testMountDir, _ := ioutil.TempDir(os.TempDir(), "appendlargefile-mount") @@ -824,9 +772,9 @@ func testAppendFileContentsToEnd(api *api.Api, t *testing.T) { line1 := make([]byte, 10) rand.Read(line1) files["1.txt"] = fileInfo{0700, 333, 444, line1} - bzzHash := createTestFilesAndUploadToSwarm(t, api, files, testUploadDir) + bzzHash := createTestFilesAndUploadToSwarm(t, ta.api, files, testUploadDir) - swarmfs1 := mountDir(t, api, files, bzzHash, testMountDir) + swarmfs1 := mountDir(t, ta.api, files, bzzHash, testMountDir) defer swarmfs1.Stop() actualPath := filepath.Join(testMountDir, "1.txt") @@ -849,49 +797,42 @@ func testAppendFileContentsToEnd(api *api.Api, t *testing.T) { b := [][]byte{line1, line2} line1and2 := bytes.Join(b, []byte("")) files["1.txt"] = fileInfo{0700, 333, 444, line1and2} - swarmfs2 := mountDir(t, api, files, mi1.LatestManifest, testMountDir) + swarmfs2 := mountDir(t, ta.api, files, mi1.LatestManifest, testMountDir) defer swarmfs2.Stop() checkFile(t, testMountDir, "1.txt", line1and2) - } -func TestSwarmFileSystem(t *testing.T) { - testFuseFileSystem(t, func(api *api.Api) { - - testMountListAndUnmount(api, t) - - testMaxMounts(api, t) - - testReMounts(api, t) - - testUnmount(api, t) - - testUnmountWhenResourceBusy(api, t) - - testSeekInMultiChunkFile(api, t) - - testCreateNewFile(api, t) - - testCreateNewFileInsideDirectory(api, t) - - testCreateNewFileInsideNewDirectory(api, t) - - testRemoveExistingFile(api, t) - - testRemoveExistingFileInsideADir(api, t) - - testRemoveNewlyAddedFile(api, t) - - testAddNewFileAndModifyContents(api, t) - - testRemoveEmptyDir(api, t) - - testRemoveDirWhichHasFiles(api, t) - - testRemoveDirWhichHasSubDirs(api, t) - - testAppendFileContentsToEnd(api, t) +func TestFUSE(t *testing.T) { + datadir, err := ioutil.TempDir("", "fuse") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + os.RemoveAll(datadir) - }) + dpa, err := storage.NewLocalDPA(datadir) + if err != nil { + t.Fatal(err) + } + ta := &testAPI{api: api.NewApi(dpa, nil)} + dpa.Start() + defer dpa.Stop() + + t.Run("mountListAndUmount", ta.mountListAndUnmount) + t.Run("maxMounts", ta.maxMounts) + t.Run("remount", ta.remount) + t.Run("unmount", ta.unmount) + t.Run("unmountWhenResourceBusy", ta.unmountWhenResourceBusy) + t.Run("seekInMultiChunkFile", ta.seekInMultiChunkFile) + t.Run("createNewFile", ta.createNewFile) + t.Run("createNewFileInsideDirectory", ta.createNewFileInsideDirectory) + t.Run("createNewFileInsideNewDirectory", ta.createNewFileInsideNewDirectory) + t.Run("removeExistingFile", ta.removeExistingFile) + t.Run("removeExistingFileInsideDir", ta.removeExistingFileInsideDir) + t.Run("removeNewlyAddedFile", ta.removeNewlyAddedFile) + t.Run("addNewFileAndModifyContents", ta.addNewFileAndModifyContents) + t.Run("removeEmptyDir", ta.removeEmptyDir) + t.Run("removeDirWhichHasFiles", ta.removeDirWhichHasFiles) + t.Run("removeDirWhichHasSubDirs", ta.removeDirWhichHasSubDirs) + t.Run("appendFileContentsToEnd", ta.appendFileContentsToEnd) } diff --git a/swarm/fuse/swarmfs_unix.go b/swarm/fuse/swarmfs_unix.go index f4eecef24..1a8390a4b 100644 --- a/swarm/fuse/swarmfs_unix.go +++ b/swarm/fuse/swarmfs_unix.go @@ -19,18 +19,19 @@ package fuse import ( - "bazil.org/fuse" - "bazil.org/fuse/fs" "errors" "fmt" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/swarm/api" "os" "path/filepath" "strings" "sync" "time" + + "bazil.org/fuse" + "bazil.org/fuse/fs" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/api" ) var ( @@ -203,7 +204,7 @@ func (self *SwarmFS) Unmount(mountpoint string) (*MountInfo, error) { } err = fuse.Unmount(cleanedMountPoint) if err != nil { - err1 := externalUnMount(cleanedMountPoint) + err1 := externalUnmount(cleanedMountPoint) if err1 != nil { errStr := fmt.Sprintf("UnMount error: %v", err) log.Warn(errStr) diff --git a/swarm/fuse/swarmfs_util.go b/swarm/fuse/swarmfs_util.go index d20ab258e..d39966c0e 100644 --- a/swarm/fuse/swarmfs_util.go +++ b/swarm/fuse/swarmfs_util.go @@ -19,47 +19,31 @@ package fuse import ( + "context" "fmt" - "github.com/ethereum/go-ethereum/log" "os/exec" "runtime" - "time" -) -func externalUnMount(mountPoint string) error { + "github.com/ethereum/go-ethereum/log" +) - var cmd *exec.Cmd +func externalUnmount(mountPoint string) error { + ctx, cancel := context.WithTimeout(context.Background(), unmountTimeout) + defer cancel() + // Try generic umount. + if err := exec.CommandContext(ctx, "umount", mountPoint).Run(); err == nil { + return nil + } + // Try FUSE-specific commands if umount didn't work. switch runtime.GOOS { - case "darwin": - cmd = exec.Command("/usr/bin/diskutil", "umount", "force", mountPoint) - + return exec.CommandContext(ctx, "diskutil", "umount", "force", mountPoint).Run() case "linux": - cmd = exec.Command("fusermount", "-u", mountPoint) - + return exec.CommandContext(ctx, "fusermount", "-u", mountPoint).Run() default: return fmt.Errorf("unmount: unimplemented") } - - errc := make(chan error, 1) - go func() { - defer close(errc) - - if err := exec.Command("umount", mountPoint).Run(); err == nil { - return - } - errc <- cmd.Run() - }() - - select { - - case <-time.After(unmountTimeout): - return fmt.Errorf("umount timeout") - - case err := <-errc: - return err - } } func addFileToSwarm(sf *SwarmFile, content []byte, size int) error { diff --git a/trie/errors.go b/trie/errors.go index e23f9d563..567b80078 100644 --- a/trie/errors.go +++ b/trie/errors.go @@ -23,24 +23,13 @@ import ( ) // MissingNodeError is returned by the trie functions (TryGet, TryUpdate, TryDelete) -// in the case where a trie node is not present in the local database. Contains -// information necessary for retrieving the missing node through an ODR service. -// -// NodeHash is the hash of the missing node -// -// RootHash is the original root of the trie that contains the node -// -// PrefixLen is the nibble length of the key prefix that leads from the root to -// the missing node -// -// SuffixLen is the nibble length of the remaining part of the key that hints on -// which further nodes should also be retrieved (can be zero when there are no -// such hints in the error message) +// in the case where a trie node is not present in the local database. It contains +// information necessary for retrieving the missing node. type MissingNodeError struct { - RootHash, NodeHash common.Hash - PrefixLen, SuffixLen int + NodeHash common.Hash // hash of the missing node + Path []byte // hex-encoded path to the missing node } func (err *MissingNodeError) Error() string { - return fmt.Sprintf("Missing trie node %064x", err.NodeHash) + return fmt.Sprintf("missing trie node %x (path %x)", err.NodeHash, err.Path) } diff --git a/trie/iterator.go b/trie/iterator.go index 26ae1d5ad..76146c0d6 100644 --- a/trie/iterator.go +++ b/trie/iterator.go @@ -24,14 +24,13 @@ import ( "github.com/ethereum/go-ethereum/common" ) -var iteratorEnd = errors.New("end of iteration") - // Iterator is a key-value trie iterator that traverses a Trie. type Iterator struct { nodeIt NodeIterator Key []byte // Current data key on which the iterator is positioned on Value []byte // Current data value on which the iterator is positioned on + Err error } // NewIterator creates a new key-value iterator from a node iterator @@ -45,35 +44,42 @@ func NewIterator(it NodeIterator) *Iterator { func (it *Iterator) Next() bool { for it.nodeIt.Next(true) { if it.nodeIt.Leaf() { - it.Key = hexToKeybytes(it.nodeIt.Path()) + it.Key = it.nodeIt.LeafKey() it.Value = it.nodeIt.LeafBlob() return true } } it.Key = nil it.Value = nil + it.Err = it.nodeIt.Error() return false } // NodeIterator is an iterator to traverse the trie pre-order. type NodeIterator interface { - // Hash returns the hash of the current node - Hash() common.Hash - // Parent returns the hash of the parent of the current node - Parent() common.Hash - // Leaf returns true iff the current node is a leaf node. - Leaf() bool - // LeafBlob returns the contents of the node, if it is a leaf. - // Callers must not retain references to the return value after calling Next() - LeafBlob() []byte - // Path returns the hex-encoded path to the current node. - // Callers must not retain references to the return value after calling Next() - Path() []byte // Next moves the iterator to the next node. If the parameter is false, any child // nodes will be skipped. Next(bool) bool // Error returns the error status of the iterator. Error() error + + // Hash returns the hash of the current node. + Hash() common.Hash + // Parent returns the hash of the parent of the current node. The hash may be the one + // grandparent if the immediate parent is an internal node with no hash. + Parent() common.Hash + // Path returns the hex-encoded path to the current node. + // Callers must not retain references to the return value after calling Next. + // For leaf nodes, the last element of the path is the 'terminator symbol' 0x10. + Path() []byte + + // Leaf returns true iff the current node is a leaf node. + // LeafBlob, LeafKey return the contents and key of the leaf node. These + // method panic if the iterator is not positioned at a leaf. + // Callers must not retain references to their return value after calling Next + Leaf() bool + LeafBlob() []byte + LeafKey() []byte } // nodeIteratorState represents the iteration state at one particular node of the @@ -89,8 +95,21 @@ type nodeIteratorState struct { type nodeIterator struct { trie *Trie // Trie being iterated stack []*nodeIteratorState // Hierarchy of trie nodes persisting the iteration state - err error // Failure set in case of an internal error in the iterator path []byte // Path to the current node + err error // Failure set in case of an internal error in the iterator +} + +// iteratorEnd is stored in nodeIterator.err when iteration is done. +var iteratorEnd = errors.New("end of iteration") + +// seekError is stored in nodeIterator.err if the initial seek has failed. +type seekError struct { + key []byte + err error +} + +func (e seekError) Error() string { + return "seek error: " + e.err.Error() } func newNodeIterator(trie *Trie, start []byte) NodeIterator { @@ -98,60 +117,57 @@ func newNodeIterator(trie *Trie, start []byte) NodeIterator { return new(nodeIterator) } it := &nodeIterator{trie: trie} - it.seek(start) + it.err = it.seek(start) return it } -// Hash returns the hash of the current node func (it *nodeIterator) Hash() common.Hash { if len(it.stack) == 0 { return common.Hash{} } - return it.stack[len(it.stack)-1].hash } -// Parent returns the hash of the parent node func (it *nodeIterator) Parent() common.Hash { if len(it.stack) == 0 { return common.Hash{} } - return it.stack[len(it.stack)-1].parent } -// Leaf returns true if the current node is a leaf func (it *nodeIterator) Leaf() bool { - if len(it.stack) == 0 { - return false - } - - _, ok := it.stack[len(it.stack)-1].node.(valueNode) - return ok + return hasTerm(it.path) } -// LeafBlob returns the data for the current node, if it is a leaf func (it *nodeIterator) LeafBlob() []byte { - if len(it.stack) == 0 { - return nil + if len(it.stack) > 0 { + if node, ok := it.stack[len(it.stack)-1].node.(valueNode); ok { + return []byte(node) + } } + panic("not at leaf") +} - if node, ok := it.stack[len(it.stack)-1].node.(valueNode); ok { - return []byte(node) +func (it *nodeIterator) LeafKey() []byte { + if len(it.stack) > 0 { + if _, ok := it.stack[len(it.stack)-1].node.(valueNode); ok { + return hexToKeybytes(it.path) + } } - return nil + panic("not at leaf") } -// Path returns the hex-encoded path to the current node func (it *nodeIterator) Path() []byte { return it.path } -// Error returns the error set in case of an internal error in the iterator func (it *nodeIterator) Error() error { if it.err == iteratorEnd { return nil } + if seek, ok := it.err.(seekError); ok { + return seek.err + } return it.err } @@ -160,29 +176,37 @@ func (it *nodeIterator) Error() error { // sets the Error field to the encountered failure. If `descend` is false, // skips iterating over any subnodes of the current node. func (it *nodeIterator) Next(descend bool) bool { - if it.err != nil { + if it.err == iteratorEnd { return false } - // Otherwise step forward with the iterator and report any errors + if seek, ok := it.err.(seekError); ok { + if it.err = it.seek(seek.key); it.err != nil { + return false + } + } + // Otherwise step forward with the iterator and report any errors. state, parentIndex, path, err := it.peek(descend) - if err != nil { - it.err = err + it.err = err + if it.err != nil { return false } it.push(state, parentIndex, path) return true } -func (it *nodeIterator) seek(prefix []byte) { +func (it *nodeIterator) seek(prefix []byte) error { // The path we're looking for is the hex encoded key without terminator. key := keybytesToHex(prefix) key = key[:len(key)-1] // Move forward until we're just before the closest match to key. for { state, parentIndex, path, err := it.peek(bytes.HasPrefix(key, it.path)) - if err != nil || bytes.Compare(path, key) >= 0 { - it.err = err - return + if err == iteratorEnd { + return iteratorEnd + } else if err != nil { + return seekError{prefix, err} + } else if bytes.Compare(path, key) >= 0 { + return nil } it.push(state, parentIndex, path) } @@ -197,7 +221,8 @@ func (it *nodeIterator) peek(descend bool) (*nodeIteratorState, *int, []byte, er if root != emptyRoot { state.hash = root } - return state, nil, nil, nil + err := state.resolve(it.trie, nil) + return state, nil, nil, err } if !descend { // If we're skipping children, pop the current node first @@ -205,72 +230,73 @@ func (it *nodeIterator) peek(descend bool) (*nodeIteratorState, *int, []byte, er } // Continue iteration to the next child - for { - if len(it.stack) == 0 { - return nil, nil, nil, iteratorEnd - } + for len(it.stack) > 0 { parent := it.stack[len(it.stack)-1] ancestor := parent.hash if (ancestor == common.Hash{}) { ancestor = parent.parent } - if node, ok := parent.node.(*fullNode); ok { - // Full node, move to the first non-nil child. - for i := parent.index + 1; i < len(node.Children); i++ { - child := node.Children[i] - if child != nil { - hash, _ := child.cache() - state := &nodeIteratorState{ - hash: common.BytesToHash(hash), - node: child, - parent: ancestor, - index: -1, - pathlen: len(it.path), - } - path := append(it.path, byte(i)) - parent.index = i - 1 - return state, &parent.index, path, nil - } + state, path, ok := it.nextChild(parent, ancestor) + if ok { + if err := state.resolve(it.trie, path); err != nil { + return parent, &parent.index, path, err } - } else if node, ok := parent.node.(*shortNode); ok { - // Short node, return the pointer singleton child - if parent.index < 0 { - hash, _ := node.Val.cache() + return state, &parent.index, path, nil + } + // No more child nodes, move back up. + it.pop() + } + return nil, nil, nil, iteratorEnd +} + +func (st *nodeIteratorState) resolve(tr *Trie, path []byte) error { + if hash, ok := st.node.(hashNode); ok { + resolved, err := tr.resolveHash(hash, path) + if err != nil { + return err + } + st.node = resolved + st.hash = common.BytesToHash(hash) + } + return nil +} + +func (it *nodeIterator) nextChild(parent *nodeIteratorState, ancestor common.Hash) (*nodeIteratorState, []byte, bool) { + switch node := parent.node.(type) { + case *fullNode: + // Full node, move to the first non-nil child. + for i := parent.index + 1; i < len(node.Children); i++ { + child := node.Children[i] + if child != nil { + hash, _ := child.cache() state := &nodeIteratorState{ hash: common.BytesToHash(hash), - node: node.Val, + node: child, parent: ancestor, index: -1, pathlen: len(it.path), } - var path []byte - if hasTerm(node.Key) { - path = append(it.path, node.Key[:len(node.Key)-1]...) - } else { - path = append(it.path, node.Key...) - } - return state, &parent.index, path, nil + path := append(it.path, byte(i)) + parent.index = i - 1 + return state, path, true } - } else if hash, ok := parent.node.(hashNode); ok { - // Hash node, resolve the hash child from the database - if parent.index < 0 { - node, err := it.trie.resolveHash(hash, nil, nil) - if err != nil { - return it.stack[len(it.stack)-1], &parent.index, it.path, err - } - state := &nodeIteratorState{ - hash: common.BytesToHash(hash), - node: node, - parent: ancestor, - index: -1, - pathlen: len(it.path), - } - return state, &parent.index, it.path, nil + } + case *shortNode: + // Short node, return the pointer singleton child + if parent.index < 0 { + hash, _ := node.Val.cache() + state := &nodeIteratorState{ + hash: common.BytesToHash(hash), + node: node.Val, + parent: ancestor, + index: -1, + pathlen: len(it.path), } + path := append(it.path, node.Key...) + return state, path, true } - // No more child nodes, move back up. - it.pop() } + return parent, it.path, false } func (it *nodeIterator) push(state *nodeIteratorState, parentIndex *int, path []byte) { @@ -288,23 +314,21 @@ func (it *nodeIterator) pop() { } func compareNodes(a, b NodeIterator) int { - cmp := bytes.Compare(a.Path(), b.Path()) - if cmp != 0 { + if cmp := bytes.Compare(a.Path(), b.Path()); cmp != 0 { return cmp } - if a.Leaf() && !b.Leaf() { return -1 } else if b.Leaf() && !a.Leaf() { return 1 } - - cmp = bytes.Compare(a.Hash().Bytes(), b.Hash().Bytes()) - if cmp != 0 { + if cmp := bytes.Compare(a.Hash().Bytes(), b.Hash().Bytes()); cmp != 0 { return cmp } - - return bytes.Compare(a.LeafBlob(), b.LeafBlob()) + if a.Leaf() && b.Leaf() { + return bytes.Compare(a.LeafBlob(), b.LeafBlob()) + } + return 0 } type differenceIterator struct { @@ -341,6 +365,10 @@ func (it *differenceIterator) LeafBlob() []byte { return it.b.LeafBlob() } +func (it *differenceIterator) LeafKey() []byte { + return it.b.LeafKey() +} + func (it *differenceIterator) Path() []byte { return it.b.Path() } @@ -410,7 +438,6 @@ func (h *nodeIteratorHeap) Pop() interface{} { type unionIterator struct { items *nodeIteratorHeap // Nodes returned are the union of the ones in these iterators count int // Number of nodes scanned across all tries - err error // The error, if one has been encountered } // NewUnionIterator constructs a NodeIterator that iterates over elements in the union @@ -421,9 +448,7 @@ func NewUnionIterator(iters []NodeIterator) (NodeIterator, *int) { copy(h, iters) heap.Init(&h) - ui := &unionIterator{ - items: &h, - } + ui := &unionIterator{items: &h} return ui, &ui.count } @@ -443,6 +468,10 @@ func (it *unionIterator) LeafBlob() []byte { return (*it.items)[0].LeafBlob() } +func (it *unionIterator) LeafKey() []byte { + return (*it.items)[0].LeafKey() +} + func (it *unionIterator) Path() []byte { return (*it.items)[0].Path() } diff --git a/trie/iterator_test.go b/trie/iterator_test.go index f161fd99d..4808d8b0c 100644 --- a/trie/iterator_test.go +++ b/trie/iterator_test.go @@ -19,6 +19,7 @@ package trie import ( "bytes" "fmt" + "math/rand" "testing" "github.com/ethereum/go-ethereum/common" @@ -239,8 +240,8 @@ func TestUnionIterator(t *testing.T) { all := []struct{ k, v string }{ {"aardvark", "c"}, - {"barb", "bd"}, {"barb", "ba"}, + {"barb", "bd"}, {"bard", "bc"}, {"bars", "bb"}, {"bars", "be"}, @@ -267,3 +268,107 @@ func TestUnionIterator(t *testing.T) { t.Errorf("Iterator returned extra values.") } } + +func TestIteratorNoDups(t *testing.T) { + var tr Trie + for _, val := range testdata1 { + tr.Update([]byte(val.k), []byte(val.v)) + } + checkIteratorNoDups(t, tr.NodeIterator(nil), nil) +} + +// This test checks that nodeIterator.Next can be retried after inserting missing trie nodes. +func TestIteratorContinueAfterError(t *testing.T) { + db, _ := ethdb.NewMemDatabase() + tr, _ := New(common.Hash{}, db) + for _, val := range testdata1 { + tr.Update([]byte(val.k), []byte(val.v)) + } + tr.Commit() + wantNodeCount := checkIteratorNoDups(t, tr.NodeIterator(nil), nil) + keys := db.Keys() + t.Log("node count", wantNodeCount) + + for i := 0; i < 20; i++ { + // Create trie that will load all nodes from DB. + tr, _ := New(tr.Hash(), db) + + // Remove a random node from the database. It can't be the root node + // because that one is already loaded. + var rkey []byte + for { + if rkey = keys[rand.Intn(len(keys))]; !bytes.Equal(rkey, tr.Hash().Bytes()) { + break + } + } + rval, _ := db.Get(rkey) + db.Delete(rkey) + + // Iterate until the error is hit. + seen := make(map[string]bool) + it := tr.NodeIterator(nil) + checkIteratorNoDups(t, it, seen) + missing, ok := it.Error().(*MissingNodeError) + if !ok || !bytes.Equal(missing.NodeHash[:], rkey) { + t.Fatal("didn't hit missing node, got", it.Error()) + } + + // Add the node back and continue iteration. + db.Put(rkey, rval) + checkIteratorNoDups(t, it, seen) + if it.Error() != nil { + t.Fatal("unexpected error", it.Error()) + } + if len(seen) != wantNodeCount { + t.Fatal("wrong node iteration count, got", len(seen), "want", wantNodeCount) + } + } +} + +// Similar to the test above, this one checks that failure to create nodeIterator at a +// certain key prefix behaves correctly when Next is called. The expectation is that Next +// should retry seeking before returning true for the first time. +func TestIteratorContinueAfterSeekError(t *testing.T) { + // Commit test trie to db, then remove the node containing "bars". + db, _ := ethdb.NewMemDatabase() + ctr, _ := New(common.Hash{}, db) + for _, val := range testdata1 { + ctr.Update([]byte(val.k), []byte(val.v)) + } + root, _ := ctr.Commit() + barNodeHash := common.HexToHash("05041990364eb72fcb1127652ce40d8bab765f2bfe53225b1170d276cc101c2e") + barNode, _ := db.Get(barNodeHash[:]) + db.Delete(barNodeHash[:]) + + // Create a new iterator that seeks to "bars". Seeking can't proceed because + // the node is missing. + tr, _ := New(root, db) + it := tr.NodeIterator([]byte("bars")) + missing, ok := it.Error().(*MissingNodeError) + if !ok { + t.Fatal("want MissingNodeError, got", it.Error()) + } else if missing.NodeHash != barNodeHash { + t.Fatal("wrong node missing") + } + + // Reinsert the missing node. + db.Put(barNodeHash[:], barNode[:]) + + // Check that iteration produces the right set of values. + if err := checkIteratorOrder(testdata1[2:], NewIterator(it)); err != nil { + t.Fatal(err) + } +} + +func checkIteratorNoDups(t *testing.T, it NodeIterator, seen map[string]bool) int { + if seen == nil { + seen = make(map[string]bool) + } + for it.Next(true) { + if seen[string(it.Path())] { + t.Fatalf("iterator visited node path %x twice", it.Path()) + } + seen[string(it.Path())] = true + } + return len(seen) +} diff --git a/trie/proof.go b/trie/proof.go index fb7734b86..1f8f76b1b 100644 --- a/trie/proof.go +++ b/trie/proof.go @@ -58,7 +58,7 @@ func (t *Trie) Prove(key []byte) []rlp.RawValue { nodes = append(nodes, n) case hashNode: var err error - tn, err = t.resolveHash(n, nil, nil) + tn, err = t.resolveHash(n, nil) if err != nil { log.Error(fmt.Sprintf("Unhandled trie error: %v", err)) return nil diff --git a/trie/trie.go b/trie/trie.go index cbe496574..a3151b1ce 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -116,7 +116,7 @@ func New(root common.Hash, db Database) (*Trie, error) { if db == nil { panic("trie.New: cannot use existing root without a database") } - rootnode, err := trie.resolveHash(root[:], nil, nil) + rootnode, err := trie.resolveHash(root[:], nil) if err != nil { return nil, err } @@ -180,7 +180,7 @@ func (t *Trie) tryGet(origNode node, key []byte, pos int) (value []byte, newnode } return value, n, didResolve, err case hashNode: - child, err := t.resolveHash(n, key[:pos], key[pos:]) + child, err := t.resolveHash(n, key[:pos]) if err != nil { return nil, n, true, err } @@ -283,7 +283,7 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error // We've hit a part of the trie that isn't loaded yet. Load // the node and insert into it. This leaves all child nodes on // the path to the value in the trie. - rn, err := t.resolveHash(n, prefix, key) + rn, err := t.resolveHash(n, prefix) if err != nil { return false, nil, err } @@ -388,7 +388,7 @@ func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) { // shortNode{..., shortNode{...}}. Since the entry // might not be loaded yet, resolve it just for this // check. - cnode, err := t.resolve(n.Children[pos], prefix, []byte{byte(pos)}) + cnode, err := t.resolve(n.Children[pos], prefix) if err != nil { return false, nil, err } @@ -414,7 +414,7 @@ func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) { // We've hit a part of the trie that isn't loaded yet. Load // the node and delete from it. This leaves all child nodes on // the path to the value in the trie. - rn, err := t.resolveHash(n, prefix, key) + rn, err := t.resolveHash(n, prefix) if err != nil { return false, nil, err } @@ -436,24 +436,19 @@ func concat(s1 []byte, s2 ...byte) []byte { return r } -func (t *Trie) resolve(n node, prefix, suffix []byte) (node, error) { +func (t *Trie) resolve(n node, prefix []byte) (node, error) { if n, ok := n.(hashNode); ok { - return t.resolveHash(n, prefix, suffix) + return t.resolveHash(n, prefix) } return n, nil } -func (t *Trie) resolveHash(n hashNode, prefix, suffix []byte) (node, error) { +func (t *Trie) resolveHash(n hashNode, prefix []byte) (node, error) { cacheMissCounter.Inc(1) enc, err := t.db.Get(n) if err != nil || enc == nil { - return nil, &MissingNodeError{ - RootHash: t.originalRoot, - NodeHash: common.BytesToHash(n), - PrefixLen: len(prefix), - SuffixLen: len(suffix), - } + return nil, &MissingNodeError{NodeHash: common.BytesToHash(n), Path: prefix} } dec := mustDecodeNode(n, enc, t.cachegen) return dec, nil diff --git a/trie/trie_test.go b/trie/trie_test.go index 61adbba0c..1c9095070 100644 --- a/trie/trie_test.go +++ b/trie/trie_test.go @@ -19,6 +19,7 @@ package trie import ( "bytes" "encoding/binary" + "errors" "fmt" "io/ioutil" "math/rand" @@ -34,7 +35,7 @@ import ( func init() { spew.Config.Indent = " " - spew.Config.DisableMethods = true + spew.Config.DisableMethods = false } // Used for testing @@ -357,6 +358,7 @@ type randTestStep struct { op int key []byte // for opUpdate, opDelete, opGet value []byte // for opUpdate + err error // for debugging } const ( @@ -406,7 +408,7 @@ func runRandTest(rt randTest) bool { tr, _ := New(common.Hash{}, db) values := make(map[string]string) // tracks content of the trie - for _, step := range rt { + for i, step := range rt { switch step.op { case opUpdate: tr.Update(step.key, step.value) @@ -418,23 +420,22 @@ func runRandTest(rt randTest) bool { v := tr.Get(step.key) want := values[string(step.key)] if string(v) != want { - fmt.Printf("mismatch for key 0x%x, got 0x%x want 0x%x", step.key, v, want) - return false + rt[i].err = fmt.Errorf("mismatch for key 0x%x, got 0x%x want 0x%x", step.key, v, want) } case opCommit: - if _, err := tr.Commit(); err != nil { - panic(err) - } + _, rt[i].err = tr.Commit() case opHash: tr.Hash() case opReset: hash, err := tr.Commit() if err != nil { - panic(err) + rt[i].err = err + return false } newtr, err := New(hash, db) if err != nil { - panic(err) + rt[i].err = err + return false } tr = newtr case opItercheckhash: @@ -444,17 +445,20 @@ func runRandTest(rt randTest) bool { checktr.Update(it.Key, it.Value) } if tr.Hash() != checktr.Hash() { - fmt.Println("hashes not equal") - return false + rt[i].err = fmt.Errorf("hash mismatch in opItercheckhash") } case opCheckCacheInvariant: - return checkCacheInvariant(tr.root, nil, tr.cachegen, false, 0) + rt[i].err = checkCacheInvariant(tr.root, nil, tr.cachegen, false, 0) + } + // Abort the test on error. + if rt[i].err != nil { + return false } } return true } -func checkCacheInvariant(n, parent node, parentCachegen uint16, parentDirty bool, depth int) bool { +func checkCacheInvariant(n, parent node, parentCachegen uint16, parentDirty bool, depth int) error { var children []node var flag nodeFlag switch n := n.(type) { @@ -465,33 +469,34 @@ func checkCacheInvariant(n, parent node, parentCachegen uint16, parentDirty bool flag = n.flags children = n.Children[:] default: - return true + return nil } - showerror := func() { - fmt.Printf("at depth %d node %s", depth, spew.Sdump(n)) - fmt.Printf("parent: %s", spew.Sdump(parent)) + errorf := func(format string, args ...interface{}) error { + msg := fmt.Sprintf(format, args...) + msg += fmt.Sprintf("\nat depth %d node %s", depth, spew.Sdump(n)) + msg += fmt.Sprintf("parent: %s", spew.Sdump(parent)) + return errors.New(msg) } if flag.gen > parentCachegen { - fmt.Printf("cache invariant violation: %d > %d\n", flag.gen, parentCachegen) - showerror() - return false + return errorf("cache invariant violation: %d > %d\n", flag.gen, parentCachegen) } if depth > 0 && !parentDirty && flag.dirty { - fmt.Printf("cache invariant violation: child is dirty but parent isn't\n") - showerror() - return false + return errorf("cache invariant violation: %d > %d\n", flag.gen, parentCachegen) } for _, child := range children { - if !checkCacheInvariant(child, n, flag.gen, flag.dirty, depth+1) { - return false + if err := checkCacheInvariant(child, n, flag.gen, flag.dirty, depth+1); err != nil { + return err } } - return true + return nil } func TestRandom(t *testing.T) { if err := quick.Check(runRandTest, nil); err != nil { + if cerr, ok := err.(*quick.CheckError); ok { + t.Fatalf("random test iteration %d failed: %s", cerr.Count, spew.Sdump(cerr.In)) + } t.Fatal(err) } } diff --git a/vendor/github.com/docker/docker/LICENSE b/vendor/github.com/docker/docker/LICENSE new file mode 100644 index 000000000..9c8e20ab8 --- /dev/null +++ b/vendor/github.com/docker/docker/LICENSE @@ -0,0 +1,191 @@ + + Apache License + Version 2.0, January 2004 + https://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + Copyright 2013-2017 Docker, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/docker/docker/NOTICE b/vendor/github.com/docker/docker/NOTICE new file mode 100644 index 000000000..0c74e15b0 --- /dev/null +++ b/vendor/github.com/docker/docker/NOTICE @@ -0,0 +1,19 @@ +Docker +Copyright 2012-2017 Docker, Inc. + +This product includes software developed at Docker, Inc. (https://www.docker.com). + +This product contains software (https://github.com/kr/pty) developed +by Keith Rarick, licensed under the MIT License. + +The following is courtesy of our legal counsel: + + +Use and transfer of Docker may be subject to certain restrictions by the +United States and other governments. +It is your responsibility to ensure that your use and/or transfer does not +violate applicable laws. + +For more information, please see https://www.bis.doc.gov + +See also https://www.apache.org/dev/crypto.html and/or seek legal counsel. diff --git a/vendor/github.com/docker/docker/pkg/reexec/README.md b/vendor/github.com/docker/docker/pkg/reexec/README.md new file mode 100644 index 000000000..6658f69b6 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/reexec/README.md @@ -0,0 +1,5 @@ +# reexec + +The `reexec` package facilitates the busybox style reexec of the docker binary that we require because +of the forking limitations of using Go. Handlers can be registered with a name and the argv 0 of +the exec of the binary will be used to find and execute custom init paths. diff --git a/vendor/github.com/docker/docker/pkg/reexec/command_linux.go b/vendor/github.com/docker/docker/pkg/reexec/command_linux.go new file mode 100644 index 000000000..34ae2a9dc --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/reexec/command_linux.go @@ -0,0 +1,28 @@ +// +build linux + +package reexec + +import ( + "os/exec" + "syscall" +) + +// Self returns the path to the current process's binary. +// Returns "/proc/self/exe". +func Self() string { + return "/proc/self/exe" +} + +// Command returns *exec.Cmd which has Path as current binary. Also it setting +// SysProcAttr.Pdeathsig to SIGTERM. +// This will use the in-memory version (/proc/self/exe) of the current binary, +// it is thus safe to delete or replace the on-disk binary (os.Args[0]). +func Command(args ...string) *exec.Cmd { + return &exec.Cmd{ + Path: Self(), + Args: args, + SysProcAttr: &syscall.SysProcAttr{ + Pdeathsig: syscall.SIGTERM, + }, + } +} diff --git a/vendor/github.com/docker/docker/pkg/reexec/command_unix.go b/vendor/github.com/docker/docker/pkg/reexec/command_unix.go new file mode 100644 index 000000000..778a720e3 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/reexec/command_unix.go @@ -0,0 +1,23 @@ +// +build freebsd solaris darwin + +package reexec + +import ( + "os/exec" +) + +// Self returns the path to the current process's binary. +// Uses os.Args[0]. +func Self() string { + return naiveSelf() +} + +// Command returns *exec.Cmd which has Path as current binary. +// For example if current binary is "docker" at "/usr/bin/", then cmd.Path will +// be set to "/usr/bin/docker". +func Command(args ...string) *exec.Cmd { + return &exec.Cmd{ + Path: Self(), + Args: args, + } +} diff --git a/vendor/github.com/docker/docker/pkg/reexec/command_unsupported.go b/vendor/github.com/docker/docker/pkg/reexec/command_unsupported.go new file mode 100644 index 000000000..76edd8242 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/reexec/command_unsupported.go @@ -0,0 +1,12 @@ +// +build !linux,!windows,!freebsd,!solaris,!darwin + +package reexec + +import ( + "os/exec" +) + +// Command is unsupported on operating systems apart from Linux, Windows, Solaris and Darwin. +func Command(args ...string) *exec.Cmd { + return nil +} diff --git a/vendor/github.com/docker/docker/pkg/reexec/command_windows.go b/vendor/github.com/docker/docker/pkg/reexec/command_windows.go new file mode 100644 index 000000000..ca871c422 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/reexec/command_windows.go @@ -0,0 +1,23 @@ +// +build windows + +package reexec + +import ( + "os/exec" +) + +// Self returns the path to the current process's binary. +// Uses os.Args[0]. +func Self() string { + return naiveSelf() +} + +// Command returns *exec.Cmd which has Path as current binary. +// For example if current binary is "docker.exe" at "C:\", then cmd.Path will +// be set to "C:\docker.exe". +func Command(args ...string) *exec.Cmd { + return &exec.Cmd{ + Path: Self(), + Args: args, + } +} diff --git a/vendor/github.com/docker/docker/pkg/reexec/reexec.go b/vendor/github.com/docker/docker/pkg/reexec/reexec.go new file mode 100644 index 000000000..c56671d91 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/reexec/reexec.go @@ -0,0 +1,47 @@ +package reexec + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" +) + +var registeredInitializers = make(map[string]func()) + +// Register adds an initialization func under the specified name +func Register(name string, initializer func()) { + if _, exists := registeredInitializers[name]; exists { + panic(fmt.Sprintf("reexec func already registered under name %q", name)) + } + + registeredInitializers[name] = initializer +} + +// Init is called as the first part of the exec process and returns true if an +// initialization function was called. +func Init() bool { + initializer, exists := registeredInitializers[os.Args[0]] + if exists { + initializer() + + return true + } + return false +} + +func naiveSelf() string { + name := os.Args[0] + if filepath.Base(name) == name { + if lp, err := exec.LookPath(name); err == nil { + return lp + } + } + // handle conversion of relative paths to absolute + if absName, err := filepath.Abs(name); err == nil { + return absName + } + // if we couldn't get absolute name, return original + // (NOTE: Go only errors on Abs() if os.Getwd fails) + return name +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 607f193a3..1c2c58500 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -75,6 +75,12 @@ "revisionTime": "2017-02-01T22:58:49Z" }, { + "checksumSHA1": "lutCa+IVM60R1OYBm9RtDAW50Ys=", + "path": "github.com/docker/docker/pkg/reexec", + "revision": "83ee902ecc3790c33c1e2d87334074436056bb49", + "revisionTime": "2017-04-22T21:51:12Z" + }, + { "checksumSHA1": "zYnPsNAVm1/ViwCkN++dX2JQhBo=", "path": "github.com/edsrzf/mmap-go", "revision": "935e0e8a636ca4ba70b713f3e38a19e1b77739e8", |