aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network
diff options
context:
space:
mode:
authorBalint Gabor <balint.g@gmail.com>2018-09-13 17:42:19 +0800
committerGitHub <noreply@github.com>2018-09-13 17:42:19 +0800
commit3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e (patch)
tree62a2896b3b824449595272f0b92dda877ba1c58d /swarm/network
parentff3a5d24d2e40fd66f7813173e9cfc31144f3c53 (diff)
downloadgo-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.gz
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.bz2
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.lz
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.xz
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.zst
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.zip
swarm: Chunk refactor (#17659)
Co-authored-by: Janos Guljas <janos@resenje.org> Co-authored-by: Balint Gabor <balint.g@gmail.com> Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com> Co-authored-by: Viktor TrĂ³n <viktor.tron@gmail.com>
Diffstat (limited to 'swarm/network')
-rw-r--r--swarm/network/fetcher.go305
-rw-r--r--swarm/network/fetcher_test.go459
-rw-r--r--swarm/network/priorityqueue/priorityqueue.go38
-rw-r--r--swarm/network/priorityqueue/priorityqueue_test.go6
-rw-r--r--swarm/network/simulation/simulation.go16
-rw-r--r--swarm/network/stream/common_test.go17
-rw-r--r--swarm/network/stream/delivery.go231
-rw-r--r--swarm/network/stream/delivery_test.go93
-rw-r--r--swarm/network/stream/intervals_test.go78
-rw-r--r--swarm/network/stream/messages.go87
-rw-r--r--swarm/network/stream/peer.go51
-rw-r--r--swarm/network/stream/snapshot_retrieval_test.go46
-rw-r--r--swarm/network/stream/snapshot_sync_test.go170
-rw-r--r--swarm/network/stream/stream.go28
-rw-r--r--swarm/network/stream/streamer_test.go8
-rw-r--r--swarm/network/stream/syncer.go94
-rw-r--r--swarm/network/stream/syncer_test.go32
17 files changed, 1293 insertions, 466 deletions
diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go
new file mode 100644
index 000000000..35e2f0132
--- /dev/null
+++ b/swarm/network/fetcher.go
@@ -0,0 +1,305 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package network
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+var searchTimeout = 1 * time.Second
+
+// Time to consider peer to be skipped.
+// Also used in stream delivery.
+var RequestTimeout = 10 * time.Second
+
+type RequestFunc func(context.Context, *Request) (*discover.NodeID, chan struct{}, error)
+
+// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and
+// keeps it alive until all active requests are completed. This can happen:
+// 1. either because the chunk is delivered
+// 2. or becuse the requestor cancelled/timed out
+// Fetcher self destroys itself after it is completed.
+// TODO: cancel all forward requests after termination
+type Fetcher struct {
+ protoRequestFunc RequestFunc // request function fetcher calls to issue retrieve request for a chunk
+ addr storage.Address // the address of the chunk to be fetched
+ offerC chan *discover.NodeID // channel of sources (peer node id strings)
+ requestC chan struct{}
+ skipCheck bool
+}
+
+type Request struct {
+ Addr storage.Address // chunk address
+ Source *discover.NodeID // nodeID of peer to request from (can be nil)
+ SkipCheck bool // whether to offer the chunk first or deliver directly
+ peersToSkip *sync.Map // peers not to request chunk from (only makes sense if source is nil)
+}
+
+// NewRequest returns a new instance of Request based on chunk address skip check and
+// a map of peers to skip.
+func NewRequest(addr storage.Address, skipCheck bool, peersToSkip *sync.Map) *Request {
+ return &Request{
+ Addr: addr,
+ SkipCheck: skipCheck,
+ peersToSkip: peersToSkip,
+ }
+}
+
+// SkipPeer returns if the peer with nodeID should not be requested to deliver a chunk.
+// Peers to skip are kept per Request and for a time period of RequestTimeout.
+// This function is used in stream package in Delivery.RequestFromPeers to optimize
+// requests for chunks.
+func (r *Request) SkipPeer(nodeID string) bool {
+ val, ok := r.peersToSkip.Load(nodeID)
+ if !ok {
+ return false
+ }
+ t, ok := val.(time.Time)
+ if ok && time.Now().After(t.Add(RequestTimeout)) {
+ // deadine expired
+ r.peersToSkip.Delete(nodeID)
+ return false
+ }
+ return true
+}
+
+// FetcherFactory is initialised with a request function and can create fetchers
+type FetcherFactory struct {
+ request RequestFunc
+ skipCheck bool
+}
+
+// NewFetcherFactory takes a request function and skip check parameter and creates a FetcherFactory
+func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory {
+ return &FetcherFactory{
+ request: request,
+ skipCheck: skipCheck,
+ }
+}
+
+// New contructs a new Fetcher, for the given chunk. All peers in peersToSkip are not requested to
+// deliver the given chunk. peersToSkip should always contain the peers which are actively requesting
+// this chunk, to make sure we don't request back the chunks from them.
+// The created Fetcher is started and returned.
+func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher {
+ fetcher := NewFetcher(source, f.request, f.skipCheck)
+ go fetcher.run(ctx, peersToSkip)
+ return fetcher
+}
+
+// NewFetcher creates a new Fetcher for the given chunk address using the given request function.
+func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
+ return &Fetcher{
+ addr: addr,
+ protoRequestFunc: rf,
+ offerC: make(chan *discover.NodeID),
+ requestC: make(chan struct{}),
+ skipCheck: skipCheck,
+ }
+}
+
+// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally.
+func (f *Fetcher) Offer(ctx context.Context, source *discover.NodeID) {
+ // First we need to have this select to make sure that we return if context is done
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
+ // This select alone would not guarantee that we return of context is done, it could potentially
+ // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
+ select {
+ case f.offerC <- source:
+ case <-ctx.Done():
+ }
+}
+
+// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
+func (f *Fetcher) Request(ctx context.Context) {
+ // First we need to have this select to make sure that we return if context is done
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
+ // This select alone would not guarantee that we return of context is done, it could potentially
+ // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
+ select {
+ case f.requestC <- struct{}{}:
+ case <-ctx.Done():
+ }
+}
+
+// start prepares the Fetcher
+// it keeps the Fetcher alive within the lifecycle of the passed context
+func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
+ var (
+ doRequest bool // determines if retrieval is initiated in the current iteration
+ wait *time.Timer // timer for search timeout
+ waitC <-chan time.Time // timer channel
+ sources []*discover.NodeID // known sources, ie. peers that offered the chunk
+ requested bool // true if the chunk was actually requested
+ )
+ gone := make(chan *discover.NodeID) // channel to signal that a peer we requested from disconnected
+
+ // loop that keeps the fetching process alive
+ // after every request a timer is set. If this goes off we request again from another peer
+ // note that the previous request is still alive and has the chance to deliver, so
+ // rerequesting extends the search. ie.,
+ // if a peer we requested from is gone we issue a new request, so the number of active
+ // requests never decreases
+ for {
+ select {
+
+ // incoming offer
+ case source := <-f.offerC:
+ log.Trace("new source", "peer addr", source, "request addr", f.addr)
+ // 1) the chunk is offered by a syncing peer
+ // add to known sources
+ sources = append(sources, source)
+ // launch a request to the source iff the chunk was requested (not just expected because its offered by a syncing peer)
+ doRequest = requested
+
+ // incoming request
+ case <-f.requestC:
+ log.Trace("new request", "request addr", f.addr)
+ // 2) chunk is requested, set requested flag
+ // launch a request iff none been launched yet
+ doRequest = !requested
+ requested = true
+
+ // peer we requested from is gone. fall back to another
+ // and remove the peer from the peers map
+ case id := <-gone:
+ log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr)
+ peers.Delete(id.String())
+ doRequest = requested
+
+ // search timeout: too much time passed since the last request,
+ // extend the search to a new peer if we can find one
+ case <-waitC:
+ log.Trace("search timed out: rerequesting", "request addr", f.addr)
+ doRequest = requested
+
+ // all Fetcher context closed, can quit
+ case <-ctx.Done():
+ log.Trace("terminate fetcher", "request addr", f.addr)
+ // TODO: send cancelations to all peers left over in peers map (i.e., those we requested from)
+ return
+ }
+
+ // need to issue a new request
+ if doRequest {
+ var err error
+ sources, err = f.doRequest(ctx, gone, peers, sources)
+ if err != nil {
+ log.Warn("unable to request", "request addr", f.addr, "err", err)
+ }
+ }
+
+ // if wait channel is not set, set it to a timer
+ if requested {
+ if wait == nil {
+ wait = time.NewTimer(searchTimeout)
+ defer wait.Stop()
+ waitC = wait.C
+ } else {
+ // stop the timer and drain the channel if it was not drained earlier
+ if !wait.Stop() {
+ select {
+ case <-wait.C:
+ default:
+ }
+ }
+ // reset the timer to go off after searchTimeout
+ wait.Reset(searchTimeout)
+ }
+ }
+ doRequest = false
+ }
+}
+
+// doRequest attempts at finding a peer to request the chunk from
+// * first it tries to request explicitly from peers that are known to have offered the chunk
+// * if there are no such peers (available) it tries to request it from a peer closest to the chunk address
+// excluding those in the peersToSkip map
+// * if no such peer is found an error is returned
+//
+// if a request is successful,
+// * the peer's address is added to the set of peers to skip
+// * the peer's address is removed from prospective sources, and
+// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
+func (f *Fetcher) doRequest(ctx context.Context, gone chan *discover.NodeID, peersToSkip *sync.Map, sources []*discover.NodeID) ([]*discover.NodeID, error) {
+ var i int
+ var sourceID *discover.NodeID
+ var quit chan struct{}
+
+ req := &Request{
+ Addr: f.addr,
+ SkipCheck: f.skipCheck,
+ peersToSkip: peersToSkip,
+ }
+
+ foundSource := false
+ // iterate over known sources
+ for i = 0; i < len(sources); i++ {
+ req.Source = sources[i]
+ var err error
+ sourceID, quit, err = f.protoRequestFunc(ctx, req)
+ if err == nil {
+ // remove the peer from known sources
+ // Note: we can modify the source although we are looping on it, because we break from the loop immediately
+ sources = append(sources[:i], sources[i+1:]...)
+ foundSource = true
+ break
+ }
+ }
+
+ // if there are no known sources, or none available, we try request from a closest node
+ if !foundSource {
+ req.Source = nil
+ var err error
+ sourceID, quit, err = f.protoRequestFunc(ctx, req)
+ if err != nil {
+ // if no peers found to request from
+ return sources, err
+ }
+ }
+ // add peer to the set of peers to skip from now
+ peersToSkip.Store(sourceID.String(), time.Now())
+
+ // if the quit channel is closed, it indicates that the source peer we requested from
+ // disconnected or terminated its streamer
+ // here start a go routine that watches this channel and reports the source peer on the gone channel
+ // this go routine quits if the fetcher global context is done to prevent process leak
+ go func() {
+ select {
+ case <-quit:
+ gone <- sourceID
+ case <-ctx.Done():
+ }
+ }()
+ return sources, nil
+}
diff --git a/swarm/network/fetcher_test.go b/swarm/network/fetcher_test.go
new file mode 100644
index 000000000..21b81d652
--- /dev/null
+++ b/swarm/network/fetcher_test.go
@@ -0,0 +1,459 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package network
+
+import (
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+var requestedPeerID = discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439")
+var sourcePeerID = discover.MustHexID("2dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439")
+
+// mockRequester pushes every request to the requestC channel when its doRequest function is called
+type mockRequester struct {
+ // requests []Request
+ requestC chan *Request // when a request is coming it is pushed to requestC
+ waitTimes []time.Duration // with waitTimes[i] you can define how much to wait on the ith request (optional)
+ ctr int //counts the number of requests
+ quitC chan struct{}
+}
+
+func newMockRequester(waitTimes ...time.Duration) *mockRequester {
+ return &mockRequester{
+ requestC: make(chan *Request),
+ waitTimes: waitTimes,
+ quitC: make(chan struct{}),
+ }
+}
+
+func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*discover.NodeID, chan struct{}, error) {
+ waitTime := time.Duration(0)
+ if m.ctr < len(m.waitTimes) {
+ waitTime = m.waitTimes[m.ctr]
+ m.ctr++
+ }
+ time.Sleep(waitTime)
+ m.requestC <- request
+
+ // if there is a Source in the request use that, if not use the global requestedPeerId
+ source := request.Source
+ if source == nil {
+ source = &requestedPeerID
+ }
+ return source, m.quitC, nil
+}
+
+// TestFetcherSingleRequest creates a Fetcher using mockRequester, and run it with a sample set of peers to skip.
+// mockRequester pushes a Request on a channel every time the request function is called. Using
+// this channel we test if calling Fetcher.Request calls the request function, and whether it uses
+// the correct peers to skip which we provided for the fetcher.run function.
+func TestFetcherSingleRequest(t *testing.T) {
+ requester := newMockRequester()
+ addr := make([]byte, 32)
+ fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ peers := []string{"a", "b", "c", "d"}
+ peersToSkip := &sync.Map{}
+ for _, p := range peers {
+ peersToSkip.Store(p, time.Now())
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ go fetcher.run(ctx, peersToSkip)
+
+ rctx := context.Background()
+ fetcher.Request(rctx)
+
+ select {
+ case request := <-requester.requestC:
+ // request should contain all peers from peersToSkip provided to the fetcher
+ for _, p := range peers {
+ if _, ok := request.peersToSkip.Load(p); !ok {
+ t.Fatalf("request.peersToSkip misses peer")
+ }
+ }
+
+ // source peer should be also added to peersToSkip eventually
+ time.Sleep(100 * time.Millisecond)
+ if _, ok := request.peersToSkip.Load(requestedPeerID.String()); !ok {
+ t.Fatalf("request.peersToSkip does not contain peer returned by the request function")
+ }
+
+ // fetch should trigger a request, if it doesn't happen in time, test should fail
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("fetch timeout")
+ }
+}
+
+// TestCancelStopsFetcher tests that a cancelled fetcher does not initiate further requests even if its fetch function is called
+func TestFetcherCancelStopsFetcher(t *testing.T) {
+ requester := newMockRequester()
+ addr := make([]byte, 32)
+ fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ // we start the fetcher, and then we immediately cancel the context
+ go fetcher.run(ctx, peersToSkip)
+ cancel()
+
+ rctx, rcancel := context.WithTimeout(ctx, 100*time.Millisecond)
+ defer rcancel()
+ // we call Request with an active context
+ fetcher.Request(rctx)
+
+ // fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
+ select {
+ case <-requester.requestC:
+ t.Fatalf("cancelled fetcher initiated request")
+ case <-time.After(200 * time.Millisecond):
+ }
+}
+
+// TestFetchCancelStopsRequest tests that calling a Request function with a cancelled context does not initiate a request
+func TestFetcherCancelStopsRequest(t *testing.T) {
+ requester := newMockRequester(100 * time.Millisecond)
+ addr := make([]byte, 32)
+ fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // we start the fetcher with an active context
+ go fetcher.run(ctx, peersToSkip)
+
+ rctx, rcancel := context.WithCancel(context.Background())
+ rcancel()
+
+ // we call Request with a cancelled context
+ fetcher.Request(rctx)
+
+ // fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
+ select {
+ case <-requester.requestC:
+ t.Fatalf("cancelled fetch function initiated request")
+ case <-time.After(200 * time.Millisecond):
+ }
+
+ // if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled
+ rctx = context.Background()
+ fetcher.Request(rctx)
+
+ select {
+ case <-requester.requestC:
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("expected request")
+ }
+}
+
+// TestOfferUsesSource tests Fetcher Offer behavior.
+// In this case there should be 1 (and only one) request initiated from the source peer, and the
+// source nodeid should appear in the peersToSkip map.
+func TestFetcherOfferUsesSource(t *testing.T) {
+ requester := newMockRequester(100 * time.Millisecond)
+ addr := make([]byte, 32)
+ fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // start the fetcher
+ go fetcher.run(ctx, peersToSkip)
+
+ rctx := context.Background()
+ // call the Offer function with the source peer
+ fetcher.Offer(rctx, &sourcePeerID)
+
+ // fetcher should not initiate request
+ select {
+ case <-requester.requestC:
+ t.Fatalf("fetcher initiated request")
+ case <-time.After(200 * time.Millisecond):
+ }
+
+ // call Request after the Offer
+ rctx = context.Background()
+ fetcher.Request(rctx)
+
+ // there should be exactly 1 request coming from fetcher
+ var request *Request
+ select {
+ case request = <-requester.requestC:
+ if *request.Source != sourcePeerID {
+ t.Fatalf("Expected source id %v got %v", sourcePeerID, request.Source)
+ }
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("fetcher did not initiate request")
+ }
+
+ select {
+ case <-requester.requestC:
+ t.Fatalf("Fetcher number of requests expected 1 got 2")
+ case <-time.After(200 * time.Millisecond):
+ }
+
+ // source peer should be added to peersToSkip eventually
+ time.Sleep(100 * time.Millisecond)
+ if _, ok := request.peersToSkip.Load(sourcePeerID.String()); !ok {
+ t.Fatalf("SourcePeerId not added to peersToSkip")
+ }
+}
+
+func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
+ requester := newMockRequester(100 * time.Millisecond)
+ addr := make([]byte, 32)
+ fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // start the fetcher
+ go fetcher.run(ctx, peersToSkip)
+
+ // call Request first
+ rctx := context.Background()
+ fetcher.Request(rctx)
+
+ // there should be a request coming from fetcher
+ var request *Request
+ select {
+ case request = <-requester.requestC:
+ if request.Source != nil {
+ t.Fatalf("Incorrect source peer id, expected nil got %v", request.Source)
+ }
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("fetcher did not initiate request")
+ }
+
+ // after the Request call Offer
+ fetcher.Offer(context.Background(), &sourcePeerID)
+
+ // there should be a request coming from fetcher
+ select {
+ case request = <-requester.requestC:
+ if *request.Source != sourcePeerID {
+ t.Fatalf("Incorrect source peer id, expected %v got %v", sourcePeerID, request.Source)
+ }
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("fetcher did not initiate request")
+ }
+
+ // source peer should be added to peersToSkip eventually
+ time.Sleep(100 * time.Millisecond)
+ if _, ok := request.peersToSkip.Load(sourcePeerID.String()); !ok {
+ t.Fatalf("SourcePeerId not added to peersToSkip")
+ }
+}
+
+// TestFetcherRetryOnTimeout tests that fetch retries after searchTimeOut has passed
+func TestFetcherRetryOnTimeout(t *testing.T) {
+ requester := newMockRequester()
+ addr := make([]byte, 32)
+ fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
+ // set searchTimeOut to low value so the test is quicker
+ defer func(t time.Duration) {
+ searchTimeout = t
+ }(searchTimeout)
+ searchTimeout = 250 * time.Millisecond
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // start the fetcher
+ go fetcher.run(ctx, peersToSkip)
+
+ // call the fetch function with an active context
+ rctx := context.Background()
+ fetcher.Request(rctx)
+
+ // after 100ms the first request should be initiated
+ time.Sleep(100 * time.Millisecond)
+
+ select {
+ case <-requester.requestC:
+ default:
+ t.Fatalf("fetch did not initiate request")
+ }
+
+ // after another 100ms no new request should be initiated, because search timeout is 250ms
+ time.Sleep(100 * time.Millisecond)
+
+ select {
+ case <-requester.requestC:
+ t.Fatalf("unexpected request from fetcher")
+ default:
+ }
+
+ // after another 300ms search timeout is over, there should be a new request
+ time.Sleep(300 * time.Millisecond)
+
+ select {
+ case <-requester.requestC:
+ default:
+ t.Fatalf("fetch did not retry request")
+ }
+}
+
+// TestFetcherFactory creates a FetcherFactory and checks if the factory really creates and starts
+// a Fetcher when it return a fetch function. We test the fetching functionality just by checking if
+// a request is initiated when the fetch function is called
+func TestFetcherFactory(t *testing.T) {
+ requester := newMockRequester(100 * time.Millisecond)
+ addr := make([]byte, 32)
+ fetcherFactory := NewFetcherFactory(requester.doRequest, false)
+
+ peersToSkip := &sync.Map{}
+
+ fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip)
+
+ fetcher.Request(context.Background())
+
+ // check if the created fetchFunction really starts a fetcher and initiates a request
+ select {
+ case <-requester.requestC:
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("fetch timeout")
+ }
+
+}
+
+func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
+ requester := newMockRequester()
+ addr := make([]byte, 32)
+ fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ // make sure searchTimeout is long so it is sure the request is not retried because of timeout
+ defer func(t time.Duration) {
+ searchTimeout = t
+ }(searchTimeout)
+ searchTimeout = 10 * time.Second
+
+ peersToSkip := &sync.Map{}
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ go fetcher.run(ctx, peersToSkip)
+
+ rctx := context.Background()
+ fetcher.Request(rctx)
+
+ select {
+ case <-requester.requestC:
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("request is not initiated")
+ }
+
+ close(requester.quitC)
+
+ select {
+ case <-requester.requestC:
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("request is not initiated after failed request")
+ }
+}
+
+// TestRequestSkipPeer checks if PeerSkip function will skip provided peer
+// and not skip unknown one.
+func TestRequestSkipPeer(t *testing.T) {
+ addr := make([]byte, 32)
+ peers := []discover.NodeID{
+ discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
+ discover.MustHexID("2dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
+ }
+
+ peersToSkip := new(sync.Map)
+ peersToSkip.Store(peers[0].String(), time.Now())
+ r := NewRequest(addr, false, peersToSkip)
+
+ if !r.SkipPeer(peers[0].String()) {
+ t.Errorf("peer not skipped")
+ }
+
+ if r.SkipPeer(peers[1].String()) {
+ t.Errorf("peer skipped")
+ }
+}
+
+// TestRequestSkipPeerExpired checks if a peer to skip is not skipped
+// after RequestTimeout has passed.
+func TestRequestSkipPeerExpired(t *testing.T) {
+ addr := make([]byte, 32)
+ peer := discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439")
+
+ // set RequestTimeout to a low value and reset it after the test
+ defer func(t time.Duration) { RequestTimeout = t }(RequestTimeout)
+ RequestTimeout = 250 * time.Millisecond
+
+ peersToSkip := new(sync.Map)
+ peersToSkip.Store(peer.String(), time.Now())
+ r := NewRequest(addr, false, peersToSkip)
+
+ if !r.SkipPeer(peer.String()) {
+ t.Errorf("peer not skipped")
+ }
+
+ time.Sleep(500 * time.Millisecond)
+
+ if r.SkipPeer(peer.String()) {
+ t.Errorf("peer skipped")
+ }
+}
+
+// TestRequestSkipPeerPermanent checks if a peer to skip is not skipped
+// after RequestTimeout is not skipped if it is set for a permanent skipping
+// by value to peersToSkip map is not time.Duration.
+func TestRequestSkipPeerPermanent(t *testing.T) {
+ addr := make([]byte, 32)
+ peer := discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439")
+
+ // set RequestTimeout to a low value and reset it after the test
+ defer func(t time.Duration) { RequestTimeout = t }(RequestTimeout)
+ RequestTimeout = 250 * time.Millisecond
+
+ peersToSkip := new(sync.Map)
+ peersToSkip.Store(peer.String(), true)
+ r := NewRequest(addr, false, peersToSkip)
+
+ if !r.SkipPeer(peer.String()) {
+ t.Errorf("peer not skipped")
+ }
+
+ time.Sleep(500 * time.Millisecond)
+
+ if !r.SkipPeer(peer.String()) {
+ t.Errorf("peer not skipped")
+ }
+}
diff --git a/swarm/network/priorityqueue/priorityqueue.go b/swarm/network/priorityqueue/priorityqueue.go
index fab638c9e..538502605 100644
--- a/swarm/network/priorityqueue/priorityqueue.go
+++ b/swarm/network/priorityqueue/priorityqueue.go
@@ -28,10 +28,13 @@ package priorityqueue
import (
"context"
"errors"
+
+ "github.com/ethereum/go-ethereum/log"
)
var (
- errContention = errors.New("queue contention")
+ ErrContention = errors.New("contention")
+
errBadPriority = errors.New("bad priority")
wakey = struct{}{}
@@ -39,7 +42,7 @@ var (
// PriorityQueue is the basic structure
type PriorityQueue struct {
- queues []chan interface{}
+ Queues []chan interface{}
wakeup chan struct{}
}
@@ -50,27 +53,29 @@ func New(n int, l int) *PriorityQueue {
queues[i] = make(chan interface{}, l)
}
return &PriorityQueue{
- queues: queues,
+ Queues: queues,
wakeup: make(chan struct{}, 1),
}
}
// Run is a forever loop popping items from the queues
func (pq *PriorityQueue) Run(ctx context.Context, f func(interface{})) {
- top := len(pq.queues) - 1
+ top := len(pq.Queues) - 1
p := top
READ:
for {
- q := pq.queues[p]
+ q := pq.Queues[p]
select {
case <-ctx.Done():
return
case x := <-q:
+ log.Trace("priority.queue f(x)", "p", p, "len(Queues[p])", len(pq.Queues[p]))
f(x)
p = top
default:
if p > 0 {
p--
+ log.Trace("priority.queue p > 0", "p", p)
continue READ
}
p = top
@@ -78,6 +83,7 @@ READ:
case <-ctx.Done():
return
case <-pq.wakeup:
+ log.Trace("priority.queue wakeup", "p", p)
}
}
}
@@ -85,23 +91,15 @@ READ:
// Push pushes an item to the appropriate queue specified in the priority argument
// if context is given it waits until either the item is pushed or the Context aborts
-// otherwise returns errContention if the queue is full
-func (pq *PriorityQueue) Push(ctx context.Context, x interface{}, p int) error {
- if p < 0 || p >= len(pq.queues) {
+func (pq *PriorityQueue) Push(x interface{}, p int) error {
+ if p < 0 || p >= len(pq.Queues) {
return errBadPriority
}
- if ctx == nil {
- select {
- case pq.queues[p] <- x:
- default:
- return errContention
- }
- } else {
- select {
- case pq.queues[p] <- x:
- case <-ctx.Done():
- return ctx.Err()
- }
+ log.Trace("priority.queue push", "p", p, "len(Queues[p])", len(pq.Queues[p]))
+ select {
+ case pq.Queues[p] <- x:
+ default:
+ return ErrContention
}
select {
case pq.wakeup <- wakey:
diff --git a/swarm/network/priorityqueue/priorityqueue_test.go b/swarm/network/priorityqueue/priorityqueue_test.go
index cd54250f8..ed8b575c2 100644
--- a/swarm/network/priorityqueue/priorityqueue_test.go
+++ b/swarm/network/priorityqueue/priorityqueue_test.go
@@ -30,7 +30,7 @@ func TestPriorityQueue(t *testing.T) {
results = append(results, v.(string))
wg.Done()
})
- pq.Push(context.Background(), "2.0", 2)
+ pq.Push("2.0", 2)
wg.Wait()
if results[0] != "2.0" {
t.Errorf("expected first result %q, got %q", "2.0", results[0])
@@ -66,7 +66,7 @@ Loop:
{
priorities: []int{0, 0, 0},
values: []string{"0.0", "0.0", "0.1"},
- errors: []error{nil, nil, errContention},
+ errors: []error{nil, nil, ErrContention},
},
} {
var results []string
@@ -74,7 +74,7 @@ Loop:
pq := New(3, 2)
wg.Add(len(tc.values))
for j, value := range tc.values {
- err := pq.Push(nil, value, tc.priorities[j])
+ err := pq.Push(value, tc.priorities[j])
if tc.errors != nil && err != tc.errors[j] {
t.Errorf("expected push error %v, got %v", tc.errors[j], err)
continue Loop
diff --git a/swarm/network/simulation/simulation.go b/swarm/network/simulation/simulation.go
index 74f9d98ee..096f7322c 100644
--- a/swarm/network/simulation/simulation.go
+++ b/swarm/network/simulation/simulation.go
@@ -94,7 +94,7 @@ func New(services map[string]ServiceFunc) (s *Simulation) {
}
s.Net = simulations.NewNetwork(
- adapters.NewSimAdapter(adapterServices),
+ adapters.NewTCPAdapter(adapterServices),
&simulations.NetworkConfig{ID: "0"},
)
@@ -164,17 +164,6 @@ var maxParallelCleanups = 10
func (s *Simulation) Close() {
close(s.done)
- // Close all connections before calling the Network Shutdown.
- // It is possible that p2p.Server.Stop will block if there are
- // existing connections.
- for _, c := range s.Net.Conns {
- if c.Up {
- s.Net.Disconnect(c.One, c.Other)
- }
- }
- s.shutdownWG.Wait()
- s.Net.Shutdown()
-
sem := make(chan struct{}, maxParallelCleanups)
s.mu.RLock()
cleanupFuncs := make([]func(), len(s.cleanupFuncs))
@@ -206,6 +195,9 @@ func (s *Simulation) Close() {
}
close(s.runC)
}
+
+ s.shutdownWG.Wait()
+ s.Net.Shutdown()
}
// Done returns a channel that is closed when the simulation
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go
index 491dc9fd5..e0d776e34 100644
--- a/swarm/network/stream/common_test.go
+++ b/swarm/network/stream/common_test.go
@@ -107,9 +107,14 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora
return nil, nil, nil, removeDataDir, err
}
- db := storage.NewDBAPI(localStore)
- delivery := NewDelivery(to, db)
- streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, nil, removeDataDir, err
+ }
+
+ delivery := NewDelivery(to, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+ streamer := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil)
teardown := func() {
streamer.Close()
removeDataDir()
@@ -150,14 +155,14 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore {
}
}
-func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (*storage.Chunk, error) {
+func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (storage.Chunk, error) {
return nil, errors.New("get not well defined on round robin store")
}
-func (rrs *roundRobinStore) Put(ctx context.Context, chunk *storage.Chunk) {
+func (rrs *roundRobinStore) Put(ctx context.Context, chunk storage.Chunk) error {
i := atomic.AddUint32(&rrs.index, 1)
idx := int(i) % len(rrs.stores)
- rrs.stores[idx].Put(ctx, chunk)
+ return rrs.stores[idx].Put(ctx, chunk)
}
func (rrs *roundRobinStore) Close() {
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 627352535..d0f27eebc 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -19,12 +19,11 @@ package stream
import (
"context"
"errors"
- "time"
- "github.com/ethereum/go-ethereum/common"
+ "fmt"
+
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/discover"
- cp "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/spancontext"
@@ -46,39 +45,34 @@ var (
)
type Delivery struct {
- db *storage.DBAPI
- kad *network.Kademlia
- receiveC chan *ChunkDeliveryMsg
- getPeer func(discover.NodeID) *Peer
+ chunkStore storage.SyncChunkStore
+ kad *network.Kademlia
+ getPeer func(discover.NodeID) *Peer
}
-func NewDelivery(kad *network.Kademlia, db *storage.DBAPI) *Delivery {
- d := &Delivery{
- db: db,
- kad: kad,
- receiveC: make(chan *ChunkDeliveryMsg, deliveryCap),
+func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery {
+ return &Delivery{
+ chunkStore: chunkStore,
+ kad: kad,
}
-
- go d.processReceivedChunks()
- return d
}
// SwarmChunkServer implements Server
type SwarmChunkServer struct {
deliveryC chan []byte
batchC chan []byte
- db *storage.DBAPI
+ chunkStore storage.ChunkStore
currentLen uint64
quit chan struct{}
}
// NewSwarmChunkServer is SwarmChunkServer constructor
-func NewSwarmChunkServer(db *storage.DBAPI) *SwarmChunkServer {
+func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer {
s := &SwarmChunkServer{
- deliveryC: make(chan []byte, deliveryCap),
- batchC: make(chan []byte),
- db: db,
- quit: make(chan struct{}),
+ deliveryC: make(chan []byte, deliveryCap),
+ batchC: make(chan []byte),
+ chunkStore: chunkStore,
+ quit: make(chan struct{}),
}
go s.processDeliveries()
return s
@@ -123,13 +117,11 @@ func (s *SwarmChunkServer) Close() {
// GetData retrives chunk data from db store
func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
- chunk, err := s.db.Get(ctx, storage.Address(key))
- if err == storage.ErrFetching {
- <-chunk.ReqC
- } else if err != nil {
+ chunk, err := s.chunkStore.Get(ctx, storage.Address(key))
+ if err != nil {
return nil, err
}
- return chunk.SData, nil
+ return chunk.Data(), nil
}
// RetrieveRequestMsg is the protocol msg for chunk retrieve requests
@@ -153,57 +145,39 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
return err
}
streamer := s.Server.(*SwarmChunkServer)
- chunk, created := d.db.GetOrCreateRequest(ctx, req.Addr)
- if chunk.ReqC != nil {
- if created {
- if err := d.RequestFromPeers(ctx, chunk.Addr[:], true, sp.ID()); err != nil {
- log.Warn("unable to forward chunk request", "peer", sp.ID(), "key", chunk.Addr, "err", err)
- chunk.SetErrored(storage.ErrChunkForward)
- return nil
- }
+
+ var cancel func()
+ // TODO: do something with this hardcoded timeout, maybe use TTL in the future
+ ctx, cancel = context.WithTimeout(context.WithValue(ctx, "peer", sp.ID().String()), network.RequestTimeout)
+
+ go func() {
+ select {
+ case <-ctx.Done():
+ case <-streamer.quit:
}
- go func() {
- var osp opentracing.Span
- ctx, osp = spancontext.StartSpan(
- ctx,
- "waiting.delivery")
- defer osp.Finish()
-
- t := time.NewTimer(10 * time.Minute)
- defer t.Stop()
-
- log.Debug("waiting delivery", "peer", sp.ID(), "hash", req.Addr, "node", common.Bytes2Hex(d.kad.BaseAddr()), "created", created)
- start := time.Now()
- select {
- case <-chunk.ReqC:
- log.Debug("retrieve request ReqC closed", "peer", sp.ID(), "hash", req.Addr, "time", time.Since(start))
- case <-t.C:
- log.Debug("retrieve request timeout", "peer", sp.ID(), "hash", req.Addr)
- chunk.SetErrored(storage.ErrChunkTimeout)
- return
- }
- chunk.SetErrored(nil)
-
- if req.SkipCheck {
- err := sp.Deliver(ctx, chunk, s.priority)
- if err != nil {
- log.Warn("ERROR in handleRetrieveRequestMsg, DROPPING peer!", "err", err)
- sp.Drop(err)
- }
+ cancel()
+ }()
+
+ go func() {
+ chunk, err := d.chunkStore.Get(ctx, req.Addr)
+ if err != nil {
+ log.Warn("ChunkStore.Get can not retrieve chunk", "err", err)
+ return
+ }
+ if req.SkipCheck {
+ err = sp.Deliver(ctx, chunk, s.priority)
+ if err != nil {
+ log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
}
- streamer.deliveryC <- chunk.Addr[:]
- }()
- return nil
- }
- // TODO: call the retrieve function of the outgoing syncer
- if req.SkipCheck {
- log.Trace("deliver", "peer", sp.ID(), "hash", chunk.Addr)
- if length := len(chunk.SData); length < 9 {
- log.Error("Chunk.SData to deliver is too short", "len(chunk.SData)", length, "address", chunk.Addr)
+ return
}
- return sp.Deliver(ctx, chunk, s.priority)
- }
- streamer.deliveryC <- chunk.Addr[:]
+ select {
+ case streamer.deliveryC <- chunk.Address()[:]:
+ case <-streamer.quit:
+ }
+
+ }()
+
return nil
}
@@ -213,6 +187,7 @@ type ChunkDeliveryMsg struct {
peer *Peer // set in handleChunkDeliveryMsg
}
+// TODO: Fix context SNAFU
func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
var osp opentracing.Span
ctx, osp = spancontext.StartSpan(
@@ -220,81 +195,63 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
"chunk.delivery")
defer osp.Finish()
- req.peer = sp
- d.receiveC <- req
- return nil
-}
+ processReceivedChunksCount.Inc(1)
-func (d *Delivery) processReceivedChunks() {
-R:
- for req := range d.receiveC {
- processReceivedChunksCount.Inc(1)
-
- if len(req.SData) > cp.DefaultSize+8 {
- log.Warn("received chunk is bigger than expected", "len", len(req.SData))
- continue R
- }
-
- // this should be has locally
- chunk, err := d.db.Get(context.TODO(), req.Addr)
- if err == nil {
- continue R
- }
- if err != storage.ErrFetching {
- log.Error("processReceivedChunks db error", "addr", req.Addr, "err", err, "chunk", chunk)
- continue R
- }
- select {
- case <-chunk.ReqC:
- log.Error("someone else delivered?", "hash", chunk.Addr.Hex())
- continue R
- default:
- }
-
- chunk.SData = req.SData
- d.db.Put(context.TODO(), chunk)
-
- go func(req *ChunkDeliveryMsg) {
- err := chunk.WaitToStore()
+ go func() {
+ req.peer = sp
+ err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
+ if err != nil {
if err == storage.ErrChunkInvalid {
+ // we removed this log because it spams the logs
+ // TODO: Enable this log line
+ // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", req.Addr, )
req.peer.Drop(err)
}
- }(req)
- }
+ }
+ }()
+ return nil
}
// RequestFromPeers sends a chunk retrieve request to
-func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error {
- var success bool
- var err error
+func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) {
requestFromPeersCount.Inc(1)
+ var sp *Peer
+ spID := req.Source
- d.kad.EachConn(hash, 255, func(p *network.Peer, po int, nn bool) bool {
- spId := p.ID()
- for _, p := range peersToSkip {
- if p == spId {
- log.Trace("Delivery.RequestFromPeers: skip peer", "peer", spId)
+ if spID != nil {
+ sp = d.getPeer(*spID)
+ if sp == nil {
+ return nil, nil, fmt.Errorf("source peer %v not found", spID.String())
+ }
+ } else {
+ d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool {
+ id := p.ID()
+ // TODO: skip light nodes that do not accept retrieve requests
+ if req.SkipPeer(id.String()) {
+ log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id)
return true
}
- }
- sp := d.getPeer(spId)
+ sp = d.getPeer(id)
+ if sp == nil {
+ log.Warn("Delivery.RequestFromPeers: peer not found", "id", id)
+ return true
+ }
+ spID = &id
+ return false
+ })
if sp == nil {
- log.Warn("Delivery.RequestFromPeers: peer not found", "id", spId)
- return true
+ return nil, nil, errors.New("no peer found")
}
- err = sp.SendPriority(ctx, &RetrieveRequestMsg{
- Addr: hash,
- SkipCheck: skipCheck,
- }, Top)
- if err != nil {
- return true
- }
- requestFromPeersEachCount.Inc(1)
- success = true
- return false
- })
- if success {
- return nil
}
- return errors.New("no peer found")
+
+ err := sp.SendPriority(ctx, &RetrieveRequestMsg{
+ Addr: req.Addr,
+ SkipCheck: req.SkipCheck,
+ }, Top)
+ if err != nil {
+ return nil, nil, err
+ }
+ requestFromPeersEachCount.Inc(1)
+
+ return spID, sp.quit, nil
}
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index 972cc859a..ece54d4ee 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -47,7 +47,13 @@ func TestStreamerRetrieveRequest(t *testing.T) {
peerID := tester.IDs[0]
- streamer.delivery.RequestFromPeers(context.TODO(), hash0[:], true)
+ ctx := context.Background()
+ req := network.NewRequest(
+ storage.Address(hash0[:]),
+ true,
+ &sync.Map{},
+ )
+ streamer.delivery.RequestFromPeers(ctx, req)
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
@@ -93,7 +99,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
{
Code: 5,
Msg: &RetrieveRequestMsg{
- Addr: chunk.Addr[:],
+ Addr: chunk.Address()[:],
},
Peer: peerID,
},
@@ -139,10 +145,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
})
hash := storage.Address(hash0[:])
- chunk := storage.NewChunk(hash, nil)
- chunk.SData = hash
- localStore.Put(context.TODO(), chunk)
- chunk.WaitToStore()
+ chunk := storage.NewChunk(hash, hash)
+ err = localStore.Put(context.TODO(), chunk)
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
@@ -178,10 +185,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
hash = storage.Address(hash1[:])
- chunk = storage.NewChunk(hash, nil)
- chunk.SData = hash1[:]
- localStore.Put(context.TODO(), chunk)
- chunk.WaitToStore()
+ chunk = storage.NewChunk(hash, hash1[:])
+ err = localStore.Put(context.TODO(), chunk)
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
@@ -235,16 +243,6 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
chunkKey := hash0[:]
chunkData := hash1[:]
- chunk, created := localStore.GetOrCreateRequest(context.TODO(), chunkKey)
-
- if !created {
- t.Fatal("chunk already exists")
- }
- select {
- case <-chunk.ReqC:
- t.Fatal("chunk is already received")
- default:
- }
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
@@ -261,7 +259,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
},
},
p2ptest.Exchange{
- Label: "ChunkDeliveryRequest message",
+ Label: "ChunkDelivery message",
Triggers: []p2ptest.Trigger{
{
Code: 6,
@@ -277,21 +275,26 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
-
- timeout := time.NewTimer(1 * time.Second)
-
- select {
- case <-timeout.C:
- t.Fatal("timeout receiving chunk")
- case <-chunk.ReqC:
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+
+ // wait for the chunk to get stored
+ storedChunk, err := localStore.Get(ctx, chunkKey)
+ for err != nil {
+ select {
+ case <-ctx.Done():
+ t.Fatalf("Chunk is not in localstore after timeout, err: %v", err)
+ default:
+ }
+ storedChunk, err = localStore.Get(ctx, chunkKey)
+ time.Sleep(50 * time.Millisecond)
}
- storedChunk, err := localStore.Get(context.TODO(), chunkKey)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
- if !bytes.Equal(storedChunk.SData, chunkData) {
+ if !bytes.Equal(storedChunk.Data(), chunkData) {
t.Fatal("Retrieved chunk has different data than original")
}
@@ -324,19 +327,20 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
store.Close()
}
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
+
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
bucket.Store(bucketKeyRegistry, r)
- retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
- return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
- }
- netStore := storage.NewNetStore(localStore, retrieveFunc)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
@@ -498,7 +502,6 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-
id := ctx.Config.ID
addr := network.NewAddrFromNodeID(id)
store, datadir, err := createTestLocalStorageForID(id, addr)
@@ -511,20 +514,20 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
store.Close()
}
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
DoSync: true,
SyncUpdateDelay: 0,
})
- retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
- return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
- }
- netStore := storage.NewNetStore(localStore, retrieveFunc)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go
index f4294134b..452aaca76 100644
--- a/swarm/network/stream/intervals_test.go
+++ b/swarm/network/stream/intervals_test.go
@@ -38,13 +38,18 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage"
)
-func TestIntervals(t *testing.T) {
+func TestIntervalsLive(t *testing.T) {
testIntervals(t, true, nil, false)
- testIntervals(t, false, NewRange(9, 26), false)
- testIntervals(t, true, NewRange(9, 26), false)
-
testIntervals(t, true, nil, true)
+}
+
+func TestIntervalsHistory(t *testing.T) {
+ testIntervals(t, false, NewRange(9, 26), false)
testIntervals(t, false, NewRange(9, 26), true)
+}
+
+func TestIntervalsLiveAndHistory(t *testing.T) {
+ testIntervals(t, true, NewRange(9, 26), false)
testIntervals(t, true, NewRange(9, 26), true)
}
@@ -70,17 +75,21 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
os.RemoveAll(datadir)
}
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
bucket.Store(bucketKeyRegistry, r)
r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) {
- return newTestExternalClient(db), nil
+ return newTestExternalClient(netStore), nil
})
r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) {
return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
@@ -101,9 +110,13 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
t.Fatal(err)
}
- ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
+ ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ t.Fatal(err)
+ }
+
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
storer := nodeIDs[0]
@@ -136,11 +149,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
liveErrC := make(chan error)
historyErrC := make(chan error)
- if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
- log.Error("WaitKademlia error: %v", "err", err)
- return err
- }
-
log.Debug("Watching for disconnections")
disconnections := sim.PeerEvents(
context.Background(),
@@ -148,6 +156,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
)
+ err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
+ if err != nil {
+ return err
+ }
+
go func() {
for d := range disconnections {
if d.Error != nil {
@@ -172,7 +185,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
var liveHashesChan chan []byte
liveHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", true))
if err != nil {
- log.Error("Subscription error: %v", "err", err)
+ log.Error("get hashes", "err", err)
return
}
i := externalStreamSessionAt
@@ -216,6 +229,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
var historyHashesChan chan []byte
historyHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", false))
if err != nil {
+ log.Error("get hashes", "err", err)
return
}
@@ -252,10 +266,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
}
}()
- err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
- if err != nil {
- return err
- }
if err := <-liveErrC; err != nil {
return err
}
@@ -302,34 +312,32 @@ func enableNotifications(r *Registry, peerID discover.NodeID, s Stream) error {
type testExternalClient struct {
hashes chan []byte
- db *storage.DBAPI
+ store storage.SyncChunkStore
enableNotificationsC chan struct{}
}
-func newTestExternalClient(db *storage.DBAPI) *testExternalClient {
+func newTestExternalClient(store storage.SyncChunkStore) *testExternalClient {
return &testExternalClient{
hashes: make(chan []byte),
- db: db,
+ store: store,
enableNotificationsC: make(chan struct{}),
}
}
-func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() {
- chunk, _ := c.db.GetOrCreateRequest(ctx, hash)
- if chunk.ReqC == nil {
+func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
+ wait := c.store.FetchFunc(ctx, storage.Address(hash))
+ if wait == nil {
return nil
}
- c.hashes <- hash
- //NOTE: This was failing on go1.9.x with a deadlock.
- //Sometimes this function would just block
- //It is commented now, but it may be well worth after the chunk refactor
- //to re-enable this and see if the problem has been addressed
- /*
- return func() {
- return chunk.WaitToStore()
+ select {
+ case c.hashes <- hash:
+ case <-ctx.Done():
+ log.Warn("testExternalClient NeedData context", "err", ctx.Err())
+ return func(_ context.Context) error {
+ return ctx.Err()
}
- */
- return nil
+ }
+ return wait
}
func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index a19f63589..2e1a81e82 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -18,9 +18,7 @@ package stream
import (
"context"
- "errors"
"fmt"
- "sync"
"time"
"github.com/ethereum/go-ethereum/metrics"
@@ -31,6 +29,8 @@ import (
opentracing "github.com/opentracing/opentracing-go"
)
+var syncBatchTimeout = 30 * time.Second
+
// Stream defines a unique stream identifier.
type Stream struct {
// Name is used for Client and Server functions identification.
@@ -117,8 +117,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e
go func() {
if err := p.SendOfferedHashes(os, from, to); err != nil {
- log.Warn("SendOfferedHashes dropping peer", "err", err)
- p.Drop(err)
+ log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err)
}
}()
@@ -135,8 +134,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e
}
go func() {
if err := p.SendOfferedHashes(os, req.History.From, req.History.To); err != nil {
- log.Warn("SendOfferedHashes dropping peer", "err", err)
- p.Drop(err)
+ log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err)
}
}()
}
@@ -202,38 +200,52 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
if err != nil {
return fmt.Errorf("error initiaising bitvector of length %v: %v", len(hashes)/HashSize, err)
}
- wg := sync.WaitGroup{}
+
+ ctr := 0
+ errC := make(chan error)
+ ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout)
+
+ ctx = context.WithValue(ctx, "source", p.ID().String())
for i := 0; i < len(hashes); i += HashSize {
hash := hashes[i : i+HashSize]
if wait := c.NeedData(ctx, hash); wait != nil {
+ ctr++
want.Set(i/HashSize, true)
- wg.Add(1)
// create request and wait until the chunk data arrives and is stored
- go func(w func()) {
- w()
- wg.Done()
+ go func(w func(context.Context) error) {
+ select {
+ case errC <- w(ctx):
+ case <-ctx.Done():
+ }
}(wait)
}
}
- // done := make(chan bool)
- // go func() {
- // wg.Wait()
- // close(done)
- // }()
- // go func() {
- // select {
- // case <-done:
- // s.next <- s.batchDone(p, req, hashes)
- // case <-time.After(1 * time.Second):
- // p.Drop(errors.New("timeout waiting for batch to be delivered"))
- // }
- // }()
+
go func() {
- wg.Wait()
+ defer cancel()
+ for i := 0; i < ctr; i++ {
+ select {
+ case err := <-errC:
+ if err != nil {
+ log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err)
+ p.Drop(err)
+ return
+ }
+ case <-ctx.Done():
+ log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
+ return
+ case <-c.quit:
+ log.Debug("client.handleOfferedHashesMsg() quit")
+ return
+ }
+ }
select {
case c.next <- c.batchDone(p, req, hashes):
case <-c.quit:
+ log.Debug("client.handleOfferedHashesMsg() quit")
+ case <-ctx.Done():
+ log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
}
}()
// only send wantedKeysMsg if all missing chunks of the previous batch arrived
@@ -242,7 +254,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
c.sessionAt = req.From
}
from, to := c.nextBatch(req.To + 1)
- log.Trace("received offered batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To)
+ log.Trace("set next batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "addr", p.streamer.addr.ID())
if from == to {
return nil
}
@@ -254,25 +266,25 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
To: to,
}
go func() {
+ log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
select {
- case <-time.After(120 * time.Second):
- log.Warn("handleOfferedHashesMsg timeout, so dropping peer")
- p.Drop(errors.New("handle offered hashes timeout"))
- return
case err := <-c.next:
if err != nil {
- log.Warn("c.next dropping peer", "err", err)
+ log.Warn("c.next error dropping peer", "err", err)
p.Drop(err)
return
}
case <-c.quit:
+ log.Debug("client.handleOfferedHashesMsg() quit")
+ return
+ case <-ctx.Done():
+ log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
return
}
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
err := p.SendPriority(ctx, msg, c.priority)
if err != nil {
- log.Warn("SendPriority err, so dropping peer", "err", err)
- p.Drop(err)
+ log.Warn("SendPriority error", "err", err)
}
}()
return nil
@@ -306,8 +318,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
// launch in go routine since GetBatch blocks until new hashes arrive
go func() {
if err := p.SendOfferedHashes(s, req.From, req.To); err != nil {
- log.Warn("SendOfferedHashes dropping peer", "err", err)
- p.Drop(err)
+ log.Warn("SendOfferedHashes error", "err", err)
}
}()
// go p.SendOfferedHashes(s, req.From, req.To)
@@ -327,11 +338,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
if err != nil {
return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err)
}
- chunk := storage.NewChunk(hash, nil)
- chunk.SData = data
- if length := len(chunk.SData); length < 9 {
- log.Error("Chunk.SData to sync is too short", "len(chunk.SData)", length, "address", chunk.Addr)
- }
+ chunk := storage.NewChunk(hash, data)
if err := p.Deliver(ctx, chunk, s.priority); err != nil {
return err
}
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 80b9ab711..1466a7a9c 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -33,8 +33,6 @@ import (
opentracing "github.com/opentracing/opentracing-go"
)
-var sendTimeout = 30 * time.Second
-
type notFoundError struct {
t string
s Stream
@@ -83,8 +81,40 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
ctx, cancel := context.WithCancel(context.Background())
go p.pq.Run(ctx, func(i interface{}) {
wmsg := i.(WrappedPriorityMsg)
- p.Send(wmsg.Context, wmsg.Msg)
+ err := p.Send(wmsg.Context, wmsg.Msg)
+ if err != nil {
+ log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
+ p.Drop(err)
+ }
})
+
+ // basic monitoring for pq contention
+ go func(pq *pq.PriorityQueue) {
+ ticker := time.NewTicker(5 * time.Second)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ var len_maxi int
+ var cap_maxi int
+ for k := range pq.Queues {
+ if len_maxi < len(pq.Queues[k]) {
+ len_maxi = len(pq.Queues[k])
+ }
+
+ if cap_maxi < cap(pq.Queues[k]) {
+ cap_maxi = cap(pq.Queues[k])
+ }
+ }
+
+ metrics.GetOrRegisterGauge(fmt.Sprintf("pq_len_%s", p.ID().TerminalString()), nil).Update(int64(len_maxi))
+ metrics.GetOrRegisterGauge(fmt.Sprintf("pq_cap_%s", p.ID().TerminalString()), nil).Update(int64(cap_maxi))
+ case <-p.quit:
+ return
+ }
+ }
+ }(p.pq)
+
go func() {
<-p.quit
cancel()
@@ -93,7 +123,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
}
// Deliver sends a storeRequestMsg protocol message to the peer
-func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8) error {
+func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8) error {
var sp opentracing.Span
ctx, sp = spancontext.StartSpan(
ctx,
@@ -101,8 +131,8 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8
defer sp.Finish()
msg := &ChunkDeliveryMsg{
- Addr: chunk.Addr,
- SData: chunk.SData,
+ Addr: chunk.Address(),
+ SData: chunk.Data(),
}
return p.SendPriority(ctx, msg, priority)
}
@@ -111,13 +141,16 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8
func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
- cctx, cancel := context.WithTimeout(context.Background(), sendTimeout)
- defer cancel()
wmsg := WrappedPriorityMsg{
Context: ctx,
Msg: msg,
}
- return p.pq.Push(cctx, wmsg, int(priority))
+ err := p.pq.Push(wmsg, int(priority))
+ if err == pq.ErrContention {
+ log.Warn("dropping peer on priority queue contention", "peer", p.ID())
+ p.Drop(err)
+ }
+ return err
}
// SendOfferedHashes sends OfferedHashesMsg protocol msg
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index 4ff947b21..19eaad34e 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -124,23 +124,30 @@ func runFileRetrievalTest(nodeCount int) error {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
+
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
DoSync: true,
SyncUpdateDelay: 3 * time.Second,
})
- fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ netStore.Close()
+ r.Close()
+ }
+
return r, cleanup, nil
},
@@ -267,24 +274,31 @@ func runRetrievalTest(chunkCount int, nodeCount int) error {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
+
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
DoSync: true,
SyncUpdateDelay: 0,
})
- fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucketKeyFileStore = simulation.BucketKey("filestore")
bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ netStore.Close()
+ r.Close()
+ }
+
return r, cleanup, nil
},
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 313019d6a..7cd09099c 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -21,6 +21,7 @@ import (
"fmt"
"io"
"os"
+ "runtime"
"sync"
"testing"
"time"
@@ -39,15 +40,20 @@ import (
mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
)
-const testMinProxBinSize = 2
const MaxTimeout = 600
type synctestConfig struct {
- addrs [][]byte
- hashes []storage.Address
- idToChunksMap map[discover.NodeID][]int
- chunksToNodesMap map[string][]int
- addrToIDMap map[string]discover.NodeID
+ addrs [][]byte
+ hashes []storage.Address
+ idToChunksMap map[discover.NodeID][]int
+ //chunksToNodesMap map[string][]int
+ addrToIDMap map[string]discover.NodeID
+}
+
+// Tests in this file should not request chunks from peers.
+// This function will panic indicating that there is a problem if request has been made.
+func dummyRequestFromPeers(_ context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) {
+ panic(fmt.Sprintf("unexpected request: address %s, source %s", req.Addr.String(), req.Source.String()))
}
//This test is a syncing test for nodes.
@@ -58,6 +64,9 @@ type synctestConfig struct {
//they are expected to store based on the syncing protocol.
//Number of chunks and nodes can be provided via commandline too.
func TestSyncingViaGlobalSync(t *testing.T) {
+ if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" {
+ t.Skip("Flaky on mac on travis")
+ }
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
@@ -86,11 +95,14 @@ func TestSyncingViaGlobalSync(t *testing.T) {
}
func TestSyncingViaDirectSubscribe(t *testing.T) {
+ if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" {
+ t.Skip("Flaky on mac on travis")
+ }
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
- err := testSyncingViaDirectSubscribe(*chunks, *nodes)
+ err := testSyncingViaDirectSubscribe(t, *chunks, *nodes)
if err != nil {
t.Fatal(err)
}
@@ -110,7 +122,7 @@ func TestSyncingViaDirectSubscribe(t *testing.T) {
for _, chnk := range chnkCnt {
for _, n := range nodeCnt {
log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
- err := testSyncingViaDirectSubscribe(chnk, n)
+ err := testSyncingViaDirectSubscribe(t, chnk, n)
if err != nil {
t.Fatal(err)
}
@@ -130,21 +142,27 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
DoSync: true,
SyncUpdateDelay: 3 * time.Second,
})
bucket.Store(bucketKeyRegistry, r)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ netStore.Close()
+ r.Close()
+ }
+
return r, cleanup, nil
},
@@ -166,9 +184,27 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
t.Fatal(err)
}
- ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancelSimRun()
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ t.Fatal(err)
+ }
+
+ disconnections := sim.PeerEvents(
+ context.Background(),
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+ )
+
+ go func() {
+ for d := range disconnections {
+ log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+ t.Fatal("unexpected disconnect")
+ cancelSimRun()
+ }
+ }()
+
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
for _, n := range nodeIDs {
@@ -197,10 +233,6 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
conf.hashes = append(conf.hashes, hashes...)
mapKeysToNodes(conf)
- if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
- return err
- }
-
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
// or until the timeout is reached.
allSuccess := false
@@ -220,6 +252,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
}()
}
for !allSuccess {
+ allSuccess = true
for _, id := range nodeIDs {
//for each expected chunk, check if it is in the local store
localChunks := conf.idToChunksMap[id]
@@ -252,7 +285,10 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
}
}
- allSuccess = localSuccess
+ if !localSuccess {
+ allSuccess = false
+ break
+ }
}
}
if !allSuccess {
@@ -264,6 +300,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
if result.Error != nil {
t.Fatal(result.Error)
}
+ log.Info("Simulation ended")
}
/*
@@ -277,7 +314,7 @@ The test loads a snapshot file to construct the swarm network,
assuming that the snapshot file identifies a healthy
kademlia network. The snapshot should have 'streamer' in its service list.
*/
-func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
+func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) error {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
@@ -288,28 +325,34 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil)
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil)
bucket.Store(bucketKeyRegistry, r)
- fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ netStore.Close()
+ r.Close()
+ }
+
return r, cleanup, nil
},
})
defer sim.Close()
- ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancelSimRun()
conf := &synctestConfig{}
@@ -325,6 +368,24 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
return err
}
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ return err
+ }
+
+ disconnections := sim.PeerEvents(
+ context.Background(),
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+ )
+
+ go func() {
+ for d := range disconnections {
+ log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+ t.Fatal("unexpected disconnect")
+ cancelSimRun()
+ }
+ }()
+
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
for _, n := range nodeIDs {
@@ -402,6 +463,7 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
// or until the timeout is reached.
allSuccess := false
for !allSuccess {
+ allSuccess = true
for _, id := range nodeIDs {
//for each expected chunk, check if it is in the local store
localChunks := conf.idToChunksMap[id]
@@ -434,7 +496,10 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
}
}
- allSuccess = localSuccess
+ if !localSuccess {
+ allSuccess = false
+ break
+ }
}
}
if !allSuccess {
@@ -447,7 +512,7 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
return result.Error
}
- log.Info("Simulation terminated")
+ log.Info("Simulation ended")
return nil
}
@@ -462,10 +527,9 @@ func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
//iterate over each bin and solicit needed subscription to bins
kad.EachBin(r.addr.Over(), pof, 0, func(conn *network.Peer, po int) bool {
//identify begin and start index of the bin(s) we want to subscribe to
- histRange := &Range{}
subCnt++
- err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), histRange, Top)
+ err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), NewRange(0, 0), High)
if err != nil {
log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err))
return false
@@ -478,7 +542,6 @@ func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
//map chunk keys to addresses which are responsible
func mapKeysToNodes(conf *synctestConfig) {
- kmap := make(map[string][]int)
nodemap := make(map[string][]int)
//build a pot for chunk hashes
np := pot.NewPot(nil, 0)
@@ -487,36 +550,33 @@ func mapKeysToNodes(conf *synctestConfig) {
indexmap[string(a)] = i
np, _, _ = pot.Add(np, a, pof)
}
+
+ var kadMinProxSize = 2
+
+ ppmap := network.NewPeerPotMap(kadMinProxSize, conf.addrs)
+
//for each address, run EachNeighbour on the chunk hashes pot to identify closest nodes
log.Trace(fmt.Sprintf("Generated hash chunk(s): %v", conf.hashes))
for i := 0; i < len(conf.hashes); i++ {
- pl := 256 //highest possible proximity
- var nns []int
+ var a []byte
np.EachNeighbour([]byte(conf.hashes[i]), pof, func(val pot.Val, po int) bool {
- a := val.([]byte)
- if pl < 256 && pl != po {
- return false
- }
- if pl == 256 || pl == po {
- log.Trace(fmt.Sprintf("appending %s", conf.addrToIDMap[string(a)]))
- nns = append(nns, indexmap[string(a)])
- nodemap[string(a)] = append(nodemap[string(a)], i)
- }
- if pl == 256 && len(nns) >= testMinProxBinSize {
- //maxProxBinSize has been reached at this po, so save it
- //we will add all other nodes at the same po
- pl = po
- }
- return true
+ // take the first address
+ a = val.([]byte)
+ return false
})
- kmap[string(conf.hashes[i])] = nns
+
+ nns := ppmap[common.Bytes2Hex(a)].NNSet
+ nns = append(nns, a)
+
+ for _, p := range nns {
+ nodemap[string(p)] = append(nodemap[string(p)], i)
+ }
}
for addr, chunks := range nodemap {
//this selects which chunks are expected to be found with the given node
conf.idToChunksMap[conf.addrToIDMap[addr]] = chunks
}
log.Debug(fmt.Sprintf("Map of expected chunks by ID: %v", conf.idToChunksMap))
- conf.chunksToNodesMap = kmap
}
//upload a file(chunks) to a single local node store
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index deffdfc3f..1f1f34b7b 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -32,10 +32,8 @@ import (
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
"github.com/ethereum/go-ethereum/swarm/pot"
- "github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
- opentracing "github.com/opentracing/opentracing-go"
)
const (
@@ -43,8 +41,8 @@ const (
Mid
High
Top
- PriorityQueue // number of queues
- PriorityQueueCap = 32 // queue capacity
+ PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top
+ PriorityQueueCap = 128 // queue capacity
HashSize = 32
)
@@ -73,7 +71,7 @@ type RegistryOptions struct {
}
// NewRegistry is Streamer constructor
-func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, intervalsStore state.Store, options *RegistryOptions) *Registry {
+func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry {
if options == nil {
options = &RegistryOptions{}
}
@@ -93,13 +91,13 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer
streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, _ bool) (Server, error) {
- return NewSwarmChunkServer(delivery.db), nil
+ return NewSwarmChunkServer(delivery.chunkStore), nil
})
streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
- return NewSwarmSyncerClient(p, delivery.db, false, NewStream(swarmChunkServerStreamName, t, live))
+ return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
})
- RegisterSwarmSyncerServer(streamer, db)
- RegisterSwarmSyncerClient(streamer, db)
+ RegisterSwarmSyncerServer(streamer, syncChunkStore)
+ RegisterSwarmSyncerClient(streamer, syncChunkStore)
if options.DoSync {
// latestIntC function ensures that
@@ -325,16 +323,6 @@ func (r *Registry) Quit(peerId discover.NodeID, s Stream) error {
return peer.Send(context.TODO(), msg)
}
-func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error {
- var sp opentracing.Span
- ctx, sp = spancontext.StartSpan(
- ctx,
- "registry.retrieve")
- defer sp.Finish()
-
- return r.delivery.RequestFromPeers(ctx, chunk.Addr[:], r.skipCheck)
-}
-
func (r *Registry) NodeInfo() interface{} {
return nil
}
@@ -557,7 +545,7 @@ func (c client) NextInterval() (start, end uint64, err error) {
// Client interface for incoming peer Streamer
type Client interface {
- NeedData(context.Context, []byte) func()
+ NeedData(context.Context, []byte) func(context.Context) error
BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
Close()
}
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index 7523860c9..06e96b9a9 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -80,15 +80,17 @@ func newTestClient(t string) *testClient {
}
}
-func (self *testClient) NeedData(ctx context.Context, hash []byte) func() {
+func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
self.receivedHashes[string(hash)] = hash
if bytes.Equal(hash, hash0[:]) {
- return func() {
+ return func(context.Context) error {
<-self.wait0
+ return nil
}
} else if bytes.Equal(hash, hash2[:]) {
- return func() {
+ return func(context.Context) error {
<-self.wait2
+ return nil
}
}
return nil
diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go
index d7febe4a3..e9811a678 100644
--- a/swarm/network/stream/syncer.go
+++ b/swarm/network/stream/syncer.go
@@ -28,7 +28,6 @@ import (
)
const (
- // BatchSize = 2
BatchSize = 128
)
@@ -38,35 +37,37 @@ const (
// * (live/non-live historical) chunk syncing per proximity bin
type SwarmSyncerServer struct {
po uint8
- db *storage.DBAPI
+ store storage.SyncChunkStore
sessionAt uint64
start uint64
+ live bool
quit chan struct{}
}
// NewSwarmSyncerServer is contructor for SwarmSyncerServer
-func NewSwarmSyncerServer(live bool, po uint8, db *storage.DBAPI) (*SwarmSyncerServer, error) {
- sessionAt := db.CurrentBucketStorageIndex(po)
+func NewSwarmSyncerServer(live bool, po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) {
+ sessionAt := syncChunkStore.BinIndex(po)
var start uint64
if live {
start = sessionAt
}
return &SwarmSyncerServer{
po: po,
- db: db,
+ store: syncChunkStore,
sessionAt: sessionAt,
start: start,
+ live: live,
quit: make(chan struct{}),
}, nil
}
-func RegisterSwarmSyncerServer(streamer *Registry, db *storage.DBAPI) {
+func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) {
streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, live bool) (Server, error) {
po, err := ParseSyncBinKey(t)
if err != nil {
return nil, err
}
- return NewSwarmSyncerServer(live, po, db)
+ return NewSwarmSyncerServer(live, po, syncChunkStore)
})
// streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) {
// return NewOutgoingProvableSwarmSyncer(po, db)
@@ -78,27 +79,35 @@ func (s *SwarmSyncerServer) Close() {
close(s.quit)
}
-// GetSection retrieves the actual chunk from localstore
+// GetData retrieves the actual chunk from netstore
func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
- chunk, err := s.db.Get(ctx, storage.Address(key))
- if err == storage.ErrFetching {
- <-chunk.ReqC
- } else if err != nil {
+ chunk, err := s.store.Get(ctx, storage.Address(key))
+ if err != nil {
return nil, err
}
- return chunk.SData, nil
+ return chunk.Data(), nil
}
// GetBatch retrieves the next batch of hashes from the dbstore
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
var batch []byte
i := 0
- if from == 0 {
- from = s.start
- }
- if to <= from || from >= s.sessionAt {
- to = math.MaxUint64
+ if s.live {
+ if from == 0 {
+ from = s.start
+ }
+ if to <= from || from >= s.sessionAt {
+ to = math.MaxUint64
+ }
+ } else {
+ if (to < from && to != 0) || from > s.sessionAt {
+ return nil, 0, 0, nil, nil
+ }
+ if to == 0 || to > s.sessionAt {
+ to = s.sessionAt
+ }
}
+
var ticker *time.Ticker
defer func() {
if ticker != nil {
@@ -119,8 +128,8 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
}
metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1)
- err := s.db.Iterator(from, to, s.po, func(addr storage.Address, idx uint64) bool {
- batch = append(batch, addr[:]...)
+ err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool {
+ batch = append(batch, key[:]...)
i++
to = idx
return i < BatchSize
@@ -134,7 +143,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
wait = true
}
- log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.db.CurrentBucketStorageIndex(s.po))
+ log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po))
return batch, from, to, nil, nil
}
@@ -146,28 +155,26 @@ type SwarmSyncerClient struct {
sessionReader storage.LazySectionReader
retrieveC chan *storage.Chunk
storeC chan *storage.Chunk
- db *storage.DBAPI
+ store storage.SyncChunkStore
// chunker storage.Chunker
- currentRoot storage.Address
- requestFunc func(chunk *storage.Chunk)
- end, start uint64
- peer *Peer
- ignoreExistingRequest bool
- stream Stream
+ currentRoot storage.Address
+ requestFunc func(chunk *storage.Chunk)
+ end, start uint64
+ peer *Peer
+ stream Stream
}
// NewSwarmSyncerClient is a contructor for provable data exchange syncer
-func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool, stream Stream) (*SwarmSyncerClient, error) {
+func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) {
return &SwarmSyncerClient{
- db: db,
- peer: p,
- ignoreExistingRequest: ignoreExistingRequest,
- stream: stream,
+ store: store,
+ peer: p,
+ stream: stream,
}, nil
}
// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer
-// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Key, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient {
+// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient {
// retrieveC := make(storage.Chunk, chunksCap)
// RunChunkRequestor(p, retrieveC)
// storeC := make(storage.Chunk, chunksCap)
@@ -204,26 +211,15 @@ func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool
// RegisterSwarmSyncerClient registers the client constructor function for
// to handle incoming sync streams
-func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) {
+func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) {
streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) {
- return NewSwarmSyncerClient(p, db, true, NewStream("SYNC", t, live))
+ return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live))
})
}
// NeedData
-func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func()) {
- chunk, _ := s.db.GetOrCreateRequest(ctx, key)
- // TODO: we may want to request from this peer anyway even if the request exists
-
- // ignoreExistingRequest is temporary commented out until its functionality is verified.
- // For now, this optimization can be disabled.
- if chunk.ReqC == nil { //|| (s.ignoreExistingRequest && !created) {
- return nil
- }
- // create request and wait until the chunk data arrives and is stored
- return func() {
- chunk.WaitToStore()
- }
+func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) {
+ return s.store.FetchFunc(ctx, key)
}
// BatchDone
diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go
index f72aa3444..469d520f8 100644
--- a/swarm/network/stream/syncer_test.go
+++ b/swarm/network/stream/syncer_test.go
@@ -102,17 +102,22 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
}
}
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
- bucket.Store(bucketKeyDB, db)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
+ bucket.Store(bucketKeyDB, netStore)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+
bucket.Store(bucketKeyDelivery, delivery)
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
- fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
return r, cleanup, nil
@@ -197,8 +202,8 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
if !ok {
return fmt.Errorf("No DB")
}
- db := item.(*storage.DBAPI)
- db.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool {
+ netStore := item.(*storage.NetStore)
+ netStore.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool {
hashes[i] = append(hashes[i], addr)
totalHashes++
hashCounts[i]++
@@ -216,16 +221,11 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
if !ok {
return fmt.Errorf("No DB")
}
- db := item.(*storage.DBAPI)
- chunk, err := db.Get(ctx, key)
- if err == storage.ErrFetching {
- <-chunk.ReqC
- } else if err != nil {
- continue
+ db := item.(*storage.NetStore)
+ _, err := db.Get(ctx, key)
+ if err == nil {
+ found++
}
- // needed for leveldb not to be closed?
- // chunk.WaitToStore()
- found++
}
}
log.Debug("sync check", "node", node, "index", i, "bin", po, "found", found, "total", total)