diff options
Diffstat (limited to 'swarm')
45 files changed, 617 insertions, 234 deletions
diff --git a/swarm/api/api.go b/swarm/api/api.go index ff29877ff..74af669c9 100644 --- a/swarm/api/api.go +++ b/swarm/api/api.go @@ -37,8 +37,10 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/multihash" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/mru" + opentracing "github.com/opentracing/opentracing-go" ) var ( @@ -263,6 +265,12 @@ func (a *API) Resolve(ctx context.Context, uri *URI) (storage.Address, error) { apiResolveCount.Inc(1) log.Trace("resolving", "uri", uri.Addr) + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + "api.resolve") + defer sp.Finish() + // if the URI is immutable, check if the address looks like a hash if uri.Immutable() { key := uri.Address() @@ -347,7 +355,7 @@ func (a *API) Get(ctx context.Context, manifestAddr storage.Address, path string log.Trace("resource type", "key", manifestAddr, "hash", entry.Hash) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rsrc, err := a.resource.Load(storage.Address(common.FromHex(entry.Hash))) + rsrc, err := a.resource.Load(ctx, storage.Address(common.FromHex(entry.Hash))) if err != nil { apiGetNotFound.Inc(1) status = http.StatusNotFound @@ -486,7 +494,7 @@ func (a *API) GetDirectoryTar(ctx context.Context, uri *URI) (io.ReadCloser, err // retrieve the entry's key and size reader, _ := a.Retrieve(ctx, storage.Address(common.Hex2Bytes(entry.Hash))) - size, err := reader.Size(nil) + size, err := reader.Size(ctx, nil) if err != nil { return err } @@ -883,7 +891,7 @@ func (a *API) BuildDirectoryTree(ctx context.Context, mhash string, nameresolver // ResourceLookup Looks up mutable resource updates at specific periods and versions func (a *API) ResourceLookup(ctx context.Context, addr storage.Address, period uint32, version uint32, maxLookup *mru.LookupParams) (string, []byte, error) { var err error - rsrc, err := a.resource.Load(addr) + rsrc, err := a.resource.Load(ctx, addr) if err != nil { return "", nil, err } diff --git a/swarm/api/api_test.go b/swarm/api/api_test.go index d1fd49b5b..78fab9508 100644 --- a/swarm/api/api_test.go +++ b/swarm/api/api_test.go @@ -90,7 +90,7 @@ func testGet(t *testing.T, api *API, bzzhash, path string) *testResponse { t.Fatalf("unexpected error: %v", err) } quitC := make(chan bool) - size, err := reader.Size(quitC) + size, err := reader.Size(context.TODO(), quitC) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/swarm/api/filesystem.go b/swarm/api/filesystem.go index adf6bfbaf..aacd26699 100644 --- a/swarm/api/filesystem.go +++ b/swarm/api/filesystem.go @@ -277,7 +277,7 @@ func retrieveToFile(quitC chan bool, fileStore *storage.FileStore, addr storage. } reader, _ := fileStore.Retrieve(context.TODO(), addr) writer := bufio.NewWriter(f) - size, err := reader.Size(quitC) + size, err := reader.Size(context.TODO(), quitC) if err != nil { return err } diff --git a/swarm/api/http/server.go b/swarm/api/http/server.go index 22c091026..5a7c9e93e 100644 --- a/swarm/api/http/server.go +++ b/swarm/api/http/server.go @@ -42,8 +42,11 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/mru" + opentracing "github.com/opentracing/opentracing-go" + "github.com/pborman/uuid" "github.com/rs/cors" ) @@ -263,6 +266,13 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *Request) { postRawCount.Inc(1) + ctx := r.Context() + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + "http.post.raw") + defer sp.Finish() + toEncrypt := false if r.uri.Addr == "encrypt" { toEncrypt = true @@ -286,7 +296,7 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *Request) { return } - addr, _, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt) + addr, _, err := s.api.Store(ctx, r.Body, r.ContentLength, toEncrypt) if err != nil { postRawFail.Inc(1) Respond(w, r, err.Error(), http.StatusInternalServerError) @@ -307,8 +317,15 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *Request) { // resulting manifest hash as a text/plain response func (s *Server) HandlePostFiles(w http.ResponseWriter, r *Request) { log.Debug("handle.post.files", "ruid", r.ruid) - postFilesCount.Inc(1) + + var sp opentracing.Span + ctx := r.Context() + ctx, sp = spancontext.StartSpan( + ctx, + "http.post.files") + defer sp.Finish() + contentType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { postFilesFail.Inc(1) @@ -323,7 +340,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *Request) { var addr storage.Address if r.uri.Addr != "" && r.uri.Addr != "encrypt" { - addr, err = s.api.Resolve(r.Context(), r.uri) + addr, err = s.api.Resolve(ctx, r.uri) if err != nil { postFilesFail.Inc(1) Respond(w, r, fmt.Sprintf("cannot resolve %s: %s", r.uri.Addr, err), http.StatusInternalServerError) @@ -331,7 +348,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *Request) { } log.Debug("resolved key", "ruid", r.ruid, "key", addr) } else { - addr, err = s.api.NewManifest(r.Context(), toEncrypt) + addr, err = s.api.NewManifest(ctx, toEncrypt) if err != nil { postFilesFail.Inc(1) Respond(w, r, err.Error(), http.StatusInternalServerError) @@ -340,7 +357,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *Request) { log.Debug("new manifest", "ruid", r.ruid, "key", addr) } - newAddr, err := s.api.UpdateManifest(r.Context(), addr, func(mw *api.ManifestWriter) error { + newAddr, err := s.api.UpdateManifest(ctx, addr, func(mw *api.ManifestWriter) error { switch contentType { case "application/x-tar": @@ -509,6 +526,14 @@ func resourcePostMode(path string) (isRaw bool, frequency uint64, err error) { // and name "foo.eth" will be created func (s *Server) HandlePostResource(w http.ResponseWriter, r *Request) { log.Debug("handle.post.resource", "ruid", r.ruid) + + var sp opentracing.Span + ctx := r.Context() + ctx, sp = spancontext.StartSpan( + ctx, + "http.post.resource") + defer sp.Finish() + var err error var addr storage.Address var name string @@ -525,7 +550,7 @@ func (s *Server) HandlePostResource(w http.ResponseWriter, r *Request) { name = r.uri.Addr // the key is the content addressed root chunk holding mutable resource metadata information - addr, err = s.api.ResourceCreate(r.Context(), name, frequency) + addr, err = s.api.ResourceCreate(ctx, name, frequency) if err != nil { code, err2 := s.translateResourceError(w, r, "resource creation fail", err) @@ -536,7 +561,7 @@ func (s *Server) HandlePostResource(w http.ResponseWriter, r *Request) { // we create a manifest so we can retrieve the resource with bzz:// later // this manifest has a special "resource type" manifest, and its hash is the key of the mutable resource // root chunk - m, err := s.api.NewResourceManifest(r.Context(), addr.Hex()) + m, err := s.api.NewResourceManifest(ctx, addr.Hex()) if err != nil { Respond(w, r, fmt.Sprintf("failed to create resource manifest: %v", err), http.StatusInternalServerError) return @@ -556,7 +581,7 @@ func (s *Server) HandlePostResource(w http.ResponseWriter, r *Request) { // that means that we retrieve the manifest and inspect its Hash member. manifestAddr := r.uri.Address() if manifestAddr == nil { - manifestAddr, err = s.api.Resolve(r.Context(), r.uri) + manifestAddr, err = s.api.Resolve(ctx, r.uri) if err != nil { getFail.Inc(1) Respond(w, r, fmt.Sprintf("cannot resolve %s: %s", r.uri.Addr, err), http.StatusNotFound) @@ -567,7 +592,7 @@ func (s *Server) HandlePostResource(w http.ResponseWriter, r *Request) { } // get the root chunk key from the manifest - addr, err = s.api.ResolveResourceManifest(r.Context(), manifestAddr) + addr, err = s.api.ResolveResourceManifest(ctx, manifestAddr) if err != nil { getFail.Inc(1) Respond(w, r, fmt.Sprintf("error resolving resource root chunk for %s: %s", r.uri.Addr, err), http.StatusNotFound) @@ -576,7 +601,7 @@ func (s *Server) HandlePostResource(w http.ResponseWriter, r *Request) { log.Debug("handle.post.resource: resolved", "ruid", r.ruid, "manifestkey", manifestAddr, "rootchunkkey", addr) - name, _, err = s.api.ResourceLookup(r.Context(), addr, 0, 0, &mru.LookupParams{}) + name, _, err = s.api.ResourceLookup(ctx, addr, 0, 0, &mru.LookupParams{}) if err != nil { Respond(w, r, err.Error(), http.StatusNotFound) return @@ -592,7 +617,7 @@ func (s *Server) HandlePostResource(w http.ResponseWriter, r *Request) { // Multihash will be passed as hex-encoded data, so we need to parse this to bytes if isRaw { - _, _, _, err = s.api.ResourceUpdate(r.Context(), name, data) + _, _, _, err = s.api.ResourceUpdate(ctx, name, data) if err != nil { Respond(w, r, err.Error(), http.StatusBadRequest) return @@ -603,7 +628,7 @@ func (s *Server) HandlePostResource(w http.ResponseWriter, r *Request) { Respond(w, r, err.Error(), http.StatusBadRequest) return } - _, _, _, err = s.api.ResourceUpdateMultihash(r.Context(), name, bytesdata) + _, _, _, err = s.api.ResourceUpdateMultihash(ctx, name, bytesdata) if err != nil { Respond(w, r, err.Error(), http.StatusBadRequest) return @@ -730,10 +755,18 @@ func (s *Server) translateResourceError(w http.ResponseWriter, r *Request, supEr func (s *Server) HandleGet(w http.ResponseWriter, r *Request) { log.Debug("handle.get", "ruid", r.ruid, "uri", r.uri) getCount.Inc(1) + + var sp opentracing.Span + ctx := r.Context() + ctx, sp = spancontext.StartSpan( + ctx, + "http.get") + defer sp.Finish() + var err error addr := r.uri.Address() if addr == nil { - addr, err = s.api.Resolve(r.Context(), r.uri) + addr, err = s.api.Resolve(ctx, r.uri) if err != nil { getFail.Inc(1) Respond(w, r, fmt.Sprintf("cannot resolve %s: %s", r.uri.Addr, err), http.StatusNotFound) @@ -748,7 +781,7 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *Request) { // if path is set, interpret <key> as a manifest and return the // raw entry at the given path if r.uri.Path != "" { - walker, err := s.api.NewManifestWalker(r.Context(), addr, nil) + walker, err := s.api.NewManifestWalker(ctx, addr, nil) if err != nil { getFail.Inc(1) Respond(w, r, fmt.Sprintf("%s is not a manifest", addr), http.StatusBadRequest) @@ -796,8 +829,8 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *Request) { } // check the root chunk exists by retrieving the file's size - reader, isEncrypted := s.api.Retrieve(r.Context(), addr) - if _, err := reader.Size(nil); err != nil { + reader, isEncrypted := s.api.Retrieve(ctx, addr) + if _, err := reader.Size(ctx, nil); err != nil { getFail.Inc(1) Respond(w, r, fmt.Sprintf("root chunk not found %s: %s", addr, err), http.StatusNotFound) return @@ -828,13 +861,21 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *Request) { func (s *Server) HandleGetList(w http.ResponseWriter, r *Request) { log.Debug("handle.get.list", "ruid", r.ruid, "uri", r.uri) getListCount.Inc(1) + + var sp opentracing.Span + ctx := r.Context() + ctx, sp = spancontext.StartSpan( + ctx, + "http.get.list") + defer sp.Finish() + // ensure the root path has a trailing slash so that relative URLs work if r.uri.Path == "" && !strings.HasSuffix(r.URL.Path, "/") { http.Redirect(w, &r.Request, r.URL.Path+"/", http.StatusMovedPermanently) return } - addr, err := s.api.Resolve(r.Context(), r.uri) + addr, err := s.api.Resolve(ctx, r.uri) if err != nil { getListFail.Inc(1) Respond(w, r, fmt.Sprintf("cannot resolve %s: %s", r.uri.Addr, err), http.StatusNotFound) @@ -842,7 +883,7 @@ func (s *Server) HandleGetList(w http.ResponseWriter, r *Request) { } log.Debug("handle.get.list: resolved", "ruid", r.ruid, "key", addr) - list, err := s.api.GetManifestList(r.Context(), addr, r.uri.Path) + list, err := s.api.GetManifestList(ctx, addr, r.uri.Path) if err != nil { getListFail.Inc(1) Respond(w, r, err.Error(), http.StatusInternalServerError) @@ -877,19 +918,28 @@ func (s *Server) HandleGetList(w http.ResponseWriter, r *Request) { func (s *Server) HandleGetFile(w http.ResponseWriter, r *Request) { log.Debug("handle.get.file", "ruid", r.ruid) getFileCount.Inc(1) + + var sp opentracing.Span + ctx := r.Context() + ctx, sp = spancontext.StartSpan( + ctx, + "http.get.file") + // ensure the root path has a trailing slash so that relative URLs work if r.uri.Path == "" && !strings.HasSuffix(r.URL.Path, "/") { http.Redirect(w, &r.Request, r.URL.Path+"/", http.StatusMovedPermanently) + sp.Finish() return } var err error manifestAddr := r.uri.Address() if manifestAddr == nil { - manifestAddr, err = s.api.Resolve(r.Context(), r.uri) + manifestAddr, err = s.api.Resolve(ctx, r.uri) if err != nil { getFileFail.Inc(1) Respond(w, r, fmt.Sprintf("cannot resolve %s: %s", r.uri.Addr, err), http.StatusNotFound) + sp.Finish() return } } else { @@ -897,7 +947,8 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *Request) { } log.Debug("handle.get.file: resolved", "ruid", r.ruid, "key", manifestAddr) - reader, contentType, status, contentKey, err := s.api.Get(r.Context(), manifestAddr, r.uri.Path) + + reader, contentType, status, contentKey, err := s.api.Get(ctx, manifestAddr, r.uri.Path) etag := common.Bytes2Hex(contentKey) noneMatchEtag := r.Header.Get("If-None-Match") @@ -905,6 +956,7 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *Request) { if noneMatchEtag != "" { if bytes.Equal(storage.Address(common.Hex2Bytes(noneMatchEtag)), contentKey) { Respond(w, r, "Not Modified", http.StatusNotModified) + sp.Finish() return } } @@ -918,34 +970,49 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *Request) { getFileFail.Inc(1) Respond(w, r, err.Error(), http.StatusInternalServerError) } + sp.Finish() return } //the request results in ambiguous files //e.g. /read with readme.md and readinglist.txt available in manifest if status == http.StatusMultipleChoices { - list, err := s.api.GetManifestList(r.Context(), manifestAddr, r.uri.Path) + list, err := s.api.GetManifestList(ctx, manifestAddr, r.uri.Path) if err != nil { getFileFail.Inc(1) Respond(w, r, err.Error(), http.StatusInternalServerError) + sp.Finish() return } log.Debug(fmt.Sprintf("Multiple choices! --> %v", list), "ruid", r.ruid) //show a nice page links to available entries ShowMultipleChoices(w, r, list) + sp.Finish() return } // check the root chunk exists by retrieving the file's size - if _, err := reader.Size(nil); err != nil { + if _, err := reader.Size(ctx, nil); err != nil { getFileNotFound.Inc(1) Respond(w, r, fmt.Sprintf("file not found %s: %s", r.uri, err), http.StatusNotFound) + sp.Finish() return } + buf, err := ioutil.ReadAll(newBufferedReadSeeker(reader, getFileBufferSize)) + if err != nil { + getFileNotFound.Inc(1) + Respond(w, r, fmt.Sprintf("file not found %s: %s", r.uri, err), http.StatusNotFound) + sp.Finish() + return + } + + log.Debug("got response in buffer", "len", len(buf), "ruid", r.ruid) + sp.Finish() + w.Header().Set("Content-Type", contentType) - http.ServeContent(w, &r.Request, "", time.Now(), newBufferedReadSeeker(reader, getFileBufferSize)) + http.ServeContent(w, &r.Request, "", time.Now(), bytes.NewReader(buf)) } // The size of buffer used for bufio.Reader on LazyChunkReader passed to diff --git a/swarm/api/manifest.go b/swarm/api/manifest.go index 78d1418bc..198ca22ce 100644 --- a/swarm/api/manifest.go +++ b/swarm/api/manifest.go @@ -212,10 +212,10 @@ func loadManifest(ctx context.Context, fileStore *storage.FileStore, hash storag return readManifest(manifestReader, hash, fileStore, isEncrypted, quitC) } -func readManifest(manifestReader storage.LazySectionReader, hash storage.Address, fileStore *storage.FileStore, isEncrypted bool, quitC chan bool) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand +func readManifest(mr storage.LazySectionReader, hash storage.Address, fileStore *storage.FileStore, isEncrypted bool, quitC chan bool) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand // TODO check size for oversized manifests - size, err := manifestReader.Size(quitC) + size, err := mr.Size(mr.Context(), quitC) if err != nil { // size == 0 // can't determine size means we don't have the root chunk log.Trace("manifest not found", "key", hash) @@ -228,7 +228,7 @@ func readManifest(manifestReader storage.LazySectionReader, hash storage.Address return } manifestData := make([]byte, size) - read, err := manifestReader.Read(manifestData) + read, err := mr.Read(manifestData) if int64(read) < size { log.Trace("manifest not found", "key", hash) if err == nil { diff --git a/swarm/api/storage.go b/swarm/api/storage.go index 8646dc41f..3b52301a0 100644 --- a/swarm/api/storage.go +++ b/swarm/api/storage.go @@ -72,7 +72,7 @@ func (s *Storage) Get(ctx context.Context, bzzpath string) (*Response, error) { return nil, err } quitC := make(chan bool) - expsize, err := reader.Size(quitC) + expsize, err := reader.Size(ctx, quitC) if err != nil { return nil, err } diff --git a/swarm/fuse/fuse_file.go b/swarm/fuse/fuse_file.go index be3b01c8c..ca04f737e 100644 --- a/swarm/fuse/fuse_file.go +++ b/swarm/fuse/fuse_file.go @@ -86,7 +86,7 @@ func (sf *SwarmFile) Attr(ctx context.Context, a *fuse.Attr) error { if sf.fileSize == -1 { reader, _ := sf.mountInfo.swarmApi.Retrieve(ctx, sf.addr) quitC := make(chan bool) - size, err := reader.Size(quitC) + size, err := reader.Size(ctx, quitC) if err != nil { log.Error("Couldnt get size of file %s : %v", sf.path, err) return err diff --git a/swarm/network/discovery.go b/swarm/network/discovery.go index c0868410c..55bf7c033 100644 --- a/swarm/network/discovery.go +++ b/swarm/network/discovery.go @@ -17,6 +17,7 @@ package network import ( + "context" "fmt" "sync" @@ -48,7 +49,7 @@ func newDiscovery(p *BzzPeer, o Overlay) *discPeer { } // HandleMsg is the message handler that delegates incoming messages -func (d *discPeer) HandleMsg(msg interface{}) error { +func (d *discPeer) HandleMsg(ctx context.Context, msg interface{}) error { switch msg := msg.(type) { case *peersMsg: @@ -99,14 +100,14 @@ func (d *discPeer) NotifyPeer(a OverlayAddr, po uint8) { resp := &peersMsg{ Peers: []*BzzAddr{ToAddr(a)}, } - go d.Send(resp) + go d.Send(context.TODO(), resp) } // NotifyDepth sends a subPeers Msg to the receiver notifying them about // a change in the depth of saturation func (d *discPeer) NotifyDepth(po uint8) { // log.Trace(fmt.Sprintf("%08x peer %08x notified of new depth %v", d.localAddr.Over()[:4], d.Address()[:4], po)) - go d.Send(&subPeersMsg{Depth: po}) + go d.Send(context.TODO(), &subPeersMsg{Depth: po}) } /* @@ -178,7 +179,7 @@ func (d *discPeer) handleSubPeersMsg(msg *subPeersMsg) error { }) if len(peers) > 0 { // log.Debug(fmt.Sprintf("%08x: %v peers sent to %v", d.overlay.BaseAddr(), len(peers), d)) - go d.Send(&peersMsg{Peers: peers}) + go d.Send(context.TODO(), &peersMsg{Peers: peers}) } } d.sentPeers = true diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go index 39673f5a1..8bf81fde6 100644 --- a/swarm/network/protocol.go +++ b/swarm/network/protocol.go @@ -82,9 +82,9 @@ type Peer interface { type Conn interface { ID() discover.NodeID // the key that uniquely identifies the Node for the peerPool Handshake(context.Context, interface{}, func(interface{}) error) (interface{}, error) // can send messages - Send(interface{}) error // can send messages + Send(context.Context, interface{}) error // can send messages Drop(error) // disconnect this peer - Run(func(interface{}) error) error // the run function to run a protocol + Run(func(context.Context, interface{}) error) error // the run function to run a protocol Off() OverlayAddr } diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 6a2c27401..4d55c6ee3 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -126,7 +126,7 @@ func NewStreamerService(ctx *adapters.ServiceContext) (node.Service, error) { return testRegistry, nil } -func defaultRetrieveFunc(id discover.NodeID) func(chunk *storage.Chunk) error { +func defaultRetrieveFunc(id discover.NodeID) func(ctx context.Context, chunk *storage.Chunk) error { return nil } @@ -217,14 +217,14 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore { } } -func (rrs *roundRobinStore) Get(addr storage.Address) (*storage.Chunk, error) { +func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (*storage.Chunk, error) { return nil, errors.New("get not well defined on round robin store") } -func (rrs *roundRobinStore) Put(chunk *storage.Chunk) { +func (rrs *roundRobinStore) Put(ctx context.Context, chunk *storage.Chunk) { i := atomic.AddUint32(&rrs.index, 1) idx := int(i) % len(rrs.stores) - rrs.stores[idx].Put(chunk) + rrs.stores[idx].Put(ctx, chunk) } func (rrs *roundRobinStore) Close() { @@ -369,8 +369,8 @@ func newTestExternalClient(db *storage.DBAPI) *testExternalClient { } } -func (c *testExternalClient) NeedData(hash []byte) func() { - chunk, _ := c.db.GetOrCreateRequest(hash) +func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() { + chunk, _ := c.db.GetOrCreateRequest(ctx, hash) if chunk.ReqC == nil { return nil } @@ -429,7 +429,7 @@ func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint6 return b, from, to, nil, nil } -func (s *testExternalServer) GetData([]byte) ([]byte, error) { +func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) { return make([]byte, 4096), nil } diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 75aabad6c..fa210e300 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -17,6 +17,7 @@ package stream import ( + "context" "errors" "time" @@ -25,7 +26,9 @@ import ( "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" + opentracing "github.com/opentracing/opentracing-go" ) const ( @@ -118,8 +121,8 @@ func (s *SwarmChunkServer) Close() { } // GetData retrives chunk data from db store -func (s *SwarmChunkServer) GetData(key []byte) ([]byte, error) { - chunk, err := s.db.Get(storage.Address(key)) +func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) { + chunk, err := s.db.Get(ctx, storage.Address(key)) if err == storage.ErrFetching { <-chunk.ReqC } else if err != nil { @@ -134,25 +137,37 @@ type RetrieveRequestMsg struct { SkipCheck bool } -func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) error { +func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *RetrieveRequestMsg) error { log.Trace("received request", "peer", sp.ID(), "hash", req.Addr) handleRetrieveRequestMsgCount.Inc(1) + var osp opentracing.Span + ctx, osp = spancontext.StartSpan( + ctx, + "retrieve.request") + defer osp.Finish() + s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", false)) if err != nil { return err } streamer := s.Server.(*SwarmChunkServer) - chunk, created := d.db.GetOrCreateRequest(req.Addr) + chunk, created := d.db.GetOrCreateRequest(ctx, req.Addr) if chunk.ReqC != nil { if created { - if err := d.RequestFromPeers(chunk.Addr[:], true, sp.ID()); err != nil { + if err := d.RequestFromPeers(ctx, chunk.Addr[:], true, sp.ID()); err != nil { log.Warn("unable to forward chunk request", "peer", sp.ID(), "key", chunk.Addr, "err", err) chunk.SetErrored(storage.ErrChunkForward) return nil } } go func() { + var osp opentracing.Span + ctx, osp = spancontext.StartSpan( + ctx, + "waiting.delivery") + defer osp.Finish() + t := time.NewTimer(10 * time.Minute) defer t.Stop() @@ -169,7 +184,7 @@ func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) e chunk.SetErrored(nil) if req.SkipCheck { - err := sp.Deliver(chunk, s.priority) + err := sp.Deliver(ctx, chunk, s.priority) if err != nil { log.Warn("ERROR in handleRetrieveRequestMsg, DROPPING peer!", "err", err) sp.Drop(err) @@ -185,7 +200,7 @@ func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) e if length := len(chunk.SData); length < 9 { log.Error("Chunk.SData to deliver is too short", "len(chunk.SData)", length, "address", chunk.Addr) } - return sp.Deliver(chunk, s.priority) + return sp.Deliver(ctx, chunk, s.priority) } streamer.deliveryC <- chunk.Addr[:] return nil @@ -197,7 +212,13 @@ type ChunkDeliveryMsg struct { peer *Peer // set in handleChunkDeliveryMsg } -func (d *Delivery) handleChunkDeliveryMsg(sp *Peer, req *ChunkDeliveryMsg) error { +func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { + var osp opentracing.Span + ctx, osp = spancontext.StartSpan( + ctx, + "chunk.delivery") + defer osp.Finish() + req.peer = sp d.receiveC <- req return nil @@ -209,7 +230,7 @@ R: processReceivedChunksCount.Inc(1) // this should be has locally - chunk, err := d.db.Get(req.Addr) + chunk, err := d.db.Get(context.TODO(), req.Addr) if err == nil { continue R } @@ -224,7 +245,7 @@ R: default: } chunk.SData = req.SData - d.db.Put(chunk) + d.db.Put(context.TODO(), chunk) go func(req *ChunkDeliveryMsg) { err := chunk.WaitToStore() @@ -236,10 +257,11 @@ R: } // RequestFromPeers sends a chunk retrieve request to -func (d *Delivery) RequestFromPeers(hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error { +func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error { var success bool var err error requestFromPeersCount.Inc(1) + d.overlay.EachConn(hash, 255, func(p network.OverlayConn, po int, nn bool) bool { spId := p.(network.Peer).ID() for _, p := range peersToSkip { @@ -253,8 +275,7 @@ func (d *Delivery) RequestFromPeers(hash []byte, skipCheck bool, peersToSkip ... log.Warn("Delivery.RequestFromPeers: peer not found", "id", spId) return true } - // TODO: skip light nodes that do not accept retrieve requests - err = sp.SendPriority(&RetrieveRequestMsg{ + err = sp.SendPriority(ctx, &RetrieveRequestMsg{ Addr: hash, SkipCheck: skipCheck, }, Top) diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index cd87557b1..f3da893a2 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -46,7 +46,7 @@ func TestStreamerRetrieveRequest(t *testing.T) { peerID := tester.IDs[0] - streamer.delivery.RequestFromPeers(hash0[:], true) + streamer.delivery.RequestFromPeers(context.TODO(), hash0[:], true) err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -80,7 +80,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { peer := streamer.getPeer(peerID) - peer.handleSubscribeMsg(&SubscribeMsg{ + peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ Stream: NewStream(swarmChunkServerStreamName, "", false), History: nil, Priority: Top, @@ -131,7 +131,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { stream := NewStream(swarmChunkServerStreamName, "", false) - peer.handleSubscribeMsg(&SubscribeMsg{ + peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ Stream: stream, History: nil, Priority: Top, @@ -140,7 +140,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { hash := storage.Address(hash0[:]) chunk := storage.NewChunk(hash, nil) chunk.SData = hash - localStore.Put(chunk) + localStore.Put(context.TODO(), chunk) chunk.WaitToStore() err = tester.TestExchanges(p2ptest.Exchange{ @@ -179,7 +179,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { hash = storage.Address(hash1[:]) chunk = storage.NewChunk(hash, nil) chunk.SData = hash1[:] - localStore.Put(chunk) + localStore.Put(context.TODO(), chunk) chunk.WaitToStore() err = tester.TestExchanges(p2ptest.Exchange{ @@ -234,7 +234,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { chunkKey := hash0[:] chunkData := hash1[:] - chunk, created := localStore.GetOrCreateRequest(chunkKey) + chunk, created := localStore.GetOrCreateRequest(context.TODO(), chunkKey) if !created { t.Fatal("chunk already exists") @@ -285,7 +285,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { case <-chunk.ReqC: } - storedChunk, err := localStore.Get(chunkKey) + storedChunk, err := localStore.Get(context.TODO(), chunkKey) if err != nil { t.Fatalf("Expected no error, got %v", err) } @@ -401,8 +401,8 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck } // create a retriever FileStore for the pivot node delivery := deliveries[sim.IDs[0]] - retrieveFunc := func(chunk *storage.Chunk) error { - return delivery.RequestFromPeers(chunk.Addr[:], skipCheck) + retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { + return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) } netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) @@ -617,8 +617,8 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip // create a retriever FileStore for the pivot node // by now deliveries are set for each node by the streamer service delivery := deliveries[sim.IDs[0]] - retrieveFunc := func(chunk *storage.Chunk) error { - return delivery.RequestFromPeers(chunk.Addr[:], skipCheck) + retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { + return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) } netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc) @@ -650,7 +650,7 @@ Loop: errs := make(chan error) for _, hash := range hashes { go func(h storage.Address) { - _, err := netStore.Get(h) + _, err := netStore.Get(ctx, h) log.Warn("test check netstore get", "hash", h, "err", err) errs <- err }(hash) diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index 5668a73e9..a19f63589 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -17,6 +17,7 @@ package stream import ( + "context" "errors" "fmt" "sync" @@ -25,7 +26,9 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" bv "github.com/ethereum/go-ethereum/swarm/network/bitvector" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" + opentracing "github.com/opentracing/opentracing-go" ) // Stream defines a unique stream identifier. @@ -71,17 +74,17 @@ type RequestSubscriptionMsg struct { Priority uint8 // delivered on priority channel } -func (p *Peer) handleRequestSubscription(req *RequestSubscriptionMsg) (err error) { +func (p *Peer) handleRequestSubscription(ctx context.Context, req *RequestSubscriptionMsg) (err error) { log.Debug(fmt.Sprintf("handleRequestSubscription: streamer %s to subscribe to %s with stream %s", p.streamer.addr.ID(), p.ID(), req.Stream)) return p.streamer.Subscribe(p.ID(), req.Stream, req.History, req.Priority) } -func (p *Peer) handleSubscribeMsg(req *SubscribeMsg) (err error) { +func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err error) { metrics.GetOrRegisterCounter("peer.handlesubscribemsg", nil).Inc(1) defer func() { if err != nil { - if e := p.Send(SubscribeErrorMsg{ + if e := p.Send(context.TODO(), SubscribeErrorMsg{ Error: err.Error(), }); e != nil { log.Error("send stream subscribe error message", "err", err) @@ -181,9 +184,15 @@ func (m OfferedHashesMsg) String() string { // handleOfferedHashesMsg protocol msg handler calls the incoming streamer interface // Filter method -func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error { +func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg) error { metrics.GetOrRegisterCounter("peer.handleofferedhashes", nil).Inc(1) + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + "handle.offered.hashes") + defer sp.Finish() + c, _, err := p.getOrSetClient(req.Stream, req.From, req.To) if err != nil { return err @@ -197,7 +206,7 @@ func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error { for i := 0; i < len(hashes); i += HashSize { hash := hashes[i : i+HashSize] - if wait := c.NeedData(hash); wait != nil { + if wait := c.NeedData(ctx, hash); wait != nil { want.Set(i/HashSize, true) wg.Add(1) // create request and wait until the chunk data arrives and is stored @@ -260,7 +269,7 @@ func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error { return } log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) - err := p.SendPriority(msg, c.priority) + err := p.SendPriority(ctx, msg, c.priority) if err != nil { log.Warn("SendPriority err, so dropping peer", "err", err) p.Drop(err) @@ -285,7 +294,7 @@ func (m WantedHashesMsg) String() string { // handleWantedHashesMsg protocol msg handler // * sends the next batch of unsynced keys // * sends the actual data chunks as per WantedHashesMsg -func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error { +func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) error { metrics.GetOrRegisterCounter("peer.handlewantedhashesmsg", nil).Inc(1) log.Trace("received wanted batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To) @@ -314,7 +323,7 @@ func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error { metrics.GetOrRegisterCounter("peer.handlewantedhashesmsg.actualget", nil).Inc(1) hash := hashes[i*HashSize : (i+1)*HashSize] - data, err := s.GetData(hash) + data, err := s.GetData(ctx, hash) if err != nil { return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err) } @@ -323,7 +332,7 @@ func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error { if length := len(chunk.SData); length < 9 { log.Error("Chunk.SData to sync is too short", "len(chunk.SData)", length, "address", chunk.Addr) } - if err := p.Deliver(chunk, s.priority); err != nil { + if err := p.Deliver(ctx, chunk, s.priority); err != nil { return err } } @@ -363,7 +372,7 @@ func (m TakeoverProofMsg) String() string { return fmt.Sprintf("Stream: '%v' [%v-%v], Root: %x, Sig: %x", m.Stream, m.Start, m.End, m.Root, m.Sig) } -func (p *Peer) handleTakeoverProofMsg(req *TakeoverProofMsg) error { +func (p *Peer) handleTakeoverProofMsg(ctx context.Context, req *TakeoverProofMsg) error { _, err := p.getServer(req.Stream) // store the strongest takeoverproof for the stream in streamer return err diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 29984a911..80b9ab711 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -27,8 +27,10 @@ import ( "github.com/ethereum/go-ethereum/swarm/log" pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" + opentracing "github.com/opentracing/opentracing-go" ) var sendTimeout = 30 * time.Second @@ -62,6 +64,11 @@ type Peer struct { quit chan struct{} } +type WrappedPriorityMsg struct { + Context context.Context + Msg interface{} +} + // NewPeer is the constructor for Peer func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { p := &Peer{ @@ -74,7 +81,10 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { quit: make(chan struct{}), } ctx, cancel := context.WithCancel(context.Background()) - go p.pq.Run(ctx, func(i interface{}) { p.Send(i) }) + go p.pq.Run(ctx, func(i interface{}) { + wmsg := i.(WrappedPriorityMsg) + p.Send(wmsg.Context, wmsg.Msg) + }) go func() { <-p.quit cancel() @@ -83,25 +93,41 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { } // Deliver sends a storeRequestMsg protocol message to the peer -func (p *Peer) Deliver(chunk *storage.Chunk, priority uint8) error { +func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8) error { + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + "send.chunk.delivery") + defer sp.Finish() + msg := &ChunkDeliveryMsg{ Addr: chunk.Addr, SData: chunk.SData, } - return p.SendPriority(msg, priority) + return p.SendPriority(ctx, msg, priority) } // SendPriority sends message to the peer using the outgoing priority queue -func (p *Peer) SendPriority(msg interface{}, priority uint8) error { +func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error { defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now()) metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1) - ctx, cancel := context.WithTimeout(context.Background(), sendTimeout) + cctx, cancel := context.WithTimeout(context.Background(), sendTimeout) defer cancel() - return p.pq.Push(ctx, msg, int(priority)) + wmsg := WrappedPriorityMsg{ + Context: ctx, + Msg: msg, + } + return p.pq.Push(cctx, wmsg, int(priority)) } // SendOfferedHashes sends OfferedHashesMsg protocol msg func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { + var sp opentracing.Span + ctx, sp := spancontext.StartSpan( + context.TODO(), + "send.offered.hashes") + defer sp.Finish() + hashes, from, to, proof, err := s.SetNextBatch(f, t) if err != nil { return err @@ -124,7 +150,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { Stream: s.stream, } log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to) - return p.SendPriority(msg, s.priority) + return p.SendPriority(ctx, msg, s.priority) } func (p *Peer) getServer(s Stream) (*server, error) { diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index da5253e8a..9961a0bc7 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -55,10 +55,10 @@ func initRetrievalTest() { //deliveries for each node deliveries = make(map[discover.NodeID]*Delivery) //global retrieve func - getRetrieveFunc = func(id discover.NodeID) func(chunk *storage.Chunk) error { - return func(chunk *storage.Chunk) error { + getRetrieveFunc = func(id discover.NodeID) func(ctx context.Context, chunk *storage.Chunk) error { + return func(ctx context.Context, chunk *storage.Chunk) error { skipCheck := true - return deliveries[id].RequestFromPeers(chunk.Addr[:], skipCheck) + return deliveries[id].RequestFromPeers(ctx, chunk.Addr[:], skipCheck) } } //registries, map of discover.NodeID to its streamer @@ -412,7 +412,7 @@ func runFileRetrievalTest(nodeCount int) error { for i, hash := range conf.hashes { reader, _ := fileStore.Retrieve(context.TODO(), hash) //check that we can read the file size and that it corresponds to the generated file size - if s, err := reader.Size(nil); err != nil || s != int64(len(randomFiles[i])) { + if s, err := reader.Size(context.TODO(), nil); err != nil || s != int64(len(randomFiles[i])) { allSuccess = false log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id) } else { @@ -699,7 +699,7 @@ func runRetrievalTest(chunkCount int, nodeCount int) error { for _, chnk := range conf.hashes { reader, _ := fileStore.Retrieve(context.TODO(), chnk) //assuming that reading the Size of the chunk is enough to know we found it - if s, err := reader.Size(nil); err != nil || s != chunkSize { + if s, err := reader.Size(context.TODO(), nil); err != nil || s != chunkSize { allSuccess = false log.Warn("Retrieve error", "err", err, "chunk", chnk, "nodeId", id) } else { diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index fd8863d43..0b5257c60 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -437,7 +437,7 @@ func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error { } else { //use the actual localstore lstore := stores[id] - _, err = lstore.Get(chunk) + _, err = lstore.Get(context.TODO(), chunk) } if err != nil { log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id)) diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 9b4658c51..56f242e91 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -32,8 +32,10 @@ import ( "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/pot" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" + opentracing "github.com/opentracing/opentracing-go" ) const ( @@ -235,7 +237,7 @@ func (r *Registry) RequestSubscription(peerId discover.NodeID, s Stream, h *Rang if e, ok := err.(*notFoundError); ok && e.t == "server" { // request subscription only if the server for this stream is not created log.Debug("RequestSubscription ", "peer", peerId, "stream", s, "history", h) - return peer.Send(&RequestSubscriptionMsg{ + return peer.Send(context.TODO(), &RequestSubscriptionMsg{ Stream: s, History: h, Priority: prio, @@ -285,7 +287,7 @@ func (r *Registry) Subscribe(peerId discover.NodeID, s Stream, h *Range, priorit } log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h) - return peer.SendPriority(msg, priority) + return peer.SendPriority(context.TODO(), msg, priority) } func (r *Registry) Unsubscribe(peerId discover.NodeID, s Stream) error { @@ -299,7 +301,7 @@ func (r *Registry) Unsubscribe(peerId discover.NodeID, s Stream) error { } log.Debug("Unsubscribe ", "peer", peerId, "stream", s) - if err := peer.Send(msg); err != nil { + if err := peer.Send(context.TODO(), msg); err != nil { return err } return peer.removeClient(s) @@ -320,11 +322,17 @@ func (r *Registry) Quit(peerId discover.NodeID, s Stream) error { } log.Debug("Quit ", "peer", peerId, "stream", s) - return peer.Send(msg) + return peer.Send(context.TODO(), msg) } -func (r *Registry) Retrieve(chunk *storage.Chunk) error { - return r.delivery.RequestFromPeers(chunk.Addr[:], r.skipCheck) +func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error { + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + "registry.retrieve") + defer sp.Finish() + + return r.delivery.RequestFromPeers(ctx, chunk.Addr[:], r.skipCheck) } func (r *Registry) NodeInfo() interface{} { @@ -460,11 +468,11 @@ func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error { } // HandleMsg is the message handler that delegates incoming messages -func (p *Peer) HandleMsg(msg interface{}) error { +func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { switch msg := msg.(type) { case *SubscribeMsg: - return p.handleSubscribeMsg(msg) + return p.handleSubscribeMsg(ctx, msg) case *SubscribeErrorMsg: return p.handleSubscribeErrorMsg(msg) @@ -473,22 +481,22 @@ func (p *Peer) HandleMsg(msg interface{}) error { return p.handleUnsubscribeMsg(msg) case *OfferedHashesMsg: - return p.handleOfferedHashesMsg(msg) + return p.handleOfferedHashesMsg(ctx, msg) case *TakeoverProofMsg: - return p.handleTakeoverProofMsg(msg) + return p.handleTakeoverProofMsg(ctx, msg) case *WantedHashesMsg: - return p.handleWantedHashesMsg(msg) + return p.handleWantedHashesMsg(ctx, msg) case *ChunkDeliveryMsg: - return p.streamer.delivery.handleChunkDeliveryMsg(p, msg) + return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, msg) case *RetrieveRequestMsg: - return p.streamer.delivery.handleRetrieveRequestMsg(p, msg) + return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) case *RequestSubscriptionMsg: - return p.handleRequestSubscription(msg) + return p.handleRequestSubscription(ctx, msg) case *QuitMsg: return p.handleQuitMsg(msg) @@ -508,7 +516,7 @@ type server struct { // Server interface for outgoing peer Streamer type Server interface { SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) - GetData([]byte) ([]byte, error) + GetData(context.Context, []byte) ([]byte, error) Close() } @@ -551,7 +559,7 @@ func (c client) NextInterval() (start, end uint64, err error) { // Client interface for incoming peer Streamer type Client interface { - NeedData([]byte) func() + NeedData(context.Context, []byte) func() BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) Close() } @@ -588,7 +596,7 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error if err != nil { return err } - if err := p.SendPriority(tp, c.priority); err != nil { + if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil { return err } if c.to > 0 && tp.Takeover.End >= c.to { diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 44622c995..7523860c9 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -18,6 +18,7 @@ package stream import ( "bytes" + "context" "testing" "time" @@ -79,7 +80,7 @@ func newTestClient(t string) *testClient { } } -func (self *testClient) NeedData(hash []byte) func() { +func (self *testClient) NeedData(ctx context.Context, hash []byte) func() { self.receivedHashes[string(hash)] = hash if bytes.Equal(hash, hash0[:]) { return func() { @@ -114,7 +115,7 @@ func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, ui return make([]byte, HashSize), from + 1, to + 1, nil, nil } -func (self *testServer) GetData([]byte) ([]byte, error) { +func (self *testServer) GetData(context.Context, []byte) ([]byte, error) { return nil, nil } diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index 5510b2409..d7febe4a3 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -17,6 +17,7 @@ package stream import ( + "context" "math" "strconv" "time" @@ -78,8 +79,8 @@ func (s *SwarmSyncerServer) Close() { } // GetSection retrieves the actual chunk from localstore -func (s *SwarmSyncerServer) GetData(key []byte) ([]byte, error) { - chunk, err := s.db.Get(storage.Address(key)) +func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) { + chunk, err := s.db.Get(ctx, storage.Address(key)) if err == storage.ErrFetching { <-chunk.ReqC } else if err != nil { @@ -210,8 +211,8 @@ func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) { } // NeedData -func (s *SwarmSyncerClient) NeedData(key []byte) (wait func()) { - chunk, _ := s.db.GetOrCreateRequest(key) +func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func()) { + chunk, _ := s.db.GetOrCreateRequest(ctx, key) // TODO: we may want to request from this peer anyway even if the request exists // ignoreExistingRequest is temporary commented out until its functionality is verified. diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index 5fea7befe..a3d53e648 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -231,7 +231,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck for j := i; j < nodes; j++ { total += len(hashes[j]) for _, key := range hashes[j] { - chunk, err := dbs[i].Get(key) + chunk, err := dbs[i].Get(ctx, key) if err == storage.ErrFetching { <-chunk.ReqC } else if err != nil { diff --git a/swarm/pss/ping.go b/swarm/pss/ping.go index 2ef072918..ff635f40a 100644 --- a/swarm/pss/ping.go +++ b/swarm/pss/ping.go @@ -19,6 +19,7 @@ package pss import ( + "context" "errors" "time" @@ -40,7 +41,7 @@ type Ping struct { InC chan bool // optional, report back to calling code } -func (p *Ping) pingHandler(msg interface{}) error { +func (p *Ping) pingHandler(ctx context.Context, msg interface{}) error { var pingmsg *PingMsg var ok bool if pingmsg, ok = msg.(*PingMsg); !ok { @@ -80,7 +81,7 @@ func NewPingProtocol(ping *Ping) *p2p.Protocol { for { select { case ispong := <-ping.OutC: - pp.Send(&PingMsg{ + pp.Send(context.TODO(), &PingMsg{ Created: time.Now(), Pong: ispong, }) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index dd081e93a..5c060b248 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -18,6 +18,7 @@ package pss import ( "bytes" + "context" "crypto/ecdsa" "crypto/rand" "errors" @@ -71,7 +72,7 @@ type senderPeer interface { Info() *p2p.PeerInfo ID() discover.NodeID Address() []byte - Send(interface{}) error + Send(context.Context, interface{}) error } // per-key peer related information @@ -344,7 +345,7 @@ func (p *Pss) getHandlers(topic Topic) map[*Handler]bool { // Check if address partially matches // If yes, it CAN be for us, and we process it // Only passes error to pss protocol handler if payload is not valid pssmsg -func (p *Pss) handlePssMsg(msg interface{}) error { +func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { metrics.GetOrRegisterCounter("pss.handlepssmsg", nil).Inc(1) pssmsg, ok := msg.(*PssMsg) @@ -844,7 +845,7 @@ func (p *Pss) forward(msg *PssMsg) error { p.fwdPoolMu.RUnlock() // attempt to send the message - err := pp.Send(msg) + err := pp.Send(context.TODO(), msg) if err != nil { metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1) log.Error(err.Error()) diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index c738247f1..41b03db28 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -334,7 +334,7 @@ func TestHandlerConditions(t *testing.T) { Data: []byte{0x66, 0x6f, 0x6f}, }, } - if err := ps.handlePssMsg(msg); err != nil { + if err := ps.handlePssMsg(context.TODO(), msg); err != nil { t.Fatal(err.Error()) } tmr := time.NewTimer(time.Millisecond * 100) @@ -351,7 +351,7 @@ func TestHandlerConditions(t *testing.T) { // message should pass and queue due to partial length msg.To = addr[0:1] msg.Payload.Data = []byte{0x78, 0x79, 0x80, 0x80, 0x79} - if err := ps.handlePssMsg(msg); err != nil { + if err := ps.handlePssMsg(context.TODO(), msg); err != nil { t.Fatal(err.Error()) } tmr.Reset(time.Millisecond * 100) @@ -374,7 +374,7 @@ func TestHandlerConditions(t *testing.T) { // full address mismatch should put message in queue msg.To[0] = 0xff - if err := ps.handlePssMsg(msg); err != nil { + if err := ps.handlePssMsg(context.TODO(), msg); err != nil { t.Fatal(err.Error()) } tmr.Reset(time.Millisecond * 10) @@ -397,7 +397,7 @@ func TestHandlerConditions(t *testing.T) { // expired message should be dropped msg.Expire = uint32(time.Now().Add(-time.Second).Unix()) - if err := ps.handlePssMsg(msg); err != nil { + if err := ps.handlePssMsg(context.TODO(), msg); err != nil { t.Fatal(err.Error()) } tmr.Reset(time.Millisecond * 10) @@ -417,7 +417,7 @@ func TestHandlerConditions(t *testing.T) { }{ pssMsg: &PssMsg{}, } - if err := ps.handlePssMsg(fckedupmsg); err == nil { + if err := ps.handlePssMsg(context.TODO(), fckedupmsg); err == nil { t.Fatalf("expected error from processMsg but error nil") } @@ -427,7 +427,7 @@ func TestHandlerConditions(t *testing.T) { ps.outbox <- msg } msg.Payload.Data = []byte{0x62, 0x61, 0x72} - err = ps.handlePssMsg(msg) + err = ps.handlePssMsg(context.TODO(), msg) if err == nil { t.Fatal("expected error when mailbox full, but was nil") } diff --git a/swarm/spancontext/spancontext.go b/swarm/spancontext/spancontext.go new file mode 100644 index 000000000..2cb9f82f7 --- /dev/null +++ b/swarm/spancontext/spancontext.go @@ -0,0 +1,49 @@ +package spancontext + +import ( + "context" + + opentracing "github.com/opentracing/opentracing-go" +) + +func WithContext(ctx context.Context, sctx opentracing.SpanContext) context.Context { + return context.WithValue(ctx, "span_context", sctx) +} + +func FromContext(ctx context.Context) opentracing.SpanContext { + sctx, ok := ctx.Value("span_context").(opentracing.SpanContext) + if ok { + return sctx + } + + return nil +} + +func StartSpan(ctx context.Context, name string) (context.Context, opentracing.Span) { + tracer := opentracing.GlobalTracer() + + sctx := FromContext(ctx) + + var sp opentracing.Span + if sctx != nil { + sp = tracer.StartSpan( + name, + opentracing.ChildOf(sctx)) + } else { + sp = tracer.StartSpan(name) + } + + nctx := context.WithValue(ctx, "span_context", sp.Context()) + + return nctx, sp +} + +func StartSpanFrom(name string, sctx opentracing.SpanContext) opentracing.Span { + tracer := opentracing.GlobalTracer() + + sp := tracer.StartSpan( + name, + opentracing.ChildOf(sctx)) + + return sp +} diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 2d197fefa..b9b502273 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -26,6 +26,9 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/spancontext" + opentracing "github.com/opentracing/opentracing-go" + olog "github.com/opentracing/opentracing-go/log" ) /* @@ -93,9 +96,12 @@ type JoinerParams struct { getter Getter // TODO: there is a bug, so depth can only be 0 today, see: https://github.com/ethersphere/go-ethereum/issues/344 depth int + ctx context.Context } type TreeChunker struct { + ctx context.Context + branches int64 hashFunc SwarmHasher dataSize int64 @@ -136,6 +142,7 @@ func TreeJoin(ctx context.Context, addr Address, getter Getter, depth int) *Lazy addr: addr, getter: getter, depth: depth, + ctx: ctx, } return NewTreeJoiner(jp).Join(ctx) @@ -174,6 +181,8 @@ func NewTreeJoiner(params *JoinerParams) *TreeChunker { tc.errC = make(chan error) tc.quitC = make(chan bool) + tc.ctx = params.ctx + return tc } @@ -351,7 +360,7 @@ func (tc *TreeChunker) runWorker() { return } - h, err := tc.putter.Put(job.chunk) + h, err := tc.putter.Put(tc.ctx, job.chunk) if err != nil { tc.errC <- err return @@ -371,6 +380,7 @@ func (tc *TreeChunker) Append() (Address, func(), error) { // LazyChunkReader implements LazySectionReader type LazyChunkReader struct { + Ctx context.Context key Address // root key chunkData ChunkData off int64 // offset @@ -389,16 +399,28 @@ func (tc *TreeChunker) Join(ctx context.Context) *LazyChunkReader { hashSize: tc.hashSize, depth: tc.depth, getter: tc.getter, + Ctx: tc.ctx, } } +func (r *LazyChunkReader) Context() context.Context { + return r.Ctx +} + // Size is meant to be called on the LazySectionReader -func (r *LazyChunkReader) Size(quitC chan bool) (n int64, err error) { +func (r *LazyChunkReader) Size(ctx context.Context, quitC chan bool) (n int64, err error) { metrics.GetOrRegisterCounter("lazychunkreader.size", nil).Inc(1) + var sp opentracing.Span + var cctx context.Context + cctx, sp = spancontext.StartSpan( + ctx, + "lcr.size") + defer sp.Finish() + log.Debug("lazychunkreader.size", "key", r.key) if r.chunkData == nil { - chunkData, err := r.getter.Get(Reference(r.key)) + chunkData, err := r.getter.Get(cctx, Reference(r.key)) if err != nil { return 0, err } @@ -421,12 +443,25 @@ func (r *LazyChunkReader) Size(quitC chan bool) (n int64, err error) { func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { metrics.GetOrRegisterCounter("lazychunkreader.readat", nil).Inc(1) + var sp opentracing.Span + var cctx context.Context + cctx, sp = spancontext.StartSpan( + r.Ctx, + "lcr.read") + defer sp.Finish() + + defer func() { + sp.LogFields( + olog.Int("off", int(off)), + olog.Int("read", read)) + }() + // this is correct, a swarm doc cannot be zero length, so no EOF is expected if len(b) == 0 { return 0, nil } quitC := make(chan bool) - size, err := r.Size(quitC) + size, err := r.Size(cctx, quitC) if err != nil { log.Error("lazychunkreader.readat.size", "size", size, "err", err) return 0, err @@ -449,7 +484,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { length *= r.chunkSize } wg.Add(1) - go r.join(b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC) + go r.join(cctx, b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC) go func() { wg.Wait() close(errC) @@ -467,7 +502,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { return len(b), nil } -func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { +func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { defer parentWg.Done() // find appropriate block level for chunkData.Size() < treeSize && depth > r.depth { @@ -514,7 +549,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS wg.Add(1) go func(j int64) { childKey := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize] - chunkData, err := r.getter.Get(Reference(childKey)) + chunkData, err := r.getter.Get(ctx, Reference(childKey)) if err != nil { log.Error("lazychunkreader.join", "key", fmt.Sprintf("%x", childKey), "err", err) select { @@ -533,7 +568,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS if soff < off { soff = off } - r.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC) + r.join(ctx, b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC) }(i) } //for } @@ -570,7 +605,7 @@ func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) { offset += r.off case 2: if r.chunkData == nil { //seek from the end requires rootchunk for size. call Size first - _, err := r.Size(nil) + _, err := r.Size(context.TODO(), nil) if err != nil { return 0, fmt.Errorf("can't get size: %v", err) } diff --git a/swarm/storage/chunker_test.go b/swarm/storage/chunker_test.go index 69c388b39..dbcc8700d 100644 --- a/swarm/storage/chunker_test.go +++ b/swarm/storage/chunker_test.go @@ -50,11 +50,11 @@ type fakeChunkStore struct { } // Put doesn't store anything it is just here to implement ChunkStore -func (f *fakeChunkStore) Put(*Chunk) { +func (f *fakeChunkStore) Put(context.Context, *Chunk) { } // Gut doesn't store anything it is just here to implement ChunkStore -func (f *fakeChunkStore) Get(Address) (*Chunk, error) { +func (f *fakeChunkStore) Get(context.Context, Address) (*Chunk, error) { return nil, errors.New("FakeChunkStore doesn't support Get") } @@ -281,7 +281,7 @@ func TestRandomBrokenData(t *testing.T) { } func benchReadAll(reader LazySectionReader) { - size, _ := reader.Size(nil) + size, _ := reader.Size(context.TODO(), nil) output := make([]byte, 1000) for pos := int64(0); pos < size; pos += 1000 { reader.ReadAt(output, pos) diff --git a/swarm/storage/chunkstore.go b/swarm/storage/chunkstore.go index ce95cd971..3b4d97a7a 100644 --- a/swarm/storage/chunkstore.go +++ b/swarm/storage/chunkstore.go @@ -16,7 +16,10 @@ package storage -import "sync" +import ( + "context" + "sync" +) /* ChunkStore interface is implemented by : @@ -28,8 +31,8 @@ ChunkStore interface is implemented by : - FakeChunkStore: dummy store which doesn't store anything just implements the interface */ type ChunkStore interface { - Put(*Chunk) // effectively there is no error even if there is an error - Get(Address) (*Chunk, error) + Put(context.Context, *Chunk) // effectively there is no error even if there is an error + Get(context.Context, Address) (*Chunk, error) Close() } @@ -45,14 +48,14 @@ func NewMapChunkStore() *MapChunkStore { } } -func (m *MapChunkStore) Put(chunk *Chunk) { +func (m *MapChunkStore) Put(ctx context.Context, chunk *Chunk) { m.mu.Lock() defer m.mu.Unlock() m.chunks[chunk.Addr.Hex()] = chunk chunk.markAsStored() } -func (m *MapChunkStore) Get(addr Address) (*Chunk, error) { +func (m *MapChunkStore) Get(ctx context.Context, addr Address) (*Chunk, error) { m.mu.RLock() defer m.mu.RUnlock() chunk := m.chunks[addr.Hex()] diff --git a/swarm/storage/common.go b/swarm/storage/common.go index d86cb6914..d6352820e 100644 --- a/swarm/storage/common.go +++ b/swarm/storage/common.go @@ -16,6 +16,7 @@ package storage import ( + "context" "sync" "github.com/ethereum/go-ethereum/swarm/log" @@ -37,7 +38,7 @@ func PutChunks(store *LocalStore, chunks ...*Chunk) { } }() for _, c := range chunks { - go store.Put(c) + go store.Put(context.TODO(), c) } wg.Wait() } diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go index c6e97d68f..dc1a3ab35 100644 --- a/swarm/storage/common_test.go +++ b/swarm/storage/common_test.go @@ -18,6 +18,7 @@ package storage import ( "bytes" + "context" "crypto/rand" "flag" "fmt" @@ -69,7 +70,7 @@ func mput(store ChunkStore, processors int, n int, f func(i int64) *Chunk) (hs [ for chunk := range c { wg.Add(1) chunk := chunk - store.Put(chunk) + store.Put(context.TODO(), chunk) go func() { defer wg.Done() <-chunk.dbStoredC @@ -103,7 +104,7 @@ func mget(store ChunkStore, hs []Address, f func(h Address, chunk *Chunk) error) for _, k := range hs { go func(h Address) { defer wg.Done() - chunk, err := store.Get(h) + chunk, err := store.Get(context.TODO(), h) if err != nil { errc <- err return diff --git a/swarm/storage/dbapi.go b/swarm/storage/dbapi.go index 24234b031..dd71752eb 100644 --- a/swarm/storage/dbapi.go +++ b/swarm/storage/dbapi.go @@ -16,6 +16,8 @@ package storage +import "context" + // wrapper of db-s to provide mockable custom local chunk store access to syncer type DBAPI struct { db *LDBStore @@ -27,8 +29,8 @@ func NewDBAPI(loc *LocalStore) *DBAPI { } // to obtain the chunks from address or request db entry only -func (d *DBAPI) Get(addr Address) (*Chunk, error) { - return d.loc.Get(addr) +func (d *DBAPI) Get(ctx context.Context, addr Address) (*Chunk, error) { + return d.loc.Get(ctx, addr) } // current storage counter of chunk db @@ -42,11 +44,11 @@ func (d *DBAPI) Iterator(from uint64, to uint64, po uint8, f func(Address, uint6 } // to obtain the chunks from address or request db entry only -func (d *DBAPI) GetOrCreateRequest(addr Address) (*Chunk, bool) { - return d.loc.GetOrCreateRequest(addr) +func (d *DBAPI) GetOrCreateRequest(ctx context.Context, addr Address) (*Chunk, bool) { + return d.loc.GetOrCreateRequest(ctx, addr) } // to obtain the chunks from key or request db entry only -func (d *DBAPI) Put(chunk *Chunk) { - d.loc.Put(chunk) +func (d *DBAPI) Put(ctx context.Context, chunk *Chunk) { + d.loc.Put(ctx, chunk) } diff --git a/swarm/storage/hasherstore.go b/swarm/storage/hasherstore.go index e18b66ddc..139c0ee03 100644 --- a/swarm/storage/hasherstore.go +++ b/swarm/storage/hasherstore.go @@ -74,7 +74,7 @@ func NewHasherStore(chunkStore ChunkStore, hashFunc SwarmHasher, toEncrypt bool) // Put stores the chunkData into the ChunkStore of the hasherStore and returns the reference. // If hasherStore has a chunkEncryption object, the data will be encrypted. // Asynchronous function, the data will not necessarily be stored when it returns. -func (h *hasherStore) Put(chunkData ChunkData) (Reference, error) { +func (h *hasherStore) Put(ctx context.Context, chunkData ChunkData) (Reference, error) { c := chunkData size := chunkData.Size() var encryptionKey encryption.Key @@ -87,7 +87,7 @@ func (h *hasherStore) Put(chunkData ChunkData) (Reference, error) { } chunk := h.createChunk(c, size) - h.storeChunk(chunk) + h.storeChunk(ctx, chunk) return Reference(append(chunk.Addr, encryptionKey...)), nil } @@ -95,14 +95,14 @@ func (h *hasherStore) Put(chunkData ChunkData) (Reference, error) { // Get returns data of the chunk with the given reference (retrieved from the ChunkStore of hasherStore). // If the data is encrypted and the reference contains an encryption key, it will be decrypted before // return. -func (h *hasherStore) Get(ref Reference) (ChunkData, error) { +func (h *hasherStore) Get(ctx context.Context, ref Reference) (ChunkData, error) { key, encryptionKey, err := parseReference(ref, h.hashSize) if err != nil { return nil, err } toDecrypt := (encryptionKey != nil) - chunk, err := h.store.Get(key) + chunk, err := h.store.Get(ctx, key) if err != nil { return nil, err } @@ -207,13 +207,13 @@ func (h *hasherStore) RefSize() int64 { return h.refSize } -func (h *hasherStore) storeChunk(chunk *Chunk) { +func (h *hasherStore) storeChunk(ctx context.Context, chunk *Chunk) { h.wg.Add(1) go func() { <-chunk.dbStoredC h.wg.Done() }() - h.store.Put(chunk) + h.store.Put(ctx, chunk) } func parseReference(ref Reference, hashSize int) (Address, encryption.Key, error) { diff --git a/swarm/storage/hasherstore_test.go b/swarm/storage/hasherstore_test.go index cf7b0dcc3..ddf1c39b0 100644 --- a/swarm/storage/hasherstore_test.go +++ b/swarm/storage/hasherstore_test.go @@ -47,13 +47,13 @@ func TestHasherStore(t *testing.T) { // Put two random chunks into the hasherStore chunkData1 := GenerateRandomChunk(int64(tt.chunkLength)).SData - key1, err := hasherStore.Put(chunkData1) + key1, err := hasherStore.Put(context.TODO(), chunkData1) if err != nil { t.Fatalf("Expected no error got \"%v\"", err) } chunkData2 := GenerateRandomChunk(int64(tt.chunkLength)).SData - key2, err := hasherStore.Put(chunkData2) + key2, err := hasherStore.Put(context.TODO(), chunkData2) if err != nil { t.Fatalf("Expected no error got \"%v\"", err) } @@ -67,7 +67,7 @@ func TestHasherStore(t *testing.T) { } // Get the first chunk - retrievedChunkData1, err := hasherStore.Get(key1) + retrievedChunkData1, err := hasherStore.Get(context.TODO(), key1) if err != nil { t.Fatalf("Expected no error, got \"%v\"", err) } @@ -78,7 +78,7 @@ func TestHasherStore(t *testing.T) { } // Get the second chunk - retrievedChunkData2, err := hasherStore.Get(key2) + retrievedChunkData2, err := hasherStore.Get(context.TODO(), key2) if err != nil { t.Fatalf("Expected no error, got \"%v\"", err) } @@ -105,7 +105,7 @@ func TestHasherStore(t *testing.T) { } // Check if chunk data in store is encrypted or not - chunkInStore, err := chunkStore.Get(hash1) + chunkInStore, err := chunkStore.Get(context.TODO(), hash1) if err != nil { t.Fatalf("Expected no error got \"%v\"", err) } diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index 178b1ebc4..7920ee767 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -25,6 +25,7 @@ package storage import ( "archive/tar" "bytes" + "context" "encoding/binary" "encoding/hex" "fmt" @@ -370,7 +371,7 @@ func (s *LDBStore) Import(in io.Reader) (int64, error) { key := Address(keybytes) chunk := NewChunk(key, nil) chunk.SData = data[32:] - s.Put(chunk) + s.Put(context.TODO(), chunk) wg.Add(1) go func() { defer wg.Done() @@ -499,7 +500,7 @@ func (s *LDBStore) CurrentStorageIndex() uint64 { return s.dataIdx } -func (s *LDBStore) Put(chunk *Chunk) { +func (s *LDBStore) Put(ctx context.Context, chunk *Chunk) { metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1) log.Trace("ldbstore.put", "key", chunk.Addr) @@ -639,7 +640,7 @@ func (s *LDBStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool { return true } -func (s *LDBStore) Get(addr Address) (chunk *Chunk, err error) { +func (s *LDBStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) { metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1) log.Trace("ldbstore.get", "key", addr) diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go index 2453d2f30..baf9e8c14 100644 --- a/swarm/storage/ldbstore_test.go +++ b/swarm/storage/ldbstore_test.go @@ -18,6 +18,7 @@ package storage import ( "bytes" + "context" "fmt" "io/ioutil" "os" @@ -157,7 +158,7 @@ func testDbStoreNotFound(t *testing.T, mock bool) { t.Fatalf("init dbStore failed: %v", err) } - _, err = db.Get(ZeroAddr) + _, err = db.Get(context.TODO(), ZeroAddr) if err != ErrChunkNotFound { t.Errorf("Expected ErrChunkNotFound, got %v", err) } @@ -188,7 +189,7 @@ func testIterator(t *testing.T, mock bool) { wg := &sync.WaitGroup{} wg.Add(len(chunks)) for i = 0; i < len(chunks); i++ { - db.Put(chunks[i]) + db.Put(context.TODO(), chunks[i]) chunkkeys[i] = chunks[i].Addr j := i go func() { @@ -299,7 +300,7 @@ func TestLDBStoreWithoutCollectGarbage(t *testing.T) { } for i := 0; i < n; i++ { - go ldb.Put(chunks[i]) + go ldb.Put(context.TODO(), chunks[i]) } // wait for all chunks to be stored @@ -310,7 +311,7 @@ func TestLDBStoreWithoutCollectGarbage(t *testing.T) { log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) for i := 0; i < n; i++ { - ret, err := ldb.Get(chunks[i].Addr) + ret, err := ldb.Get(context.TODO(), chunks[i].Addr) if err != nil { t.Fatal(err) } @@ -349,7 +350,7 @@ func TestLDBStoreCollectGarbage(t *testing.T) { } for i := 0; i < n; i++ { - ldb.Put(chunks[i]) + ldb.Put(context.TODO(), chunks[i]) } // wait for all chunks to be stored @@ -364,7 +365,7 @@ func TestLDBStoreCollectGarbage(t *testing.T) { var missing int for i := 0; i < n; i++ { - ret, err := ldb.Get(chunks[i].Addr) + ret, err := ldb.Get(context.TODO(), chunks[i].Addr) if err == ErrChunkNotFound || err == ldberrors.ErrNotFound { missing++ continue @@ -403,7 +404,7 @@ func TestLDBStoreAddRemove(t *testing.T) { } for i := 0; i < n; i++ { - go ldb.Put(chunks[i]) + go ldb.Put(context.TODO(), chunks[i]) } // wait for all chunks to be stored before continuing @@ -428,7 +429,7 @@ func TestLDBStoreAddRemove(t *testing.T) { log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) for i := 0; i < n; i++ { - ret, err := ldb.Get(chunks[i].Addr) + ret, err := ldb.Get(context.TODO(), chunks[i].Addr) if i%2 == 0 { // expect even chunks to be missing @@ -465,7 +466,7 @@ func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) { } for i := 0; i < n; i++ { - ldb.Put(chunks[i]) + ldb.Put(context.TODO(), chunks[i]) } // wait for all chunks to be stored before continuing @@ -494,7 +495,7 @@ func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) { n = 10 for i := 0; i < n; i++ { - ldb.Put(chunks[i]) + ldb.Put(context.TODO(), chunks[i]) } // wait for all chunks to be stored before continuing @@ -504,14 +505,14 @@ func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) { // expect for first chunk to be missing, because it has the smallest access value idx := 0 - ret, err := ldb.Get(chunks[idx].Addr) + ret, err := ldb.Get(context.TODO(), chunks[idx].Addr) if err == nil || ret != nil { t.Fatal("expected first chunk to be missing, but got no error") } // expect for last chunk to be present, as it has the largest access value idx = 9 - ret, err = ldb.Get(chunks[idx].Addr) + ret, err = ldb.Get(context.TODO(), chunks[idx].Addr) if err != nil { t.Fatalf("expected no error, but got %s", err) } diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go index 4c57086fa..096d150ae 100644 --- a/swarm/storage/localstore.go +++ b/swarm/storage/localstore.go @@ -17,6 +17,7 @@ package storage import ( + "context" "encoding/binary" "fmt" "path/filepath" @@ -96,7 +97,7 @@ func NewTestLocalStoreForAddr(params *LocalStoreParams) (*LocalStore, error) { // when the chunk is stored in memstore. // After the LDBStore.Put, it is ensured that the MemStore // contains the chunk with the same data, but nil ReqC channel. -func (ls *LocalStore) Put(chunk *Chunk) { +func (ls *LocalStore) Put(ctx context.Context, chunk *Chunk) { if l := len(chunk.SData); l < 9 { log.Debug("incomplete chunk data", "addr", chunk.Addr, "length", l) chunk.SetErrored(ErrChunkInvalid) @@ -123,7 +124,7 @@ func (ls *LocalStore) Put(chunk *Chunk) { chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) - memChunk, err := ls.memStore.Get(chunk.Addr) + memChunk, err := ls.memStore.Get(ctx, chunk.Addr) switch err { case nil: if memChunk.ReqC == nil { @@ -136,7 +137,7 @@ func (ls *LocalStore) Put(chunk *Chunk) { return } - ls.DbStore.Put(chunk) + ls.DbStore.Put(ctx, chunk) // chunk is no longer a request, but a chunk with data, so replace it in memStore newc := NewChunk(chunk.Addr, nil) @@ -144,7 +145,7 @@ func (ls *LocalStore) Put(chunk *Chunk) { newc.Size = chunk.Size newc.dbStoredC = chunk.dbStoredC - ls.memStore.Put(newc) + ls.memStore.Put(ctx, newc) if memChunk != nil && memChunk.ReqC != nil { close(memChunk.ReqC) @@ -155,15 +156,15 @@ func (ls *LocalStore) Put(chunk *Chunk) { // This method is blocking until the chunk is retrieved // so additional timeout may be needed to wrap this call if // ChunkStores are remote and can have long latency -func (ls *LocalStore) Get(addr Address) (chunk *Chunk, err error) { +func (ls *LocalStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) { ls.mu.Lock() defer ls.mu.Unlock() - return ls.get(addr) + return ls.get(ctx, addr) } -func (ls *LocalStore) get(addr Address) (chunk *Chunk, err error) { - chunk, err = ls.memStore.Get(addr) +func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk *Chunk, err error) { + chunk, err = ls.memStore.Get(ctx, addr) if err == nil { if chunk.ReqC != nil { select { @@ -177,25 +178,25 @@ func (ls *LocalStore) get(addr Address) (chunk *Chunk, err error) { return } metrics.GetOrRegisterCounter("localstore.get.cachemiss", nil).Inc(1) - chunk, err = ls.DbStore.Get(addr) + chunk, err = ls.DbStore.Get(ctx, addr) if err != nil { metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1) return } chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) - ls.memStore.Put(chunk) + ls.memStore.Put(ctx, chunk) return } // retrieve logic common for local and network chunk retrieval requests -func (ls *LocalStore) GetOrCreateRequest(addr Address) (chunk *Chunk, created bool) { +func (ls *LocalStore) GetOrCreateRequest(ctx context.Context, addr Address) (chunk *Chunk, created bool) { metrics.GetOrRegisterCounter("localstore.getorcreaterequest", nil).Inc(1) ls.mu.Lock() defer ls.mu.Unlock() var err error - chunk, err = ls.get(addr) + chunk, err = ls.get(ctx, addr) if err == nil && chunk.GetErrored() == nil { metrics.GetOrRegisterCounter("localstore.getorcreaterequest.hit", nil).Inc(1) log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v found locally", addr)) @@ -210,7 +211,7 @@ func (ls *LocalStore) GetOrCreateRequest(addr Address) (chunk *Chunk, created bo metrics.GetOrRegisterCounter("localstore.getorcreaterequest.miss", nil).Inc(1) log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v not found locally. open new request", addr)) chunk = NewChunk(addr, make(chan bool)) - ls.memStore.Put(chunk) + ls.memStore.Put(ctx, chunk) return chunk, true } diff --git a/swarm/storage/memstore.go b/swarm/storage/memstore.go index 7af31ffbd..55cfcbfea 100644 --- a/swarm/storage/memstore.go +++ b/swarm/storage/memstore.go @@ -19,6 +19,7 @@ package storage import ( + "context" "sync" lru "github.com/hashicorp/golang-lru" @@ -68,7 +69,7 @@ func NewMemStore(params *StoreParams, _ *LDBStore) (m *MemStore) { } } -func (m *MemStore) Get(addr Address) (*Chunk, error) { +func (m *MemStore) Get(ctx context.Context, addr Address) (*Chunk, error) { if m.disabled { return nil, ErrChunkNotFound } @@ -90,7 +91,7 @@ func (m *MemStore) Get(addr Address) (*Chunk, error) { return c.(*Chunk), nil } -func (m *MemStore) Put(c *Chunk) { +func (m *MemStore) Put(ctx context.Context, c *Chunk) { if m.disabled { return } diff --git a/swarm/storage/memstore_test.go b/swarm/storage/memstore_test.go index 5c68a4b4b..2c1b0e89e 100644 --- a/swarm/storage/memstore_test.go +++ b/swarm/storage/memstore_test.go @@ -17,6 +17,7 @@ package storage import ( + "context" "crypto/rand" "encoding/binary" "io/ioutil" @@ -72,7 +73,7 @@ func TestMemStoreNotFound(t *testing.T) { m := newTestMemStore() defer m.Close() - _, err := m.Get(ZeroAddr) + _, err := m.Get(context.TODO(), ZeroAddr) if err != ErrChunkNotFound { t.Errorf("Expected ErrChunkNotFound, got %v", err) } @@ -187,8 +188,8 @@ func TestMemStoreAndLDBStore(t *testing.T) { } for i := 0; i < tt.n; i++ { - go ldb.Put(chunks[i]) - memStore.Put(chunks[i]) + go ldb.Put(context.TODO(), chunks[i]) + memStore.Put(context.TODO(), chunks[i]) if got := memStore.cache.Len(); got > cacheCap { t.Fatalf("expected to get cache capacity less than %v, but got %v", cacheCap, got) @@ -200,10 +201,10 @@ func TestMemStoreAndLDBStore(t *testing.T) { } for i := 0; i < tt.n; i++ { - _, err := memStore.Get(chunks[i].Addr) + _, err := memStore.Get(context.TODO(), chunks[i].Addr) if err != nil { if err == ErrChunkNotFound { - _, err := ldb.Get(chunks[i].Addr) + _, err := ldb.Get(context.TODO(), chunks[i].Addr) if err != nil { t.Fatalf("couldn't get chunk %v from ldb, got error: %v", i, err) } diff --git a/swarm/storage/mru/resource.go b/swarm/storage/mru/resource.go index 1e92a5e92..4f5a4f44c 100644 --- a/swarm/storage/mru/resource.go +++ b/swarm/storage/mru/resource.go @@ -125,6 +125,10 @@ type resource struct { updated time.Time } +func (r *resource) Context() context.Context { + return context.TODO() +} + // TODO Expire content after a defined period (to force resync) func (r *resource) isSynced() bool { return !r.updated.IsZero() @@ -134,7 +138,7 @@ func (r *resource) NameHash() common.Hash { return r.nameHash } -func (r *resource) Size(chan bool) (int64, error) { +func (r *resource) Size(context.Context, chan bool) (int64, error) { if !r.isSynced() { return 0, NewError(ErrNotSynced, "Not synced") } @@ -413,7 +417,7 @@ func (h *Handler) New(ctx context.Context, name string, frequency uint64) (stora chunk := h.newMetaChunk(name, currentblock, frequency) - h.chunkStore.Put(chunk) + h.chunkStore.Put(ctx, chunk) log.Debug("new resource", "name", name, "key", nameHash, "startBlock", currentblock, "frequency", frequency) // create the internal index for the resource and populate it with the data of the first version @@ -593,7 +597,7 @@ func (h *Handler) lookup(rsrc *resource, period uint32, version uint32, refresh return nil, NewError(ErrPeriodDepth, fmt.Sprintf("Lookup exceeded max period hops (%d)", maxLookup.Max)) } key := h.resourceHash(period, version, rsrc.nameHash) - chunk, err := h.chunkStore.GetWithTimeout(key, defaultRetrieveTimeout) + chunk, err := h.chunkStore.GetWithTimeout(context.TODO(), key, defaultRetrieveTimeout) if err == nil { if specificversion { return h.updateIndex(rsrc, chunk) @@ -603,7 +607,7 @@ func (h *Handler) lookup(rsrc *resource, period uint32, version uint32, refresh for { newversion := version + 1 key := h.resourceHash(period, newversion, rsrc.nameHash) - newchunk, err := h.chunkStore.GetWithTimeout(key, defaultRetrieveTimeout) + newchunk, err := h.chunkStore.GetWithTimeout(context.TODO(), key, defaultRetrieveTimeout) if err != nil { return h.updateIndex(rsrc, chunk) } @@ -621,8 +625,8 @@ func (h *Handler) lookup(rsrc *resource, period uint32, version uint32, refresh // Retrieves a resource metadata chunk and creates/updates the index entry for it // with the resulting metadata -func (h *Handler) Load(addr storage.Address) (*resource, error) { - chunk, err := h.chunkStore.GetWithTimeout(addr, defaultRetrieveTimeout) +func (h *Handler) Load(ctx context.Context, addr storage.Address) (*resource, error) { + chunk, err := h.chunkStore.GetWithTimeout(ctx, addr, defaultRetrieveTimeout) if err != nil { return nil, NewError(ErrNotFound, err.Error()) } @@ -890,7 +894,7 @@ func (h *Handler) update(ctx context.Context, name string, data []byte, multihas chunk := newUpdateChunk(key, signature, nextperiod, version, name, data, datalength) // send the chunk - h.chunkStore.Put(chunk) + h.chunkStore.Put(ctx, chunk) log.Trace("resource update", "name", name, "key", key, "currentblock", currentblock, "lastperiod", nextperiod, "version", version, "data", chunk.SData, "multihash", multihash) // update our resources map entry and return the new key diff --git a/swarm/storage/mru/resource_test.go b/swarm/storage/mru/resource_test.go index aa1860359..48387d981 100644 --- a/swarm/storage/mru/resource_test.go +++ b/swarm/storage/mru/resource_test.go @@ -182,7 +182,7 @@ func TestHandler(t *testing.T) { t.Fatal(err) } - chunk, err := rh.chunkStore.Get(storage.Address(rootChunkKey)) + chunk, err := rh.chunkStore.Get(context.TODO(), storage.Address(rootChunkKey)) if err != nil { t.Fatal(err) } else if len(chunk.SData) < 16 { @@ -256,7 +256,7 @@ func TestHandler(t *testing.T) { if err != nil { t.Fatal(err) } - rsrc2, err := rh2.Load(rootChunkKey) + rsrc2, err := rh2.Load(context.TODO(), rootChunkKey) _, err = rh2.LookupLatest(ctx, nameHash, true, nil) if err != nil { t.Fatal(err) @@ -754,7 +754,7 @@ func newTestSigner() (*GenericSigner, error) { } func getUpdateDirect(rh *Handler, addr storage.Address) ([]byte, error) { - chunk, err := rh.chunkStore.Get(addr) + chunk, err := rh.chunkStore.Get(context.TODO(), addr) if err != nil { return nil, err } diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go index 6a205cfa4..96a7e51f7 100644 --- a/swarm/storage/netstore.go +++ b/swarm/storage/netstore.go @@ -17,9 +17,12 @@ package storage import ( + "context" "time" "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/spancontext" + opentracing "github.com/opentracing/opentracing-go" ) var ( @@ -43,10 +46,10 @@ var ( // access by calling network is blocking with a timeout type NetStore struct { localStore *LocalStore - retrieve func(chunk *Chunk) error + retrieve func(ctx context.Context, chunk *Chunk) error } -func NewNetStore(localStore *LocalStore, retrieve func(chunk *Chunk) error) *NetStore { +func NewNetStore(localStore *LocalStore, retrieve func(ctx context.Context, chunk *Chunk) error) *NetStore { return &NetStore{localStore, retrieve} } @@ -56,7 +59,14 @@ func NewNetStore(localStore *LocalStore, retrieve func(chunk *Chunk) error) *Net // Get uses get method to retrieve request, but retries if the // ErrChunkNotFound is returned by get, until the netStoreRetryTimeout // is reached. -func (ns *NetStore) Get(addr Address) (chunk *Chunk, err error) { +func (ns *NetStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) { + + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + "netstore.get.global") + defer sp.Finish() + timer := time.NewTimer(netStoreRetryTimeout) defer timer.Stop() @@ -84,7 +94,7 @@ func (ns *NetStore) Get(addr Address) (chunk *Chunk, err error) { defer limiter.Stop() for { - chunk, err := ns.get(addr, 0) + chunk, err := ns.get(ctx, addr, 0) if err != ErrChunkNotFound { // break retry only if the error is nil // or other error then ErrChunkNotFound @@ -122,16 +132,23 @@ func (ns *NetStore) Get(addr Address) (chunk *Chunk, err error) { } // GetWithTimeout makes a single retrieval attempt for a chunk with a explicit timeout parameter -func (ns *NetStore) GetWithTimeout(addr Address, timeout time.Duration) (chunk *Chunk, err error) { - return ns.get(addr, timeout) +func (ns *NetStore) GetWithTimeout(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) { + return ns.get(ctx, addr, timeout) } -func (ns *NetStore) get(addr Address, timeout time.Duration) (chunk *Chunk, err error) { +func (ns *NetStore) get(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) { if timeout == 0 { timeout = searchTimeout } + + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + "netstore.get") + defer sp.Finish() + if ns.retrieve == nil { - chunk, err = ns.localStore.Get(addr) + chunk, err = ns.localStore.Get(ctx, addr) if err == nil { return chunk, nil } @@ -140,14 +157,14 @@ func (ns *NetStore) get(addr Address, timeout time.Duration) (chunk *Chunk, err } } else { var created bool - chunk, created = ns.localStore.GetOrCreateRequest(addr) + chunk, created = ns.localStore.GetOrCreateRequest(ctx, addr) if chunk.ReqC == nil { return chunk, nil } if created { - err := ns.retrieve(chunk) + err := ns.retrieve(ctx, chunk) if err != nil { // mark chunk request as failed so that we can retry it later chunk.SetErrored(ErrChunkUnavailable) @@ -171,8 +188,8 @@ func (ns *NetStore) get(addr Address, timeout time.Duration) (chunk *Chunk, err } // Put is the entrypoint for local store requests coming from storeLoop -func (ns *NetStore) Put(chunk *Chunk) { - ns.localStore.Put(chunk) +func (ns *NetStore) Put(ctx context.Context, chunk *Chunk) { + ns.localStore.Put(ctx, chunk) } // Close chunk store diff --git a/swarm/storage/netstore_test.go b/swarm/storage/netstore_test.go index 432a799d8..7babbf5e0 100644 --- a/swarm/storage/netstore_test.go +++ b/swarm/storage/netstore_test.go @@ -17,6 +17,7 @@ package storage import ( + "context" "encoding/hex" "errors" "io/ioutil" @@ -46,7 +47,7 @@ func newDummyChunk(addr Address) *Chunk { return chunk } -func (m *mockRetrieve) retrieve(chunk *Chunk) error { +func (m *mockRetrieve) retrieve(ctx context.Context, chunk *Chunk) error { hkey := hex.EncodeToString(chunk.Addr) m.requests[hkey] += 1 @@ -100,7 +101,7 @@ func TestNetstoreFailedRequest(t *testing.T) { // } // second call - _, err = netStore.Get(key) + _, err = netStore.Get(context.TODO(), key) if got := r.requests[hex.EncodeToString(key)]; got != 2 { t.Fatalf("expected to have called retrieve two times, but got: %v", got) } @@ -109,7 +110,7 @@ func TestNetstoreFailedRequest(t *testing.T) { } // third call - chunk, err := netStore.Get(key) + chunk, err := netStore.Get(context.TODO(), key) if got := r.requests[hex.EncodeToString(key)]; got != 3 { t.Fatalf("expected to have called retrieve three times, but got: %v", got) } diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go index 6643e989a..2923c81c5 100644 --- a/swarm/storage/pyramid.go +++ b/swarm/storage/pyramid.go @@ -287,7 +287,7 @@ func (pc *PyramidChunker) processor(id int64) { func (pc *PyramidChunker) processChunk(id int64, job *chunkJob) { log.Debug("pyramid.chunker: processChunk()", "id", id) - ref, err := pc.putter.Put(job.chunk) + ref, err := pc.putter.Put(context.TODO(), job.chunk) if err != nil { pc.errC <- err } @@ -302,7 +302,7 @@ func (pc *PyramidChunker) processChunk(id int64, job *chunkJob) { func (pc *PyramidChunker) loadTree() error { log.Debug("pyramid.chunker: loadTree()") // Get the root chunk to get the total size - chunkData, err := pc.getter.Get(Reference(pc.key)) + chunkData, err := pc.getter.Get(context.TODO(), Reference(pc.key)) if err != nil { return errLoadingTreeRootChunk } @@ -355,7 +355,7 @@ func (pc *PyramidChunker) loadTree() error { branchCount = int64(len(ent.chunk)-8) / pc.hashSize for i := int64(0); i < branchCount; i++ { key := ent.chunk[8+(i*pc.hashSize) : 8+((i+1)*pc.hashSize)] - newChunkData, err := pc.getter.Get(Reference(key)) + newChunkData, err := pc.getter.Get(context.TODO(), Reference(key)) if err != nil { return errLoadingTreeChunk } @@ -417,7 +417,7 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { lastKey := parent.chunk[8+lastBranch*pc.hashSize : 8+(lastBranch+1)*pc.hashSize] var err error - unfinishedChunkData, err = pc.getter.Get(lastKey) + unfinishedChunkData, err = pc.getter.Get(context.TODO(), lastKey) if err != nil { pc.errC <- err } diff --git a/swarm/storage/types.go b/swarm/storage/types.go index 32880ead7..3114ef576 100644 --- a/swarm/storage/types.go +++ b/swarm/storage/types.go @@ -250,7 +250,8 @@ func GenerateRandomChunks(dataSize int64, count int) (chunks []*Chunk) { // Size, Seek, Read, ReadAt type LazySectionReader interface { - Size(chan bool) (int64, error) + Context() context.Context + Size(context.Context, chan bool) (int64, error) io.Seeker io.Reader io.ReaderAt @@ -260,10 +261,14 @@ type LazyTestSectionReader struct { *io.SectionReader } -func (r *LazyTestSectionReader) Size(chan bool) (int64, error) { +func (r *LazyTestSectionReader) Size(context.Context, chan bool) (int64, error) { return r.SectionReader.Size(), nil } +func (r *LazyTestSectionReader) Context() context.Context { + return context.TODO() +} + type StoreParams struct { Hash SwarmHasher `toml:"-"` DbCapacity uint64 @@ -298,7 +303,7 @@ type Reference []byte // Putter is responsible to store data and create a reference for it type Putter interface { - Put(ChunkData) (Reference, error) + Put(context.Context, ChunkData) (Reference, error) // RefSize returns the length of the Reference created by this Putter RefSize() int64 // Close is to indicate that no more chunk data will be Put on this Putter @@ -309,7 +314,7 @@ type Putter interface { // Getter is an interface to retrieve a chunk's data by its reference type Getter interface { - Get(Reference) (ChunkData, error) + Get(context.Context, Reference) (ChunkData, error) } // NOTE: this returns invalid data if chunk is encrypted diff --git a/swarm/swarm.go b/swarm/swarm.go index 431d57a70..bf8bcdbd5 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -21,6 +21,7 @@ import ( "context" "crypto/ecdsa" "fmt" + "io" "math/big" "net" "path/filepath" @@ -50,6 +51,7 @@ import ( "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/mock" "github.com/ethereum/go-ethereum/swarm/storage/mru" + "github.com/ethereum/go-ethereum/swarm/tracing" ) var ( @@ -76,6 +78,8 @@ type Swarm struct { lstore *storage.LocalStore // local store, needs to store for releasing resources after node stopped sfs *fuse.SwarmFS // need this to cleanup all the active mounts on node exit ps *pss.Pss + + tracerClose io.Closer } type SwarmAPI struct { @@ -356,6 +360,8 @@ Start is called when the stack is started func (self *Swarm) Start(srv *p2p.Server) error { startTime = time.Now() + self.tracerClose = tracing.Closer + // update uaddr to correct enode newaddr := self.bzz.UpdateLocalAddr([]byte(srv.Self().String())) log.Warn("Updated bzz local addr", "oaddr", fmt.Sprintf("%x", newaddr.OAddr), "uaddr", fmt.Sprintf("%s", newaddr.UAddr)) @@ -424,6 +430,13 @@ func (self *Swarm) updateGauges() { // implements the node.Service interface // stops all component services. func (self *Swarm) Stop() error { + if self.tracerClose != nil { + err := self.tracerClose.Close() + if err != nil { + return err + } + } + if self.ps != nil { self.ps.Stop() } diff --git a/swarm/tracing/tracing.go b/swarm/tracing/tracing.go new file mode 100644 index 000000000..b84cfb310 --- /dev/null +++ b/swarm/tracing/tracing.go @@ -0,0 +1,103 @@ +package tracing + +import ( + "io" + "os" + "strings" + "time" + + "github.com/ethereum/go-ethereum/log" + jaeger "github.com/uber/jaeger-client-go" + jaegercfg "github.com/uber/jaeger-client-go/config" + jaegerlog "github.com/uber/jaeger-client-go/log" + cli "gopkg.in/urfave/cli.v1" +) + +var Enabled bool = false + +// TracingEnabledFlag is the CLI flag name to use to enable trace collections. +const TracingEnabledFlag = "tracing" + +var ( + Closer io.Closer +) + +var ( + TracingFlag = cli.BoolFlag{ + Name: TracingEnabledFlag, + Usage: "Enable tracing", + } + TracingEndpointFlag = cli.StringFlag{ + Name: "tracing.endpoint", + Usage: "Tracing endpoint", + Value: "0.0.0.0:6831", + } + TracingSvcFlag = cli.StringFlag{ + Name: "tracing.svc", + Usage: "Tracing service name", + Value: "swarm", + } +) + +// Flags holds all command-line flags required for tracing collection. +var Flags = []cli.Flag{ + TracingFlag, + TracingEndpointFlag, + TracingSvcFlag, +} + +// Init enables or disables the open tracing system. +func init() { + for _, arg := range os.Args { + if flag := strings.TrimLeft(arg, "-"); flag == TracingEnabledFlag { + Enabled = true + } + } +} + +func Setup(ctx *cli.Context) { + if Enabled { + log.Info("Enabling opentracing") + var ( + endpoint = ctx.GlobalString(TracingEndpointFlag.Name) + svc = ctx.GlobalString(TracingSvcFlag.Name) + ) + + Closer = initTracer(endpoint, svc) + } +} + +func initTracer(endpoint, svc string) (closer io.Closer) { + // Sample configuration for testing. Use constant sampling to sample every trace + // and enable LogSpan to log every span via configured Logger. + cfg := jaegercfg.Configuration{ + Sampler: &jaegercfg.SamplerConfig{ + Type: jaeger.SamplerTypeConst, + Param: 1, + }, + Reporter: &jaegercfg.ReporterConfig{ + LogSpans: true, + BufferFlushInterval: 1 * time.Second, + LocalAgentHostPort: endpoint, + }, + } + + // Example logger and metrics factory. Use github.com/uber/jaeger-client-go/log + // and github.com/uber/jaeger-lib/metrics respectively to bind to real logging and metrics + // frameworks. + jLogger := jaegerlog.StdLogger + //jMetricsFactory := metrics.NullFactory + + // Initialize tracer with a logger and a metrics factory + closer, err := cfg.InitGlobalTracer( + svc, + jaegercfg.Logger(jLogger), + //jaegercfg.Metrics(jMetricsFactory), + //jaegercfg.Observer(rpcmetrics.NewObserver(jMetricsFactory, rpcmetrics.DefaultNameNormalizer)), + ) + if err != nil { + log.Error("Could not initialize Jaeger tracer", "err", err) + } + + return closer +} |