From fbdb44dcc17240a01b45e55d3aa4e4b8db0868cd Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 29 Oct 2015 13:28:00 +0100 Subject: cmd/utils, rpc/comms: stop XEth when IPC connection ends There are a bunch of changes required to make this work: - in miner: allow unregistering agents, fix RemoteAgent.Stop - in eth/filters: make FilterSystem.Stop not crash - in rpc/comms: move listen loop to platform-independent code Fixes #1930. I ran the shell loop there for a few minutes and didn't see any changes in the memory profile. --- cmd/utils/flags.go | 11 ++---- eth/filters/filter_system.go | 88 ++++++++++++++++++-------------------------- miner/miner.go | 7 +++- miner/remote_agent.go | 15 +++++--- miner/worker.go | 30 ++++++++------- rpc/comms/ipc.go | 43 +++++++++++++++++++++- rpc/comms/ipc_unix.go | 40 +++----------------- rpc/comms/ipc_windows.go | 36 ++---------------- xeth/xeth.go | 14 +++---- 9 files changed, 127 insertions(+), 157 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index c2b92be46..299ab4abb 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -627,17 +627,14 @@ func StartIPC(eth *eth.Ethereum, ctx *cli.Context) error { Endpoint: IpcSocketPath(ctx), } - initializer := func(conn net.Conn) (shared.EthereumApi, error) { + initializer := func(conn net.Conn) (comms.Stopper, shared.EthereumApi, error) { fe := useragent.NewRemoteFrontend(conn, eth.AccountManager()) xeth := xeth.New(eth, fe) - codec := codec.JSON - - apis, err := api.ParseApiString(ctx.GlobalString(IPCApiFlag.Name), codec, xeth, eth) + apis, err := api.ParseApiString(ctx.GlobalString(IPCApiFlag.Name), codec.JSON, xeth, eth) if err != nil { - return nil, err + return nil, nil, err } - - return api.Merge(apis...), nil + return xeth, api.Merge(apis...), nil } return comms.StartIpc(config, codec.JSON, initializer) diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index ae6093525..df3ce90c6 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -31,30 +31,32 @@ import ( // block, transaction and log events. The Filtering system can be used to listen // for specific LOG events fired by the EVM (Ethereum Virtual Machine). type FilterSystem struct { - eventMux *event.TypeMux - filterMu sync.RWMutex filterId int filters map[int]*Filter created map[int]time.Time - - quit chan struct{} + sub event.Subscription } // NewFilterSystem returns a newly allocated filter manager func NewFilterSystem(mux *event.TypeMux) *FilterSystem { fs := &FilterSystem{ - eventMux: mux, - filters: make(map[int]*Filter), - created: make(map[int]time.Time), + filters: make(map[int]*Filter), + created: make(map[int]time.Time), } + fs.sub = mux.Subscribe( + //core.PendingBlockEvent{}, + core.ChainEvent{}, + core.TxPreEvent{}, + vm.Logs(nil), + ) go fs.filterLoop() return fs } // Stop quits the filter loop required for polling events func (fs *FilterSystem) Stop() { - close(fs.quit) + fs.sub.Unsubscribe() } // Add adds a filter to the filter manager @@ -89,57 +91,37 @@ func (fs *FilterSystem) Get(id int) *Filter { // filterLoop waits for specific events from ethereum and fires their handlers // when the filter matches the requirements. func (fs *FilterSystem) filterLoop() { - // Subscribe to events - eventCh := fs.eventMux.Subscribe( - //core.PendingBlockEvent{}, - core.ChainEvent{}, - core.TxPreEvent{}, - vm.Logs(nil), - ).Chan() - -out: - for { - select { - case <-fs.quit: - break out - case event, ok := <-eventCh: - if !ok { - // Event subscription closed, set the channel to nil to stop spinning - eventCh = nil - continue - } - // A real event arrived, notify the registered filters - switch ev := event.Data.(type) { - case core.ChainEvent: - fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.BlockCallback != nil && fs.created[id].Before(event.Time) { - filter.BlockCallback(ev.Block, ev.Logs) - } + for event := range fs.sub.Chan() { + switch ev := event.Data.(type) { + case core.ChainEvent: + fs.filterMu.RLock() + for id, filter := range fs.filters { + if filter.BlockCallback != nil && fs.created[id].Before(event.Time) { + filter.BlockCallback(ev.Block, ev.Logs) } - fs.filterMu.RUnlock() + } + fs.filterMu.RUnlock() - case core.TxPreEvent: - fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) { - filter.TransactionCallback(ev.Tx) - } + case core.TxPreEvent: + fs.filterMu.RLock() + for id, filter := range fs.filters { + if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) { + filter.TransactionCallback(ev.Tx) } - fs.filterMu.RUnlock() - - case vm.Logs: - fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.LogsCallback != nil && fs.created[id].Before(event.Time) { - msgs := filter.FilterLogs(ev) - if len(msgs) > 0 { - filter.LogsCallback(msgs) - } + } + fs.filterMu.RUnlock() + + case vm.Logs: + fs.filterMu.RLock() + for id, filter := range fs.filters { + if filter.LogsCallback != nil && fs.created[id].Before(event.Time) { + msgs := filter.FilterLogs(ev) + if len(msgs) > 0 { + filter.LogsCallback(msgs) } } - fs.filterMu.RUnlock() } + fs.filterMu.RUnlock() } } } diff --git a/miner/miner.go b/miner/miner.go index 769db79d1..6d4a84f1a 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -133,10 +133,13 @@ func (self *Miner) Register(agent Agent) { if self.Mining() { agent.Start() } - self.worker.register(agent) } +func (self *Miner) Unregister(agent Agent) { + self.worker.unregister(agent) +} + func (self *Miner) Mining() bool { return atomic.LoadInt32(&self.mining) > 0 } @@ -146,7 +149,7 @@ func (self *Miner) HashRate() (tot int64) { // do we care this might race? is it worth we're rewriting some // aspects of the worker/locking up agents so we can get an accurate // hashrate? - for _, agent := range self.worker.agents { + for agent := range self.worker.agents { tot += agent.GetHashRate() } return diff --git a/miner/remote_agent.go b/miner/remote_agent.go index 9e4453ce8..18ddf121c 100644 --- a/miner/remote_agent.go +++ b/miner/remote_agent.go @@ -48,9 +48,10 @@ type RemoteAgent struct { } func NewRemoteAgent() *RemoteAgent { - agent := &RemoteAgent{work: make(map[common.Hash]*Work), hashrate: make(map[common.Hash]hashrate)} - - return agent + return &RemoteAgent{ + work: make(map[common.Hash]*Work), + hashrate: make(map[common.Hash]hashrate), + } } func (a *RemoteAgent) SubmitHashrate(id common.Hash, rate uint64) { @@ -75,8 +76,12 @@ func (a *RemoteAgent) Start() { } func (a *RemoteAgent) Stop() { - close(a.quit) - close(a.workCh) + if a.quit != nil { + close(a.quit) + } + if a.workCh != nil { + close(a.workCh) + } } // GetHashRate returns the accumulated hashrate of all identifier combined diff --git a/miner/worker.go b/miner/worker.go index 3519e1506..2d072ef60 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -92,7 +92,7 @@ type Result struct { type worker struct { mu sync.Mutex - agents []Agent + agents map[Agent]struct{} recv chan *Result mux *event.TypeMux quit chan struct{} @@ -136,6 +136,7 @@ func newWorker(coinbase common.Address, eth core.Backend) *worker { coinbase: coinbase, txQueue: make(map[common.Hash]*types.Transaction), quit: make(chan struct{}), + agents: make(map[Agent]struct{}), fullValidation: false, } go worker.update() @@ -180,7 +181,7 @@ func (self *worker) start() { atomic.StoreInt32(&self.mining, 1) // spin up agents - for _, agent := range self.agents { + for agent := range self.agents { agent.Start() } } @@ -190,16 +191,14 @@ func (self *worker) stop() { defer self.mu.Unlock() if atomic.LoadInt32(&self.mining) == 1 { - var keep []Agent - // stop all agents - for _, agent := range self.agents { + // Stop all agents. + for agent := range self.agents { agent.Stop() - // keep all that's not a cpu agent - if _, ok := agent.(*CpuAgent); !ok { - keep = append(keep, agent) + // Remove CPU agents. + if _, ok := agent.(*CpuAgent); ok { + delete(self.agents, agent) } } - self.agents = keep } atomic.StoreInt32(&self.mining, 0) @@ -209,10 +208,17 @@ func (self *worker) stop() { func (self *worker) register(agent Agent) { self.mu.Lock() defer self.mu.Unlock() - self.agents = append(self.agents, agent) + self.agents[agent] = struct{}{} agent.SetReturnCh(self.recv) } +func (self *worker) unregister(agent Agent) { + self.mu.Lock() + defer self.mu.Unlock() + delete(self.agents, agent) + agent.Stop() +} + func (self *worker) update() { eventSub := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{}) defer eventSub.Unsubscribe() @@ -341,11 +347,9 @@ func (self *worker) push(work *Work) { glog.Infoln("You turn back and abort mining") return } - // push new work to agents - for _, agent := range self.agents { + for agent := range self.agents { atomic.AddInt32(&self.atWork, 1) - if agent.Work() != nil { agent.Work() <- work } diff --git a/rpc/comms/ipc.go b/rpc/comms/ipc.go index 3de659b65..882d62ab4 100644 --- a/rpc/comms/ipc.go +++ b/rpc/comms/ipc.go @@ -20,13 +20,22 @@ import ( "fmt" "math/rand" "net" + "os" "encoding/json" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/rpc/codec" "github.com/ethereum/go-ethereum/rpc/shared" ) +type Stopper interface { + Stop() +} + +type InitFunc func(conn net.Conn) (Stopper, shared.EthereumApi, error) + type IpcConfig struct { Endpoint string } @@ -90,8 +99,38 @@ func NewIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) { } // Start IPC server -func StartIpc(cfg IpcConfig, codec codec.Codec, initializer func(conn net.Conn) (shared.EthereumApi, error)) error { - return startIpc(cfg, codec, initializer) +func StartIpc(cfg IpcConfig, codec codec.Codec, initializer InitFunc) error { + l, err := ipcListen(cfg) + if err != nil { + return err + } + go ipcLoop(cfg, codec, initializer, l) + return nil +} + +func ipcLoop(cfg IpcConfig, codec codec.Codec, initializer InitFunc, l net.Listener) { + glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint) + defer os.Remove(cfg.Endpoint) + defer l.Close() + for { + conn, err := l.Accept() + if err != nil { + glog.V(logger.Debug).Infof("accept: %v", err) + return + } + id := newIpcConnId() + go func() { + defer conn.Close() + glog.V(logger.Debug).Infof("new connection with id %06d started", id) + stopper, api, err := initializer(conn) + if err != nil { + glog.V(logger.Error).Infof("Unable to initialize IPC connection: %v", err) + return + } + defer stopper.Stop() + handle(id, conn, api, codec) + }() + } } func newIpcConnId() int { diff --git a/rpc/comms/ipc_unix.go b/rpc/comms/ipc_unix.go index d68363a45..4b839572a 100644 --- a/rpc/comms/ipc_unix.go +++ b/rpc/comms/ipc_unix.go @@ -23,8 +23,6 @@ import ( "os" "path/filepath" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/rpc/codec" "github.com/ethereum/go-ethereum/rpc/shared" "github.com/ethereum/go-ethereum/rpc/useragent" @@ -69,44 +67,16 @@ func (self *ipcClient) reconnect() error { return err } -func startIpc(cfg IpcConfig, codec codec.Codec, initializer func(conn net.Conn) (shared.EthereumApi, error)) error { +func ipcListen(cfg IpcConfig) (net.Listener, error) { // Ensure the IPC path exists and remove any previous leftover if err := os.MkdirAll(filepath.Dir(cfg.Endpoint), 0751); err != nil { - return err + return nil, err } os.Remove(cfg.Endpoint) - - l, err := net.ListenUnix("unix", &net.UnixAddr{Name: cfg.Endpoint, Net: "unix"}) + l, err := net.Listen("unix", cfg.Endpoint) if err != nil { - return err + return nil, err } os.Chmod(cfg.Endpoint, 0600) - - go func() { - for { - conn, err := l.AcceptUnix() - if err != nil { - glog.V(logger.Error).Infof("Error accepting ipc connection - %v\n", err) - continue - } - - id := newIpcConnId() - glog.V(logger.Debug).Infof("New IPC connection with id %06d started\n", id) - - api, err := initializer(conn) - if err != nil { - glog.V(logger.Error).Infof("Unable to initialize IPC connection - %v\n", err) - conn.Close() - continue - } - - go handle(id, conn, api, codec) - } - - os.Remove(cfg.Endpoint) - }() - - glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint) - - return nil + return l, nil } diff --git a/rpc/comms/ipc_windows.go b/rpc/comms/ipc_windows.go index 47edd9e5b..e25fba253 100644 --- a/rpc/comms/ipc_windows.go +++ b/rpc/comms/ipc_windows.go @@ -28,8 +28,6 @@ import ( "time" "unsafe" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/rpc/codec" "github.com/ethereum/go-ethereum/rpc/shared" "github.com/ethereum/go-ethereum/rpc/useragent" @@ -688,40 +686,12 @@ func (self *ipcClient) reconnect() error { return err } -func startIpc(cfg IpcConfig, codec codec.Codec, initializer func(conn net.Conn) (shared.EthereumApi, error)) error { +func ipcListen(cfg IpcConfig) (net.Listener, error) { os.Remove(cfg.Endpoint) // in case it still exists from a previous run - l, err := Listen(cfg.Endpoint) if err != nil { - return err + return nil, err } os.Chmod(cfg.Endpoint, 0600) - - go func() { - for { - conn, err := l.Accept() - if err != nil { - glog.V(logger.Error).Infof("Error accepting ipc connection - %v\n", err) - continue - } - - id := newIpcConnId() - glog.V(logger.Debug).Infof("New IPC connection with id %06d started\n", id) - - api, err := initializer(conn) - if err != nil { - glog.V(logger.Error).Infof("Unable to initialize IPC connection - %v\n", err) - conn.Close() - continue - } - - go handle(id, conn, api, codec) - } - - os.Remove(cfg.Endpoint) - }() - - glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint) - - return nil + return l, nil } diff --git a/xeth/xeth.go b/xeth/xeth.go index f1e8cc5ee..35e6dd52d 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -113,19 +113,15 @@ func New(ethereum *eth.Ethereum, frontend Frontend) *XEth { if frontend == nil { xeth.frontend = dummyFrontend{} } - state, err := xeth.backend.BlockChain().State() - if err != nil { - return nil - } + state, _ := xeth.backend.BlockChain().State() xeth.state = NewState(xeth, state) - go xeth.start() - return xeth } func (self *XEth) start() { timer := time.NewTicker(2 * time.Second) + defer timer.Stop() done: for { select { @@ -171,8 +167,12 @@ done: } } -func (self *XEth) stop() { +// Stop releases any resources associated with self. +// It may not be called more than once. +func (self *XEth) Stop() { close(self.quit) + self.filterManager.Stop() + self.backend.Miner().Unregister(self.agent) } func cAddress(a []string) []common.Address { -- cgit v1.2.3