aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJeffrey Wilcke <geffobscura@gmail.com>2016-01-05 21:55:28 +0800
committerJeffrey Wilcke <geffobscura@gmail.com>2016-02-05 23:55:27 +0800
commit68dda3490585dd789fced5418507f0fda82bbf19 (patch)
treefe169836f28fe6b3b74018a9802223c1c2444299
parenta50bccc642d079899feb8cf5781331bb10174a77 (diff)
downloaddexon-68dda3490585dd789fced5418507f0fda82bbf19.tar
dexon-68dda3490585dd789fced5418507f0fda82bbf19.tar.gz
dexon-68dda3490585dd789fced5418507f0fda82bbf19.tar.bz2
dexon-68dda3490585dd789fced5418507f0fda82bbf19.tar.lz
dexon-68dda3490585dd789fced5418507f0fda82bbf19.tar.xz
dexon-68dda3490585dd789fced5418507f0fda82bbf19.tar.zst
dexon-68dda3490585dd789fced5418507f0fda82bbf19.zip
eth/filters: added notifications for out of bound log events
Out of Bound log events are events that were removed due to a fork. When logs are received the filtering mechanism should check for the `removed` field on the json structure.
-rw-r--r--eth/filters/api.go46
-rw-r--r--eth/filters/filter.go2
-rw-r--r--eth/filters/filter_system.go23
-rw-r--r--eth/filters/filter_system_test.go87
4 files changed, 132 insertions, 26 deletions
diff --git a/eth/filters/api.go b/eth/filters/api.go
index aa4c305a6..148daa649 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -206,12 +206,12 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
filter.SetEndBlock(latest)
filter.SetAddresses(addresses)
filter.SetTopics(topics)
- filter.LogsCallback = func(logs vm.Logs) {
+ filter.LogCallback = func(log *vm.Log, removed bool) {
s.logMu.Lock()
defer s.logMu.Unlock()
if queue := s.logQueue[id]; queue != nil {
- queue.add(logs...)
+ queue.add(vmlog{log, removed})
}
}
@@ -365,14 +365,14 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
}
// GetLogs returns the logs matching the given argument.
-func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) vm.Logs {
+func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog {
filter := New(s.chainDb)
filter.SetBeginBlock(args.FromBlock.Int64())
filter.SetEndBlock(args.ToBlock.Int64())
filter.SetAddresses(args.Addresses)
filter.SetTopics(args.Topics)
- return returnLogs(filter.Find())
+ return toRPCLogs(filter.Find(), false)
}
// UninstallFilter removes the filter with the given filter id.
@@ -447,7 +447,7 @@ func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash {
}
// logFilterChanged returns a collection of logs for the log filter with the given id.
-func (s *PublicFilterAPI) logFilterChanged(id int) vm.Logs {
+func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
s.logMu.Lock()
defer s.logMu.Unlock()
@@ -458,17 +458,17 @@ func (s *PublicFilterAPI) logFilterChanged(id int) vm.Logs {
}
// GetFilterLogs returns the logs for the filter with the given id.
-func (s *PublicFilterAPI) GetFilterLogs(filterId string) vm.Logs {
+func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
id, ok := s.filterMapping[filterId]
if !ok {
- return returnLogs(nil)
+ return toRPCLogs(nil, false)
}
if filter := s.filterManager.Get(id); filter != nil {
- return returnLogs(filter.Find())
+ return toRPCLogs(filter.Find(), false)
}
- return returnLogs(nil)
+ return toRPCLogs(nil, false)
}
// GetFilterChanges returns the logs for the filter with the given id since last time is was called.
@@ -488,28 +488,33 @@ func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
case transactionFilterTy:
return returnHashes(s.transactionFilterChanged(id))
case logFilterTy:
- return returnLogs(s.logFilterChanged(id))
+ return s.logFilterChanged(id)
}
return []interface{}{}
}
+type vmlog struct {
+ *vm.Log
+ Removed bool `json:"removed"`
+}
+
type logQueue struct {
mu sync.Mutex
- logs vm.Logs
+ logs []vmlog
timeout time.Time
id int
}
-func (l *logQueue) add(logs ...*vm.Log) {
+func (l *logQueue) add(logs ...vmlog) {
l.mu.Lock()
defer l.mu.Unlock()
l.logs = append(l.logs, logs...)
}
-func (l *logQueue) get() vm.Logs {
+func (l *logQueue) get() []vmlog {
l.mu.Lock()
defer l.mu.Unlock()
@@ -556,13 +561,16 @@ func newFilterId() (string, error) {
return "0x" + hex.EncodeToString(subid[:]), nil
}
-// returnLogs is a helper that will return an empty logs array case the given logs is nil, otherwise is will return the
-// given logs. The RPC interfaces defines that always an array is returned.
-func returnLogs(logs vm.Logs) vm.Logs {
- if logs == nil {
- return vm.Logs{}
+// toRPCLogs is a helper that will convert a vm.Logs array to an structure which
+// can hold additional information about the logs such as whether it was deleted.
+// Additionally when nil is given it will by default instead create an empty slice
+// instead. This is required by the RPC specification.
+func toRPCLogs(logs vm.Logs, removed bool) []vmlog {
+ convertedLogs := make([]vmlog, len(logs))
+ for i, log := range logs {
+ convertedLogs[i] = vmlog{Log: log, Removed: removed}
}
- return logs
+ return convertedLogs
}
// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index ff192cdf6..2c92d20b1 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -39,7 +39,7 @@ type Filter struct {
BlockCallback func(*types.Block, vm.Logs)
TransactionCallback func(*types.Transaction)
- LogsCallback func(vm.Logs)
+ LogCallback func(*vm.Log, bool)
}
// Create a new filter which uses a bloom filter on blocks to figure out whether a particular block
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index df3ce90c6..04e58a08c 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -46,6 +46,7 @@ func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
}
fs.sub = mux.Subscribe(
//core.PendingBlockEvent{},
+ core.RemovedLogEvent{},
core.ChainEvent{},
core.TxPreEvent{},
vm.Logs(nil),
@@ -96,7 +97,7 @@ func (fs *FilterSystem) filterLoop() {
case core.ChainEvent:
fs.filterMu.RLock()
for id, filter := range fs.filters {
- if filter.BlockCallback != nil && fs.created[id].Before(event.Time) {
+ if filter.BlockCallback != nil && !fs.created[id].After(event.Time) {
filter.BlockCallback(ev.Block, ev.Logs)
}
}
@@ -105,7 +106,7 @@ func (fs *FilterSystem) filterLoop() {
case core.TxPreEvent:
fs.filterMu.RLock()
for id, filter := range fs.filters {
- if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) {
+ if filter.TransactionCallback != nil && !fs.created[id].After(event.Time) {
filter.TransactionCallback(ev.Tx)
}
}
@@ -114,10 +115,20 @@ func (fs *FilterSystem) filterLoop() {
case vm.Logs:
fs.filterMu.RLock()
for id, filter := range fs.filters {
- if filter.LogsCallback != nil && fs.created[id].Before(event.Time) {
- msgs := filter.FilterLogs(ev)
- if len(msgs) > 0 {
- filter.LogsCallback(msgs)
+ if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
+ for _, log := range filter.FilterLogs(ev) {
+ filter.LogCallback(log, false)
+ }
+ }
+ }
+ fs.filterMu.RUnlock()
+
+ case core.RemovedLogEvent:
+ fs.filterMu.RLock()
+ for id, filter := range fs.filters {
+ if filter.LogCallback != nil && !fs.created[id].After(event.Time) {
+ for _, removedLog := range ev.Logs {
+ filter.LogCallback(removedLog, true)
}
}
}
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
new file mode 100644
index 000000000..7ddeb02bc
--- /dev/null
+++ b/eth/filters/filter_system_test.go
@@ -0,0 +1,87 @@
+package filters
+
+import (
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/core/vm"
+ "github.com/ethereum/go-ethereum/event"
+)
+
+func TestCallbacks(t *testing.T) {
+ var (
+ mux event.TypeMux
+ fs = NewFilterSystem(&mux)
+ blockDone = make(chan struct{})
+ txDone = make(chan struct{})
+ logDone = make(chan struct{})
+ removedLogDone = make(chan struct{})
+ )
+
+ blockFilter := &Filter{
+ BlockCallback: func(*types.Block, vm.Logs) {
+ close(blockDone)
+ },
+ }
+ txFilter := &Filter{
+ TransactionCallback: func(*types.Transaction) {
+ close(txDone)
+ },
+ }
+ logFilter := &Filter{
+ LogCallback: func(l *vm.Log, oob bool) {
+ if !oob {
+ close(logDone)
+ }
+ },
+ }
+
+ removedLogFilter := &Filter{
+ LogCallback: func(l *vm.Log, oob bool) {
+ if oob {
+ close(removedLogDone)
+ }
+ },
+ }
+
+ fs.Add(blockFilter)
+ fs.Add(txFilter)
+ fs.Add(logFilter)
+ fs.Add(removedLogFilter)
+
+ mux.Post(core.ChainEvent{})
+ mux.Post(core.TxPreEvent{})
+ mux.Post(core.RemovedLogEvent{vm.Logs{&vm.Log{}}})
+ mux.Post(vm.Logs{&vm.Log{}})
+
+ const dura = 5 * time.Second
+ failTimer := time.NewTimer(dura)
+ select {
+ case <-blockDone:
+ case <-failTimer.C:
+ t.Error("block filter failed to trigger (timeout)")
+ }
+
+ failTimer.Reset(dura)
+ select {
+ case <-txDone:
+ case <-failTimer.C:
+ t.Error("transaction filter failed to trigger (timeout)")
+ }
+
+ failTimer.Reset(dura)
+ select {
+ case <-logDone:
+ case <-failTimer.C:
+ t.Error("log filter failed to trigger (timeout)")
+ }
+
+ failTimer.Reset(dura)
+ select {
+ case <-removedLogDone:
+ case <-failTimer.C:
+ t.Error("removed log filter failed to trigger (timeout)")
+ }
+}