diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-10-30 00:42:55 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-10-30 00:42:55 +0800 |
commit | fc46cf337af614f4f9c96acd222089652fe7c76e (patch) | |
tree | 672ce52e11b768801f0b33f224424ba5f0fdc465 /rpc/comms | |
parent | fd27f074feecec2f1e4c8041ff04ddac8d0ab6a3 (diff) | |
parent | fbdb44dcc17240a01b45e55d3aa4e4b8db0868cd (diff) | |
download | go-tangerine-fc46cf337af614f4f9c96acd222089652fe7c76e.tar go-tangerine-fc46cf337af614f4f9c96acd222089652fe7c76e.tar.gz go-tangerine-fc46cf337af614f4f9c96acd222089652fe7c76e.tar.bz2 go-tangerine-fc46cf337af614f4f9c96acd222089652fe7c76e.tar.lz go-tangerine-fc46cf337af614f4f9c96acd222089652fe7c76e.tar.xz go-tangerine-fc46cf337af614f4f9c96acd222089652fe7c76e.tar.zst go-tangerine-fc46cf337af614f4f9c96acd222089652fe7c76e.zip |
Merge pull request #1946 from fjl/xeth-oom
Fix for xeth OOM issue
Diffstat (limited to 'rpc/comms')
-rw-r--r-- | rpc/comms/ipc.go | 43 | ||||
-rw-r--r-- | rpc/comms/ipc_unix.go | 40 | ||||
-rw-r--r-- | rpc/comms/ipc_windows.go | 36 |
3 files changed, 49 insertions, 70 deletions
diff --git a/rpc/comms/ipc.go b/rpc/comms/ipc.go index 3de659b65..882d62ab4 100644 --- a/rpc/comms/ipc.go +++ b/rpc/comms/ipc.go @@ -20,13 +20,22 @@ import ( "fmt" "math/rand" "net" + "os" "encoding/json" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/rpc/codec" "github.com/ethereum/go-ethereum/rpc/shared" ) +type Stopper interface { + Stop() +} + +type InitFunc func(conn net.Conn) (Stopper, shared.EthereumApi, error) + type IpcConfig struct { Endpoint string } @@ -90,8 +99,38 @@ func NewIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) { } // Start IPC server -func StartIpc(cfg IpcConfig, codec codec.Codec, initializer func(conn net.Conn) (shared.EthereumApi, error)) error { - return startIpc(cfg, codec, initializer) +func StartIpc(cfg IpcConfig, codec codec.Codec, initializer InitFunc) error { + l, err := ipcListen(cfg) + if err != nil { + return err + } + go ipcLoop(cfg, codec, initializer, l) + return nil +} + +func ipcLoop(cfg IpcConfig, codec codec.Codec, initializer InitFunc, l net.Listener) { + glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint) + defer os.Remove(cfg.Endpoint) + defer l.Close() + for { + conn, err := l.Accept() + if err != nil { + glog.V(logger.Debug).Infof("accept: %v", err) + return + } + id := newIpcConnId() + go func() { + defer conn.Close() + glog.V(logger.Debug).Infof("new connection with id %06d started", id) + stopper, api, err := initializer(conn) + if err != nil { + glog.V(logger.Error).Infof("Unable to initialize IPC connection: %v", err) + return + } + defer stopper.Stop() + handle(id, conn, api, codec) + }() + } } func newIpcConnId() int { diff --git a/rpc/comms/ipc_unix.go b/rpc/comms/ipc_unix.go index d68363a45..4b839572a 100644 --- a/rpc/comms/ipc_unix.go +++ b/rpc/comms/ipc_unix.go @@ -23,8 +23,6 @@ import ( "os" "path/filepath" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/rpc/codec" "github.com/ethereum/go-ethereum/rpc/shared" "github.com/ethereum/go-ethereum/rpc/useragent" @@ -69,44 +67,16 @@ func (self *ipcClient) reconnect() error { return err } -func startIpc(cfg IpcConfig, codec codec.Codec, initializer func(conn net.Conn) (shared.EthereumApi, error)) error { +func ipcListen(cfg IpcConfig) (net.Listener, error) { // Ensure the IPC path exists and remove any previous leftover if err := os.MkdirAll(filepath.Dir(cfg.Endpoint), 0751); err != nil { - return err + return nil, err } os.Remove(cfg.Endpoint) - - l, err := net.ListenUnix("unix", &net.UnixAddr{Name: cfg.Endpoint, Net: "unix"}) + l, err := net.Listen("unix", cfg.Endpoint) if err != nil { - return err + return nil, err } os.Chmod(cfg.Endpoint, 0600) - - go func() { - for { - conn, err := l.AcceptUnix() - if err != nil { - glog.V(logger.Error).Infof("Error accepting ipc connection - %v\n", err) - continue - } - - id := newIpcConnId() - glog.V(logger.Debug).Infof("New IPC connection with id %06d started\n", id) - - api, err := initializer(conn) - if err != nil { - glog.V(logger.Error).Infof("Unable to initialize IPC connection - %v\n", err) - conn.Close() - continue - } - - go handle(id, conn, api, codec) - } - - os.Remove(cfg.Endpoint) - }() - - glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint) - - return nil + return l, nil } diff --git a/rpc/comms/ipc_windows.go b/rpc/comms/ipc_windows.go index 47edd9e5b..e25fba253 100644 --- a/rpc/comms/ipc_windows.go +++ b/rpc/comms/ipc_windows.go @@ -28,8 +28,6 @@ import ( "time" "unsafe" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/rpc/codec" "github.com/ethereum/go-ethereum/rpc/shared" "github.com/ethereum/go-ethereum/rpc/useragent" @@ -688,40 +686,12 @@ func (self *ipcClient) reconnect() error { return err } -func startIpc(cfg IpcConfig, codec codec.Codec, initializer func(conn net.Conn) (shared.EthereumApi, error)) error { +func ipcListen(cfg IpcConfig) (net.Listener, error) { os.Remove(cfg.Endpoint) // in case it still exists from a previous run - l, err := Listen(cfg.Endpoint) if err != nil { - return err + return nil, err } os.Chmod(cfg.Endpoint, 0600) - - go func() { - for { - conn, err := l.Accept() - if err != nil { - glog.V(logger.Error).Infof("Error accepting ipc connection - %v\n", err) - continue - } - - id := newIpcConnId() - glog.V(logger.Debug).Infof("New IPC connection with id %06d started\n", id) - - api, err := initializer(conn) - if err != nil { - glog.V(logger.Error).Infof("Unable to initialize IPC connection - %v\n", err) - conn.Close() - continue - } - - go handle(id, conn, api, codec) - } - - os.Remove(cfg.Endpoint) - }() - - glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint) - - return nil + return l, nil } |