aboutsummaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'rpc')
-rw-r--r--rpc/args.go9
-rw-r--r--rpc/message.go44
-rw-r--r--rpc/packages.go80
3 files changed, 124 insertions, 9 deletions
diff --git a/rpc/args.go b/rpc/args.go
index aaa017c4e..75eef873d 100644
--- a/rpc/args.go
+++ b/rpc/args.go
@@ -251,3 +251,12 @@ func (a *DbArgs) requirements() error {
}
return nil
}
+
+type WhisperMessageArgs struct {
+ Payload string
+ To string
+ From string
+ Topics []string
+ Priority uint32
+ Ttl uint32
+}
diff --git a/rpc/message.go b/rpc/message.go
index 5045adb8f..919302921 100644
--- a/rpc/message.go
+++ b/rpc/message.go
@@ -21,6 +21,8 @@ import (
"encoding/json"
"errors"
"fmt"
+
+ "github.com/ethereum/go-ethereum/xeth"
)
const (
@@ -270,3 +272,45 @@ func (req *RpcRequest) ToDbGetArgs() (*DbArgs, error) {
rpclogger.DebugDetailf("%T %v", args, args)
return &args, nil
}
+
+func (req *RpcRequest) ToWhisperFilterArgs() (*xeth.Options, error) {
+ if len(req.Params) < 1 {
+ return nil, NewErrorResponse(ErrorArguments)
+ }
+
+ var args xeth.Options
+ err := json.Unmarshal(req.Params[0], &args)
+ if err != nil {
+ return nil, NewErrorResponseWithError(ErrorDecodeArgs, err)
+ }
+ rpclogger.DebugDetailf("%T %v", args, args)
+ return &args, nil
+}
+
+func (req *RpcRequest) ToWhisperChangedArgs() (int, error) {
+ if len(req.Params) < 1 {
+ return 0, NewErrorResponse(ErrorArguments)
+ }
+
+ var id int
+ err := json.Unmarshal(req.Params[0], &id)
+ if err != nil {
+ return 0, NewErrorResponse(ErrorDecodeArgs)
+ }
+ rpclogger.DebugDetailf("%T %v", id, id)
+ return id, nil
+}
+
+func (req *RpcRequest) ToWhisperPostArgs() (*WhisperMessageArgs, error) {
+ if len(req.Params) < 1 {
+ return nil, NewErrorResponse(ErrorArguments)
+ }
+
+ var args WhisperMessageArgs
+ err := json.Unmarshal(req.Params[0], &args)
+ if err != nil {
+ return nil, err
+ }
+ rpclogger.DebugDetailf("%T %v", args, args)
+ return &args, nil
+}
diff --git a/rpc/packages.go b/rpc/packages.go
index aa51aad42..8344d6a46 100644
--- a/rpc/packages.go
+++ b/rpc/packages.go
@@ -44,18 +44,22 @@ type EthereumApi struct {
xeth *xeth.XEth
filterManager *filter.FilterManager
- mut sync.RWMutex
- logs map[int]state.Logs
+ logMut sync.RWMutex
+ logs map[int]state.Logs
+
+ messagesMut sync.RWMutex
+ messages map[int][]xeth.WhisperMessage
db ethutil.Database
}
-func NewEthereumApi(xeth *xeth.XEth) *EthereumApi {
+func NewEthereumApi(eth *xeth.XEth) *EthereumApi {
db, _ := ethdb.NewLDBDatabase("dapps")
api := &EthereumApi{
- xeth: xeth,
- filterManager: filter.NewFilterManager(xeth.Backend().EventMux()),
+ xeth: eth,
+ filterManager: filter.NewFilterManager(eth.Backend().EventMux()),
logs: make(map[int]state.Logs),
+ messages: make(map[int][]xeth.WhisperMessage),
db: db,
}
go api.filterManager.Start()
@@ -67,8 +71,8 @@ func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) erro
var id int
filter := core.NewFilter(self.xeth.Backend())
filter.LogsCallback = func(logs state.Logs) {
- self.mut.Lock()
- defer self.mut.Unlock()
+ self.logMut.Lock()
+ defer self.logMut.Unlock()
self.logs[id] = append(self.logs[id], logs...)
}
@@ -79,8 +83,8 @@ func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) erro
}
func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error {
- self.mut.RLock()
- defer self.mut.RUnlock()
+ self.logMut.RLock()
+ defer self.logMut.RUnlock()
*reply = toLogs(self.logs[id])
@@ -257,6 +261,44 @@ func (p *EthereumApi) DbGet(args *DbArgs, reply *interface{}) error {
return nil
}
+func (p *EthereumApi) NewWhisperIdentity(reply *interface{}) error {
+ *reply = p.xeth.Whisper().NewIdentity()
+ return nil
+}
+
+func (p *EthereumApi) NewWhisperFilter(args *xeth.Options, reply *interface{}) error {
+ var id int
+ args.Fn = func(msg xeth.WhisperMessage) {
+ p.messagesMut.Lock()
+ defer p.messagesMut.Unlock()
+ p.messages[id] = append(p.messages[id], msg)
+ }
+ id = p.xeth.Whisper().Watch(args)
+ *reply = id
+ return nil
+}
+
+func (self *EthereumApi) MessagesChanged(id int, reply *interface{}) error {
+ self.messagesMut.RLock()
+ defer self.messagesMut.RUnlock()
+
+ *reply = self.messages[id]
+
+ self.messages[id] = nil // empty the messages
+
+ return nil
+}
+
+func (p *EthereumApi) WhisperPost(args *WhisperMessageArgs, reply *interface{}) error {
+ err := p.xeth.Whisper().Post(args.Payload, args.To, args.From, args.Topics, args.Priority, args.Ttl)
+ if err != nil {
+ return err
+ }
+
+ *reply = true
+ return nil
+}
+
func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error {
// Spec at https://github.com/ethereum/wiki/wiki/Generic-ON-RPC
rpclogger.DebugDetailf("%T %s", req.Params, req.Params)
@@ -354,6 +396,26 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error
return err
}
return p.DbGet(args, reply)
+ case "shh_newIdentity":
+ return p.NewWhisperIdentity(reply)
+ case "shh_newFilter":
+ args, err := req.ToWhisperFilterArgs()
+ if err != nil {
+ return err
+ }
+ return p.NewWhisperFilter(args, reply)
+ case "shh_changed":
+ args, err := req.ToWhisperChangedArgs()
+ if err != nil {
+ return err
+ }
+ return p.MessagesChanged(args, reply)
+ case "shh_post":
+ args, err := req.ToWhisperPostArgs()
+ if err != nil {
+ return nil
+ }
+ return p.WhisperPost(args, reply)
default:
return NewErrorResponse(fmt.Sprintf("%v %s", ErrorNotImplemented, req.Method))
}