aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/netstore_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/netstore_test.go')
-rw-r--r--swarm/storage/netstore_test.go639
1 files changed, 577 insertions, 62 deletions
diff --git a/swarm/storage/netstore_test.go b/swarm/storage/netstore_test.go
index 7babbf5e0..f08968f0e 100644
--- a/swarm/storage/netstore_test.go
+++ b/swarm/storage/netstore_test.go
@@ -17,107 +17,622 @@
package storage
import (
+ "bytes"
"context"
- "encoding/hex"
- "errors"
+ "crypto/rand"
"io/ioutil"
+ "sync"
"testing"
"time"
- "github.com/ethereum/go-ethereum/swarm/network"
-)
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ ch "github.com/ethereum/go-ethereum/swarm/chunk"
-var (
- errUnknown = errors.New("unknown error")
+ "github.com/ethereum/go-ethereum/common"
)
-type mockRetrieve struct {
- requests map[string]int
+var sourcePeerID = discover.MustHexID("2dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439")
+
+type mockNetFetcher struct {
+ peers *sync.Map
+ sources []*discover.NodeID
+ peersPerRequest [][]Address
+ requestCalled bool
+ offerCalled bool
+ quit <-chan struct{}
+ ctx context.Context
+}
+
+func (m *mockNetFetcher) Offer(ctx context.Context, source *discover.NodeID) {
+ m.offerCalled = true
+ m.sources = append(m.sources, source)
+}
+
+func (m *mockNetFetcher) Request(ctx context.Context) {
+ m.requestCalled = true
+ var peers []Address
+ m.peers.Range(func(key interface{}, _ interface{}) bool {
+ peers = append(peers, common.FromHex(key.(string)))
+ return true
+ })
+ m.peersPerRequest = append(m.peersPerRequest, peers)
+}
+
+type mockNetFetchFuncFactory struct {
+ fetcher *mockNetFetcher
+}
+
+func (m *mockNetFetchFuncFactory) newMockNetFetcher(ctx context.Context, _ Address, peers *sync.Map) NetFetcher {
+ m.fetcher.peers = peers
+ m.fetcher.quit = ctx.Done()
+ m.fetcher.ctx = ctx
+ return m.fetcher
+}
+
+func mustNewNetStore(t *testing.T) *NetStore {
+ netStore, _ := mustNewNetStoreWithFetcher(t)
+ return netStore
}
-func NewMockRetrieve() *mockRetrieve {
- return &mockRetrieve{requests: make(map[string]int)}
+func mustNewNetStoreWithFetcher(t *testing.T) (*NetStore, *mockNetFetcher) {
+ t.Helper()
+
+ datadir, err := ioutil.TempDir("", "netstore")
+ if err != nil {
+ t.Fatal(err)
+ }
+ naddr := make([]byte, 32)
+ params := NewDefaultLocalStoreParams()
+ params.Init(datadir)
+ params.BaseKey = naddr
+ localStore, err := NewTestLocalStoreForAddr(params)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ fetcher := &mockNetFetcher{}
+ mockNetFetchFuncFactory := &mockNetFetchFuncFactory{
+ fetcher: fetcher,
+ }
+ netStore, err := NewNetStore(localStore, mockNetFetchFuncFactory.newMockNetFetcher)
+ if err != nil {
+ t.Fatal(err)
+ }
+ return netStore, fetcher
}
-func newDummyChunk(addr Address) *Chunk {
- chunk := NewChunk(addr, make(chan bool))
- chunk.SData = []byte{3, 4, 5}
- chunk.Size = 3
+// TestNetStoreGetAndPut tests calling NetStore.Get which is blocked until the same chunk is Put.
+// After the Put there should no active fetchers, and the context created for the fetcher should
+// be cancelled.
+func TestNetStoreGetAndPut(t *testing.T) {
+ netStore, fetcher := mustNewNetStoreWithFetcher(t)
+
+ chunk := GenerateRandomChunk(ch.DefaultSize)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+ defer cancel()
+
+ c := make(chan struct{}) // this channel ensures that the gouroutine with the Put does not run earlier than the Get
+ go func() {
+ <-c // wait for the Get to be called
+ time.Sleep(200 * time.Millisecond) // and a little more so it is surely called
+
+ // check if netStore created a fetcher in the Get call for the unavailable chunk
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil {
+ t.Fatal("Expected netStore to use a fetcher for the Get call")
+ }
+
+ err := netStore.Put(ctx, chunk)
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
+ }()
+
+ close(c)
+ recChunk, err := netStore.Get(ctx, chunk.Address()) // this is blocked until the Put above is done
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
+ // the retrieved chunk should be the same as what we Put
+ if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) {
+ t.Fatalf("Different chunk received than what was put")
+ }
+ // the chunk is already available locally, so there should be no active fetchers waiting for it
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to remove the fetcher after delivery")
+ }
+
+ // A fetcher was created when the Get was called (and the chunk was not available). The chunk
+ // was delivered with the Put call, so the fetcher should be cancelled now.
+ select {
+ case <-fetcher.ctx.Done():
+ default:
+ t.Fatal("Expected fetcher context to be cancelled")
+ }
- return chunk
}
-func (m *mockRetrieve) retrieve(ctx context.Context, chunk *Chunk) error {
- hkey := hex.EncodeToString(chunk.Addr)
- m.requests[hkey] += 1
+// TestNetStoreGetAndPut tests calling NetStore.Put and then NetStore.Get.
+// After the Put the chunk is available locally, so the Get can just retrieve it from LocalStore,
+// there is no need to create fetchers.
+func TestNetStoreGetAfterPut(t *testing.T) {
+ netStore, fetcher := mustNewNetStoreWithFetcher(t)
+
+ chunk := GenerateRandomChunk(ch.DefaultSize)
- // on second call return error
- if m.requests[hkey] == 2 {
- return errUnknown
+ ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+ defer cancel()
+
+ // First we Put the chunk, so the chunk will be available locally
+ err := netStore.Put(ctx, chunk)
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
}
- // on third call return data
- if m.requests[hkey] == 3 {
- *chunk = *newDummyChunk(chunk.Addr)
+ // Get should retrieve the chunk from LocalStore, without creating fetcher
+ recChunk, err := netStore.Get(ctx, chunk.Address())
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
+ // the retrieved chunk should be the same as what we Put
+ if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) {
+ t.Fatalf("Different chunk received than what was put")
+ }
+ // no fetcher offer or request should be created for a locally available chunk
+ if fetcher.offerCalled || fetcher.requestCalled {
+ t.Fatal("NetFetcher.offerCalled or requestCalled not expected to be called")
+ }
+ // no fetchers should be created for a locally available chunk
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to not have fetcher")
+ }
+
+}
+
+// TestNetStoreGetTimeout tests a Get call for an unavailable chunk and waits for timeout
+func TestNetStoreGetTimeout(t *testing.T) {
+ netStore, fetcher := mustNewNetStoreWithFetcher(t)
+
+ chunk := GenerateRandomChunk(ch.DefaultSize)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+ defer cancel()
+
+ c := make(chan struct{}) // this channel ensures that the gouroutine does not run earlier than the Get
+ go func() {
+ <-c // wait for the Get to be called
+ time.Sleep(200 * time.Millisecond) // and a little more so it is surely called
+
+ // check if netStore created a fetcher in the Get call for the unavailable chunk
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil {
+ t.Fatal("Expected netStore to use a fetcher for the Get call")
+ }
+ }()
+
+ close(c)
+ // We call Get on this chunk, which is not in LocalStore. We don't Put it at all, so there will
+ // be a timeout
+ _, err := netStore.Get(ctx, chunk.Address())
+
+ // Check if the timeout happened
+ if err != context.DeadlineExceeded {
+ t.Fatalf("Expected context.DeadLineExceeded err got %v", err)
+ }
+
+ // A fetcher was created, check if it has been removed after timeout
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to remove the fetcher after timeout")
+ }
+
+ // Check if the fetcher context has been cancelled after the timeout
+ select {
+ case <-fetcher.ctx.Done():
+ default:
+ t.Fatal("Expected fetcher context to be cancelled")
+ }
+}
+
+// TestNetStoreGetCancel tests a Get call for an unavailable chunk, then cancels the context and checks
+// the errors
+func TestNetStoreGetCancel(t *testing.T) {
+ netStore, fetcher := mustNewNetStoreWithFetcher(t)
+
+ chunk := GenerateRandomChunk(ch.DefaultSize)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+
+ c := make(chan struct{}) // this channel ensures that the gouroutine with the cancel does not run earlier than the Get
+ go func() {
+ <-c // wait for the Get to be called
+ time.Sleep(200 * time.Millisecond) // and a little more so it is surely called
+ // check if netStore created a fetcher in the Get call for the unavailable chunk
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil {
+ t.Fatal("Expected netStore to use a fetcher for the Get call")
+ }
+ cancel()
+ }()
+
+ close(c)
+ // We call Get with an unavailable chunk, so it will create a fetcher and wait for delivery
+ _, err := netStore.Get(ctx, chunk.Address())
+
+ // After the context is cancelled above Get should return with an error
+ if err != context.Canceled {
+ t.Fatalf("Expected context.Canceled err got %v", err)
+ }
+
+ // A fetcher was created, check if it has been removed after cancel
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to remove the fetcher after cancel")
+ }
+
+ // Check if the fetcher context has been cancelled after the request context cancel
+ select {
+ case <-fetcher.ctx.Done():
+ default:
+ t.Fatal("Expected fetcher context to be cancelled")
+ }
+}
+
+// TestNetStoreMultipleGetAndPut tests four Get calls for the same unavailable chunk. The chunk is
+// delivered with a Put, we have to make sure all Get calls return, and they use a single fetcher
+// for the chunk retrieval
+func TestNetStoreMultipleGetAndPut(t *testing.T) {
+ netStore, fetcher := mustNewNetStoreWithFetcher(t)
+
+ chunk := GenerateRandomChunk(ch.DefaultSize)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+ defer cancel()
+
+ go func() {
+ // sleep to make sure Put is called after all the Get
+ time.Sleep(500 * time.Millisecond)
+ // check if netStore created exactly one fetcher for all Get calls
+ if netStore.fetchers.Len() != 1 {
+ t.Fatal("Expected netStore to use one fetcher for all Get calls")
+ }
+ err := netStore.Put(ctx, chunk)
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
+ }()
+
+ // call Get 4 times for the same unavailable chunk. The calls will be blocked until the Put above.
+ getWG := sync.WaitGroup{}
+ for i := 0; i < 4; i++ {
+ getWG.Add(1)
go func() {
- time.Sleep(100 * time.Millisecond)
- close(chunk.ReqC)
+ defer getWG.Done()
+ recChunk, err := netStore.Get(ctx, chunk.Address())
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
+ if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) {
+ t.Fatalf("Different chunk received than what was put")
+ }
}()
+ }
+
+ finishedC := make(chan struct{})
+ go func() {
+ getWG.Wait()
+ close(finishedC)
+ }()
+
+ // The Get calls should return after Put, so no timeout expected
+ select {
+ case <-finishedC:
+ case <-time.After(1 * time.Second):
+ t.Fatalf("Timeout waiting for Get calls to return")
+ }
+
+ // A fetcher was created, check if it has been removed after cancel
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to remove the fetcher after delivery")
+ }
- return nil
+ // A fetcher was created, check if it has been removed after delivery
+ select {
+ case <-fetcher.ctx.Done():
+ default:
+ t.Fatal("Expected fetcher context to be cancelled")
}
- return nil
}
-func TestNetstoreFailedRequest(t *testing.T) {
- searchTimeout = 300 * time.Millisecond
+// TestNetStoreFetchFuncTimeout tests a FetchFunc call for an unavailable chunk and waits for timeout
+func TestNetStoreFetchFuncTimeout(t *testing.T) {
+ netStore, fetcher := mustNewNetStoreWithFetcher(t)
- // setup
- addr := network.RandomAddr() // tested peers peer address
+ chunk := GenerateRandomChunk(ch.DefaultSize)
- // temp datadir
- datadir, err := ioutil.TempDir("", "netstore")
+ ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
+ defer cancel()
+
+ // FetchFunc is called for an unavaible chunk, so the returned wait function should not be nil
+ wait := netStore.FetchFunc(ctx, chunk.Address())
+ if wait == nil {
+ t.Fatal("Expected wait function to be not nil")
+ }
+
+ // There should an active fetcher for the chunk after the FetchFunc call
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil {
+ t.Fatalf("Expected netStore to have one fetcher for the requested chunk")
+ }
+
+ // wait function should timeout because we don't deliver the chunk with a Put
+ err := wait(ctx)
+ if err != context.DeadlineExceeded {
+ t.Fatalf("Expected context.DeadLineExceeded err got %v", err)
+ }
+
+ // the fetcher should be removed after timeout
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to remove the fetcher after timeout")
+ }
+
+ // the fetcher context should be cancelled after timeout
+ select {
+ case <-fetcher.ctx.Done():
+ default:
+ t.Fatal("Expected fetcher context to be cancelled")
+ }
+}
+
+// TestNetStoreFetchFuncAfterPut tests that the FetchFunc should return nil for a locally available chunk
+func TestNetStoreFetchFuncAfterPut(t *testing.T) {
+ netStore := mustNewNetStore(t)
+
+ chunk := GenerateRandomChunk(ch.DefaultSize)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ defer cancel()
+
+ // We deliver the created the chunk with a Put
+ err := netStore.Put(ctx, chunk)
if err != nil {
- t.Fatal(err)
+ t.Fatalf("Expected no err got %v", err)
}
- params := NewDefaultLocalStoreParams()
- params.Init(datadir)
- params.BaseKey = addr.Over()
- localStore, err := NewTestLocalStoreForAddr(params)
+
+ // FetchFunc should return nil, because the chunk is available locally, no need to fetch it
+ wait := netStore.FetchFunc(ctx, chunk.Address())
+ if wait != nil {
+ t.Fatal("Expected wait to be nil")
+ }
+
+ // No fetchers should be created at all
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to not have fetcher")
+ }
+}
+
+// TestNetStoreGetCallsRequest tests if Get created a request on the NetFetcher for an unavailable chunk
+func TestNetStoreGetCallsRequest(t *testing.T) {
+ netStore, fetcher := mustNewNetStoreWithFetcher(t)
+
+ chunk := GenerateRandomChunk(ch.DefaultSize)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
+ defer cancel()
+
+ // We call get for a not available chunk, it will timeout because the chunk is not delivered
+ _, err := netStore.Get(ctx, chunk.Address())
+
+ if err != context.DeadlineExceeded {
+ t.Fatalf("Expected context.DeadlineExceeded err got %v", err)
+ }
+
+ // NetStore should call NetFetcher.Request and wait for the chunk
+ if !fetcher.requestCalled {
+ t.Fatal("Expected NetFetcher.Request to be called")
+ }
+}
+
+// TestNetStoreGetCallsOffer tests if Get created a request on the NetFetcher for an unavailable chunk
+// in case of a source peer provided in the context.
+func TestNetStoreGetCallsOffer(t *testing.T) {
+ netStore, fetcher := mustNewNetStoreWithFetcher(t)
+
+ chunk := GenerateRandomChunk(ch.DefaultSize)
+
+ // If a source peer is added to the context, NetStore will handle it as an offer
+ ctx := context.WithValue(context.Background(), "source", sourcePeerID.String())
+ ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond)
+ defer cancel()
+
+ // We call get for a not available chunk, it will timeout because the chunk is not delivered
+ chunk, err := netStore.Get(ctx, chunk.Address())
+
+ if err != context.DeadlineExceeded {
+ t.Fatalf("Expect error %v got %v", context.DeadlineExceeded, err)
+ }
+
+ // NetStore should call NetFetcher.Offer with the source peer
+ if !fetcher.offerCalled {
+ t.Fatal("Expected NetFetcher.Request to be called")
+ }
+
+ if len(fetcher.sources) != 1 {
+ t.Fatalf("Expected fetcher sources length 1 got %v", len(fetcher.sources))
+ }
+
+ if fetcher.sources[0].String() != sourcePeerID.String() {
+ t.Fatalf("Expected fetcher source %v got %v", sourcePeerID, fetcher.sources[0])
+ }
+
+}
+
+// TestNetStoreFetcherCountPeers tests multiple NetStore.Get calls with peer in the context.
+// There is no Put call, so the Get calls timeout
+func TestNetStoreFetcherCountPeers(t *testing.T) {
+
+ netStore, fetcher := mustNewNetStoreWithFetcher(t)
+
+ addr := randomAddr()
+ peers := []string{randomAddr().Hex(), randomAddr().Hex(), randomAddr().Hex()}
+
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ defer cancel()
+ errC := make(chan error)
+ nrGets := 3
+
+ // Call Get 3 times with a peer in context
+ for i := 0; i < nrGets; i++ {
+ peer := peers[i]
+ go func() {
+ ctx := context.WithValue(ctx, "peer", peer)
+ _, err := netStore.Get(ctx, addr)
+ errC <- err
+ }()
+ }
+
+ // All 3 Get calls should timeout
+ for i := 0; i < nrGets; i++ {
+ err := <-errC
+ if err != context.DeadlineExceeded {
+ t.Fatalf("Expected \"%v\" error got \"%v\"", context.DeadlineExceeded, err)
+ }
+ }
+
+ // fetcher should be closed after timeout
+ select {
+ case <-fetcher.quit:
+ case <-time.After(3 * time.Second):
+ t.Fatalf("mockNetFetcher not closed after timeout")
+ }
+
+ // All 3 peers should be given to NetFetcher after the 3 Get calls
+ if len(fetcher.peersPerRequest) != nrGets {
+ t.Fatalf("Expected 3 got %v", len(fetcher.peersPerRequest))
+ }
+
+ for i, peers := range fetcher.peersPerRequest {
+ if len(peers) < i+1 {
+ t.Fatalf("Expected at least %v got %v", i+1, len(peers))
+ }
+ }
+}
+
+// TestNetStoreFetchFuncCalledMultipleTimes calls the wait function given by FetchFunc three times,
+// and checks there is still exactly one fetcher for one chunk. Afthe chunk is delivered, it checks
+// if the fetcher is closed.
+func TestNetStoreFetchFuncCalledMultipleTimes(t *testing.T) {
+ netStore, fetcher := mustNewNetStoreWithFetcher(t)
+
+ chunk := GenerateRandomChunk(ch.DefaultSize)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+ defer cancel()
+
+ // FetchFunc should return a non-nil wait function, because the chunk is not available
+ wait := netStore.FetchFunc(ctx, chunk.Address())
+ if wait == nil {
+ t.Fatal("Expected wait function to be not nil")
+ }
+
+ // There should be exactly one fetcher for the chunk
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil {
+ t.Fatalf("Expected netStore to have one fetcher for the requested chunk")
+ }
+
+ // Call wait three times parallelly
+ wg := sync.WaitGroup{}
+ for i := 0; i < 3; i++ {
+ wg.Add(1)
+ go func() {
+ err := wait(ctx)
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
+ wg.Done()
+ }()
+ }
+
+ // sleep a little so the wait functions are called above
+ time.Sleep(100 * time.Millisecond)
+
+ // there should be still only one fetcher, because all wait calls are for the same chunk
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil {
+ t.Fatal("Expected netStore to have one fetcher for the requested chunk")
+ }
+
+ // Deliver the chunk with a Put
+ err := netStore.Put(ctx, chunk)
if err != nil {
- t.Fatal(err)
+ t.Fatalf("Expected no err got %v", err)
+ }
+
+ // wait until all wait calls return (because the chunk is delivered)
+ wg.Wait()
+
+ // There should be no more fetchers for the delivered chunk
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to remove the fetcher after delivery")
}
- r := NewMockRetrieve()
- netStore := NewNetStore(localStore, r.retrieve)
+ // The context for the fetcher should be cancelled after delivery
+ select {
+ case <-fetcher.ctx.Done():
+ default:
+ t.Fatal("Expected fetcher context to be cancelled")
+ }
+}
+
+// TestNetStoreFetcherLifeCycleWithTimeout is similar to TestNetStoreFetchFuncCalledMultipleTimes,
+// the only difference is that we don't deilver the chunk, just wait for timeout
+func TestNetStoreFetcherLifeCycleWithTimeout(t *testing.T) {
+ netStore, fetcher := mustNewNetStoreWithFetcher(t)
- key := Address{}
+ chunk := GenerateRandomChunk(ch.DefaultSize)
- // first call is done by the retry on ErrChunkNotFound, no need to do it here
- // _, err = netStore.Get(key)
- // if err == nil || err != ErrChunkNotFound {
- // t.Fatalf("expected to get ErrChunkNotFound, but got: %s", err)
- // }
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ defer cancel()
- // second call
- _, err = netStore.Get(context.TODO(), key)
- if got := r.requests[hex.EncodeToString(key)]; got != 2 {
- t.Fatalf("expected to have called retrieve two times, but got: %v", got)
+ // FetchFunc should return a non-nil wait function, because the chunk is not available
+ wait := netStore.FetchFunc(ctx, chunk.Address())
+ if wait == nil {
+ t.Fatal("Expected wait function to be not nil")
}
- if err != errUnknown {
- t.Fatalf("expected to get an unknown error, but got: %s", err)
+
+ // There should be exactly one fetcher for the chunk
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil {
+ t.Fatalf("Expected netStore to have one fetcher for the requested chunk")
}
- // third call
- chunk, err := netStore.Get(context.TODO(), key)
- if got := r.requests[hex.EncodeToString(key)]; got != 3 {
- t.Fatalf("expected to have called retrieve three times, but got: %v", got)
+ // Call wait three times parallelly
+ wg := sync.WaitGroup{}
+ for i := 0; i < 3; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ rctx, rcancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
+ defer rcancel()
+ err := wait(rctx)
+ if err != context.DeadlineExceeded {
+ t.Fatalf("Expected err %v got %v", context.DeadlineExceeded, err)
+ }
+ }()
}
- if err != nil || chunk == nil {
- t.Fatalf("expected to get a chunk but got: %v, %s", chunk, err)
+
+ // wait until all wait calls timeout
+ wg.Wait()
+
+ // There should be no more fetchers after timeout
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to remove the fetcher after delivery")
}
- if len(chunk.SData) != 3 {
- t.Fatalf("expected to get a chunk with size 3, but got: %v", chunk.SData)
+
+ // The context for the fetcher should be cancelled after timeout
+ select {
+ case <-fetcher.ctx.Done():
+ default:
+ t.Fatal("Expected fetcher context to be cancelled")
}
}
+
+func randomAddr() Address {
+ addr := make([]byte, 32)
+ rand.Read(addr)
+ return Address(addr)
+}