From 7e160a677d1590f97708a0d297f978a99977d398 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 6 May 2015 17:51:32 +0200 Subject: xeth, core, event/filter, rpc: new block and transaction filters --- xeth/xeth.go | 237 +++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 166 insertions(+), 71 deletions(-) (limited to 'xeth/xeth.go') diff --git a/xeth/xeth.go b/xeth/xeth.go index ac59069d5..a0b936b57 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -29,6 +29,14 @@ var ( defaultGas = big.NewInt(90000) //500000 ) +// byte will be inferred +const ( + UnknownFilterTy = iota + BlockFilterTy + TransactionFilterTy + LogFilterTy +) + func DefaultGas() *big.Int { return new(big.Int).Set(defaultGas) } func DefaultGasPrice() *big.Int { return new(big.Int).Set(defaultGasPrice) } @@ -42,11 +50,17 @@ type XEth struct { quit chan struct{} filterManager *filter.FilterManager - logMut sync.RWMutex - logs map[int]*logFilter + logMu sync.RWMutex + logQueue map[int]*logQueue + + blockMu sync.RWMutex + blockQueue map[int]*hashQueue + + transactionMu sync.RWMutex + transactionQueue map[int]*hashQueue - messagesMut sync.RWMutex - messages map[int]*whisperFilter + messagesMu sync.RWMutex + messages map[int]*whisperFilter // regmut sync.Mutex // register map[string][]*interface{} // TODO improve return type @@ -59,14 +73,16 @@ type XEth struct { // confirms all transactions will be used. func New(eth *eth.Ethereum, frontend Frontend) *XEth { xeth := &XEth{ - backend: eth, - frontend: frontend, - whisper: NewWhisper(eth.Whisper()), - quit: make(chan struct{}), - filterManager: filter.NewFilterManager(eth.EventMux()), - logs: make(map[int]*logFilter), - messages: make(map[int]*whisperFilter), - agent: miner.NewRemoteAgent(), + backend: eth, + frontend: frontend, + whisper: NewWhisper(eth.Whisper()), + quit: make(chan struct{}), + filterManager: filter.NewFilterManager(eth.EventMux()), + logQueue: make(map[int]*logQueue), + blockQueue: make(map[int]*hashQueue), + transactionQueue: make(map[int]*hashQueue), + messages: make(map[int]*whisperFilter), + agent: miner.NewRemoteAgent(), } eth.Miner().Register(xeth.agent) @@ -87,23 +103,41 @@ done: for { select { case <-timer.C: - self.logMut.Lock() - self.messagesMut.Lock() - for id, filter := range self.logs { + self.logMu.Lock() + for id, filter := range self.logQueue { + if time.Since(filter.timeout) > filterTickerTime { + self.filterManager.UninstallFilter(id) + delete(self.logQueue, id) + } + } + self.logMu.Unlock() + + self.blockMu.Lock() + for id, filter := range self.blockQueue { + if time.Since(filter.timeout) > filterTickerTime { + self.filterManager.UninstallFilter(id) + delete(self.blockQueue, id) + } + } + self.blockMu.Unlock() + + self.transactionMu.Lock() + for id, filter := range self.transactionQueue { if time.Since(filter.timeout) > filterTickerTime { self.filterManager.UninstallFilter(id) - delete(self.logs, id) + delete(self.transactionQueue, id) } } + self.transactionMu.Unlock() + self.messagesMu.Lock() for id, filter := range self.messages { if time.Since(filter.activity()) > filterTickerTime { self.Whisper().Unwatch(id) delete(self.messages, id) } } - self.messagesMut.Unlock() - self.logMut.Unlock() + self.messagesMu.Unlock() case <-self.quit: break done } @@ -360,7 +394,32 @@ func (self *XEth) SecretToAddress(key string) string { return common.ToHex(pair.Address()) } -func (self *XEth) RegisterFilter(earliest, latest int64, skip, max int, address []string, topics [][]string) int { +func (self *XEth) UninstallFilter(id int) bool { + defer self.filterManager.UninstallFilter(id) + + if _, ok := self.logQueue[id]; ok { + self.logMu.Lock() + defer self.logMu.Unlock() + delete(self.logQueue, id) + return true + } + if _, ok := self.blockQueue[id]; ok { + self.blockMu.Lock() + defer self.blockMu.Unlock() + delete(self.blockQueue, id) + return true + } + if _, ok := self.transactionQueue[id]; ok { + self.transactionMu.Lock() + defer self.transactionMu.Unlock() + delete(self.transactionQueue, id) + return true + } + + return false +} + +func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address []string, topics [][]string) int { var id int filter := core.NewFilter(self.backend) filter.SetEarliestBlock(earliest) @@ -370,71 +429,90 @@ func (self *XEth) RegisterFilter(earliest, latest int64, skip, max int, address filter.SetAddress(cAddress(address)) filter.SetTopics(cTopics(topics)) filter.LogsCallback = func(logs state.Logs) { - self.logMut.Lock() - defer self.logMut.Unlock() + self.logMu.Lock() + defer self.logMu.Unlock() - self.logs[id].add(logs...) + self.logQueue[id].add(logs...) } id = self.filterManager.InstallFilter(filter) - self.logs[id] = &logFilter{timeout: time.Now()} + self.logQueue[id] = &logQueue{timeout: time.Now()} return id } -func (self *XEth) UninstallFilter(id int) bool { - if _, ok := self.logs[id]; ok { - delete(self.logs, id) - self.filterManager.UninstallFilter(id) - return true - } +func (self *XEth) NewTransactionFilter() int { + var id int + filter := core.NewFilter(self.backend) + filter.TransactionCallback = func(tx *types.Transaction) { + self.transactionMu.Lock() + defer self.transactionMu.Unlock() - return false + self.transactionQueue[id].add(tx.Hash()) + } + id = self.filterManager.InstallFilter(filter) + self.transactionQueue[id] = &hashQueue{timeout: time.Now()} + return id } -func (self *XEth) NewFilterString(word string) int { +func (self *XEth) NewBlockFilter() int { var id int filter := core.NewFilter(self.backend) + filter.BlockCallback = func(block *types.Block, logs state.Logs) { + self.blockMu.Lock() + defer self.blockMu.Unlock() - switch word { - case "pending": - filter.PendingCallback = func(tx *types.Transaction) { - self.logMut.Lock() - defer self.logMut.Unlock() - - self.logs[id].add(&state.Log{}) - } - case "latest": - filter.BlockCallback = func(block *types.Block, logs state.Logs) { - self.logMut.Lock() - defer self.logMut.Unlock() + self.blockQueue[id].add(block.Hash()) + } + id = self.filterManager.InstallFilter(filter) + self.blockQueue[id] = &hashQueue{timeout: time.Now()} + return id +} - for _, log := range logs { - self.logs[id].add(log) - } - self.logs[id].add(&state.Log{}) - } +func (self *XEth) GetFilterType(id int) byte { + if _, ok := self.blockQueue[id]; ok { + return BlockFilterTy + } else if _, ok := self.transactionQueue[id]; ok { + return TransactionFilterTy + } else if _, ok := self.logQueue[id]; ok { + return LogFilterTy } - id = self.filterManager.InstallFilter(filter) - self.logs[id] = &logFilter{timeout: time.Now()} + return UnknownFilterTy +} - return id +func (self *XEth) LogFilterChanged(id int) state.Logs { + self.logMu.Lock() + defer self.logMu.Unlock() + + if self.logQueue[id] != nil { + return self.logQueue[id].get() + } + return nil } -func (self *XEth) FilterChanged(id int) state.Logs { - self.logMut.Lock() - defer self.logMut.Unlock() +func (self *XEth) BlockFilterChanged(id int) []common.Hash { + self.blockMu.Lock() + defer self.blockMu.Unlock() - if self.logs[id] != nil { - return self.logs[id].get() + if self.blockQueue[id] != nil { + return self.blockQueue[id].get() } + return nil +} + +func (self *XEth) TransactionFilterChanged(id int) []common.Hash { + self.blockMu.Lock() + defer self.blockMu.Unlock() + if self.blockQueue[id] != nil { + return self.transactionQueue[id].get() + } return nil } func (self *XEth) Logs(id int) state.Logs { - self.logMut.Lock() - defer self.logMut.Unlock() + self.logMu.Lock() + defer self.logMu.Unlock() filter := self.filterManager.GetFilter(id) if filter != nil { @@ -465,24 +543,24 @@ func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int { // Callback to delegate core whisper messages to this xeth filter callback := func(msg WhisperMessage) { - p.messagesMut.RLock() // Only read lock to the filter pool - defer p.messagesMut.RUnlock() + p.messagesMu.RLock() // Only read lock to the filter pool + defer p.messagesMu.RUnlock() p.messages[id].insert(msg) } // Initialize the core whisper filter and wrap into xeth id = p.Whisper().Watch(to, from, topics, callback) - p.messagesMut.Lock() + p.messagesMu.Lock() p.messages[id] = newWhisperFilter(id, p.Whisper()) - p.messagesMut.Unlock() + p.messagesMu.Unlock() return id } // UninstallWhisperFilter disables and removes an existing filter. func (p *XEth) UninstallWhisperFilter(id int) bool { - p.messagesMut.Lock() - defer p.messagesMut.Unlock() + p.messagesMu.Lock() + defer p.messagesMu.Unlock() if _, ok := p.messages[id]; ok { delete(p.messages, id) @@ -493,8 +571,8 @@ func (p *XEth) UninstallWhisperFilter(id int) bool { // WhisperMessages retrieves all the known messages that match a specific filter. func (self *XEth) WhisperMessages(id int) []WhisperMessage { - self.messagesMut.RLock() - defer self.messagesMut.RUnlock() + self.messagesMu.RLock() + defer self.messagesMu.RUnlock() if self.messages[id] != nil { return self.messages[id].messages() @@ -505,8 +583,8 @@ func (self *XEth) WhisperMessages(id int) []WhisperMessage { // WhisperMessagesChanged retrieves all the new messages matched by a filter // since the last retrieval func (self *XEth) WhisperMessagesChanged(id int) []WhisperMessage { - self.messagesMut.RLock() - defer self.messagesMut.RUnlock() + self.messagesMu.RLock() + defer self.messagesMu.RUnlock() if self.messages[id] != nil { return self.messages[id].retrieve() @@ -767,19 +845,36 @@ func (m callmsg) Gas() *big.Int { return m.gas } func (m callmsg) Value() *big.Int { return m.value } func (m callmsg) Data() []byte { return m.data } -type logFilter struct { +type logQueue struct { logs state.Logs timeout time.Time id int } -func (l *logFilter) add(logs ...*state.Log) { +func (l *logQueue) add(logs ...*state.Log) { l.logs = append(l.logs, logs...) } -func (l *logFilter) get() state.Logs { +func (l *logQueue) get() state.Logs { l.timeout = time.Now() tmp := l.logs l.logs = nil return tmp } + +type hashQueue struct { + hashes []common.Hash + timeout time.Time + id int +} + +func (l *hashQueue) add(hashes ...common.Hash) { + l.hashes = append(l.hashes, hashes...) +} + +func (l *hashQueue) get() []common.Hash { + l.timeout = time.Now() + tmp := l.hashes + l.hashes = nil + return tmp +} -- cgit v1.2.3