aboutsummaryrefslogtreecommitdiffstats
path: root/eth/filters/filter_system.go
diff options
context:
space:
mode:
authorMiya Chen <miyatlchen@gmail.com>2017-08-18 18:58:36 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-08-18 18:58:36 +0800
commitbf1e2631281e1e439533f2abcf1e99a7b2f9552a (patch)
treea8b86720edf085a6531e7042ef33f36a993540d5 /eth/filters/filter_system.go
parenta4da8416eec6a00c358b6a612d21e7cdf859d588 (diff)
downloadgo-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.gz
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.bz2
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.lz
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.xz
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.zst
go-tangerine-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.zip
core, light: send chain events using event.Feed (#14865)
Diffstat (limited to 'eth/filters/filter_system.go')
-rw-r--r--eth/filters/filter_system.go98
1 files changed, 72 insertions, 26 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index ab0b7473e..00ade0ffb 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -54,6 +54,19 @@ const (
LastIndexSubscription
)
+const (
+
+ // txChanSize is the size of channel listening to TxPreEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
+ // rmLogsChanSize is the size of channel listening to RemovedLogsEvent.
+ rmLogsChanSize = 10
+ // logsChanSize is the size of channel listening to LogsEvent.
+ logsChanSize = 10
+ // chainEvChanSize is the size of channel listening to ChainEvent.
+ chainEvChanSize = 10
+)
+
var (
ErrInvalidSubscriptionID = errors.New("invalid id")
)
@@ -276,57 +289,50 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr
type filterIndex map[Type]map[rpc.ID]*subscription
// broadcast event to filters that match criteria.
-func (es *EventSystem) broadcast(filters filterIndex, ev *event.TypeMuxEvent) {
+func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
if ev == nil {
return
}
- switch e := ev.Data.(type) {
+ switch e := ev.(type) {
case []*types.Log:
if len(e) > 0 {
for _, f := range filters[LogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
}
case core.RemovedLogsEvent:
for _, f := range filters[LogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
- case core.PendingLogsEvent:
- for _, f := range filters[PendingLogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(e.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
+ case *event.TypeMuxEvent:
+ switch muxe := e.Data.(type) {
+ case core.PendingLogsEvent:
+ for _, f := range filters[PendingLogsSubscription] {
+ if e.Time.After(f.created) {
+ if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
}
}
}
case core.TxPreEvent:
for _, f := range filters[PendingTransactionsSubscription] {
- if ev.Time.After(f.created) {
- f.hashes <- e.Tx.Hash()
- }
+ f.hashes <- e.Tx.Hash()
}
case core.ChainEvent:
for _, f := range filters[BlocksSubscription] {
- if ev.Time.After(f.created) {
- f.headers <- e.Block.Header()
- }
+ f.headers <- e.Block.Header()
}
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
})
@@ -395,9 +401,28 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
func (es *EventSystem) eventLoop() {
var (
index = make(filterIndex)
- sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, []*types.Log{}, core.TxPreEvent{}, core.ChainEvent{})
+ sub = es.mux.Subscribe(core.PendingLogsEvent{})
+ // Subscribe TxPreEvent form txpool
+ txCh = make(chan core.TxPreEvent, txChanSize)
+ txSub = es.backend.SubscribeTxPreEvent(txCh)
+ // Subscribe RemovedLogsEvent
+ rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize)
+ rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh)
+ // Subscribe []*types.Log
+ logsCh = make(chan []*types.Log, logsChanSize)
+ logsSub = es.backend.SubscribeLogsEvent(logsCh)
+ // Subscribe ChainEvent
+ chainEvCh = make(chan core.ChainEvent, chainEvChanSize)
+ chainEvSub = es.backend.SubscribeChainEvent(chainEvCh)
)
+ // Unsubscribe all events
+ defer sub.Unsubscribe()
+ defer txSub.Unsubscribe()
+ defer rmLogsSub.Unsubscribe()
+ defer logsSub.Unsubscribe()
+ defer chainEvSub.Unsubscribe()
+
for i := UnknownSubscription; i < LastIndexSubscription; i++ {
index[i] = make(map[rpc.ID]*subscription)
}
@@ -409,6 +434,17 @@ func (es *EventSystem) eventLoop() {
return
}
es.broadcast(index, ev)
+
+ // Handle subscribed events
+ case ev := <-txCh:
+ es.broadcast(index, ev)
+ case ev := <-rmLogsCh:
+ es.broadcast(index, ev)
+ case ev := <-logsCh:
+ es.broadcast(index, ev)
+ case ev := <-chainEvCh:
+ es.broadcast(index, ev)
+
case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
@@ -427,6 +463,16 @@ func (es *EventSystem) eventLoop() {
delete(index[f.typ], f.id)
}
close(f.err)
+
+ // System stopped
+ case <-txSub.Err():
+ return
+ case <-rmLogsSub.Err():
+ return
+ case <-logsSub.Err():
+ return
+ case <-chainEvSub.Err():
+ return
}
}
}