aboutsummaryrefslogtreecommitdiffstats
path: root/core/bloombits
diff options
context:
space:
mode:
authorFelföldi Zsolt <zsfelfoldi@gmail.com>2017-10-24 21:19:09 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-10-24 21:19:09 +0800
commitca376ead88a5a26626a90abdb62f4de7f6313822 (patch)
tree71d11e3b6cd40d2bf29033b7e23d30d04e086558 /core/bloombits
parent6d6a5a93370371a33fb815d7ae47b60c7021c86a (diff)
downloadgo-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar
go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.gz
go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.bz2
go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.lz
go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.xz
go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.zst
go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.zip
les, light: LES/2 protocol version (#14970)
This PR implements the new LES protocol version extensions: * new and more efficient Merkle proofs reply format (when replying to a multiple Merkle proofs request, we just send a single set of trie nodes containing all necessary nodes) * BBT (BloomBitsTrie) works similarly to the existing CHT and contains the bloombits search data to speed up log searches * GetTxStatusMsg returns the inclusion position or the pending/queued/unknown state of a transaction referenced by hash * an optional signature of new block data (number/hash/td) can be included in AnnounceMsg to provide an option for "very light clients" (mobile/embedded devices) to skip expensive Ethash check and accept multiple signatures of somewhat trusted servers (still a lot better than trusting a single server completely and retrieving everything through RPC). The new client mode is not implemented in this PR, just the protocol extension.
Diffstat (limited to 'core/bloombits')
-rw-r--r--core/bloombits/matcher.go51
-rw-r--r--core/bloombits/matcher_test.go9
2 files changed, 49 insertions, 11 deletions
diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go
index e33de018a..32a660337 100644
--- a/core/bloombits/matcher.go
+++ b/core/bloombits/matcher.go
@@ -18,6 +18,7 @@ package bloombits
import (
"bytes"
+ "context"
"errors"
"math"
"sort"
@@ -60,6 +61,8 @@ type Retrieval struct {
Bit uint
Sections []uint64
Bitsets [][]byte
+ Error error
+ Context context.Context
}
// Matcher is a pipelined system of schedulers and logic matchers which perform
@@ -137,7 +140,7 @@ func (m *Matcher) addScheduler(idx uint) {
// Start starts the matching process and returns a stream of bloom matches in
// a given range of blocks. If there are no more matches in the range, the result
// channel is closed.
-func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession, error) {
+func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
// Make sure we're not creating concurrent sessions
if atomic.SwapUint32(&m.running, 1) == 1 {
return nil, errors.New("matcher already running")
@@ -149,6 +152,7 @@ func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession
matcher: m,
quit: make(chan struct{}),
kill: make(chan struct{}),
+ ctx: ctx,
}
for _, scheduler := range m.schedulers {
scheduler.reset()
@@ -502,15 +506,28 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
type MatcherSession struct {
matcher *Matcher
- quit chan struct{} // Quit channel to request pipeline termination
- kill chan struct{} // Term channel to signal non-graceful forced shutdown
- pend sync.WaitGroup
+ quit chan struct{} // Quit channel to request pipeline termination
+ kill chan struct{} // Term channel to signal non-graceful forced shutdown
+ ctx context.Context
+ err error
+ stopping bool
+ lock sync.Mutex
+ pend sync.WaitGroup
}
// Close stops the matching process and waits for all subprocesses to terminate
// before returning. The timeout may be used for graceful shutdown, allowing the
// currently running retrievals to complete before this time.
-func (s *MatcherSession) Close(timeout time.Duration) {
+func (s *MatcherSession) Close() {
+ s.lock.Lock()
+ stopping := s.stopping
+ s.stopping = true
+ s.lock.Unlock()
+ // ensure that we only close the session once
+ if stopping {
+ return
+ }
+
// Bail out if the matcher is not running
select {
case <-s.quit:
@@ -519,10 +536,26 @@ func (s *MatcherSession) Close(timeout time.Duration) {
}
// Signal termination and wait for all goroutines to tear down
close(s.quit)
- time.AfterFunc(timeout, func() { close(s.kill) })
+ time.AfterFunc(time.Second, func() { close(s.kill) })
s.pend.Wait()
}
+// setError sets an error and stops the session
+func (s *MatcherSession) setError(err error) {
+ s.lock.Lock()
+ s.err = err
+ s.lock.Unlock()
+ s.Close()
+}
+
+// Error returns an error if one has happened during the session
+func (s *MatcherSession) Error() error {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ return s.err
+}
+
// AllocateRetrieval assigns a bloom bit index to a client process that can either
// immediately reuest and fetch the section contents assigned to this bit or wait
// a little while for more sections to be requested.
@@ -618,9 +651,13 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
case mux <- request:
// Retrieval accepted, something must arrive before we're aborting
- request <- &Retrieval{Bit: bit, Sections: sections}
+ request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx}
result := <-request
+ if result.Error != nil {
+ s.setError(result.Error)
+ }
+
s.DeliverSections(result.Bit, result.Sections, result.Bitsets)
}
}
diff --git a/core/bloombits/matcher_test.go b/core/bloombits/matcher_test.go
index 2e15e7aac..0d8544136 100644
--- a/core/bloombits/matcher_test.go
+++ b/core/bloombits/matcher_test.go
@@ -17,6 +17,7 @@
package bloombits
import (
+ "context"
"math/rand"
"sync/atomic"
"testing"
@@ -144,7 +145,7 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt
quit := make(chan struct{})
matches := make(chan uint64, 16)
- session, err := matcher.Start(0, blocks-1, matches)
+ session, err := matcher.Start(context.Background(), 0, blocks-1, matches)
if err != nil {
t.Fatalf("failed to stat matcher session: %v", err)
}
@@ -163,13 +164,13 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt
}
// If we're testing intermittent mode, abort and restart the pipeline
if intermittent {
- session.Close(time.Second)
+ session.Close()
close(quit)
quit = make(chan struct{})
matches = make(chan uint64, 16)
- session, err = matcher.Start(i+1, blocks-1, matches)
+ session, err = matcher.Start(context.Background(), i+1, blocks-1, matches)
if err != nil {
t.Fatalf("failed to stat matcher session: %v", err)
}
@@ -183,7 +184,7 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt
t.Errorf("filter = %v blocks = %v intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match)
}
// Clean up the session and ensure we match the expected retrieval count
- session.Close(time.Second)
+ session.Close()
close(quit)
if retrievals != 0 && requested != retrievals {