From 6c04c19eb4506efa5f6de47561025b3702619f79 Mon Sep 17 00:00:00 2001 From: Taylor Gerring Date: Thu, 19 Mar 2015 22:58:07 -0400 Subject: Reorg filter logic to XEth --- rpc/api.go | 191 +++++++------------------------------------------------- rpc/api_test.go | 64 +++++++++---------- rpc/args.go | 10 +++ 3 files changed, 65 insertions(+), 200 deletions(-) (limited to 'rpc') diff --git a/rpc/api.go b/rpc/api.go index 4f86e703d..85f798c7c 100644 --- a/rpc/api.go +++ b/rpc/api.go @@ -7,7 +7,6 @@ import ( "path" "strings" "sync" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -15,15 +14,13 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/event/filter" "github.com/ethereum/go-ethereum/state" "github.com/ethereum/go-ethereum/xeth" ) var ( - defaultGasPrice = big.NewInt(150000000000) - defaultGas = big.NewInt(500000) - filterTickerTime = 5 * time.Minute + defaultGasPrice = big.NewInt(150000000000) + defaultGas = big.NewInt(500000) ) type EthereumApi struct { @@ -31,17 +28,9 @@ type EthereumApi struct { xethMu sync.RWMutex mux *event.TypeMux - quit chan struct{} - filterManager *filter.FilterManager - - logMut sync.RWMutex - logs map[int]*logFilter - - messagesMut sync.RWMutex - messages map[int]*whisperFilter - // Register keeps a list of accounts and transaction data - regmut sync.Mutex - register map[string][]*NewTxArgs + // // Register keeps a list of accounts and transaction data + // regmut sync.Mutex + // register map[string][]*NewTxArgs db common.Database } @@ -49,16 +38,10 @@ type EthereumApi struct { func NewEthereumApi(eth *xeth.XEth, dataDir string) *EthereumApi { db, _ := ethdb.NewLDBDatabase(path.Join(dataDir, "dapps")) api := &EthereumApi{ - eth: eth, - mux: eth.Backend().EventMux(), - quit: make(chan struct{}), - filterManager: filter.NewFilterManager(eth.Backend().EventMux()), - logs: make(map[int]*logFilter), - messages: make(map[int]*whisperFilter), - db: db, + eth: eth, + mux: eth.Backend().EventMux(), + db: db, } - go api.filterManager.Start() - go api.start() return api } @@ -85,39 +68,6 @@ func (self *EthereumApi) getStateWithNum(num int64) *xeth.State { return self.xethWithStateNum(num).State() } -func (self *EthereumApi) start() { - timer := time.NewTicker(2 * time.Second) -done: - for { - select { - case <-timer.C: - self.logMut.Lock() - self.messagesMut.Lock() - for id, filter := range self.logs { - if time.Since(filter.timeout) > filterTickerTime { - self.filterManager.UninstallFilter(id) - delete(self.logs, id) - } - } - - for id, filter := range self.messages { - if time.Since(filter.timeout) > filterTickerTime { - self.xeth().Whisper().Unwatch(id) - delete(self.messages, id) - } - } - self.messagesMut.Unlock() - self.logMut.Unlock() - case <-self.quit: - break done - } - } -} - -func (self *EthereumApi) stop() { - close(self.quit) -} - // func (self *EthereumApi) Register(args string, reply *interface{}) error { // self.regmut.Lock() // defer self.regmut.Unlock() @@ -149,91 +99,43 @@ func (self *EthereumApi) stop() { // } func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) error { - var id int - filter := core.NewFilter(self.xeth().Backend()) - filter.SetOptions(toFilterOptions(args)) - filter.LogsCallback = func(logs state.Logs) { - self.logMut.Lock() - defer self.logMut.Unlock() - - self.logs[id].add(logs...) - } - id = self.filterManager.InstallFilter(filter) - self.logs[id] = &logFilter{timeout: time.Now()} - + opts := toFilterOptions(args) + id := self.xeth().RegisterFilter(opts) *reply = common.ToHex(big.NewInt(int64(id)).Bytes()) return nil } func (self *EthereumApi) UninstallFilter(id int, reply *interface{}) error { - if _, ok := self.logs[id]; ok { - delete(self.logs, id) - } + *reply = self.xeth().UninstallFilter(id) - self.filterManager.UninstallFilter(id) - *reply = true return nil } func (self *EthereumApi) NewFilterString(args *FilterStringArgs, reply *interface{}) error { - var id int - filter := core.NewFilter(self.xeth().Backend()) - - callback := func(block *types.Block, logs state.Logs) { - self.logMut.Lock() - defer self.logMut.Unlock() - - for _, log := range logs { - self.logs[id].add(log) - } - self.logs[id].add(&state.StateLog{}) - } - - switch args.Word { - case "pending": - filter.PendingCallback = callback - case "latest": - filter.BlockCallback = callback - default: - return NewValidationError("Word", "Must be `latest` or `pending`") + if err := args.requirements(); err != nil { + return err } - id = self.filterManager.InstallFilter(filter) - self.logs[id] = &logFilter{timeout: time.Now()} + id := self.xeth().NewFilterString(args.Word) *reply = common.ToHex(big.NewInt(int64(id)).Bytes()) - return nil } func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error { - self.logMut.Lock() - defer self.logMut.Unlock() - - if self.logs[id] != nil { - *reply = NewLogsRes(self.logs[id].get()) - } - + *reply = NewLogsRes(self.xeth().FilterChanged(id)) return nil } func (self *EthereumApi) Logs(id int, reply *interface{}) error { - self.logMut.Lock() - defer self.logMut.Unlock() - - filter := self.filterManager.GetFilter(id) - if filter != nil { - *reply = NewLogsRes(filter.Find()) - } + *reply = NewLogsRes(self.xeth().Logs(id)) return nil } func (self *EthereumApi) AllLogs(args *FilterOptions, reply *interface{}) error { - filter := core.NewFilter(self.xeth().Backend()) - filter.SetOptions(toFilterOptions(args)) - - *reply = NewLogsRes(filter.Find()) + opts := toFilterOptions(args) + *reply = NewLogsRes(self.xeth().AllLogs(opts)) return nil } @@ -385,36 +287,22 @@ func (p *EthereumApi) NewWhisperIdentity(reply *interface{}) error { // } func (p *EthereumApi) NewWhisperFilter(args *WhisperFilterArgs, reply *interface{}) error { - var id int opts := new(xeth.Options) opts.From = args.From opts.To = args.To opts.Topics = args.Topics - opts.Fn = func(msg xeth.WhisperMessage) { - p.messagesMut.Lock() - defer p.messagesMut.Unlock() - p.messages[id].add(msg) // = append(p.messages[id], msg) - } - id = p.xeth().Whisper().Watch(opts) - p.messages[id] = &whisperFilter{timeout: time.Now()} + id := p.xeth().NewWhisperFilter(opts) *reply = common.ToHex(big.NewInt(int64(id)).Bytes()) return nil } func (p *EthereumApi) UninstallWhisperFilter(id int, reply *interface{}) error { - delete(p.messages, id) - *reply = true + *reply = p.xeth().UninstallWhisperFilter(id) return nil } func (self *EthereumApi) MessagesChanged(id int, reply *interface{}) error { - self.messagesMut.Lock() - defer self.messagesMut.Unlock() - - if self.messages[id] != nil { - *reply = self.messages[id].get() - } - + *reply = self.xeth().MessagesChanged(id) return nil } @@ -835,7 +723,7 @@ func (self *EthereumApi) xeth() *xeth.XEth { return self.eth } -func toFilterOptions(options *FilterOptions) core.FilterOptions { +func toFilterOptions(options *FilterOptions) *core.FilterOptions { var opts core.FilterOptions // Convert optional address slice/string to byte slice @@ -868,38 +756,5 @@ func toFilterOptions(options *FilterOptions) core.FilterOptions { } opts.Topics = topics - return opts -} - -type whisperFilter struct { - messages []xeth.WhisperMessage - timeout time.Time - id int -} - -func (w *whisperFilter) add(msgs ...xeth.WhisperMessage) { - w.messages = append(w.messages, msgs...) -} -func (w *whisperFilter) get() []xeth.WhisperMessage { - w.timeout = time.Now() - tmp := w.messages - w.messages = nil - return tmp -} - -type logFilter struct { - logs state.Logs - timeout time.Time - id int -} - -func (l *logFilter) add(logs ...state.Log) { - l.logs = append(l.logs, logs...) -} - -func (l *logFilter) get() state.Logs { - l.timeout = time.Now() - tmp := l.logs - l.logs = nil - return tmp + return &opts } diff --git a/rpc/api_test.go b/rpc/api_test.go index ec03822c5..727ade007 100644 --- a/rpc/api_test.go +++ b/rpc/api_test.go @@ -2,9 +2,9 @@ package rpc import ( "encoding/json" - "sync" + // "sync" "testing" - "time" + // "time" ) func TestWeb3Sha3(t *testing.T) { @@ -24,33 +24,33 @@ func TestWeb3Sha3(t *testing.T) { } } -func TestFilterClose(t *testing.T) { - t.Skip() - api := &EthereumApi{ - logs: make(map[int]*logFilter), - messages: make(map[int]*whisperFilter), - quit: make(chan struct{}), - } - - filterTickerTime = 1 - api.logs[0] = &logFilter{} - api.messages[0] = &whisperFilter{} - var wg sync.WaitGroup - wg.Add(1) - go api.start() - go func() { - select { - case <-time.After(500 * time.Millisecond): - api.stop() - wg.Done() - } - }() - wg.Wait() - if len(api.logs) != 0 { - t.Error("expected logs to be empty") - } - - if len(api.messages) != 0 { - t.Error("expected messages to be empty") - } -} +// func TestFilterClose(t *testing.T) { +// t.Skip() +// api := &EthereumApi{ +// logs: make(map[int]*logFilter), +// messages: make(map[int]*whisperFilter), +// quit: make(chan struct{}), +// } + +// filterTickerTime = 1 +// api.logs[0] = &logFilter{} +// api.messages[0] = &whisperFilter{} +// var wg sync.WaitGroup +// wg.Add(1) +// go api.start() +// go func() { +// select { +// case <-time.After(500 * time.Millisecond): +// api.stop() +// wg.Done() +// } +// }() +// wg.Wait() +// if len(api.logs) != 0 { +// t.Error("expected logs to be empty") +// } + +// if len(api.messages) != 0 { +// t.Error("expected messages to be empty") +// } +// } diff --git a/rpc/args.go b/rpc/args.go index ab1c40585..bd6be5a0f 100644 --- a/rpc/args.go +++ b/rpc/args.go @@ -609,6 +609,16 @@ func (args *FilterStringArgs) UnmarshalJSON(b []byte) (err error) { return nil } +func (args *FilterStringArgs) requirements() error { + switch args.Word { + case "latest", "pending": + break + default: + return NewValidationError("Word", "Must be `latest` or `pending`") + } + return nil +} + type FilterIdArgs struct { Id int } -- cgit v1.2.3