diff options
author | lash <nolash@users.noreply.github.com> | 2019-02-08 23:57:48 +0800 |
---|---|---|
committer | Rafael Matias <rafael@skyle.net> | 2019-02-19 20:09:09 +0800 |
commit | 068725c5b09a49b32850a9c10707a86f07fde962 (patch) | |
tree | 50e8fa47aa84e442e6116a1e4f4aa460dd3d7c79 /swarm/storage | |
parent | 710775f43574ca7bdd039abb7474f34a4e4fe9fd (diff) | |
download | dexon-068725c5b09a49b32850a9c10707a86f07fde962.tar dexon-068725c5b09a49b32850a9c10707a86f07fde962.tar.gz dexon-068725c5b09a49b32850a9c10707a86f07fde962.tar.bz2 dexon-068725c5b09a49b32850a9c10707a86f07fde962.tar.lz dexon-068725c5b09a49b32850a9c10707a86f07fde962.tar.xz dexon-068725c5b09a49b32850a9c10707a86f07fde962.tar.zst dexon-068725c5b09a49b32850a9c10707a86f07fde962.zip |
swarm/network, swarm/storage: Preserve opentracing contexts (#19022)
(cherry picked from commit 0c10d376066cb7e57d3bfc03f950c7750cd90640)
Diffstat (limited to 'swarm/storage')
-rw-r--r-- | swarm/storage/chunker.go | 16 | ||||
-rw-r--r-- | swarm/storage/feed/testutil.go | 4 | ||||
-rw-r--r-- | swarm/storage/netstore.go | 16 | ||||
-rw-r--r-- | swarm/storage/netstore_test.go | 4 |
4 files changed, 23 insertions, 17 deletions
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index a8bfe2d1c..0fa5026dc 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -465,7 +465,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { length *= r.chunkSize } wg.Add(1) - go r.join(b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC) + go r.join(cctx, b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC) go func() { wg.Wait() close(errC) @@ -485,7 +485,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { return len(b), nil } -func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { +func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { defer parentWg.Done() // find appropriate block level for chunkData.Size() < uint64(treeSize) && depth > r.depth { @@ -533,7 +533,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS go func(j int64) { childAddress := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize] startTime := time.Now() - chunkData, err := r.getter.Get(r.ctx, Reference(childAddress)) + chunkData, err := r.getter.Get(ctx, Reference(childAddress)) if err != nil { metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime) log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err) @@ -554,7 +554,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS if soff < off { soff = off } - r.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC) + r.join(ctx, b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC) }(i) } //for } @@ -581,6 +581,11 @@ var errWhence = errors.New("Seek: invalid whence") var errOffset = errors.New("Seek: invalid offset") func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) { + cctx, sp := spancontext.StartSpan( + r.ctx, + "lcr.seek") + defer sp.Finish() + log.Debug("lazychunkreader.seek", "key", r.addr, "offset", offset) switch whence { default: @@ -590,8 +595,9 @@ func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) { case 1: offset += r.off case 2: + if r.chunkData == nil { //seek from the end requires rootchunk for size. call Size first - _, err := r.Size(context.TODO(), nil) + _, err := r.Size(cctx, nil) if err != nil { return 0, fmt.Errorf("can't get size: %v", err) } diff --git a/swarm/storage/feed/testutil.go b/swarm/storage/feed/testutil.go index b513fa1f2..caa39d9ff 100644 --- a/swarm/storage/feed/testutil.go +++ b/swarm/storage/feed/testutil.go @@ -40,9 +40,9 @@ func (t *TestHandler) Close() { type mockNetFetcher struct{} -func (m *mockNetFetcher) Request(ctx context.Context, hopCount uint8) { +func (m *mockNetFetcher) Request(hopCount uint8) { } -func (m *mockNetFetcher) Offer(ctx context.Context, source *enode.ID) { +func (m *mockNetFetcher) Offer(source *enode.ID) { } func newFakeNetFetcher(context.Context, storage.Address, *sync.Map) storage.NetFetcher { diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go index b24d08bc2..a2595d9fa 100644 --- a/swarm/storage/netstore.go +++ b/swarm/storage/netstore.go @@ -34,8 +34,8 @@ type ( ) type NetFetcher interface { - Request(ctx context.Context, hopCount uint8) - Offer(ctx context.Context, source *enode.ID) + Request(hopCount uint8) + Offer(source *enode.ID) } // NetStore is an extension of local storage @@ -150,7 +150,7 @@ func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Co } // The chunk is not available in the LocalStore, let's get the fetcher for it, or create a new one // if it doesn't exist yet - f := n.getOrCreateFetcher(ref) + f := n.getOrCreateFetcher(ctx, ref) // If the caller needs the chunk, it has to use the returned fetch function to get it return nil, f.Fetch, nil } @@ -168,7 +168,7 @@ func (n *NetStore) Has(ctx context.Context, ref Address) bool { // getOrCreateFetcher attempts at retrieving an existing fetchers // if none exists, creates one and saves it in the fetchers cache // caller must hold the lock -func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher { +func (n *NetStore) getOrCreateFetcher(ctx context.Context, ref Address) *fetcher { if f := n.getFetcher(ref); f != nil { return f } @@ -176,7 +176,7 @@ func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher { // no fetcher for the given address, we have to create a new one key := hex.EncodeToString(ref) // create the context during which fetching is kept alive - ctx, cancel := context.WithTimeout(context.Background(), fetcherTimeout) + cctx, cancel := context.WithTimeout(ctx, fetcherTimeout) // destroy is called when all requests finish destroy := func() { // remove fetcher from fetchers @@ -190,7 +190,7 @@ func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher { // the peers which requested the chunk should not be requested to deliver it. peers := &sync.Map{} - fetcher := newFetcher(ref, n.NewNetFetcherFunc(ctx, ref, peers), destroy, peers, n.closeC) + fetcher := newFetcher(ref, n.NewNetFetcherFunc(cctx, ref, peers), destroy, peers, n.closeC) n.fetchers.Add(key, fetcher) return fetcher @@ -278,9 +278,9 @@ func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) { if err := source.UnmarshalText([]byte(sourceIF.(string))); err != nil { return nil, err } - f.netFetcher.Offer(rctx, &source) + f.netFetcher.Offer(&source) } else { - f.netFetcher.Request(rctx, hopCount) + f.netFetcher.Request(hopCount) } // wait until either the chunk is delivered or the context is done diff --git a/swarm/storage/netstore_test.go b/swarm/storage/netstore_test.go index a6a9f551a..88ec6c28f 100644 --- a/swarm/storage/netstore_test.go +++ b/swarm/storage/netstore_test.go @@ -46,12 +46,12 @@ type mockNetFetcher struct { mu sync.Mutex } -func (m *mockNetFetcher) Offer(ctx context.Context, source *enode.ID) { +func (m *mockNetFetcher) Offer(source *enode.ID) { m.offerCalled = true m.sources = append(m.sources, source) } -func (m *mockNetFetcher) Request(ctx context.Context, hopCount uint8) { +func (m *mockNetFetcher) Request(hopCount uint8) { m.mu.Lock() defer m.mu.Unlock() |