diff options
author | Zsolt Felfoldi <zsfelfoldi@gmail.com> | 2016-10-14 11:51:29 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2016-11-09 09:12:53 +0800 |
commit | 9f8d192991c4f68fa14c91366722bbca601da117 (patch) | |
tree | 5c1e089673d3f0208cd4a8208623bb95f29622c9 /eth/filters | |
parent | 760fd65487614b7a61443cd9371015925795f40f (diff) | |
download | dexon-9f8d192991c4f68fa14c91366722bbca601da117.tar dexon-9f8d192991c4f68fa14c91366722bbca601da117.tar.gz dexon-9f8d192991c4f68fa14c91366722bbca601da117.tar.bz2 dexon-9f8d192991c4f68fa14c91366722bbca601da117.tar.lz dexon-9f8d192991c4f68fa14c91366722bbca601da117.tar.xz dexon-9f8d192991c4f68fa14c91366722bbca601da117.tar.zst dexon-9f8d192991c4f68fa14c91366722bbca601da117.zip |
les: light client protocol and API
Diffstat (limited to 'eth/filters')
-rw-r--r-- | eth/filters/api.go | 12 | ||||
-rw-r--r-- | eth/filters/filter.go | 14 | ||||
-rw-r--r-- | eth/filters/filter_system.go | 82 | ||||
-rw-r--r-- | eth/filters/filter_system_test.go | 7 |
4 files changed, 99 insertions, 16 deletions
diff --git a/eth/filters/api.go b/eth/filters/api.go index 3bc220348..fa4bef283 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -61,12 +61,14 @@ type PublicFilterAPI struct { } // NewPublicFilterAPI returns a new PublicFilterAPI instance. -func NewPublicFilterAPI(chainDb ethdb.Database, mux *event.TypeMux) *PublicFilterAPI { +func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { api := &PublicFilterAPI{ - mux: mux, - chainDb: chainDb, - events: NewEventSystem(mux), - filters: make(map[rpc.ID]*filter), + backend: backend, + useMipMap: !lightMode, + mux: backend.EventMux(), + chainDb: backend.ChainDb(), + events: NewEventSystem(backend.EventMux(), backend, lightMode), + filters: make(map[rpc.ID]*filter), } go api.timeoutLoop() diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 4226620dc..d181d0892 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -207,11 +207,15 @@ Logs: return ret } -func (f *Filter) bloomFilter(block *types.Block) bool { - if len(f.addresses) > 0 { +func (f *Filter) bloomFilter(bloom types.Bloom) bool { + return bloomFilter(bloom, f.addresses, f.topics) +} + +func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool { + if len(addresses) > 0 { var included bool - for _, addr := range f.addresses { - if types.BloomLookup(block.Bloom(), addr) { + for _, addr := range addresses { + if types.BloomLookup(bloom, addr) { included = true break } @@ -222,7 +226,7 @@ func (f *Filter) bloomFilter(block *types.Block) bool { } } - for _, sub := range f.topics { + for _, sub := range topics { var included bool for _, topic := range sub { if (topic == common.Hash{}) || types.BloomLookup(block.Bloom(), topic) { diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 04a55fd09..1e330b24f 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" + "golang.org/x/net/context" ) // Type determines the kind of filter and is used to put the filter in to @@ -95,6 +96,9 @@ type subscription struct { type EventSystem struct { mux *event.TypeMux sub event.Subscription + backend Backend + lightMode bool + lastHead *types.Header install chan *subscription // install filter for event notification uninstall chan *subscription // remove filter for event notification } @@ -105,9 +109,11 @@ type EventSystem struct { // // The returned manager has a loop that needs to be stopped with the Stop function // or by stopping the given mux. -func NewEventSystem(mux *event.TypeMux) *EventSystem { +func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem { m := &EventSystem{ mux: mux, + backend: backend, + lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), } @@ -235,7 +241,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti type filterIndex map[Type]map[rpc.ID]*subscription // broadcast event to filters that match criteria. -func broadcast(filters filterIndex, ev *event.Event) { +func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) { if ev == nil { return } @@ -279,7 +285,77 @@ func broadcast(filters filterIndex, ev *event.Event) { f.headers <- e.Block.Header() } } + if es.lightMode && len(filters[LogsSubscription]) > 0 { + es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) { + for _, f := range filters[LogsSubscription] { + if ev.Time.After(f.created) { + if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { + f.logs <- matchedLogs + } + } + } + }) + } + } +} + +func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) { + oldh := es.lastHead + es.lastHead = newHeader + if oldh == nil { + return + } + newh := newHeader + // find common ancestor, create list of rolled back and new block hashes + var oldHeaders, newHeaders []*types.Header + for oldh.Hash() != newh.Hash() { + if oldh.GetNumberU64() >= newh.GetNumberU64() { + oldHeaders = append(oldHeaders, oldh) + oldh = core.GetHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1) + } + if oldh.GetNumberU64() < newh.GetNumberU64() { + newHeaders = append(newHeaders, newh) + newh = core.GetHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1) + if newh == nil { + // happens when CHT syncing, nothing to do + newh = oldh + } + } + } + // roll back old blocks + for _, h := range oldHeaders { + callBack(h, true) + } + // check new blocks (array is in reverse order) + for i := len(newHeaders) - 1; i >= 0; i-- { + callBack(newHeaders[i], false) + } +} + +// filter logs of a single header in light client mode +func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []Log { + //fmt.Println("lightFilterLogs", header.Number.Uint64(), remove) + if bloomFilter(header.Bloom, addresses, topics) { + //fmt.Println("bloom match") + // Get the logs of the block + ctx, _ := context.WithTimeout(context.Background(), time.Second*5) + receipts, err := es.backend.GetReceipts(ctx, header.Hash()) + if err != nil { + return nil + } + var unfiltered []Log + for _, receipt := range receipts { + rl := make([]Log, len(receipt.Logs)) + for i, l := range receipt.Logs { + rl[i] = Log{l, remove} + } + unfiltered = append(unfiltered, rl...) + } + logs := filterLogs(unfiltered, addresses, topics) + //fmt.Println("found", len(logs)) + return logs } + return nil } // eventLoop (un)installs filters and processes mux events. @@ -294,7 +370,7 @@ func (es *EventSystem) eventLoop() { if !active { // system stopped return } - broadcast(index, ev) + es.broadcast(index, ev) case f := <-es.install: if _, found := index[f.typ]; !found { index[f.typ] = make(map[rpc.ID]*subscription) diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 9e6fde1c6..1bd4d502d 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -32,9 +32,10 @@ import ( ) var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - api = NewPublicFilterAPI(db, mux) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + backend = &testBackend{mux, db} + api = NewPublicFilterAPI(backend, false) ) // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. |