aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/events.go3
-rw-r--r--miner/worker.go11
-rw-r--r--rpc/inproc.go88
3 files changed, 31 insertions, 71 deletions
diff --git a/core/events.go b/core/events.go
index c23206cad..bed8c7b90 100644
--- a/core/events.go
+++ b/core/events.go
@@ -35,6 +35,9 @@ type PendingLogsEvent struct {
Logs vm.Logs
}
+// PendingStateEvent is posted pre mining and notifies of pending state changes.
+type PendingStateEvent struct{}
+
// NewBlockEvent is posted when a block has been imported.
type NewBlockEvent struct{ Block *types.Block }
diff --git a/miner/worker.go b/miner/worker.go
index 71f22ef1c..f3e95cb5f 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -649,8 +649,15 @@ func (env *Work) commitTransactions(mux *event.TypeMux, transactions types.Trans
coalescedLogs = append(coalescedLogs, logs...)
}
}
- if len(coalescedLogs) > 0 {
- go mux.Post(core.PendingLogsEvent{Logs: coalescedLogs})
+ if len(coalescedLogs) > 0 || env.tcount > 0 {
+ go func(logs vm.Logs, tcount int) {
+ if len(logs) > 0 {
+ mux.Post(core.PendingLogsEvent{Logs: logs})
+ }
+ if tcount > 0 {
+ mux.Post(core.PendingStateEvent{})
+ }
+ }(coalescedLogs, env.tcount)
}
}
diff --git a/rpc/inproc.go b/rpc/inproc.go
index e138ba2c3..3cfbea71c 100644
--- a/rpc/inproc.go
+++ b/rpc/inproc.go
@@ -16,96 +16,46 @@
package rpc
-import "encoding/json"
-
-// NewInProcRPCClient creates an in-process buffer stream attachment to a given
-// RPC server.
-func NewInProcRPCClient(handler *Server) Client {
- buffer := &inprocBuffer{
- requests: make(chan []byte, 16),
- responses: make(chan []byte, 16),
- }
- client := &inProcClient{
- server: handler,
- buffer: buffer,
- }
- go handler.ServeCodec(NewJSONCodec(client.buffer))
- return client
-}
+import (
+ "encoding/json"
+ "io"
+ "net"
+)
// inProcClient is an in-process buffer stream attached to an RPC server.
type inProcClient struct {
server *Server
- buffer *inprocBuffer
+ cl io.Closer
+ enc *json.Encoder
+ dec *json.Decoder
}
// Close tears down the request channel of the in-proc client.
func (c *inProcClient) Close() {
- c.buffer.Close()
+ c.cl.Close()
+}
+
+// NewInProcRPCClient creates an in-process buffer stream attachment to a given
+// RPC server.
+func NewInProcRPCClient(handler *Server) Client {
+ p1, p2 := net.Pipe()
+ go handler.ServeCodec(NewJSONCodec(p1))
+ return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)}
}
// Send marshals a message into a json format and injects in into the client
// request channel.
func (c *inProcClient) Send(msg interface{}) error {
- d, err := json.Marshal(msg)
- if err != nil {
- return err
- }
- c.buffer.requests <- d
- return nil
+ return c.enc.Encode(msg)
}
// Recv reads a message from the response channel and tries to parse it into the
// given msg interface.
func (c *inProcClient) Recv(msg interface{}) error {
- data := <-c.buffer.responses
- return json.Unmarshal(data, &msg)
+ return c.dec.Decode(msg)
}
// Returns the collection of modules the RPC server offers.
func (c *inProcClient) SupportedModules() (map[string]string, error) {
return SupportedModules(c)
}
-
-// inprocBuffer represents the connection between the RPC server and console
-type inprocBuffer struct {
- readBuf []byte // store remaining request bytes after a partial read
- requests chan []byte // list with raw serialized requests
- responses chan []byte // list with raw serialized responses
-}
-
-// Read will read the next request in json format.
-func (b *inprocBuffer) Read(p []byte) (int, error) {
- // last read didn't read entire request, return remaining bytes
- if len(b.readBuf) > 0 {
- n := copy(p, b.readBuf)
- if n < len(b.readBuf) {
- b.readBuf = b.readBuf[:n]
- } else {
- b.readBuf = b.readBuf[:0]
- }
- return n, nil
- }
- // read next request
- req := <-b.requests
- n := copy(p, req)
- if n < len(req) {
- // inprocBuffer too small, store remaining chunk for next read
- b.readBuf = req[n:]
- }
- return n, nil
-}
-
-// Write sends the given buffer to the backend.
-func (b *inprocBuffer) Write(p []byte) (n int, err error) {
- b.responses <- p
- return len(p), nil
-}
-
-// Close cleans up obtained resources.
-func (b *inprocBuffer) Close() error {
- close(b.requests)
- close(b.responses)
-
- return nil
-}