diff options
Diffstat (limited to 'rpc/packages.go')
-rw-r--r-- | rpc/packages.go | 80 |
1 files changed, 71 insertions, 9 deletions
diff --git a/rpc/packages.go b/rpc/packages.go index aa51aad42..8344d6a46 100644 --- a/rpc/packages.go +++ b/rpc/packages.go @@ -44,18 +44,22 @@ type EthereumApi struct { xeth *xeth.XEth filterManager *filter.FilterManager - mut sync.RWMutex - logs map[int]state.Logs + logMut sync.RWMutex + logs map[int]state.Logs + + messagesMut sync.RWMutex + messages map[int][]xeth.WhisperMessage db ethutil.Database } -func NewEthereumApi(xeth *xeth.XEth) *EthereumApi { +func NewEthereumApi(eth *xeth.XEth) *EthereumApi { db, _ := ethdb.NewLDBDatabase("dapps") api := &EthereumApi{ - xeth: xeth, - filterManager: filter.NewFilterManager(xeth.Backend().EventMux()), + xeth: eth, + filterManager: filter.NewFilterManager(eth.Backend().EventMux()), logs: make(map[int]state.Logs), + messages: make(map[int][]xeth.WhisperMessage), db: db, } go api.filterManager.Start() @@ -67,8 +71,8 @@ func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) erro var id int filter := core.NewFilter(self.xeth.Backend()) filter.LogsCallback = func(logs state.Logs) { - self.mut.Lock() - defer self.mut.Unlock() + self.logMut.Lock() + defer self.logMut.Unlock() self.logs[id] = append(self.logs[id], logs...) } @@ -79,8 +83,8 @@ func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) erro } func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error { - self.mut.RLock() - defer self.mut.RUnlock() + self.logMut.RLock() + defer self.logMut.RUnlock() *reply = toLogs(self.logs[id]) @@ -257,6 +261,44 @@ func (p *EthereumApi) DbGet(args *DbArgs, reply *interface{}) error { return nil } +func (p *EthereumApi) NewWhisperIdentity(reply *interface{}) error { + *reply = p.xeth.Whisper().NewIdentity() + return nil +} + +func (p *EthereumApi) NewWhisperFilter(args *xeth.Options, reply *interface{}) error { + var id int + args.Fn = func(msg xeth.WhisperMessage) { + p.messagesMut.Lock() + defer p.messagesMut.Unlock() + p.messages[id] = append(p.messages[id], msg) + } + id = p.xeth.Whisper().Watch(args) + *reply = id + return nil +} + +func (self *EthereumApi) MessagesChanged(id int, reply *interface{}) error { + self.messagesMut.RLock() + defer self.messagesMut.RUnlock() + + *reply = self.messages[id] + + self.messages[id] = nil // empty the messages + + return nil +} + +func (p *EthereumApi) WhisperPost(args *WhisperMessageArgs, reply *interface{}) error { + err := p.xeth.Whisper().Post(args.Payload, args.To, args.From, args.Topics, args.Priority, args.Ttl) + if err != nil { + return err + } + + *reply = true + return nil +} + func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error { // Spec at https://github.com/ethereum/wiki/wiki/Generic-ON-RPC rpclogger.DebugDetailf("%T %s", req.Params, req.Params) @@ -354,6 +396,26 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error return err } return p.DbGet(args, reply) + case "shh_newIdentity": + return p.NewWhisperIdentity(reply) + case "shh_newFilter": + args, err := req.ToWhisperFilterArgs() + if err != nil { + return err + } + return p.NewWhisperFilter(args, reply) + case "shh_changed": + args, err := req.ToWhisperChangedArgs() + if err != nil { + return err + } + return p.MessagesChanged(args, reply) + case "shh_post": + args, err := req.ToWhisperPostArgs() + if err != nil { + return nil + } + return p.WhisperPost(args, reply) default: return NewErrorResponse(fmt.Sprintf("%v %s", ErrorNotImplemented, req.Method)) } |