diff options
author | Péter Szilágyi <peterke@gmail.com> | 2017-10-25 17:18:44 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-25 17:18:44 +0800 |
commit | 0095531a58772b1f5bd1547169790dbde84ec78a (patch) | |
tree | 91b9e56dbcb9afe4058c0f41f33ebc7812201abc /core/bloombits | |
parent | ca376ead88a5a26626a90abdb62f4de7f6313822 (diff) | |
download | go-tangerine-0095531a58772b1f5bd1547169790dbde84ec78a.tar go-tangerine-0095531a58772b1f5bd1547169790dbde84ec78a.tar.gz go-tangerine-0095531a58772b1f5bd1547169790dbde84ec78a.tar.bz2 go-tangerine-0095531a58772b1f5bd1547169790dbde84ec78a.tar.lz go-tangerine-0095531a58772b1f5bd1547169790dbde84ec78a.tar.xz go-tangerine-0095531a58772b1f5bd1547169790dbde84ec78a.tar.zst go-tangerine-0095531a58772b1f5bd1547169790dbde84ec78a.zip |
core, eth, les: fix messy code (#15367)
* core, eth, les: fix messy code
* les: fixed tx status test and rlp encoding
* core: add a workaround for light sync
Diffstat (limited to 'core/bloombits')
-rw-r--r-- | core/bloombits/matcher.go | 67 |
1 files changed, 24 insertions, 43 deletions
diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go index 32a660337..a75f8c085 100644 --- a/core/bloombits/matcher.go +++ b/core/bloombits/matcher.go @@ -57,12 +57,16 @@ type partialMatches struct { // Retrieval represents a request for retrieval task assignments for a given // bit with the given number of fetch elements, or a response for such a request. // It can also have the actual results set to be used as a delivery data struct. +// +// The contest and error fields are used by the light client to terminate matching +// early if an error is enountered on some path of the pipeline. type Retrieval struct { Bit uint Sections []uint64 Bitsets [][]byte - Error error - Context context.Context + + Context context.Context + Error error } // Matcher is a pipelined system of schedulers and logic matchers which perform @@ -506,54 +510,31 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) { type MatcherSession struct { matcher *Matcher - quit chan struct{} // Quit channel to request pipeline termination - kill chan struct{} // Term channel to signal non-graceful forced shutdown - ctx context.Context - err error - stopping bool - lock sync.Mutex - pend sync.WaitGroup + closer sync.Once // Sync object to ensure we only ever close once + quit chan struct{} // Quit channel to request pipeline termination + kill chan struct{} // Term channel to signal non-graceful forced shutdown + + ctx context.Context // Context used by the light client to abort filtering + err atomic.Value // Global error to track retrieval failures deep in the chain + + pend sync.WaitGroup } // Close stops the matching process and waits for all subprocesses to terminate // before returning. The timeout may be used for graceful shutdown, allowing the // currently running retrievals to complete before this time. func (s *MatcherSession) Close() { - s.lock.Lock() - stopping := s.stopping - s.stopping = true - s.lock.Unlock() - // ensure that we only close the session once - if stopping { - return - } - - // Bail out if the matcher is not running - select { - case <-s.quit: - return - default: - } - // Signal termination and wait for all goroutines to tear down - close(s.quit) - time.AfterFunc(time.Second, func() { close(s.kill) }) - s.pend.Wait() + s.closer.Do(func() { + // Signal termination and wait for all goroutines to tear down + close(s.quit) + time.AfterFunc(time.Second, func() { close(s.kill) }) + s.pend.Wait() + }) } -// setError sets an error and stops the session -func (s *MatcherSession) setError(err error) { - s.lock.Lock() - s.err = err - s.lock.Unlock() - s.Close() -} - -// Error returns an error if one has happened during the session +// Error returns any failure encountered during the matching session. func (s *MatcherSession) Error() error { - s.lock.Lock() - defer s.lock.Unlock() - - return s.err + return s.err.Load().(error) } // AllocateRetrieval assigns a bloom bit index to a client process that can either @@ -655,9 +636,9 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan result := <-request if result.Error != nil { - s.setError(result.Error) + s.err.Store(result.Error) + s.Close() } - s.DeliverSections(result.Bit, result.Sections, result.Bitsets) } } |