diff options
-rw-r--r-- | core/events.go | 3 | ||||
-rw-r--r-- | miner/worker.go | 11 | ||||
-rw-r--r-- | rpc/inproc.go | 88 |
3 files changed, 31 insertions, 71 deletions
diff --git a/core/events.go b/core/events.go index c23206cad..bed8c7b90 100644 --- a/core/events.go +++ b/core/events.go @@ -35,6 +35,9 @@ type PendingLogsEvent struct { Logs vm.Logs } +// PendingStateEvent is posted pre mining and notifies of pending state changes. +type PendingStateEvent struct{} + // NewBlockEvent is posted when a block has been imported. type NewBlockEvent struct{ Block *types.Block } diff --git a/miner/worker.go b/miner/worker.go index 71f22ef1c..f3e95cb5f 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -649,8 +649,15 @@ func (env *Work) commitTransactions(mux *event.TypeMux, transactions types.Trans coalescedLogs = append(coalescedLogs, logs...) } } - if len(coalescedLogs) > 0 { - go mux.Post(core.PendingLogsEvent{Logs: coalescedLogs}) + if len(coalescedLogs) > 0 || env.tcount > 0 { + go func(logs vm.Logs, tcount int) { + if len(logs) > 0 { + mux.Post(core.PendingLogsEvent{Logs: logs}) + } + if tcount > 0 { + mux.Post(core.PendingStateEvent{}) + } + }(coalescedLogs, env.tcount) } } diff --git a/rpc/inproc.go b/rpc/inproc.go index e138ba2c3..3cfbea71c 100644 --- a/rpc/inproc.go +++ b/rpc/inproc.go @@ -16,96 +16,46 @@ package rpc -import "encoding/json" - -// NewInProcRPCClient creates an in-process buffer stream attachment to a given -// RPC server. -func NewInProcRPCClient(handler *Server) Client { - buffer := &inprocBuffer{ - requests: make(chan []byte, 16), - responses: make(chan []byte, 16), - } - client := &inProcClient{ - server: handler, - buffer: buffer, - } - go handler.ServeCodec(NewJSONCodec(client.buffer)) - return client -} +import ( + "encoding/json" + "io" + "net" +) // inProcClient is an in-process buffer stream attached to an RPC server. type inProcClient struct { server *Server - buffer *inprocBuffer + cl io.Closer + enc *json.Encoder + dec *json.Decoder } // Close tears down the request channel of the in-proc client. func (c *inProcClient) Close() { - c.buffer.Close() + c.cl.Close() +} + +// NewInProcRPCClient creates an in-process buffer stream attachment to a given +// RPC server. +func NewInProcRPCClient(handler *Server) Client { + p1, p2 := net.Pipe() + go handler.ServeCodec(NewJSONCodec(p1)) + return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)} } // Send marshals a message into a json format and injects in into the client // request channel. func (c *inProcClient) Send(msg interface{}) error { - d, err := json.Marshal(msg) - if err != nil { - return err - } - c.buffer.requests <- d - return nil + return c.enc.Encode(msg) } // Recv reads a message from the response channel and tries to parse it into the // given msg interface. func (c *inProcClient) Recv(msg interface{}) error { - data := <-c.buffer.responses - return json.Unmarshal(data, &msg) + return c.dec.Decode(msg) } // Returns the collection of modules the RPC server offers. func (c *inProcClient) SupportedModules() (map[string]string, error) { return SupportedModules(c) } - -// inprocBuffer represents the connection between the RPC server and console -type inprocBuffer struct { - readBuf []byte // store remaining request bytes after a partial read - requests chan []byte // list with raw serialized requests - responses chan []byte // list with raw serialized responses -} - -// Read will read the next request in json format. -func (b *inprocBuffer) Read(p []byte) (int, error) { - // last read didn't read entire request, return remaining bytes - if len(b.readBuf) > 0 { - n := copy(p, b.readBuf) - if n < len(b.readBuf) { - b.readBuf = b.readBuf[:n] - } else { - b.readBuf = b.readBuf[:0] - } - return n, nil - } - // read next request - req := <-b.requests - n := copy(p, req) - if n < len(req) { - // inprocBuffer too small, store remaining chunk for next read - b.readBuf = req[n:] - } - return n, nil -} - -// Write sends the given buffer to the backend. -func (b *inprocBuffer) Write(p []byte) (n int, err error) { - b.responses <- p - return len(p), nil -} - -// Close cleans up obtained resources. -func (b *inprocBuffer) Close() error { - close(b.requests) - close(b.responses) - - return nil -} |