diff options
-rw-r--r-- | accounts/abi/abi.go | 51 | ||||
-rw-r--r-- | accounts/abi/abi_test.go | 106 | ||||
-rw-r--r-- | cmd/geth/js_test.go | 48 | ||||
-rw-r--r-- | cmd/geth/main.go | 24 | ||||
-rw-r--r-- | cmd/geth/monitorcmd.go | 2 | ||||
-rw-r--r-- | cmd/gethrpctest/main.go | 40 | ||||
-rw-r--r-- | cmd/utils/client.go | 121 | ||||
-rw-r--r-- | cmd/utils/flags.go | 94 | ||||
-rw-r--r-- | common/defaults.go | 48 | ||||
-rw-r--r-- | common/path.go | 22 | ||||
-rw-r--r-- | core/blockchain.go | 2 | ||||
-rw-r--r-- | core/blockchain_test.go | 4 | ||||
-rw-r--r-- | core/events.go | 7 | ||||
-rw-r--r-- | eth/filters/api.go | 29 | ||||
-rw-r--r-- | eth/filters/filter.go | 3 | ||||
-rw-r--r-- | eth/filters/filter_system.go | 101 | ||||
-rw-r--r-- | eth/filters/filter_system_test.go | 26 | ||||
-rw-r--r-- | miner/worker.go | 23 | ||||
-rw-r--r-- | node/api.go | 95 | ||||
-rw-r--r-- | node/config.go | 101 | ||||
-rw-r--r-- | node/config_test.go | 6 | ||||
-rw-r--r-- | node/node.go | 320 | ||||
-rw-r--r-- | node/node_test.go | 12 | ||||
-rw-r--r-- | rpc/http.go | 63 | ||||
-rw-r--r-- | rpc/inproc.go | 111 | ||||
-rw-r--r-- | rpc/ipc.go | 2 | ||||
-rw-r--r-- | rpc/server.go | 4 | ||||
-rw-r--r-- | rpc/utils.go | 3 | ||||
-rw-r--r-- | rpc/websocket.go | 78 |
29 files changed, 911 insertions, 635 deletions
diff --git a/accounts/abi/abi.go b/accounts/abi/abi.go index 2dc8039f5..324d3c76f 100644 --- a/accounts/abi/abi.go +++ b/accounts/abi/abi.go @@ -165,7 +165,14 @@ func (abi ABI) Call(executer Executer, v interface{}, name string, args ...inter return abi.unmarshal(v, name, executer(callData)) } -var interSlice = reflect.TypeOf([]interface{}{}) +// these variable are used to determine certain types during type assertion for +// assignment. +var ( + r_interSlice = reflect.TypeOf([]interface{}{}) + r_hash = reflect.TypeOf(common.Hash{}) + r_bytes = reflect.TypeOf([]byte{}) + r_byte = reflect.TypeOf(byte(0)) +) // unmarshal output in v according to the abi specification func (abi ABI) unmarshal(v interface{}, name string, output []byte) error { @@ -194,17 +201,14 @@ func (abi ABI) unmarshal(v interface{}, name string, output []byte) error { field := typ.Field(j) // TODO read tags: `abi:"fieldName"` if field.Name == strings.ToUpper(method.Outputs[i].Name[:1])+method.Outputs[i].Name[1:] { - if field.Type.AssignableTo(reflectValue.Type()) { - value.Field(j).Set(reflectValue) - break - } else { - return fmt.Errorf("abi: cannot unmarshal %v in to %v", field.Type, reflectValue.Type()) + if err := set(value.Field(j), reflectValue, method.Outputs[i]); err != nil { + return err } } } } case reflect.Slice: - if !value.Type().AssignableTo(interSlice) { + if !value.Type().AssignableTo(r_interSlice) { return fmt.Errorf("abi: cannot marshal tuple in to slice %T (only []interface{} is supported)", v) } @@ -228,17 +232,40 @@ func (abi ABI) unmarshal(v interface{}, name string, output []byte) error { if err != nil { return err } - reflectValue := reflect.ValueOf(marshalledValue) - if typ.AssignableTo(reflectValue.Type()) { - value.Set(reflectValue) - } else { - return fmt.Errorf("abi: cannot unmarshal %v in to %v", reflectValue.Type(), value.Type()) + if err := set(value, reflect.ValueOf(marshalledValue), method.Outputs[0]); err != nil { + return err } } return nil } +// set attempts to assign src to dst by either setting, copying or otherwise. +// +// set is a bit more lenient when it comes to assignment and doesn't force an as +// strict ruleset as bare `reflect` does. +func set(dst, src reflect.Value, output Argument) error { + dstType := dst.Type() + srcType := src.Type() + + switch { + case dstType.AssignableTo(src.Type()): + dst.Set(src) + case dstType.Kind() == reflect.Array && srcType.Kind() == reflect.Slice: + if !dstType.Elem().AssignableTo(r_byte) { + return fmt.Errorf("abi: cannot unmarshal %v in to array of elem %v", src.Type(), dstType.Elem()) + } + + if dst.Len() < output.Type.Size { + return fmt.Errorf("abi: cannot unmarshal src (len=%d) in to dst (len=%d)", output.Type.Size, dst.Len()) + } + reflect.Copy(dst, src) + default: + return fmt.Errorf("abi: cannot unmarshal %v in to %v", src.Type(), dst.Type()) + } + return nil +} + func (abi *ABI) UnmarshalJSON(data []byte) error { var fields []struct { Type string diff --git a/accounts/abi/abi_test.go b/accounts/abi/abi_test.go index bb0143d21..c6a8705cd 100644 --- a/accounts/abi/abi_test.go +++ b/accounts/abi/abi_test.go @@ -394,37 +394,6 @@ func TestBytes(t *testing.T) { } } -/* -func TestReturn(t *testing.T) { - const definition = `[ - { "type" : "function", "name" : "balance", "const" : true, "inputs" : [], "outputs" : [ { "name": "", "type": "hash" } ] }, - { "type" : "function", "name" : "name", "const" : true, "inputs" : [], "outputs" : [ { "name": "", "type": "address" } ] }]` - - abi, err := JSON(strings.NewReader(definition)) - if err != nil { - t.Fatal(err) - } - - r := abi.Call(func([]byte) []byte { - t := make([]byte, 32) - t[0] = 1 - return t - }, "balance") - if _, ok := r.(common.Hash); !ok { - t.Errorf("expected type common.Hash, got %T", r) - } - - r = abi.Call(func([]byte) []byte { - t := make([]byte, 32) - t[0] = 1 - return t - }, "name") - if _, ok := r.(common.Address); !ok { - t.Errorf("expected type common.Address, got %T", r) - } -} -*/ - func TestDefaultFunctionParsing(t *testing.T) { const definition = `[{ "name" : "balance" }]` @@ -550,11 +519,71 @@ func TestMultiReturnWithSlice(t *testing.T) { } } +func TestMarshalArrays(t *testing.T) { + const definition = `[ + { "name" : "bytes32", "const" : false, "outputs": [ { "type": "bytes32" } ] }, + { "name" : "bytes10", "const" : false, "outputs": [ { "type": "bytes10" } ] } + ]` + + abi, err := JSON(strings.NewReader(definition)) + if err != nil { + t.Fatal(err) + } + + output := common.LeftPadBytes([]byte{1}, 32) + + var bytes10 [10]byte + err = abi.unmarshal(&bytes10, "bytes32", output) + if err == nil || err.Error() != "abi: cannot unmarshal src (len=32) in to dst (len=10)" { + t.Error("expected error or bytes32 not be assignable to bytes10:", err) + } + + var bytes32 [32]byte + err = abi.unmarshal(&bytes32, "bytes32", output) + if err != nil { + t.Error("didn't expect error:", err) + } + if !bytes.Equal(bytes32[:], output) { + t.Error("expected bytes32[31] to be 1 got", bytes32[31]) + } + + type ( + B10 [10]byte + B32 [32]byte + ) + + var b10 B10 + err = abi.unmarshal(&b10, "bytes32", output) + if err == nil || err.Error() != "abi: cannot unmarshal src (len=32) in to dst (len=10)" { + t.Error("expected error or bytes32 not be assignable to bytes10:", err) + } + + var b32 B32 + err = abi.unmarshal(&b32, "bytes32", output) + if err != nil { + t.Error("didn't expect error:", err) + } + if !bytes.Equal(b32[:], output) { + t.Error("expected bytes32[31] to be 1 got", bytes32[31]) + } + + output[10] = 1 + var shortAssignLong [32]byte + err = abi.unmarshal(&shortAssignLong, "bytes10", output) + if err != nil { + t.Error("didn't expect error:", err) + } + if !bytes.Equal(output, shortAssignLong[:]) { + t.Errorf("expected %x to be %x", shortAssignLong, output) + } +} + func TestUnmarshal(t *testing.T) { const definition = `[ { "name" : "int", "const" : false, "outputs": [ { "type": "uint256" } ] }, { "name" : "bool", "const" : false, "outputs": [ { "type": "bool" } ] }, { "name" : "bytes", "const" : false, "outputs": [ { "type": "bytes" } ] }, + { "name" : "fixed", "const" : false, "outputs": [ { "type": "bytes32" } ] }, { "name" : "multi", "const" : false, "outputs": [ { "type": "bytes" }, { "type": "bytes" } ] }, { "name" : "mixedBytes", "const" : true, "outputs": [ { "name": "a", "type": "bytes" }, { "name": "b", "type": "bytes32" } ] }]` @@ -655,6 +684,21 @@ func TestUnmarshal(t *testing.T) { t.Errorf("expected %x got %x", bytesOut, Bytes) } + // marshal dynamic bytes length 5 + buff.Reset() + buff.Write(common.RightPadBytes([]byte("hello"), 32)) + + var hash common.Hash + err = abi.unmarshal(&hash, "fixed", buff.Bytes()) + if err != nil { + t.Error(err) + } + + helloHash := common.BytesToHash(common.RightPadBytes([]byte("hello"), 32)) + if hash != helloHash { + t.Errorf("Expected %x to equal %x", hash, helloHash) + } + // marshal error buff.Reset() buff.Write(common.Hex2Bytes("0000000000000000000000000000000000000000000000000000000000000020")) diff --git a/cmd/geth/js_test.go b/cmd/geth/js_test.go index 19583c5ef..4330b484c 100644 --- a/cmd/geth/js_test.go +++ b/cmd/geth/js_test.go @@ -37,22 +37,21 @@ import ( "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/cmd/utils" ) const ( testSolcPath = "" - solcVersion = "0.9.23" + solcVersion = "0.9.23" - testKey = "e6fab74a43941f82d89cb7faa408e227cdad3153c4720e540e855c19b15e6674" + testKey = "e6fab74a43941f82d89cb7faa408e227cdad3153c4720e540e855c19b15e6674" testAddress = "0x8605cdbbdb6d264aa742e77020dcbc58fcdce182" testBalance = "10000000000000000000" -// of empty string + // of empty string testHash = "0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470" ) var ( - versionRE = regexp.MustCompile(strconv.Quote(`"compilerVersion":"` + solcVersion + `"`)) + versionRE = regexp.MustCompile(strconv.Quote(`"compilerVersion":"` + solcVersion + `"`)) testNodeKey = crypto.ToECDSA(common.Hex2Bytes("4b50fa71f5c3eeb8fdc452224b2395af2fcc3d125e06c32c82e048c0559db03f")) testGenesis = `{"` + testAddress[2:] + `": {"balance": "` + testBalance + `"}}` ) @@ -141,8 +140,10 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *nod stack.Service(ðereum) assetPath := filepath.Join(os.Getenv("GOPATH"), "src", "github.com", "ethereum", "go-ethereum", "cmd", "mist", "assets", "ext") - //client := comms.NewInProcClient(codec.JSON) - client := utils.NewInProcRPCClient(stack) + client, err := stack.Attach() + if err != nil { + t.Fatalf("failed to attach to node: %v", err) + } tf := &testjethre{client: ethereum.HTTPClient()} repl := newJSRE(stack, assetPath, "", client, false) tf.jsre = repl @@ -152,9 +153,6 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *nod func TestNodeInfo(t *testing.T) { t.Skip("broken after p2p update") tmp, repl, ethereum := testJEthRE(t) - if err := ethereum.Start(); err != nil { - t.Fatalf("error starting ethereum: %v", err) - } defer ethereum.Stop() defer os.RemoveAll(tmp) @@ -167,8 +165,8 @@ func TestAccounts(t *testing.T) { defer node.Stop() defer os.RemoveAll(tmp) - checkEvalJSON(t, repl, `eth.accounts`, `["` + testAddress + `"]`) - checkEvalJSON(t, repl, `eth.coinbase`, `"` + testAddress + `"`) + checkEvalJSON(t, repl, `eth.accounts`, `["`+testAddress+`"]`) + checkEvalJSON(t, repl, `eth.coinbase`, `"`+testAddress+`"`) val, err := repl.re.Run(`jeth.newAccount("password")`) if err != nil { t.Errorf("expected no error, got %v", err) @@ -178,7 +176,7 @@ func TestAccounts(t *testing.T) { t.Errorf("address not hex: %q", addr) } - checkEvalJSON(t, repl, `eth.accounts`, `["` + testAddress + `","` + addr + `"]`) + checkEvalJSON(t, repl, `eth.accounts`, `["`+testAddress+`","`+addr+`"]`) } @@ -206,13 +204,13 @@ func TestBlockChain(t *testing.T) { node.Service(ðereum) ethereum.BlockChain().Reset() - checkEvalJSON(t, repl, `admin.exportChain(` + tmpfileq + `)`, `true`) + checkEvalJSON(t, repl, `admin.exportChain(`+tmpfileq+`)`, `true`) if _, err := os.Stat(tmpfile); err != nil { t.Fatal(err) } // check import, verify that dumpBlock gives the same result. - checkEvalJSON(t, repl, `admin.importChain(` + tmpfileq + `)`, `true`) + checkEvalJSON(t, repl, `admin.importChain(`+tmpfileq+`)`, `true`) checkEvalJSON(t, repl, `debug.dumpBlock(eth.blockNumber)`, beforeExport) } @@ -240,7 +238,7 @@ func TestCheckTestAccountBalance(t *testing.T) { defer os.RemoveAll(tmp) repl.re.Run(`primary = "` + testAddress + `"`) - checkEvalJSON(t, repl, `eth.getBalance(primary)`, `"` + testBalance + `"`) + checkEvalJSON(t, repl, `eth.getBalance(primary)`, `"`+testBalance+`"`) } func TestSignature(t *testing.T) { @@ -301,11 +299,11 @@ func TestContract(t *testing.T) { */ source := `contract test {\n` + - " /// @notice Will multiply `a` by 7." + `\n` + - ` function multiply(uint a) returns(uint d) {\n` + - ` return a * 7;\n` + - ` }\n` + - `}\n` + " /// @notice Will multiply `a` by 7." + `\n` + + ` function multiply(uint a) returns(uint d) {\n` + + ` return a * 7;\n` + + ` }\n` + + `}\n` if checkEvalJSON(t, repl, `admin.stopNatSpec()`, `true`) != nil { return @@ -315,10 +313,10 @@ func TestContract(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - if checkEvalJSON(t, repl, `primary = eth.accounts[0]`, `"` + testAddress + `"`) != nil { + if checkEvalJSON(t, repl, `primary = eth.accounts[0]`, `"`+testAddress+`"`) != nil { return } - if checkEvalJSON(t, repl, `source = "` + source + `"`, `"` + source + `"`) != nil { + if checkEvalJSON(t, repl, `source = "`+source+`"`, `"`+source+`"`) != nil { return } @@ -396,7 +394,7 @@ multiply7 = Multiply7.at(contractaddress); var contentHash = `"0x86d2b7cf1e72e9a7a3f8d96601f0151742a2f780f1526414304fbe413dc7f9bd"` if sol != nil && solcVersion != sol.Version() { - modContractInfo := versionRE.ReplaceAll(contractInfo, []byte(`"compilerVersion":"` + sol.Version() + `"`)) + modContractInfo := versionRE.ReplaceAll(contractInfo, []byte(`"compilerVersion":"`+sol.Version()+`"`)) fmt.Printf("modified contractinfo:\n%s\n", modContractInfo) contentHash = `"` + common.ToHex(crypto.Sha3([]byte(modContractInfo))) + `"` } @@ -481,7 +479,7 @@ func processTxs(repl *testjethre, t *testing.T, expTxc int) bool { repl.wait <- height select { case <-timer.C: - // if times out make sure the xeth loop does not block + // if times out make sure the xeth loop does not block go func() { select { case repl.wait <- nil: diff --git a/cmd/geth/main.go b/cmd/geth/main.go index fa456a7ac..8594d18c5 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -399,7 +399,7 @@ func attach(ctx *cli.Context) { // attach to a running geth instance client, err := utils.NewRemoteRPCClient(ctx) if err != nil { - utils.Fatalf("Unable to attach to geth - %v", err) + utils.Fatalf("Unable to attach to geth: %v", err) } repl := newLightweightJSRE( @@ -425,8 +425,10 @@ func console(ctx *cli.Context) { startNode(ctx, node) // Attach to the newly started node, and either execute script or become interactive - client := utils.NewInProcRPCClient(node) - + client, err := node.Attach() + if err != nil { + utils.Fatalf("Failed to attach to the inproc geth: %v", err) + } repl := newJSRE(node, ctx.GlobalString(utils.JSpathFlag.Name), ctx.GlobalString(utils.RPCCORSDomainFlag.Name), @@ -449,8 +451,10 @@ func execScripts(ctx *cli.Context) { startNode(ctx, node) // Attach to the newly started node and execute the given scripts - client := utils.NewInProcRPCClient(node) - + client, err := node.Attach() + if err != nil { + utils.Fatalf("Failed to attach to the inproc geth: %v", err) + } repl := newJSRE(node, ctx.GlobalString(utils.JSpathFlag.Name), ctx.GlobalString(utils.RPCCORSDomainFlag.Name), @@ -503,16 +507,6 @@ func startNode(ctx *cli.Context, stack *node.Node) { } } // Start auxiliary services if enabled - if ctx.GlobalBool(utils.RPCEnabledFlag.Name) { - if err := utils.StartRPC(stack, ctx); err != nil { - utils.Fatalf("Failed to start RPC: %v", err) - } - } - if ctx.GlobalBool(utils.WSEnabledFlag.Name) { - if err := utils.StartWS(stack, ctx); err != nil { - utils.Fatalf("Failed to start WS: %v", err) - } - } if ctx.GlobalBool(utils.MiningEnabledFlag.Name) { if err := ethereum.StartMining(ctx.GlobalInt(utils.MinerThreadsFlag.Name), ctx.GlobalString(utils.MiningGPUFlag.Name)); err != nil { utils.Fatalf("Failed to start mining: %v", err) diff --git a/cmd/geth/monitorcmd.go b/cmd/geth/monitorcmd.go index 120f6b9f4..5d839b5a3 100644 --- a/cmd/geth/monitorcmd.go +++ b/cmd/geth/monitorcmd.go @@ -36,7 +36,7 @@ import ( var ( monitorCommandAttachFlag = cli.StringFlag{ Name: "attach", - Value: "ipc:" + node.DefaultIpcEndpoint(), + Value: "ipc:" + node.DefaultIPCEndpoint(), Usage: "API endpoint to attach to", } monitorCommandRowsFlag = cli.IntFlag{ diff --git a/cmd/gethrpctest/main.go b/cmd/gethrpctest/main.go index becd09f5a..38016fb35 100644 --- a/cmd/gethrpctest/main.go +++ b/cmd/gethrpctest/main.go @@ -18,7 +18,6 @@ package main import ( - "errors" "flag" "io/ioutil" "log" @@ -26,10 +25,10 @@ import ( "os/signal" "github.com/ethereum/go-ethereum/accounts" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/tests" @@ -84,12 +83,6 @@ func main() { } log.Println("Initial test suite passed...") - // Start the RPC interface and wait until terminated - if err := StartRPC(stack); err != nil { - log.Fatalf("Failed to start RPC interface: %v", err) - } - log.Println("RPC Interface started, accepting requests...") - quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt) <-quit @@ -99,7 +92,16 @@ func main() { // keystore path and initial pre-state. func MakeSystemNode(keydir string, privkey string, test *tests.BlockTest) (*node.Node, error) { // Create a networkless protocol stack - stack, err := node.New(&node.Config{IpcPath: node.DefaultIpcEndpoint(), NoDiscovery: true}) + stack, err := node.New(&node.Config{ + IPCPath: node.DefaultIPCEndpoint(), + HTTPHost: common.DefaultHTTPHost, + HTTPPort: common.DefaultHTTPPort, + HTTPModules: []string{"admin", "db", "eth", "debug", "miner", "net", "shh", "txpool", "personal", "web3"}, + WSHost: common.DefaultWSHost, + WSPort: common.DefaultWSPort, + WSModules: []string{"admin", "db", "eth", "debug", "miner", "net", "shh", "txpool", "personal", "web3"}, + NoDiscovery: true, + }) if err != nil { return nil, err } @@ -164,23 +166,3 @@ func RunTest(stack *node.Node, test *tests.BlockTest) error { } return nil } - -// StartRPC initializes an RPC interface to the given protocol stack. -func StartRPC(stack *node.Node) error { - /* - web3 := NewPublicWeb3API(stack) - server.RegisterName("web3", web3) - net := NewPublicNetAPI(stack.Server(), ethereum.NetVersion()) - server.RegisterName("net", net) - */ - - for _, api := range stack.APIs() { - if adminApi, ok := api.Service.(*node.PrivateAdminAPI); ok { - _, err := adminApi.StartRPC("127.0.0.1", 8545, "", "admin,db,eth,debug,miner,net,shh,txpool,personal,web3") - return err - } - } - - glog.V(logger.Error).Infof("Unable to start RPC-HTTP interface, could not find admin API") - return errors.New("Unable to start RPC-HTTP interface") -} diff --git a/cmd/utils/client.go b/cmd/utils/client.go index 40ebcd729..3913d007b 100644 --- a/cmd/utils/client.go +++ b/cmd/utils/client.go @@ -17,132 +17,14 @@ package utils import ( - "encoding/json" "fmt" - "strings" "github.com/codegangsta/cli" - "github.com/ethereum/go-ethereum/eth" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rpc" ) -// NewInProcRPCClient will start a new RPC server for the given node and returns a client to interact with it. -func NewInProcRPCClient(stack *node.Node) *inProcClient { - server := rpc.NewServer() - - offered := stack.APIs() - for _, api := range offered { - server.RegisterName(api.Namespace, api.Service) - } - - web3 := node.NewPublicWeb3API(stack) - server.RegisterName("web3", web3) - - var ethereum *eth.Ethereum - if err := stack.Service(ðereum); err == nil { - net := eth.NewPublicNetAPI(stack.Server(), ethereum.NetVersion()) - server.RegisterName("net", net) - } else { - glog.V(logger.Warn).Infof("%v\n", err) - } - - buf := &buf{ - requests: make(chan []byte), - responses: make(chan []byte), - } - client := &inProcClient{ - server: server, - buf: buf, - } - - go func() { - server.ServeCodec(rpc.NewJSONCodec(client.buf)) - }() - - return client -} - -// buf represents the connection between the RPC server and console -type buf struct { - readBuf []byte // store remaining request bytes after a partial read - requests chan []byte // list with raw serialized requests - responses chan []byte // list with raw serialized responses -} - -// will read the next request in json format -func (b *buf) Read(p []byte) (int, error) { - // last read didn't read entire request, return remaining bytes - if len(b.readBuf) > 0 { - n := copy(p, b.readBuf) - if n < len(b.readBuf) { - b.readBuf = b.readBuf[:n] - } else { - b.readBuf = b.readBuf[:0] - } - return n, nil - } - - // read next request - req := <-b.requests - n := copy(p, req) - if n < len(req) { - // buf too small, store remaining chunk for next read - b.readBuf = req[n:] - } - - return n, nil -} - -// Write send the given buffer to the backend -func (b *buf) Write(p []byte) (n int, err error) { - b.responses <- p - return len(p), nil -} - -// Close cleans up obtained resources. -func (b *buf) Close() error { - close(b.requests) - close(b.responses) - - return nil -} - -// inProcClient starts a RPC server and uses buf to communicate with it. -type inProcClient struct { - server *rpc.Server - buf *buf -} - -// Close will stop the RPC server -func (c *inProcClient) Close() { - c.server.Stop() -} - -// Send a msg to the endpoint -func (c *inProcClient) Send(msg interface{}) error { - d, err := json.Marshal(msg) - if err != nil { - return err - } - c.buf.requests <- d - return nil -} - -// Recv reads a message and tries to parse it into the given msg -func (c *inProcClient) Recv(msg interface{}) error { - data := <-c.buf.responses - return json.Unmarshal(data, &msg) -} - -// Returns the collection of modules the RPC server offers. -func (c *inProcClient) SupportedModules() (map[string]string, error) { - return rpc.SupportedModules(c) -} - // NewRemoteRPCClient returns a RPC client which connects to a running geth instance. // Depending on the given context this can either be a IPC or a HTTP client. func NewRemoteRPCClient(ctx *cli.Context) (rpc.Client, error) { @@ -151,7 +33,7 @@ func NewRemoteRPCClient(ctx *cli.Context) (rpc.Client, error) { return NewRemoteRPCClientFromString(endpoint) } // use IPC by default - return rpc.NewIPCClient(node.DefaultIpcEndpoint()) + return rpc.NewIPCClient(node.DefaultIPCEndpoint()) } // NewRemoteRPCClientFromString returns a RPC client which connects to the given @@ -169,6 +51,5 @@ func NewRemoteRPCClientFromString(endpoint string) (rpc.Client, error) { if strings.HasPrefix(endpoint, "ws:") { return rpc.NewWSClient(endpoint) } - return nil, fmt.Errorf("invalid endpoint") } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index ca9fd74ba..8e89b9fb1 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -18,7 +18,6 @@ package utils import ( "crypto/ecdsa" - "errors" "fmt" "io/ioutil" "math" @@ -233,12 +232,12 @@ var ( RPCListenAddrFlag = cli.StringFlag{ Name: "rpcaddr", Usage: "HTTP-RPC server listening interface", - Value: "127.0.0.1", + Value: common.DefaultHTTPHost, } RPCPortFlag = cli.IntFlag{ Name: "rpcport", Usage: "HTTP-RPC server listening port", - Value: 8545, + Value: common.DefaultHTTPPort, } RPCCORSDomainFlag = cli.StringFlag{ Name: "rpccorsdomain", @@ -248,7 +247,7 @@ var ( RPCApiFlag = cli.StringFlag{ Name: "rpcapi", Usage: "API's offered over the HTTP-RPC interface", - Value: rpc.DefaultHttpRpcApis, + Value: rpc.DefaultHTTPApis, } IPCDisabledFlag = cli.BoolFlag{ Name: "ipcdisable", @@ -257,12 +256,12 @@ var ( IPCApiFlag = cli.StringFlag{ Name: "ipcapi", Usage: "API's offered over the IPC-RPC interface", - Value: rpc.DefaultIpcApis, + Value: rpc.DefaultIPCApis, } IPCPathFlag = DirectoryFlag{ Name: "ipcpath", Usage: "Filename for IPC socket/pipe within the datadir (explicit paths escape it)", - Value: DirectoryString{common.DefaultIpcSocket()}, + Value: DirectoryString{common.DefaultIPCSocket}, } WSEnabledFlag = cli.BoolFlag{ Name: "ws", @@ -271,21 +270,21 @@ var ( WSListenAddrFlag = cli.StringFlag{ Name: "wsaddr", Usage: "WS-RPC server listening interface", - Value: "127.0.0.1", + Value: common.DefaultWSHost, } WSPortFlag = cli.IntFlag{ Name: "wsport", Usage: "WS-RPC server listening port", - Value: 8546, + Value: common.DefaultWSPort, } WSApiFlag = cli.StringFlag{ Name: "wsapi", Usage: "API's offered over the WS-RPC interface", - Value: rpc.DefaultHttpRpcApis, + Value: rpc.DefaultHTTPApis, } WSAllowedDomainsFlag = cli.StringFlag{ Name: "wsdomains", - Usage: "Domains from which to accept websockets requests", + Usage: "Domains from which to accept websockets requests (can be spoofed)", Value: "", } ExecFlag = cli.StringFlag{ @@ -394,9 +393,9 @@ func MustMakeDataDir(ctx *cli.Context) string { return "" } -// MakeIpcPath creates an IPC path configuration from the set command line flags, +// MakeIPCPath creates an IPC path configuration from the set command line flags, // returning an empty string if IPC was explicitly disabled, or the set path. -func MakeIpcPath(ctx *cli.Context) string { +func MakeIPCPath(ctx *cli.Context) string { if ctx.GlobalBool(IPCDisabledFlag.Name) { return "" } @@ -482,6 +481,24 @@ func MakeNAT(ctx *cli.Context) nat.Interface { return natif } +// MakeHTTPRpcHost creates the HTTP RPC listener interface string from the set +// command line flags, returning empty if the HTTP endpoint is disabled. +func MakeHTTPRpcHost(ctx *cli.Context) string { + if !ctx.GlobalBool(RPCEnabledFlag.Name) { + return "" + } + return ctx.GlobalString(RPCListenAddrFlag.Name) +} + +// MakeWSRpcHost creates the WebSocket RPC listener interface string from the set +// command line flags, returning empty if the HTTP endpoint is disabled. +func MakeWSRpcHost(ctx *cli.Context) string { + if !ctx.GlobalBool(WSEnabledFlag.Name) { + return "" + } + return ctx.GlobalString(WSListenAddrFlag.Name) +} + // MakeGenesisBlock loads up a genesis block from an input file specified in the // command line, or returns the empty string if none set. func MakeGenesisBlock(ctx *cli.Context) string { @@ -591,7 +608,6 @@ func MakeSystemNode(name, version string, extra []byte, ctx *cli.Context) *node. // Configure the node's service container stackConf := &node.Config{ DataDir: MustMakeDataDir(ctx), - IpcPath: MakeIpcPath(ctx), PrivateKey: MakeNodeKey(ctx), Name: MakeNodeName(name, version, ctx), NoDiscovery: ctx.GlobalBool(NoDiscoverFlag.Name), @@ -600,6 +616,15 @@ func MakeSystemNode(name, version string, extra []byte, ctx *cli.Context) *node. NAT: MakeNAT(ctx), MaxPeers: ctx.GlobalInt(MaxPeersFlag.Name), MaxPendingPeers: ctx.GlobalInt(MaxPendingPeersFlag.Name), + IPCPath: MakeIPCPath(ctx), + HTTPHost: MakeHTTPRpcHost(ctx), + HTTPPort: ctx.GlobalInt(RPCPortFlag.Name), + HTTPCors: ctx.GlobalString(RPCCORSDomainFlag.Name), + HTTPModules: strings.Split(ctx.GlobalString(RPCApiFlag.Name), ","), + WSHost: MakeWSRpcHost(ctx), + WSPort: ctx.GlobalInt(WSPortFlag.Name), + WSDomains: ctx.GlobalString(WSAllowedDomainsFlag.Name), + WSModules: strings.Split(ctx.GlobalString(WSApiFlag.Name), ","), } // Configure the Ethereum service accman := MakeAccountManager(ctx) @@ -740,48 +765,5 @@ func MakeChain(ctx *cli.Context) (chain *core.BlockChain, chainDb ethdb.Database if err != nil { Fatalf("Could not start chainmanager: %v", err) } - return chain, chainDb } - -// StartRPC starts a HTTP JSON-RPC API server. -func StartRPC(stack *node.Node, ctx *cli.Context) error { - for _, api := range stack.APIs() { - if adminApi, ok := api.Service.(*node.PrivateAdminAPI); ok { - address := ctx.GlobalString(RPCListenAddrFlag.Name) - port := ctx.GlobalInt(RPCPortFlag.Name) - cors := ctx.GlobalString(RPCCORSDomainFlag.Name) - apiStr := "" - if ctx.GlobalIsSet(RPCApiFlag.Name) { - apiStr = ctx.GlobalString(RPCApiFlag.Name) - } - - _, err := adminApi.StartRPC(address, port, cors, apiStr) - return err - } - } - - glog.V(logger.Error).Infof("Unable to start RPC-HTTP interface, could not find admin API") - return errors.New("Unable to start RPC-HTTP interface") -} - -// StartWS starts a websocket JSON-RPC API server. -func StartWS(stack *node.Node, ctx *cli.Context) error { - for _, api := range stack.APIs() { - if adminApi, ok := api.Service.(*node.PrivateAdminAPI); ok { - address := ctx.GlobalString(WSListenAddrFlag.Name) - port := ctx.GlobalInt(WSAllowedDomainsFlag.Name) - allowedDomains := ctx.GlobalString(WSAllowedDomainsFlag.Name) - apiStr := "" - if ctx.GlobalIsSet(WSApiFlag.Name) { - apiStr = ctx.GlobalString(WSApiFlag.Name) - } - - _, err := adminApi.StartWS(address, port, allowedDomains, apiStr) - return err - } - } - - glog.V(logger.Error).Infof("Unable to start RPC-WS interface, could not find admin API") - return errors.New("Unable to start RPC-WS interface") -} diff --git a/common/defaults.go b/common/defaults.go new file mode 100644 index 000000000..8a136fa80 --- /dev/null +++ b/common/defaults.go @@ -0,0 +1,48 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package common + +import ( + "path/filepath" + "runtime" +) + +const ( + DefaultIPCSocket = "geth.ipc" // Default (relative) name of the IPC RPC socket + DefaultHTTPHost = "localhost" // Default host interface for the HTTP RPC server + DefaultHTTPPort = 8545 // Default TCP port for the HTTP RPC server + DefaultWSHost = "localhost" // Default host interface for the websocket RPC server + DefaultWSPort = 8546 // Default TCP port for the websocket RPC server +) + +// DefaultDataDir is the default data directory to use for the databases and other +// persistence requirements. +func DefaultDataDir() string { + // Try to place the data folder in the user's home dir + home := HomeDir() + if home != "" { + if runtime.GOOS == "darwin" { + return filepath.Join(home, "Library", "Ethereum") + } else if runtime.GOOS == "windows" { + return filepath.Join(home, "AppData", "Roaming", "Ethereum") + } else { + return filepath.Join(home, ".ethereum") + } + } + // As we cannot guess a stable location, return empty and handle later + return "" +} diff --git a/common/path.go b/common/path.go index 38c213a12..75a8c1a3e 100644 --- a/common/path.go +++ b/common/path.go @@ -72,25 +72,3 @@ func HomeDir() string { } return "" } - -func DefaultDataDir() string { - // Try to place the data folder in the user's home dir - home := HomeDir() - if home != "" { - if runtime.GOOS == "darwin" { - return filepath.Join(home, "Library", "Ethereum") - } else if runtime.GOOS == "windows" { - return filepath.Join(home, "AppData", "Roaming", "Ethereum") - } else { - return filepath.Join(home, ".ethereum") - } - } - // As we cannot guess a stable location, return empty and handle later - return "" -} - -// DefaultIpcSocket returns the relative name of the default IPC socket. The path -// resolution is done by a node with other contextual infos. -func DefaultIpcSocket() string { - return "geth.ipc" -} diff --git a/core/blockchain.go b/core/blockchain.go index 95ed06d8d..22dd617ad 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1358,7 +1358,7 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error { go self.eventMux.Post(RemovedTransactionEvent{diff}) } if len(deletedLogs) > 0 { - go self.eventMux.Post(RemovedLogEvent{deletedLogs}) + go self.eventMux.Post(RemovedLogsEvent{deletedLogs}) } return nil diff --git a/core/blockchain_test.go b/core/blockchain_test.go index b4ac1696a..1bb5f646d 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -982,7 +982,7 @@ func TestLogReorgs(t *testing.T) { evmux := &event.TypeMux{} blockchain, _ := NewBlockChain(db, FakePow{}, evmux) - subs := evmux.Subscribe(RemovedLogEvent{}) + subs := evmux.Subscribe(RemovedLogsEvent{}) chain, _ := GenerateChain(genesis, db, 2, func(i int, gen *BlockGen) { if i == 1 { tx, err := types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), big.NewInt(1000000), new(big.Int), code).SignECDSA(key1) @@ -1002,7 +1002,7 @@ func TestLogReorgs(t *testing.T) { } ev := <-subs.Chan() - if len(ev.Data.(RemovedLogEvent).Logs) == 0 { + if len(ev.Data.(RemovedLogsEvent).Logs) == 0 { t.Error("expected logs") } } diff --git a/core/events.go b/core/events.go index 1a760c71c..c23206cad 100644 --- a/core/events.go +++ b/core/events.go @@ -30,6 +30,11 @@ type TxPreEvent struct{ Tx *types.Transaction } // TxPostEvent is posted when a transaction has been processed. type TxPostEvent struct{ Tx *types.Transaction } +// PendingLogsEvent is posted pre mining and notifies of pending logs. +type PendingLogsEvent struct { + Logs vm.Logs +} + // NewBlockEvent is posted when a block has been imported. type NewBlockEvent struct{ Block *types.Block } @@ -40,7 +45,7 @@ type NewMinedBlockEvent struct{ Block *types.Block } type RemovedTransactionEvent struct{ Txs types.Transactions } // RemovedLogEvent is posted when a reorg happens -type RemovedLogEvent struct{ Logs vm.Logs } +type RemovedLogsEvent struct{ Logs vm.Logs } // ChainSplit is posted when a new head is detected type ChainSplitEvent struct { diff --git a/eth/filters/api.go b/eth/filters/api.go index 148daa649..6cd184b80 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -142,7 +142,11 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) { s.blockMu.Lock() filter := New(s.chainDb) - id := s.filterManager.Add(filter) + id, err := s.filterManager.Add(filter, ChainFilter) + if err != nil { + return "", err + } + s.blockQueue[id] = &hashQueue{timeout: time.Now()} filter.BlockCallback = func(block *types.Block, logs vm.Logs) { @@ -174,7 +178,11 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) { defer s.transactionMu.Unlock() filter := New(s.chainDb) - id := s.filterManager.Add(filter) + id, err := s.filterManager.Add(filter, PendingTxFilter) + if err != nil { + return "", err + } + s.transactionQueue[id] = &hashQueue{timeout: time.Now()} filter.TransactionCallback = func(tx *types.Transaction) { @@ -194,12 +202,16 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) { } // newLogFilter creates a new log filter. -func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) int { +func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) (int, error) { s.logMu.Lock() defer s.logMu.Unlock() filter := New(s.chainDb) - id := s.filterManager.Add(filter) + id, err := s.filterManager.Add(filter, LogFilter) + if err != nil { + return 0, err + } + s.logQueue[id] = &logQueue{timeout: time.Now()} filter.SetBeginBlock(earliest) @@ -215,7 +227,7 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo } } - return id + return id, nil } // NewFilterArgs represents a request to create a new filter. @@ -352,9 +364,12 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) { var id int if len(args.Addresses) > 0 { - id = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics) + id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics) } else { - id = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics) + id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics) + } + if err != nil { + return "", err } s.filterMapMu.Lock() diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 2c92d20b1..96af93c4a 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -18,6 +18,7 @@ package filters import ( "math" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -32,6 +33,8 @@ type AccountChange struct { // Filtering interface type Filter struct { + created time.Time + db ethdb.Database begin, end int64 addresses []common.Address diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 04e58a08c..b61a493b6 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -19,6 +19,7 @@ package filters import ( + "fmt" "sync" "time" @@ -27,26 +28,47 @@ import ( "github.com/ethereum/go-ethereum/event" ) +// FilterType determines the type of filter and is used to put the filter in to +// the correct bucket when added. +type FilterType byte + +const ( + ChainFilter FilterType = iota // new block events filter + PendingTxFilter // pending transaction filter + LogFilter // new or removed log filter + PendingLogFilter // pending log filter +) + // FilterSystem manages filters that filter specific events such as // block, transaction and log events. The Filtering system can be used to listen // for specific LOG events fired by the EVM (Ethereum Virtual Machine). type FilterSystem struct { filterMu sync.RWMutex filterId int - filters map[int]*Filter - created map[int]time.Time - sub event.Subscription + + chainFilters map[int]*Filter + pendingTxFilters map[int]*Filter + logFilters map[int]*Filter + pendingLogFilters map[int]*Filter + + // generic is an ugly hack for Get + generic map[int]*Filter + + sub event.Subscription } // NewFilterSystem returns a newly allocated filter manager func NewFilterSystem(mux *event.TypeMux) *FilterSystem { fs := &FilterSystem{ - filters: make(map[int]*Filter), - created: make(map[int]time.Time), + chainFilters: make(map[int]*Filter), + pendingTxFilters: make(map[int]*Filter), + logFilters: make(map[int]*Filter), + pendingLogFilters: make(map[int]*Filter), + generic: make(map[int]*Filter), } fs.sub = mux.Subscribe( - //core.PendingBlockEvent{}, - core.RemovedLogEvent{}, + core.PendingLogsEvent{}, + core.RemovedLogsEvent{}, core.ChainEvent{}, core.TxPreEvent{}, vm.Logs(nil), @@ -61,15 +83,30 @@ func (fs *FilterSystem) Stop() { } // Add adds a filter to the filter manager -func (fs *FilterSystem) Add(filter *Filter) (id int) { +func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) { fs.filterMu.Lock() defer fs.filterMu.Unlock() - id = fs.filterId - fs.filters[id] = filter - fs.created[id] = time.Now() + + id := fs.filterId + filter.created = time.Now() + + switch filterType { + case ChainFilter: + fs.chainFilters[id] = filter + case PendingTxFilter: + fs.pendingTxFilters[id] = filter + case LogFilter: + fs.logFilters[id] = filter + case PendingLogFilter: + fs.pendingLogFilters[id] = filter + default: + return 0, fmt.Errorf("unknown filter type %v", filterType) + } + fs.generic[id] = filter + fs.filterId++ - return id + return id, nil } // Remove removes a filter by filter id @@ -77,16 +114,18 @@ func (fs *FilterSystem) Remove(id int) { fs.filterMu.Lock() defer fs.filterMu.Unlock() - delete(fs.filters, id) - delete(fs.created, id) + delete(fs.chainFilters, id) + delete(fs.pendingTxFilters, id) + delete(fs.logFilters, id) + delete(fs.pendingLogFilters, id) + delete(fs.generic, id) } -// Get retrieves a filter installed using Add The filter may not be modified. func (fs *FilterSystem) Get(id int) *Filter { fs.filterMu.RLock() defer fs.filterMu.RUnlock() - return fs.filters[id] + return fs.generic[id] } // filterLoop waits for specific events from ethereum and fires their handlers @@ -96,17 +135,16 @@ func (fs *FilterSystem) filterLoop() { switch ev := event.Data.(type) { case core.ChainEvent: fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.BlockCallback != nil && !fs.created[id].After(event.Time) { + for _, filter := range fs.chainFilters { + if filter.BlockCallback != nil && !filter.created.After(event.Time) { filter.BlockCallback(ev.Block, ev.Logs) } } fs.filterMu.RUnlock() - case core.TxPreEvent: fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.TransactionCallback != nil && !fs.created[id].After(event.Time) { + for _, filter := range fs.pendingTxFilters { + if filter.TransactionCallback != nil && !filter.created.After(event.Time) { filter.TransactionCallback(ev.Tx) } } @@ -114,25 +152,34 @@ func (fs *FilterSystem) filterLoop() { case vm.Logs: fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.LogCallback != nil && !fs.created[id].After(event.Time) { + for _, filter := range fs.logFilters { + if filter.LogCallback != nil && !filter.created.After(event.Time) { for _, log := range filter.FilterLogs(ev) { filter.LogCallback(log, false) } } } fs.filterMu.RUnlock() - - case core.RemovedLogEvent: + case core.RemovedLogsEvent: fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.LogCallback != nil && !fs.created[id].After(event.Time) { + for _, filter := range fs.logFilters { + if filter.LogCallback != nil && !filter.created.After(event.Time) { for _, removedLog := range ev.Logs { filter.LogCallback(removedLog, true) } } } fs.filterMu.RUnlock() + case core.PendingLogsEvent: + fs.filterMu.RLock() + for _, filter := range fs.pendingLogFilters { + if filter.LogCallback != nil && !filter.created.After(event.Time) { + for _, pendingLog := range ev.Logs { + filter.LogCallback(pendingLog, false) + } + } + } + fs.filterMu.RUnlock() } } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 7ddeb02bc..3ad7dd9cb 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -18,6 +18,7 @@ func TestCallbacks(t *testing.T) { txDone = make(chan struct{}) logDone = make(chan struct{}) removedLogDone = make(chan struct{}) + pendingLogDone = make(chan struct{}) ) blockFilter := &Filter{ @@ -37,7 +38,6 @@ func TestCallbacks(t *testing.T) { } }, } - removedLogFilter := &Filter{ LogCallback: func(l *vm.Log, oob bool) { if oob { @@ -45,16 +45,23 @@ func TestCallbacks(t *testing.T) { } }, } + pendingLogFilter := &Filter{ + LogCallback: func(*vm.Log, bool) { + close(pendingLogDone) + }, + } - fs.Add(blockFilter) - fs.Add(txFilter) - fs.Add(logFilter) - fs.Add(removedLogFilter) + fs.Add(blockFilter, ChainFilter) + fs.Add(txFilter, PendingTxFilter) + fs.Add(logFilter, LogFilter) + fs.Add(removedLogFilter, LogFilter) + fs.Add(pendingLogFilter, PendingLogFilter) mux.Post(core.ChainEvent{}) mux.Post(core.TxPreEvent{}) - mux.Post(core.RemovedLogEvent{vm.Logs{&vm.Log{}}}) mux.Post(vm.Logs{&vm.Log{}}) + mux.Post(core.RemovedLogsEvent{vm.Logs{&vm.Log{}}}) + mux.Post(core.PendingLogsEvent{vm.Logs{&vm.Log{}}}) const dura = 5 * time.Second failTimer := time.NewTimer(dura) @@ -84,4 +91,11 @@ func TestCallbacks(t *testing.T) { case <-failTimer.C: t.Error("removed log filter failed to trigger (timeout)") } + + failTimer.Reset(dura) + select { + case <-pendingLogDone: + case <-failTimer.C: + t.Error("pending log filter failed to trigger (timout)") + } } diff --git a/miner/worker.go b/miner/worker.go index 9c29d2250..81f7b16ac 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -243,7 +243,7 @@ func (self *worker) update() { // Apply transaction to the pending state if we're not mining if atomic.LoadInt32(&self.mining) == 0 { self.currentMu.Lock() - self.current.commitTransactions(types.Transactions{ev.Tx}, self.gasPrice, self.chain) + self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain) self.currentMu.Unlock() } } @@ -529,7 +529,7 @@ func (self *worker) commitNewWork() { transactions := append(singleTxOwner, multiTxOwner...) */ - work.commitTransactions(transactions, self.gasPrice, self.chain) + work.commitTransactions(self.mux, transactions, self.gasPrice, self.chain) self.eth.TxPool().RemoveTransactions(work.lowGasTxs) // compute uncles for the new block. @@ -588,8 +588,10 @@ func (self *worker) commitUncle(work *Work, uncle *types.Header) error { return nil } -func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *big.Int, bc *core.BlockChain) { +func (env *Work) commitTransactions(mux *event.TypeMux, transactions types.Transactions, gasPrice *big.Int, bc *core.BlockChain) { gp := new(core.GasPool).AddGas(env.header.GasLimit) + + var coalescedLogs vm.Logs for _, tx := range transactions { // We can skip err. It has already been validated in the tx pool from, _ := tx.From() @@ -627,7 +629,7 @@ func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *b env.state.StartRecord(tx.Hash(), common.Hash{}, 0) - err := env.commitTransaction(tx, bc, gp) + err, logs := env.commitTransaction(tx, bc, gp) switch { case core.IsGasLimitErr(err): // ignore the transactor so no nonce errors will be thrown for this account @@ -643,20 +645,25 @@ func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *b } default: env.tcount++ + coalescedLogs = append(coalescedLogs, logs...) } } + if len(coalescedLogs) > 0 { + go mux.Post(core.PendingLogsEvent{Logs: coalescedLogs}) + } } -func (env *Work) commitTransaction(tx *types.Transaction, bc *core.BlockChain, gp *core.GasPool) error { +func (env *Work) commitTransaction(tx *types.Transaction, bc *core.BlockChain, gp *core.GasPool) (error, vm.Logs) { snap := env.state.Copy() - receipt, _, _, err := core.ApplyTransaction(bc, gp, env.state, env.header, tx, env.header.GasUsed) + receipt, logs, _, err := core.ApplyTransaction(bc, gp, env.state, env.header, tx, env.header.GasUsed) if err != nil { env.state.Set(snap) - return err + return err, nil } env.txs = append(env.txs, tx) env.receipts = append(env.receipts, receipt) - return nil + + return nil, logs } // TODO: remove or use diff --git a/node/api.go b/node/api.go index bc1795407..48cbd0150 100644 --- a/node/api.go +++ b/node/api.go @@ -25,10 +25,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/rpc" "github.com/rcrowley/go-metrics" - - "gopkg.in/fatih/set.v0" ) // PrivateAdminAPI is the collection of administrative API methods exposed only @@ -61,83 +58,55 @@ func (api *PrivateAdminAPI) AddPeer(url string) (bool, error) { } // StartRPC starts the HTTP RPC API server. -func (api *PrivateAdminAPI) StartRPC(address string, port int, cors string, apis string) (bool, error) { - var offeredAPIs []rpc.API - if len(apis) > 0 { - namespaces := set.New() - for _, a := range strings.Split(apis, ",") { - namespaces.Add(strings.TrimSpace(a)) - } - for _, api := range api.node.APIs() { - if namespaces.Has(api.Namespace) { - offeredAPIs = append(offeredAPIs, api) - } - } - } else { // use by default all public API's - for _, api := range api.node.APIs() { - if api.Public { - offeredAPIs = append(offeredAPIs, api) - } - } - } +func (api *PrivateAdminAPI) StartRPC(host string, port int, cors string, apis string) (bool, error) { + api.node.lock.Lock() + defer api.node.lock.Unlock() - if address == "" { - address = "127.0.0.1" + if api.node.httpHandler != nil { + return false, fmt.Errorf("HTTP RPC already running on %s", api.node.httpEndpoint) } - if port == 0 { - port = 8545 + if err := api.node.startHTTP(fmt.Sprintf("%s:%d", host, port), api.node.rpcAPIs, strings.Split(apis, ","), cors); err != nil { + return false, err } - - corsDomains := strings.Split(cors, " ") - err := rpc.StartHTTP(address, port, corsDomains, offeredAPIs) - return err == nil, err + return true, nil } // StopRPC terminates an already running HTTP RPC API endpoint. func (api *PrivateAdminAPI) StopRPC() (bool, error) { - err := rpc.StopHTTP() - return err == nil, err + api.node.lock.Lock() + defer api.node.lock.Unlock() + + if api.node.httpHandler == nil { + return false, fmt.Errorf("HTTP RPC not running") + } + api.node.stopHTTP() + return true, nil } // StartWS starts the websocket RPC API server. -func (api *PrivateAdminAPI) StartWS(address string, port int, cors string, apis string) (bool, error) { - var offeredAPIs []rpc.API - if len(apis) > 0 { - namespaces := set.New() - for _, a := range strings.Split(apis, ",") { - namespaces.Add(strings.TrimSpace(a)) - } - for _, api := range api.node.APIs() { - if namespaces.Has(api.Namespace) { - offeredAPIs = append(offeredAPIs, api) - } - } - } else { - // use by default all public API's - for _, api := range api.node.APIs() { - if api.Public { - offeredAPIs = append(offeredAPIs, api) - } - } - } +func (api *PrivateAdminAPI) StartWS(host string, port int, cors string, apis string) (bool, error) { + api.node.lock.Lock() + defer api.node.lock.Unlock() - if address == "" { - address = "127.0.0.1" + if api.node.wsHandler != nil { + return false, fmt.Errorf("WebSocket RPC already running on %s", api.node.wsEndpoint) } - if port == 0 { - port = 8546 + if err := api.node.startWS(fmt.Sprintf("%s:%d", host, port), api.node.rpcAPIs, strings.Split(apis, ","), cors); err != nil { + return false, err } - - corsDomains := strings.Split(cors, " ") - - err := rpc.StartWS(address, port, corsDomains, offeredAPIs) - return err == nil, err + return true, nil } // StopRPC terminates an already running websocket RPC API endpoint. func (api *PrivateAdminAPI) StopWS() (bool, error) { - err := rpc.StopWS() - return err == nil, err + api.node.lock.Lock() + defer api.node.lock.Unlock() + + if api.node.wsHandler == nil { + return false, fmt.Errorf("WebSocket RPC not running") + } + api.node.stopWS() + return true, nil } // PublicAdminAPI is the collection of administrative API methods exposed over diff --git a/node/config.go b/node/config.go index d3eb1c78b..301ec636e 100644 --- a/node/config.go +++ b/node/config.go @@ -19,6 +19,7 @@ package node import ( "crypto/ecdsa" "encoding/json" + "fmt" "io/ioutil" "net" "os" @@ -52,11 +53,11 @@ type Config struct { // in memory. DataDir string - // IpcPath is the requested location to place the IPC endpoint. If the path is + // IPCPath is the requested location to place the IPC endpoint. If the path is // a simple file name, it is placed inside the data directory (or on the root // pipe path on Windows), whereas if it's a resolvable path name (absolute or // relative), then that specific path is enforced. An empty path disables IPC. - IpcPath string + IPCPath string // This field should be a valid secp256k1 private key that will be used for both // remote peer identification as well as network traffic encryption. If no key @@ -97,37 +98,105 @@ type Config struct { // handshake phase, counted separately for inbound and outbound connections. // Zero defaults to preset values. MaxPendingPeers int + + // HTTPHost is the host interface on which to start the HTTP RPC server. If this + // field is empty, no HTTP API endpoint will be started. + HTTPHost string + + // HTTPPort is the TCP port number on which to start the HTTP RPC server. The + // default zero value is/ valid and will pick a port number randomly (useful + // for ephemeral nodes). + HTTPPort int + + // HTTPCors is the Cross-Origin Resource Sharing header to send to requesting + // clients. Please be aware that CORS is a browser enforced security, it's fully + // useless for custom HTTP clients. + HTTPCors string + + // HTTPModules is a list of API modules to expose via the HTTP RPC interface. + // If the module list is empty, all RPC API endpoints designated public will be + // exposed. + HTTPModules []string + + // WSHost is the host interface on which to start the websocket RPC server. If + // this field is empty, no websocket API endpoint will be started. + WSHost string + + // WSPort is the TCP port number on which to start the websocket RPC server. The + // default zero value is/ valid and will pick a port number randomly (useful for + // ephemeral nodes). + WSPort int + + // WSDomains is the list of domain to accept websocket requests from. Please be + // aware that the server can only act upon the HTTP request the client sends and + // cannot verify the validity of the request header. + WSDomains string + + // WSModules is a list of API modules to expose via the websocket RPC interface. + // If the module list is empty, all RPC API endpoints designated public will be + // exposed. + WSModules []string } -// IpcEndpoint resolves an IPC endpoint based on a configured value, taking into +// IPCEndpoint resolves an IPC endpoint based on a configured value, taking into // account the set data folders as well as the designated platform we're currently // running on. -func (c *Config) IpcEndpoint() string { +func (c *Config) IPCEndpoint() string { // Short circuit if IPC has not been enabled - if c.IpcPath == "" { + if c.IPCPath == "" { return "" } // On windows we can only use plain top-level pipes if runtime.GOOS == "windows" { - if strings.HasPrefix(c.IpcPath, `\\.\pipe\`) { - return c.IpcPath + if strings.HasPrefix(c.IPCPath, `\\.\pipe\`) { + return c.IPCPath } - return `\\.\pipe\` + c.IpcPath + return `\\.\pipe\` + c.IPCPath } // Resolve names into the data directory full paths otherwise - if filepath.Base(c.IpcPath) == c.IpcPath { + if filepath.Base(c.IPCPath) == c.IPCPath { if c.DataDir == "" { - return filepath.Join(os.TempDir(), c.IpcPath) + return filepath.Join(os.TempDir(), c.IPCPath) } - return filepath.Join(c.DataDir, c.IpcPath) + return filepath.Join(c.DataDir, c.IPCPath) + } + return c.IPCPath +} + +// DefaultIPCEndpoint returns the IPC path used by default. +func DefaultIPCEndpoint() string { + config := &Config{DataDir: common.DefaultDataDir(), IPCPath: common.DefaultIPCSocket} + return config.IPCEndpoint() +} + +// HTTPEndpoint resolves an HTTP endpoint based on the configured host interface +// and port parameters. +func (c *Config) HTTPEndpoint() string { + if c.HTTPHost == "" { + return "" + } + return fmt.Sprintf("%s:%d", c.HTTPHost, c.HTTPPort) +} + +// DefaultHTTPEndpoint returns the HTTP endpoint used by default. +func DefaultHTTPEndpoint() string { + config := &Config{HTTPHost: common.DefaultHTTPHost, HTTPPort: common.DefaultHTTPPort} + return config.HTTPEndpoint() +} + +// WSEndpoint resolves an websocket endpoint based on the configured host interface +// and port parameters. +func (c *Config) WSEndpoint() string { + if c.WSHost == "" { + return "" } - return c.IpcPath + return fmt.Sprintf("%s:%d", c.WSHost, c.WSPort) } -// DefaultIpcEndpoint returns the IPC path used by default. -func DefaultIpcEndpoint() string { - config := &Config{DataDir: common.DefaultDataDir(), IpcPath: common.DefaultIpcSocket()} - return config.IpcEndpoint() +// DefaultWSEndpoint returns the websocket endpoint used by default. +func DefaultWSEndpoint() string { + config := &Config{WSHost: common.DefaultWSHost, WSPort: common.DefaultWSPort} + return config.WSEndpoint() } // NodeKey retrieves the currently configured private key of the node, checking diff --git a/node/config_test.go b/node/config_test.go index efb864ce4..45a54d184 100644 --- a/node/config_test.go +++ b/node/config_test.go @@ -63,10 +63,10 @@ func TestDatadirCreation(t *testing.T) { // Tests that IPC paths are correctly resolved to valid endpoints of different // platforms. -func TestIpcPathResolution(t *testing.T) { +func TestIPCPathResolution(t *testing.T) { var tests = []struct { DataDir string - IpcPath string + IPCPath string Windows bool Endpoint string }{ @@ -85,7 +85,7 @@ func TestIpcPathResolution(t *testing.T) { for i, test := range tests { // Only run when platform/test match if (runtime.GOOS == "windows") == test.Windows { - if endpoint := (&Config{DataDir: test.DataDir, IpcPath: test.IpcPath}).IpcEndpoint(); endpoint != test.Endpoint { + if endpoint := (&Config{DataDir: test.DataDir, IPCPath: test.IPCPath}).IPCEndpoint(); endpoint != test.Endpoint { t.Errorf("test %d: IPC endpoint mismatch: have %s, want %s", i, endpoint, test.Endpoint) } } diff --git a/node/node.go b/node/node.go index e3fc03360..7d3a10874 100644 --- a/node/node.go +++ b/node/node.go @@ -55,10 +55,25 @@ type Node struct { serviceFuncs []ServiceConstructor // Service constructors (in dependency order) services map[reflect.Type]Service // Currently running services + rpcAPIs []rpc.API // List of APIs currently provided by the node + inprocHandler *rpc.Server // In-process RPC request handler to process the API requests + ipcEndpoint string // IPC endpoint to listen at (empty = IPC disabled) ipcListener net.Listener // IPC RPC listener socket to serve API requests ipcHandler *rpc.Server // IPC RPC request handler to process the API requests + httpEndpoint string // HTTP endpoint (interface + port) to listen at (empty = HTTP disabled) + httpWhitelist []string // HTTP RPC modules to allow through this endpoint + httpCors string // HTTP RPC Cross-Origin Resource Sharing header + httpListener net.Listener // HTTP RPC listener socket to server API requests + httpHandler *rpc.Server // HTTP RPC request handler to process the API requests + + wsEndpoint string // Websocket endpoint (interface + port) to listen at (empty = websocket disabled) + wsWhitelist []string // Websocket RPC modules to allow through this endpoint + wsDomains string // Websocket RPC allowed origin domains + wsListener net.Listener // Websocket RPC listener socket to server API requests + wsHandler *rpc.Server // Websocket RPC request handler to process the API requests + stop chan struct{} // Channel to wait for termination notifications lock sync.RWMutex } @@ -93,9 +108,15 @@ func New(conf *Config) (*Node, error) { MaxPeers: conf.MaxPeers, MaxPendingPeers: conf.MaxPendingPeers, }, - serviceFuncs: []ServiceConstructor{}, - ipcEndpoint: conf.IpcEndpoint(), - eventmux: new(event.TypeMux), + serviceFuncs: []ServiceConstructor{}, + ipcEndpoint: conf.IPCEndpoint(), + httpEndpoint: conf.HTTPEndpoint(), + httpWhitelist: conf.HTTPModules, + httpCors: conf.HTTPCors, + wsEndpoint: conf.WSEndpoint(), + wsWhitelist: conf.WSModules, + wsDomains: conf.WSDomains, + eventmux: new(event.TypeMux), }, nil } @@ -188,58 +209,234 @@ func (n *Node) Start() error { return nil } -// startRPC initializes and starts the IPC RPC endpoints. +// startRPC is a helper method to start all the various RPC endpoint during node +// startup. It's not meant to be called at any time afterwards as it makes certain +// assumptions about the state of the node. func (n *Node) startRPC(services map[reflect.Type]Service) error { - // Gather and register all the APIs exposed by the services + // Gather all the possible APIs to surface apis := n.apis() for _, service := range services { apis = append(apis, service.APIs()...) } - ipcHandler := rpc.NewServer() + // Start the various API endpoints, terminating all in case of errors + if err := n.startInProc(apis); err != nil { + return err + } + if err := n.startIPC(apis); err != nil { + n.stopInProc() + return err + } + if err := n.startHTTP(n.httpEndpoint, apis, n.httpWhitelist, n.httpCors); err != nil { + n.stopIPC() + n.stopInProc() + return err + } + if err := n.startWS(n.wsEndpoint, apis, n.wsWhitelist, n.wsDomains); err != nil { + n.stopHTTP() + n.stopIPC() + n.stopInProc() + return err + } + // All API endpoints started successfully + n.rpcAPIs = apis + return nil +} + +// startInProc initializes an in-process RPC endpoint. +func (n *Node) startInProc(apis []rpc.API) error { + // Register all the APIs exposed by the services + handler := rpc.NewServer() + for _, api := range apis { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return err + } + glog.V(logger.Debug).Infof("InProc registered %T under '%s'", api.Service, api.Namespace) + } + n.inprocHandler = handler + return nil +} + +// stopInProc terminates the in-process RPC endpoint. +func (n *Node) stopInProc() { + if n.inprocHandler != nil { + n.inprocHandler.Stop() + n.inprocHandler = nil + } +} + +// startIPC initializes and starts the IPC RPC endpoint. +func (n *Node) startIPC(apis []rpc.API) error { + // Short circuit if the IPC endpoint isn't being exposed + if n.ipcEndpoint == "" { + return nil + } + // Register all the APIs exposed by the services + handler := rpc.NewServer() for _, api := range apis { - if err := ipcHandler.RegisterName(api.Namespace, api.Service); err != nil { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { return err } - glog.V(logger.Debug).Infof("Register %T under namespace '%s'", api.Service, api.Namespace) + glog.V(logger.Debug).Infof("IPC registered %T under '%s'", api.Service, api.Namespace) } - // All APIs registered, start the IPC and HTTP listeners + // All APIs registered, start the IPC listener var ( - ipcListener net.Listener - err error + listener net.Listener + err error ) - if n.ipcEndpoint != "" { - if ipcListener, err = rpc.CreateIPCListener(n.ipcEndpoint); err != nil { - return err - } - go func() { - glog.V(logger.Info).Infof("IPC endpoint opened: %s", n.ipcEndpoint) - defer glog.V(logger.Info).Infof("IPC endpoint closed: %s", n.ipcEndpoint) - - for { - conn, err := ipcListener.Accept() - if err != nil { - // Terminate if the listener was closed - n.lock.RLock() - closed := n.ipcListener == nil - n.lock.RUnlock() - if closed { - return - } - // Not closed, just some error; report and continue - glog.V(logger.Error).Infof("IPC accept failed: %v", err) - continue + if listener, err = rpc.CreateIPCListener(n.ipcEndpoint); err != nil { + return err + } + go func() { + glog.V(logger.Info).Infof("IPC endpoint opened: %s", n.ipcEndpoint) + + for { + conn, err := listener.Accept() + if err != nil { + // Terminate if the listener was closed + n.lock.RLock() + closed := n.ipcListener == nil + n.lock.RUnlock() + if closed { + return } - go ipcHandler.ServeCodec(rpc.NewJSONCodec(conn)) + // Not closed, just some error; report and continue + glog.V(logger.Error).Infof("IPC accept failed: %v", err) + continue + } + go handler.ServeCodec(rpc.NewJSONCodec(conn)) + } + }() + // All listeners booted successfully + n.ipcListener = listener + n.ipcHandler = handler + + return nil +} + +// stopIPC terminates the IPC RPC endpoint. +func (n *Node) stopIPC() { + if n.ipcListener != nil { + n.ipcListener.Close() + n.ipcListener = nil + + glog.V(logger.Info).Infof("IPC endpoint closed: %s", n.ipcEndpoint) + } + if n.ipcHandler != nil { + n.ipcHandler.Stop() + n.ipcHandler = nil + } +} + +// startHTTP initializes and starts the HTTP RPC endpoint. +func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors string) error { + // Short circuit if the HTTP endpoint isn't being exposed + if endpoint == "" { + return nil + } + // Generate the whitelist based on the allowed modules + whitelist := make(map[string]bool) + for _, module := range modules { + whitelist[module] = true + } + // Register all the APIs exposed by the services + handler := rpc.NewServer() + for _, api := range apis { + if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return err + } + glog.V(logger.Debug).Infof("HTTP registered %T under '%s'", api.Service, api.Namespace) + } + } + // All APIs registered, start the HTTP listener + var ( + listener net.Listener + err error + ) + if listener, err = net.Listen("tcp", endpoint); err != nil { + return err + } + go rpc.NewHTTPServer(cors, handler).Serve(listener) + glog.V(logger.Info).Infof("HTTP endpoint opened: http://%s", endpoint) + + // All listeners booted successfully + n.httpEndpoint = endpoint + n.httpListener = listener + n.httpHandler = handler + n.httpCors = cors + + return nil +} + +// stopHTTP terminates the HTTP RPC endpoint. +func (n *Node) stopHTTP() { + if n.httpListener != nil { + n.httpListener.Close() + n.httpListener = nil + + glog.V(logger.Info).Infof("HTTP endpoint closed: http://%s", n.httpEndpoint) + } + if n.httpHandler != nil { + n.httpHandler.Stop() + n.httpHandler = nil + } +} + +// startWS initializes and starts the websocket RPC endpoint. +func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, cors string) error { + // Short circuit if the WS endpoint isn't being exposed + if endpoint == "" { + return nil + } + // Generate the whitelist based on the allowed modules + whitelist := make(map[string]bool) + for _, module := range modules { + whitelist[module] = true + } + // Register all the APIs exposed by the services + handler := rpc.NewServer() + for _, api := range apis { + if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return err } - }() + glog.V(logger.Debug).Infof("WebSocket registered %T under '%s'", api.Service, api.Namespace) + } + } + // All APIs registered, start the HTTP listener + var ( + listener net.Listener + err error + ) + if listener, err = net.Listen("tcp", endpoint); err != nil { + return err } + go rpc.NewWSServer(cors, handler).Serve(listener) + glog.V(logger.Info).Infof("WebSocket endpoint opened: ws://%s", endpoint) + // All listeners booted successfully - n.ipcListener = ipcListener - n.ipcHandler = ipcHandler + n.wsEndpoint = endpoint + n.wsListener = listener + n.wsHandler = handler + n.wsDomains = cors return nil } +// stopWS terminates the websocket RPC endpoint. +func (n *Node) stopWS() { + if n.wsListener != nil { + n.wsListener.Close() + n.wsListener = nil + + glog.V(logger.Info).Infof("WebSocket endpoint closed: ws://%s", n.wsEndpoint) + } + if n.wsHandler != nil { + n.wsHandler.Stop() + n.wsHandler = nil + } +} + // Stop terminates a running node along with all it's services. In the node was // not started, an error is returned. func (n *Node) Stop() error { @@ -251,14 +448,11 @@ func (n *Node) Stop() error { return ErrNodeStopped } // Otherwise terminate the API, all services and the P2P server too - if n.ipcListener != nil { - n.ipcListener.Close() - n.ipcListener = nil - } - if n.ipcHandler != nil { - n.ipcHandler.Stop() - n.ipcHandler = nil - } + n.stopWS() + n.stopHTTP() + n.stopIPC() + n.rpcAPIs = nil + failure := &StopError{ Services: make(map[reflect.Type]error), } @@ -304,6 +498,19 @@ func (n *Node) Restart() error { return nil } +// Attach creates an RPC client attached to an in-process API handler. +func (n *Node) Attach() (rpc.Client, error) { + n.lock.RLock() + defer n.lock.RUnlock() + + // Short circuit if the node's not running + if n.server == nil { + return nil, ErrNodeStopped + } + // Otherwise attach to the API and return + return rpc.NewInProcRPCClient(n.inprocHandler), nil +} + // Server retrieves the currently running P2P network layer. This method is meant // only to inspect fields of the currently running server, life cycle management // should be left to this Node entity. @@ -337,11 +544,21 @@ func (n *Node) DataDir() string { return n.datadir } -// IpcEndpoint retrieves the current IPC endpoint used by the protocol stack. -func (n *Node) IpcEndpoint() string { +// IPCEndpoint retrieves the current IPC endpoint used by the protocol stack. +func (n *Node) IPCEndpoint() string { return n.ipcEndpoint } +// HTTPEndpoint retrieves the current HTTP endpoint used by the protocol stack. +func (n *Node) HTTPEndpoint() string { + return n.httpEndpoint +} + +// WSEndpoint retrieves the current WS endpoint used by the protocol stack. +func (n *Node) WSEndpoint() string { + return n.wsEndpoint +} + // EventMux retrieves the event multiplexer used by all the network services in // the current protocol stack. func (n *Node) EventMux() *event.TypeMux { @@ -377,14 +594,3 @@ func (n *Node) apis() []rpc.API { }, } } - -// APIs returns the collection of RPC descriptor this node offers. This method -// is just a quick placeholder passthrough for the RPC update, which in the next -// step will be fully integrated into the node itself. -func (n *Node) APIs() []rpc.API { - apis := n.apis() - for _, api := range n.services { - apis = append(apis, api.APIs()...) - } - return apis -} diff --git a/node/node_test.go b/node/node_test.go index 53dcbcf74..532115d3c 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -18,9 +18,7 @@ package node import ( "errors" - "fmt" "io/ioutil" - "math/rand" "os" "reflect" "testing" @@ -37,7 +35,6 @@ var ( func testNodeConfig() *Config { return &Config{ - IpcPath: fmt.Sprintf("test-%d.ipc", rand.Int63()), PrivateKey: testNodeKey, Name: "test node", } @@ -541,10 +538,11 @@ func TestAPIGather(t *testing.T) { defer stack.Stop() // Connect to the RPC server and verify the various registered endpoints - ipcClient, err := rpc.NewIPCClient(stack.IpcEndpoint()) + client, err := stack.Attach() if err != nil { - t.Fatalf("failed to connect to the IPC API server: %v", err) + t.Fatalf("failed to connect to the inproc API server: %v", err) } + defer client.Close() tests := []struct { Method string @@ -556,11 +554,11 @@ func TestAPIGather(t *testing.T) { {"multi.v2.nested_theOneMethod", "multi.v2.nested"}, } for i, test := range tests { - if err := ipcClient.Send(rpc.JSONRequest{Id: new(int64), Version: "2.0", Method: test.Method}); err != nil { + if err := client.Send(rpc.JSONRequest{Id: new(int64), Version: "2.0", Method: test.Method}); err != nil { t.Fatalf("test %d: failed to send API request: %v", i, err) } reply := new(rpc.JSONSuccessResponse) - if err := ipcClient.Recv(reply); err != nil { + if err := client.Recv(reply); err != nil { t.Fatalf("test %d: failed to read API reply: %v", i, err) } select { diff --git a/rpc/http.go b/rpc/http.go index c5eb41af1..d9053b003 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -20,7 +20,6 @@ import ( "bufio" "bytes" "encoding/json" - "errors" "fmt" "io" "io/ioutil" @@ -29,7 +28,6 @@ import ( "net/url" "strconv" "strings" - "sync" "time" "github.com/ethereum/go-ethereum/logger" @@ -41,12 +39,6 @@ const ( httpReadDeadLine = 60 * time.Second // wait max httpReadDeadeline for next request ) -var ( - httpServerMu sync.Mutex // prevent concurrent access to the httpListener and httpServer - httpListener net.Listener // listener for the http server - httpRPCServer *Server // the node can only start 1 HTTP RPC server instance -) - // httpMessageStream is the glue between a HTTP connection which is message based // and the RPC codecs that expect json requests to be read from a stream. It will // parse HTTP messages and offer the bodies of these requests as a stream through @@ -249,53 +241,14 @@ func (h *httpConnHijacker) ServeHTTP(w http.ResponseWriter, req *http.Request) { go h.rpcServer.ServeCodec(codec) } -// StartHTTP will start the JSONRPC HTTP RPC interface when its not yet running. -func StartHTTP(address string, port int, corsdomains []string, apis []API) error { - httpServerMu.Lock() - defer httpServerMu.Unlock() - - if httpRPCServer != nil { - return fmt.Errorf("HTTP RPC interface already started on %s", httpListener.Addr()) - } - - rpcServer := NewServer() - - for _, api := range apis { - if err := rpcServer.RegisterName(api.Namespace, api.Service); err != nil { - return err - } - } - - listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", address, port)) - if err != nil { - return err +// NewHTTPServer creates a new HTTP RPC server around an API provider. +func NewHTTPServer(cors string, handler *Server) *http.Server { + return &http.Server{ + Handler: &httpConnHijacker{ + corsdomains: strings.Split(cors, ","), + rpcServer: handler, + }, } - - httpServer := http.Server{Handler: &httpConnHijacker{corsdomains, rpcServer}} - go httpServer.Serve(listener) - - httpListener = listener - httpRPCServer = rpcServer - - return nil -} - -// StopHTTP will stop the running HTTP interface. If it is not running an error will be returned. -func StopHTTP() error { - httpServerMu.Lock() - defer httpServerMu.Unlock() - - if httpRPCServer == nil { - return errors.New("HTTP RPC interface not started") - } - - httpListener.Close() - httpRPCServer.Stop() - - httpRPCServer = nil - httpListener = nil - - return nil } // httpClient connects to a geth RPC server over HTTP. @@ -306,7 +259,7 @@ type httpClient struct { // NewHTTPClient create a new RPC clients that connection to a geth RPC server // over HTTP. -func NewHTTPClient(endpoint string) (*httpClient, error) { +func NewHTTPClient(endpoint string) (Client, error) { url, err := url.Parse(endpoint) if err != nil { return nil, err diff --git a/rpc/inproc.go b/rpc/inproc.go new file mode 100644 index 000000000..e138ba2c3 --- /dev/null +++ b/rpc/inproc.go @@ -0,0 +1,111 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package rpc + +import "encoding/json" + +// NewInProcRPCClient creates an in-process buffer stream attachment to a given +// RPC server. +func NewInProcRPCClient(handler *Server) Client { + buffer := &inprocBuffer{ + requests: make(chan []byte, 16), + responses: make(chan []byte, 16), + } + client := &inProcClient{ + server: handler, + buffer: buffer, + } + go handler.ServeCodec(NewJSONCodec(client.buffer)) + return client +} + +// inProcClient is an in-process buffer stream attached to an RPC server. +type inProcClient struct { + server *Server + buffer *inprocBuffer +} + +// Close tears down the request channel of the in-proc client. +func (c *inProcClient) Close() { + c.buffer.Close() +} + +// Send marshals a message into a json format and injects in into the client +// request channel. +func (c *inProcClient) Send(msg interface{}) error { + d, err := json.Marshal(msg) + if err != nil { + return err + } + c.buffer.requests <- d + return nil +} + +// Recv reads a message from the response channel and tries to parse it into the +// given msg interface. +func (c *inProcClient) Recv(msg interface{}) error { + data := <-c.buffer.responses + return json.Unmarshal(data, &msg) +} + +// Returns the collection of modules the RPC server offers. +func (c *inProcClient) SupportedModules() (map[string]string, error) { + return SupportedModules(c) +} + +// inprocBuffer represents the connection between the RPC server and console +type inprocBuffer struct { + readBuf []byte // store remaining request bytes after a partial read + requests chan []byte // list with raw serialized requests + responses chan []byte // list with raw serialized responses +} + +// Read will read the next request in json format. +func (b *inprocBuffer) Read(p []byte) (int, error) { + // last read didn't read entire request, return remaining bytes + if len(b.readBuf) > 0 { + n := copy(p, b.readBuf) + if n < len(b.readBuf) { + b.readBuf = b.readBuf[:n] + } else { + b.readBuf = b.readBuf[:0] + } + return n, nil + } + // read next request + req := <-b.requests + n := copy(p, req) + if n < len(req) { + // inprocBuffer too small, store remaining chunk for next read + b.readBuf = req[n:] + } + return n, nil +} + +// Write sends the given buffer to the backend. +func (b *inprocBuffer) Write(p []byte) (n int, err error) { + b.responses <- p + return len(p), nil +} + +// Close cleans up obtained resources. +func (b *inprocBuffer) Close() error { + close(b.requests) + close(b.responses) + + return nil +} diff --git a/rpc/ipc.go b/rpc/ipc.go index b87bfcbd7..05d8909ca 100644 --- a/rpc/ipc.go +++ b/rpc/ipc.go @@ -38,7 +38,7 @@ type ipcClient struct { // NewIPCClient create a new IPC client that will connect on the given endpoint. Messages are JSON encoded and encoded. // On Unix it assumes the endpoint is the full path to a unix socket, and Windows the endpoint is an identifier for a // named pipe. -func NewIPCClient(endpoint string) (*ipcClient, error) { +func NewIPCClient(endpoint string) (Client, error) { conn, err := newIPCConnection(endpoint) if err != nil { return nil, err diff --git a/rpc/server.go b/rpc/server.go index 5b88d843a..f42ee2d37 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -33,8 +33,8 @@ import ( const ( stopPendingRequestTimeout = 3 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped - DefaultIpcApis = "admin,eth,debug,miner,net,shh,txpool,personal,web3" - DefaultHttpRpcApis = "eth,net,web3" + DefaultIPCApis = "admin,eth,debug,miner,net,shh,txpool,personal,web3" + DefaultHTTPApis = "eth,net,web3" ) // NewServer will create a new server instance with no registered handlers. diff --git a/rpc/utils.go b/rpc/utils.go index 39acf8196..fa114284d 100644 --- a/rpc/utils.go +++ b/rpc/utils.go @@ -20,13 +20,12 @@ import ( "crypto/rand" "encoding/hex" "errors" + "fmt" "math/big" "reflect" "unicode" "unicode/utf8" - "fmt" - "golang.org/x/net/context" ) diff --git a/rpc/websocket.go b/rpc/websocket.go index b5bcbf4f6..92615494e 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -17,13 +17,11 @@ package rpc import ( - "errors" "fmt" - "net" "net/http" - "sync" - "os" + "strings" + "sync" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" @@ -31,12 +29,6 @@ import ( "gopkg.in/fatih/set.v0" ) -var ( - wsServerMu sync.Mutex - wsRPCServer *Server - wsListener net.Listener -) - // wsReaderWriterCloser reads and write payloads from and to a websocket connection. type wsReaderWriterCloser struct { c *websocket.Conn @@ -57,14 +49,6 @@ func (rw *wsReaderWriterCloser) Close() error { return rw.c.Close() } -// wsHandler accepts a websocket connection and handles incoming RPC requests. -// Will return when the websocket connection is closed, either by the client or -// server. -func wsHandler(conn *websocket.Conn) { - rwc := &wsReaderWriterCloser{conn} - wsRPCServer.ServeCodec(NewJSONCodec(rwc)) -} - // wsHandshakeValidator returns a handler that verifies the origin during the // websocket upgrade process. When a '*' is specified as an allowed origins all // connections are accepted. @@ -103,54 +87,16 @@ func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http return f } -// StartWS will start a websocket RPC server on the given address and port. -func StartWS(address string, port int, corsdomains []string, apis []API) error { - wsServerMu.Lock() - defer wsServerMu.Unlock() - - if wsRPCServer != nil { - return fmt.Errorf("WS RPC interface already started on %s", wsListener.Addr()) - } - - rpcServer := NewServer() - for _, api := range apis { - if err := rpcServer.RegisterName(api.Namespace, api.Service); err != nil { - return err - } +// NewWSServer creates a new websocket RPC server around an API provider. +func NewWSServer(cors string, handler *Server) *http.Server { + return &http.Server{ + Handler: websocket.Server{ + Handshake: wsHandshakeValidator(strings.Split(cors, ",")), + Handler: func(conn *websocket.Conn) { + handler.ServeCodec(NewJSONCodec(&wsReaderWriterCloser{conn})) + }, + }, } - - listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", address, port)) - if err != nil { - return err - } - - wsServer := websocket.Server{Handshake: wsHandshakeValidator(corsdomains), Handler: wsHandler} - wsHTTPServer := http.Server{Handler: wsServer} - - go wsHTTPServer.Serve(listener) - - wsListener = listener - wsRPCServer = rpcServer - - return nil -} - -// StopWS stops the running websocket RPC server. -func StopWS() error { - wsServerMu.Lock() - defer wsServerMu.Unlock() - - if wsRPCServer == nil { - return errors.New("HTTP RPC interface not started") - } - - wsListener.Close() - wsRPCServer.Stop() - - wsRPCServer = nil - wsListener = nil - - return nil } // wsClient represents a RPC client that communicates over websockets with a @@ -163,7 +109,7 @@ type wsClient struct { // NewWSClientj creates a new RPC client that communicates with a RPC server // that is listening on the given endpoint using JSON encoding. -func NewWSClient(endpoint string) (*wsClient, error) { +func NewWSClient(endpoint string) (Client, error) { return &wsClient{endpoint: endpoint}, nil } |