diff options
author | Felföldi Zsolt <zsfelfoldi@gmail.com> | 2017-10-24 21:19:09 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-10-24 21:19:09 +0800 |
commit | ca376ead88a5a26626a90abdb62f4de7f6313822 (patch) | |
tree | 71d11e3b6cd40d2bf29033b7e23d30d04e086558 /core/bloombits | |
parent | 6d6a5a93370371a33fb815d7ae47b60c7021c86a (diff) | |
download | dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.gz dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.bz2 dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.lz dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.xz dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.zst dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.zip |
les, light: LES/2 protocol version (#14970)
This PR implements the new LES protocol version extensions:
* new and more efficient Merkle proofs reply format (when replying to
a multiple Merkle proofs request, we just send a single set of trie
nodes containing all necessary nodes)
* BBT (BloomBitsTrie) works similarly to the existing CHT and contains
the bloombits search data to speed up log searches
* GetTxStatusMsg returns the inclusion position or the
pending/queued/unknown state of a transaction referenced by hash
* an optional signature of new block data (number/hash/td) can be
included in AnnounceMsg to provide an option for "very light
clients" (mobile/embedded devices) to skip expensive Ethash check
and accept multiple signatures of somewhat trusted servers (still a
lot better than trusting a single server completely and retrieving
everything through RPC). The new client mode is not implemented in
this PR, just the protocol extension.
Diffstat (limited to 'core/bloombits')
-rw-r--r-- | core/bloombits/matcher.go | 51 | ||||
-rw-r--r-- | core/bloombits/matcher_test.go | 9 |
2 files changed, 49 insertions, 11 deletions
diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go index e33de018a..32a660337 100644 --- a/core/bloombits/matcher.go +++ b/core/bloombits/matcher.go @@ -18,6 +18,7 @@ package bloombits import ( "bytes" + "context" "errors" "math" "sort" @@ -60,6 +61,8 @@ type Retrieval struct { Bit uint Sections []uint64 Bitsets [][]byte + Error error + Context context.Context } // Matcher is a pipelined system of schedulers and logic matchers which perform @@ -137,7 +140,7 @@ func (m *Matcher) addScheduler(idx uint) { // Start starts the matching process and returns a stream of bloom matches in // a given range of blocks. If there are no more matches in the range, the result // channel is closed. -func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession, error) { +func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) { // Make sure we're not creating concurrent sessions if atomic.SwapUint32(&m.running, 1) == 1 { return nil, errors.New("matcher already running") @@ -149,6 +152,7 @@ func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession matcher: m, quit: make(chan struct{}), kill: make(chan struct{}), + ctx: ctx, } for _, scheduler := range m.schedulers { scheduler.reset() @@ -502,15 +506,28 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) { type MatcherSession struct { matcher *Matcher - quit chan struct{} // Quit channel to request pipeline termination - kill chan struct{} // Term channel to signal non-graceful forced shutdown - pend sync.WaitGroup + quit chan struct{} // Quit channel to request pipeline termination + kill chan struct{} // Term channel to signal non-graceful forced shutdown + ctx context.Context + err error + stopping bool + lock sync.Mutex + pend sync.WaitGroup } // Close stops the matching process and waits for all subprocesses to terminate // before returning. The timeout may be used for graceful shutdown, allowing the // currently running retrievals to complete before this time. -func (s *MatcherSession) Close(timeout time.Duration) { +func (s *MatcherSession) Close() { + s.lock.Lock() + stopping := s.stopping + s.stopping = true + s.lock.Unlock() + // ensure that we only close the session once + if stopping { + return + } + // Bail out if the matcher is not running select { case <-s.quit: @@ -519,10 +536,26 @@ func (s *MatcherSession) Close(timeout time.Duration) { } // Signal termination and wait for all goroutines to tear down close(s.quit) - time.AfterFunc(timeout, func() { close(s.kill) }) + time.AfterFunc(time.Second, func() { close(s.kill) }) s.pend.Wait() } +// setError sets an error and stops the session +func (s *MatcherSession) setError(err error) { + s.lock.Lock() + s.err = err + s.lock.Unlock() + s.Close() +} + +// Error returns an error if one has happened during the session +func (s *MatcherSession) Error() error { + s.lock.Lock() + defer s.lock.Unlock() + + return s.err +} + // AllocateRetrieval assigns a bloom bit index to a client process that can either // immediately reuest and fetch the section contents assigned to this bit or wait // a little while for more sections to be requested. @@ -618,9 +651,13 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan case mux <- request: // Retrieval accepted, something must arrive before we're aborting - request <- &Retrieval{Bit: bit, Sections: sections} + request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx} result := <-request + if result.Error != nil { + s.setError(result.Error) + } + s.DeliverSections(result.Bit, result.Sections, result.Bitsets) } } diff --git a/core/bloombits/matcher_test.go b/core/bloombits/matcher_test.go index 2e15e7aac..0d8544136 100644 --- a/core/bloombits/matcher_test.go +++ b/core/bloombits/matcher_test.go @@ -17,6 +17,7 @@ package bloombits import ( + "context" "math/rand" "sync/atomic" "testing" @@ -144,7 +145,7 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt quit := make(chan struct{}) matches := make(chan uint64, 16) - session, err := matcher.Start(0, blocks-1, matches) + session, err := matcher.Start(context.Background(), 0, blocks-1, matches) if err != nil { t.Fatalf("failed to stat matcher session: %v", err) } @@ -163,13 +164,13 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt } // If we're testing intermittent mode, abort and restart the pipeline if intermittent { - session.Close(time.Second) + session.Close() close(quit) quit = make(chan struct{}) matches = make(chan uint64, 16) - session, err = matcher.Start(i+1, blocks-1, matches) + session, err = matcher.Start(context.Background(), i+1, blocks-1, matches) if err != nil { t.Fatalf("failed to stat matcher session: %v", err) } @@ -183,7 +184,7 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt t.Errorf("filter = %v blocks = %v intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match) } // Clean up the session and ensure we match the expected retrieval count - session.Close(time.Second) + session.Close() close(quit) if retrievals != 0 && requested != retrievals { |