aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage
diff options
context:
space:
mode:
authorlash <nolash@users.noreply.github.com>2019-02-08 23:57:48 +0800
committerRafael Matias <rafael@skyle.net>2019-02-19 20:09:09 +0800
commit068725c5b09a49b32850a9c10707a86f07fde962 (patch)
tree50e8fa47aa84e442e6116a1e4f4aa460dd3d7c79 /swarm/storage
parent710775f43574ca7bdd039abb7474f34a4e4fe9fd (diff)
downloaddexon-068725c5b09a49b32850a9c10707a86f07fde962.tar
dexon-068725c5b09a49b32850a9c10707a86f07fde962.tar.gz
dexon-068725c5b09a49b32850a9c10707a86f07fde962.tar.bz2
dexon-068725c5b09a49b32850a9c10707a86f07fde962.tar.lz
dexon-068725c5b09a49b32850a9c10707a86f07fde962.tar.xz
dexon-068725c5b09a49b32850a9c10707a86f07fde962.tar.zst
dexon-068725c5b09a49b32850a9c10707a86f07fde962.zip
swarm/network, swarm/storage: Preserve opentracing contexts (#19022)
(cherry picked from commit 0c10d376066cb7e57d3bfc03f950c7750cd90640)
Diffstat (limited to 'swarm/storage')
-rw-r--r--swarm/storage/chunker.go16
-rw-r--r--swarm/storage/feed/testutil.go4
-rw-r--r--swarm/storage/netstore.go16
-rw-r--r--swarm/storage/netstore_test.go4
4 files changed, 23 insertions, 17 deletions
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go
index a8bfe2d1c..0fa5026dc 100644
--- a/swarm/storage/chunker.go
+++ b/swarm/storage/chunker.go
@@ -465,7 +465,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
length *= r.chunkSize
}
wg.Add(1)
- go r.join(b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC)
+ go r.join(cctx, b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC)
go func() {
wg.Wait()
close(errC)
@@ -485,7 +485,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
return len(b), nil
}
-func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) {
+func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) {
defer parentWg.Done()
// find appropriate block level
for chunkData.Size() < uint64(treeSize) && depth > r.depth {
@@ -533,7 +533,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS
go func(j int64) {
childAddress := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize]
startTime := time.Now()
- chunkData, err := r.getter.Get(r.ctx, Reference(childAddress))
+ chunkData, err := r.getter.Get(ctx, Reference(childAddress))
if err != nil {
metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime)
log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err)
@@ -554,7 +554,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS
if soff < off {
soff = off
}
- r.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC)
+ r.join(ctx, b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC)
}(i)
} //for
}
@@ -581,6 +581,11 @@ var errWhence = errors.New("Seek: invalid whence")
var errOffset = errors.New("Seek: invalid offset")
func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) {
+ cctx, sp := spancontext.StartSpan(
+ r.ctx,
+ "lcr.seek")
+ defer sp.Finish()
+
log.Debug("lazychunkreader.seek", "key", r.addr, "offset", offset)
switch whence {
default:
@@ -590,8 +595,9 @@ func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) {
case 1:
offset += r.off
case 2:
+
if r.chunkData == nil { //seek from the end requires rootchunk for size. call Size first
- _, err := r.Size(context.TODO(), nil)
+ _, err := r.Size(cctx, nil)
if err != nil {
return 0, fmt.Errorf("can't get size: %v", err)
}
diff --git a/swarm/storage/feed/testutil.go b/swarm/storage/feed/testutil.go
index b513fa1f2..caa39d9ff 100644
--- a/swarm/storage/feed/testutil.go
+++ b/swarm/storage/feed/testutil.go
@@ -40,9 +40,9 @@ func (t *TestHandler) Close() {
type mockNetFetcher struct{}
-func (m *mockNetFetcher) Request(ctx context.Context, hopCount uint8) {
+func (m *mockNetFetcher) Request(hopCount uint8) {
}
-func (m *mockNetFetcher) Offer(ctx context.Context, source *enode.ID) {
+func (m *mockNetFetcher) Offer(source *enode.ID) {
}
func newFakeNetFetcher(context.Context, storage.Address, *sync.Map) storage.NetFetcher {
diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go
index b24d08bc2..a2595d9fa 100644
--- a/swarm/storage/netstore.go
+++ b/swarm/storage/netstore.go
@@ -34,8 +34,8 @@ type (
)
type NetFetcher interface {
- Request(ctx context.Context, hopCount uint8)
- Offer(ctx context.Context, source *enode.ID)
+ Request(hopCount uint8)
+ Offer(source *enode.ID)
}
// NetStore is an extension of local storage
@@ -150,7 +150,7 @@ func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Co
}
// The chunk is not available in the LocalStore, let's get the fetcher for it, or create a new one
// if it doesn't exist yet
- f := n.getOrCreateFetcher(ref)
+ f := n.getOrCreateFetcher(ctx, ref)
// If the caller needs the chunk, it has to use the returned fetch function to get it
return nil, f.Fetch, nil
}
@@ -168,7 +168,7 @@ func (n *NetStore) Has(ctx context.Context, ref Address) bool {
// getOrCreateFetcher attempts at retrieving an existing fetchers
// if none exists, creates one and saves it in the fetchers cache
// caller must hold the lock
-func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher {
+func (n *NetStore) getOrCreateFetcher(ctx context.Context, ref Address) *fetcher {
if f := n.getFetcher(ref); f != nil {
return f
}
@@ -176,7 +176,7 @@ func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher {
// no fetcher for the given address, we have to create a new one
key := hex.EncodeToString(ref)
// create the context during which fetching is kept alive
- ctx, cancel := context.WithTimeout(context.Background(), fetcherTimeout)
+ cctx, cancel := context.WithTimeout(ctx, fetcherTimeout)
// destroy is called when all requests finish
destroy := func() {
// remove fetcher from fetchers
@@ -190,7 +190,7 @@ func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher {
// the peers which requested the chunk should not be requested to deliver it.
peers := &sync.Map{}
- fetcher := newFetcher(ref, n.NewNetFetcherFunc(ctx, ref, peers), destroy, peers, n.closeC)
+ fetcher := newFetcher(ref, n.NewNetFetcherFunc(cctx, ref, peers), destroy, peers, n.closeC)
n.fetchers.Add(key, fetcher)
return fetcher
@@ -278,9 +278,9 @@ func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) {
if err := source.UnmarshalText([]byte(sourceIF.(string))); err != nil {
return nil, err
}
- f.netFetcher.Offer(rctx, &source)
+ f.netFetcher.Offer(&source)
} else {
- f.netFetcher.Request(rctx, hopCount)
+ f.netFetcher.Request(hopCount)
}
// wait until either the chunk is delivered or the context is done
diff --git a/swarm/storage/netstore_test.go b/swarm/storage/netstore_test.go
index a6a9f551a..88ec6c28f 100644
--- a/swarm/storage/netstore_test.go
+++ b/swarm/storage/netstore_test.go
@@ -46,12 +46,12 @@ type mockNetFetcher struct {
mu sync.Mutex
}
-func (m *mockNetFetcher) Offer(ctx context.Context, source *enode.ID) {
+func (m *mockNetFetcher) Offer(source *enode.ID) {
m.offerCalled = true
m.sources = append(m.sources, source)
}
-func (m *mockNetFetcher) Request(ctx context.Context, hopCount uint8) {
+func (m *mockNetFetcher) Request(hopCount uint8) {
m.mu.Lock()
defer m.mu.Unlock()