aboutsummaryrefslogtreecommitdiffstats
path: root/core/bloombits/matcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/bloombits/matcher.go')
-rw-r--r--core/bloombits/matcher.go67
1 files changed, 24 insertions, 43 deletions
diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go
index 32a660337..a75f8c085 100644
--- a/core/bloombits/matcher.go
+++ b/core/bloombits/matcher.go
@@ -57,12 +57,16 @@ type partialMatches struct {
// Retrieval represents a request for retrieval task assignments for a given
// bit with the given number of fetch elements, or a response for such a request.
// It can also have the actual results set to be used as a delivery data struct.
+//
+// The contest and error fields are used by the light client to terminate matching
+// early if an error is enountered on some path of the pipeline.
type Retrieval struct {
Bit uint
Sections []uint64
Bitsets [][]byte
- Error error
- Context context.Context
+
+ Context context.Context
+ Error error
}
// Matcher is a pipelined system of schedulers and logic matchers which perform
@@ -506,54 +510,31 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
type MatcherSession struct {
matcher *Matcher
- quit chan struct{} // Quit channel to request pipeline termination
- kill chan struct{} // Term channel to signal non-graceful forced shutdown
- ctx context.Context
- err error
- stopping bool
- lock sync.Mutex
- pend sync.WaitGroup
+ closer sync.Once // Sync object to ensure we only ever close once
+ quit chan struct{} // Quit channel to request pipeline termination
+ kill chan struct{} // Term channel to signal non-graceful forced shutdown
+
+ ctx context.Context // Context used by the light client to abort filtering
+ err atomic.Value // Global error to track retrieval failures deep in the chain
+
+ pend sync.WaitGroup
}
// Close stops the matching process and waits for all subprocesses to terminate
// before returning. The timeout may be used for graceful shutdown, allowing the
// currently running retrievals to complete before this time.
func (s *MatcherSession) Close() {
- s.lock.Lock()
- stopping := s.stopping
- s.stopping = true
- s.lock.Unlock()
- // ensure that we only close the session once
- if stopping {
- return
- }
-
- // Bail out if the matcher is not running
- select {
- case <-s.quit:
- return
- default:
- }
- // Signal termination and wait for all goroutines to tear down
- close(s.quit)
- time.AfterFunc(time.Second, func() { close(s.kill) })
- s.pend.Wait()
+ s.closer.Do(func() {
+ // Signal termination and wait for all goroutines to tear down
+ close(s.quit)
+ time.AfterFunc(time.Second, func() { close(s.kill) })
+ s.pend.Wait()
+ })
}
-// setError sets an error and stops the session
-func (s *MatcherSession) setError(err error) {
- s.lock.Lock()
- s.err = err
- s.lock.Unlock()
- s.Close()
-}
-
-// Error returns an error if one has happened during the session
+// Error returns any failure encountered during the matching session.
func (s *MatcherSession) Error() error {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- return s.err
+ return s.err.Load().(error)
}
// AllocateRetrieval assigns a bloom bit index to a client process that can either
@@ -655,9 +636,9 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
result := <-request
if result.Error != nil {
- s.setError(result.Error)
+ s.err.Store(result.Error)
+ s.Close()
}
-
s.DeliverSections(result.Bit, result.Sections, result.Bitsets)
}
}