aboutsummaryrefslogtreecommitdiffstats
path: root/eth/filters/filter_system.go
diff options
context:
space:
mode:
authorZsolt Felfoldi <zsfelfoldi@gmail.com>2016-10-14 11:51:29 +0800
committerFelix Lange <fjl@twurst.com>2016-11-09 09:12:53 +0800
commit9f8d192991c4f68fa14c91366722bbca601da117 (patch)
tree5c1e089673d3f0208cd4a8208623bb95f29622c9 /eth/filters/filter_system.go
parent760fd65487614b7a61443cd9371015925795f40f (diff)
downloaddexon-9f8d192991c4f68fa14c91366722bbca601da117.tar
dexon-9f8d192991c4f68fa14c91366722bbca601da117.tar.gz
dexon-9f8d192991c4f68fa14c91366722bbca601da117.tar.bz2
dexon-9f8d192991c4f68fa14c91366722bbca601da117.tar.lz
dexon-9f8d192991c4f68fa14c91366722bbca601da117.tar.xz
dexon-9f8d192991c4f68fa14c91366722bbca601da117.tar.zst
dexon-9f8d192991c4f68fa14c91366722bbca601da117.zip
les: light client protocol and API
Diffstat (limited to 'eth/filters/filter_system.go')
-rw-r--r--eth/filters/filter_system.go82
1 files changed, 79 insertions, 3 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 04a55fd09..1e330b24f 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
+ "golang.org/x/net/context"
)
// Type determines the kind of filter and is used to put the filter in to
@@ -95,6 +96,9 @@ type subscription struct {
type EventSystem struct {
mux *event.TypeMux
sub event.Subscription
+ backend Backend
+ lightMode bool
+ lastHead *types.Header
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
}
@@ -105,9 +109,11 @@ type EventSystem struct {
//
// The returned manager has a loop that needs to be stopped with the Stop function
// or by stopping the given mux.
-func NewEventSystem(mux *event.TypeMux) *EventSystem {
+func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem {
m := &EventSystem{
mux: mux,
+ backend: backend,
+ lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
}
@@ -235,7 +241,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
type filterIndex map[Type]map[rpc.ID]*subscription
// broadcast event to filters that match criteria.
-func broadcast(filters filterIndex, ev *event.Event) {
+func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) {
if ev == nil {
return
}
@@ -279,7 +285,77 @@ func broadcast(filters filterIndex, ev *event.Event) {
f.headers <- e.Block.Header()
}
}
+ if es.lightMode && len(filters[LogsSubscription]) > 0 {
+ es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
+ for _, f := range filters[LogsSubscription] {
+ if ev.Time.After(f.created) {
+ if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
+ }
+ }
+ })
+ }
+ }
+}
+
+func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
+ oldh := es.lastHead
+ es.lastHead = newHeader
+ if oldh == nil {
+ return
+ }
+ newh := newHeader
+ // find common ancestor, create list of rolled back and new block hashes
+ var oldHeaders, newHeaders []*types.Header
+ for oldh.Hash() != newh.Hash() {
+ if oldh.GetNumberU64() >= newh.GetNumberU64() {
+ oldHeaders = append(oldHeaders, oldh)
+ oldh = core.GetHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1)
+ }
+ if oldh.GetNumberU64() < newh.GetNumberU64() {
+ newHeaders = append(newHeaders, newh)
+ newh = core.GetHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1)
+ if newh == nil {
+ // happens when CHT syncing, nothing to do
+ newh = oldh
+ }
+ }
+ }
+ // roll back old blocks
+ for _, h := range oldHeaders {
+ callBack(h, true)
+ }
+ // check new blocks (array is in reverse order)
+ for i := len(newHeaders) - 1; i >= 0; i-- {
+ callBack(newHeaders[i], false)
+ }
+}
+
+// filter logs of a single header in light client mode
+func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []Log {
+ //fmt.Println("lightFilterLogs", header.Number.Uint64(), remove)
+ if bloomFilter(header.Bloom, addresses, topics) {
+ //fmt.Println("bloom match")
+ // Get the logs of the block
+ ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
+ receipts, err := es.backend.GetReceipts(ctx, header.Hash())
+ if err != nil {
+ return nil
+ }
+ var unfiltered []Log
+ for _, receipt := range receipts {
+ rl := make([]Log, len(receipt.Logs))
+ for i, l := range receipt.Logs {
+ rl[i] = Log{l, remove}
+ }
+ unfiltered = append(unfiltered, rl...)
+ }
+ logs := filterLogs(unfiltered, addresses, topics)
+ //fmt.Println("found", len(logs))
+ return logs
}
+ return nil
}
// eventLoop (un)installs filters and processes mux events.
@@ -294,7 +370,7 @@ func (es *EventSystem) eventLoop() {
if !active { // system stopped
return
}
- broadcast(index, ev)
+ es.broadcast(index, ev)
case f := <-es.install:
if _, found := index[f.typ]; !found {
index[f.typ] = make(map[rpc.ID]*subscription)