From 3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e Mon Sep 17 00:00:00 2001 From: Balint Gabor Date: Thu, 13 Sep 2018 11:42:19 +0200 Subject: swarm: Chunk refactor (#17659) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Janos Guljas Co-authored-by: Balint Gabor Co-authored-by: Anton Evangelatov Co-authored-by: Viktor TrĂ³n --- swarm/storage/netstore_test.go | 639 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 577 insertions(+), 62 deletions(-) (limited to 'swarm/storage/netstore_test.go') diff --git a/swarm/storage/netstore_test.go b/swarm/storage/netstore_test.go index 7babbf5e0..f08968f0e 100644 --- a/swarm/storage/netstore_test.go +++ b/swarm/storage/netstore_test.go @@ -17,107 +17,622 @@ package storage import ( + "bytes" "context" - "encoding/hex" - "errors" + "crypto/rand" "io/ioutil" + "sync" "testing" "time" - "github.com/ethereum/go-ethereum/swarm/network" -) + "github.com/ethereum/go-ethereum/p2p/discover" + ch "github.com/ethereum/go-ethereum/swarm/chunk" -var ( - errUnknown = errors.New("unknown error") + "github.com/ethereum/go-ethereum/common" ) -type mockRetrieve struct { - requests map[string]int +var sourcePeerID = discover.MustHexID("2dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439") + +type mockNetFetcher struct { + peers *sync.Map + sources []*discover.NodeID + peersPerRequest [][]Address + requestCalled bool + offerCalled bool + quit <-chan struct{} + ctx context.Context +} + +func (m *mockNetFetcher) Offer(ctx context.Context, source *discover.NodeID) { + m.offerCalled = true + m.sources = append(m.sources, source) +} + +func (m *mockNetFetcher) Request(ctx context.Context) { + m.requestCalled = true + var peers []Address + m.peers.Range(func(key interface{}, _ interface{}) bool { + peers = append(peers, common.FromHex(key.(string))) + return true + }) + m.peersPerRequest = append(m.peersPerRequest, peers) +} + +type mockNetFetchFuncFactory struct { + fetcher *mockNetFetcher +} + +func (m *mockNetFetchFuncFactory) newMockNetFetcher(ctx context.Context, _ Address, peers *sync.Map) NetFetcher { + m.fetcher.peers = peers + m.fetcher.quit = ctx.Done() + m.fetcher.ctx = ctx + return m.fetcher +} + +func mustNewNetStore(t *testing.T) *NetStore { + netStore, _ := mustNewNetStoreWithFetcher(t) + return netStore } -func NewMockRetrieve() *mockRetrieve { - return &mockRetrieve{requests: make(map[string]int)} +func mustNewNetStoreWithFetcher(t *testing.T) (*NetStore, *mockNetFetcher) { + t.Helper() + + datadir, err := ioutil.TempDir("", "netstore") + if err != nil { + t.Fatal(err) + } + naddr := make([]byte, 32) + params := NewDefaultLocalStoreParams() + params.Init(datadir) + params.BaseKey = naddr + localStore, err := NewTestLocalStoreForAddr(params) + if err != nil { + t.Fatal(err) + } + + fetcher := &mockNetFetcher{} + mockNetFetchFuncFactory := &mockNetFetchFuncFactory{ + fetcher: fetcher, + } + netStore, err := NewNetStore(localStore, mockNetFetchFuncFactory.newMockNetFetcher) + if err != nil { + t.Fatal(err) + } + return netStore, fetcher } -func newDummyChunk(addr Address) *Chunk { - chunk := NewChunk(addr, make(chan bool)) - chunk.SData = []byte{3, 4, 5} - chunk.Size = 3 +// TestNetStoreGetAndPut tests calling NetStore.Get which is blocked until the same chunk is Put. +// After the Put there should no active fetchers, and the context created for the fetcher should +// be cancelled. +func TestNetStoreGetAndPut(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + c := make(chan struct{}) // this channel ensures that the gouroutine with the Put does not run earlier than the Get + go func() { + <-c // wait for the Get to be called + time.Sleep(200 * time.Millisecond) // and a little more so it is surely called + + // check if netStore created a fetcher in the Get call for the unavailable chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatal("Expected netStore to use a fetcher for the Get call") + } + + err := netStore.Put(ctx, chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + }() + + close(c) + recChunk, err := netStore.Get(ctx, chunk.Address()) // this is blocked until the Put above is done + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + // the retrieved chunk should be the same as what we Put + if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) { + t.Fatalf("Different chunk received than what was put") + } + // the chunk is already available locally, so there should be no active fetchers waiting for it + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after delivery") + } + + // A fetcher was created when the Get was called (and the chunk was not available). The chunk + // was delivered with the Put call, so the fetcher should be cancelled now. + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } - return chunk } -func (m *mockRetrieve) retrieve(ctx context.Context, chunk *Chunk) error { - hkey := hex.EncodeToString(chunk.Addr) - m.requests[hkey] += 1 +// TestNetStoreGetAndPut tests calling NetStore.Put and then NetStore.Get. +// After the Put the chunk is available locally, so the Get can just retrieve it from LocalStore, +// there is no need to create fetchers. +func TestNetStoreGetAfterPut(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) - // on second call return error - if m.requests[hkey] == 2 { - return errUnknown + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + // First we Put the chunk, so the chunk will be available locally + err := netStore.Put(ctx, chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) } - // on third call return data - if m.requests[hkey] == 3 { - *chunk = *newDummyChunk(chunk.Addr) + // Get should retrieve the chunk from LocalStore, without creating fetcher + recChunk, err := netStore.Get(ctx, chunk.Address()) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + // the retrieved chunk should be the same as what we Put + if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) { + t.Fatalf("Different chunk received than what was put") + } + // no fetcher offer or request should be created for a locally available chunk + if fetcher.offerCalled || fetcher.requestCalled { + t.Fatal("NetFetcher.offerCalled or requestCalled not expected to be called") + } + // no fetchers should be created for a locally available chunk + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to not have fetcher") + } + +} + +// TestNetStoreGetTimeout tests a Get call for an unavailable chunk and waits for timeout +func TestNetStoreGetTimeout(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + c := make(chan struct{}) // this channel ensures that the gouroutine does not run earlier than the Get + go func() { + <-c // wait for the Get to be called + time.Sleep(200 * time.Millisecond) // and a little more so it is surely called + + // check if netStore created a fetcher in the Get call for the unavailable chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatal("Expected netStore to use a fetcher for the Get call") + } + }() + + close(c) + // We call Get on this chunk, which is not in LocalStore. We don't Put it at all, so there will + // be a timeout + _, err := netStore.Get(ctx, chunk.Address()) + + // Check if the timeout happened + if err != context.DeadlineExceeded { + t.Fatalf("Expected context.DeadLineExceeded err got %v", err) + } + + // A fetcher was created, check if it has been removed after timeout + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after timeout") + } + + // Check if the fetcher context has been cancelled after the timeout + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +// TestNetStoreGetCancel tests a Get call for an unavailable chunk, then cancels the context and checks +// the errors +func TestNetStoreGetCancel(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + + c := make(chan struct{}) // this channel ensures that the gouroutine with the cancel does not run earlier than the Get + go func() { + <-c // wait for the Get to be called + time.Sleep(200 * time.Millisecond) // and a little more so it is surely called + // check if netStore created a fetcher in the Get call for the unavailable chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatal("Expected netStore to use a fetcher for the Get call") + } + cancel() + }() + + close(c) + // We call Get with an unavailable chunk, so it will create a fetcher and wait for delivery + _, err := netStore.Get(ctx, chunk.Address()) + + // After the context is cancelled above Get should return with an error + if err != context.Canceled { + t.Fatalf("Expected context.Canceled err got %v", err) + } + + // A fetcher was created, check if it has been removed after cancel + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after cancel") + } + + // Check if the fetcher context has been cancelled after the request context cancel + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +// TestNetStoreMultipleGetAndPut tests four Get calls for the same unavailable chunk. The chunk is +// delivered with a Put, we have to make sure all Get calls return, and they use a single fetcher +// for the chunk retrieval +func TestNetStoreMultipleGetAndPut(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + go func() { + // sleep to make sure Put is called after all the Get + time.Sleep(500 * time.Millisecond) + // check if netStore created exactly one fetcher for all Get calls + if netStore.fetchers.Len() != 1 { + t.Fatal("Expected netStore to use one fetcher for all Get calls") + } + err := netStore.Put(ctx, chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + }() + + // call Get 4 times for the same unavailable chunk. The calls will be blocked until the Put above. + getWG := sync.WaitGroup{} + for i := 0; i < 4; i++ { + getWG.Add(1) go func() { - time.Sleep(100 * time.Millisecond) - close(chunk.ReqC) + defer getWG.Done() + recChunk, err := netStore.Get(ctx, chunk.Address()) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) { + t.Fatalf("Different chunk received than what was put") + } }() + } + + finishedC := make(chan struct{}) + go func() { + getWG.Wait() + close(finishedC) + }() + + // The Get calls should return after Put, so no timeout expected + select { + case <-finishedC: + case <-time.After(1 * time.Second): + t.Fatalf("Timeout waiting for Get calls to return") + } + + // A fetcher was created, check if it has been removed after cancel + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after delivery") + } - return nil + // A fetcher was created, check if it has been removed after delivery + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") } - return nil } -func TestNetstoreFailedRequest(t *testing.T) { - searchTimeout = 300 * time.Millisecond +// TestNetStoreFetchFuncTimeout tests a FetchFunc call for an unavailable chunk and waits for timeout +func TestNetStoreFetchFuncTimeout(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) - // setup - addr := network.RandomAddr() // tested peers peer address + chunk := GenerateRandomChunk(ch.DefaultSize) - // temp datadir - datadir, err := ioutil.TempDir("", "netstore") + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + // FetchFunc is called for an unavaible chunk, so the returned wait function should not be nil + wait := netStore.FetchFunc(ctx, chunk.Address()) + if wait == nil { + t.Fatal("Expected wait function to be not nil") + } + + // There should an active fetcher for the chunk after the FetchFunc call + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatalf("Expected netStore to have one fetcher for the requested chunk") + } + + // wait function should timeout because we don't deliver the chunk with a Put + err := wait(ctx) + if err != context.DeadlineExceeded { + t.Fatalf("Expected context.DeadLineExceeded err got %v", err) + } + + // the fetcher should be removed after timeout + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after timeout") + } + + // the fetcher context should be cancelled after timeout + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +// TestNetStoreFetchFuncAfterPut tests that the FetchFunc should return nil for a locally available chunk +func TestNetStoreFetchFuncAfterPut(t *testing.T) { + netStore := mustNewNetStore(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + // We deliver the created the chunk with a Put + err := netStore.Put(ctx, chunk) if err != nil { - t.Fatal(err) + t.Fatalf("Expected no err got %v", err) } - params := NewDefaultLocalStoreParams() - params.Init(datadir) - params.BaseKey = addr.Over() - localStore, err := NewTestLocalStoreForAddr(params) + + // FetchFunc should return nil, because the chunk is available locally, no need to fetch it + wait := netStore.FetchFunc(ctx, chunk.Address()) + if wait != nil { + t.Fatal("Expected wait to be nil") + } + + // No fetchers should be created at all + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to not have fetcher") + } +} + +// TestNetStoreGetCallsRequest tests if Get created a request on the NetFetcher for an unavailable chunk +func TestNetStoreGetCallsRequest(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + // We call get for a not available chunk, it will timeout because the chunk is not delivered + _, err := netStore.Get(ctx, chunk.Address()) + + if err != context.DeadlineExceeded { + t.Fatalf("Expected context.DeadlineExceeded err got %v", err) + } + + // NetStore should call NetFetcher.Request and wait for the chunk + if !fetcher.requestCalled { + t.Fatal("Expected NetFetcher.Request to be called") + } +} + +// TestNetStoreGetCallsOffer tests if Get created a request on the NetFetcher for an unavailable chunk +// in case of a source peer provided in the context. +func TestNetStoreGetCallsOffer(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + // If a source peer is added to the context, NetStore will handle it as an offer + ctx := context.WithValue(context.Background(), "source", sourcePeerID.String()) + ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) + defer cancel() + + // We call get for a not available chunk, it will timeout because the chunk is not delivered + chunk, err := netStore.Get(ctx, chunk.Address()) + + if err != context.DeadlineExceeded { + t.Fatalf("Expect error %v got %v", context.DeadlineExceeded, err) + } + + // NetStore should call NetFetcher.Offer with the source peer + if !fetcher.offerCalled { + t.Fatal("Expected NetFetcher.Request to be called") + } + + if len(fetcher.sources) != 1 { + t.Fatalf("Expected fetcher sources length 1 got %v", len(fetcher.sources)) + } + + if fetcher.sources[0].String() != sourcePeerID.String() { + t.Fatalf("Expected fetcher source %v got %v", sourcePeerID, fetcher.sources[0]) + } + +} + +// TestNetStoreFetcherCountPeers tests multiple NetStore.Get calls with peer in the context. +// There is no Put call, so the Get calls timeout +func TestNetStoreFetcherCountPeers(t *testing.T) { + + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + addr := randomAddr() + peers := []string{randomAddr().Hex(), randomAddr().Hex(), randomAddr().Hex()} + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + errC := make(chan error) + nrGets := 3 + + // Call Get 3 times with a peer in context + for i := 0; i < nrGets; i++ { + peer := peers[i] + go func() { + ctx := context.WithValue(ctx, "peer", peer) + _, err := netStore.Get(ctx, addr) + errC <- err + }() + } + + // All 3 Get calls should timeout + for i := 0; i < nrGets; i++ { + err := <-errC + if err != context.DeadlineExceeded { + t.Fatalf("Expected \"%v\" error got \"%v\"", context.DeadlineExceeded, err) + } + } + + // fetcher should be closed after timeout + select { + case <-fetcher.quit: + case <-time.After(3 * time.Second): + t.Fatalf("mockNetFetcher not closed after timeout") + } + + // All 3 peers should be given to NetFetcher after the 3 Get calls + if len(fetcher.peersPerRequest) != nrGets { + t.Fatalf("Expected 3 got %v", len(fetcher.peersPerRequest)) + } + + for i, peers := range fetcher.peersPerRequest { + if len(peers) < i+1 { + t.Fatalf("Expected at least %v got %v", i+1, len(peers)) + } + } +} + +// TestNetStoreFetchFuncCalledMultipleTimes calls the wait function given by FetchFunc three times, +// and checks there is still exactly one fetcher for one chunk. Afthe chunk is delivered, it checks +// if the fetcher is closed. +func TestNetStoreFetchFuncCalledMultipleTimes(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + // FetchFunc should return a non-nil wait function, because the chunk is not available + wait := netStore.FetchFunc(ctx, chunk.Address()) + if wait == nil { + t.Fatal("Expected wait function to be not nil") + } + + // There should be exactly one fetcher for the chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatalf("Expected netStore to have one fetcher for the requested chunk") + } + + // Call wait three times parallelly + wg := sync.WaitGroup{} + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + err := wait(ctx) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + wg.Done() + }() + } + + // sleep a little so the wait functions are called above + time.Sleep(100 * time.Millisecond) + + // there should be still only one fetcher, because all wait calls are for the same chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatal("Expected netStore to have one fetcher for the requested chunk") + } + + // Deliver the chunk with a Put + err := netStore.Put(ctx, chunk) if err != nil { - t.Fatal(err) + t.Fatalf("Expected no err got %v", err) + } + + // wait until all wait calls return (because the chunk is delivered) + wg.Wait() + + // There should be no more fetchers for the delivered chunk + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after delivery") } - r := NewMockRetrieve() - netStore := NewNetStore(localStore, r.retrieve) + // The context for the fetcher should be cancelled after delivery + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +// TestNetStoreFetcherLifeCycleWithTimeout is similar to TestNetStoreFetchFuncCalledMultipleTimes, +// the only difference is that we don't deilver the chunk, just wait for timeout +func TestNetStoreFetcherLifeCycleWithTimeout(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) - key := Address{} + chunk := GenerateRandomChunk(ch.DefaultSize) - // first call is done by the retry on ErrChunkNotFound, no need to do it here - // _, err = netStore.Get(key) - // if err == nil || err != ErrChunkNotFound { - // t.Fatalf("expected to get ErrChunkNotFound, but got: %s", err) - // } + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() - // second call - _, err = netStore.Get(context.TODO(), key) - if got := r.requests[hex.EncodeToString(key)]; got != 2 { - t.Fatalf("expected to have called retrieve two times, but got: %v", got) + // FetchFunc should return a non-nil wait function, because the chunk is not available + wait := netStore.FetchFunc(ctx, chunk.Address()) + if wait == nil { + t.Fatal("Expected wait function to be not nil") } - if err != errUnknown { - t.Fatalf("expected to get an unknown error, but got: %s", err) + + // There should be exactly one fetcher for the chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatalf("Expected netStore to have one fetcher for the requested chunk") } - // third call - chunk, err := netStore.Get(context.TODO(), key) - if got := r.requests[hex.EncodeToString(key)]; got != 3 { - t.Fatalf("expected to have called retrieve three times, but got: %v", got) + // Call wait three times parallelly + wg := sync.WaitGroup{} + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + defer wg.Done() + rctx, rcancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer rcancel() + err := wait(rctx) + if err != context.DeadlineExceeded { + t.Fatalf("Expected err %v got %v", context.DeadlineExceeded, err) + } + }() } - if err != nil || chunk == nil { - t.Fatalf("expected to get a chunk but got: %v, %s", chunk, err) + + // wait until all wait calls timeout + wg.Wait() + + // There should be no more fetchers after timeout + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after delivery") } - if len(chunk.SData) != 3 { - t.Fatalf("expected to get a chunk with size 3, but got: %v", chunk.SData) + + // The context for the fetcher should be cancelled after timeout + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") } } + +func randomAddr() Address { + addr := make([]byte, 32) + rand.Read(addr) + return Address(addr) +} -- cgit v1.2.3