aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorElad <theman@elad.im>2019-02-14 14:51:57 +0800
committerRafael Matias <rafael@skyle.net>2019-02-19 20:11:52 +0800
commit79cac793c013832457a89911cc477f345e46ced9 (patch)
treef37553372cec94f0c5627bb0d5b3de1f1fbd1699
parent5de6b6b529ea3fc0ad5cd9791d5f3ac44f497d65 (diff)
downloaddexon-79cac793c013832457a89911cc477f345e46ced9.tar
dexon-79cac793c013832457a89911cc477f345e46ced9.tar.gz
dexon-79cac793c013832457a89911cc477f345e46ced9.tar.bz2
dexon-79cac793c013832457a89911cc477f345e46ced9.tar.lz
dexon-79cac793c013832457a89911cc477f345e46ced9.tar.xz
dexon-79cac793c013832457a89911cc477f345e46ced9.tar.zst
dexon-79cac793c013832457a89911cc477f345e46ced9.zip
swarm/storage/netstore: add fetcher cancellation on shutdown (#19049)
swarm/network/stream: remove netstore internal wg swarm/network/stream: run individual tests with t.Run (cherry picked from commit 3ee09ba03511ad9a49e37c58f0c35b9c9771dd6f)
-rw-r--r--swarm/network/stream/delivery_test.go229
-rw-r--r--swarm/network/stream/snapshot_retrieval_test.go15
-rw-r--r--swarm/storage/netstore.go20
3 files changed, 144 insertions, 120 deletions
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index e5821df4f..49e4a423a 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -453,133 +453,136 @@ func TestDeliveryFromNodes(t *testing.T) {
}
func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
- sim := simulation.New(map[string]simulation.ServiceFunc{
- "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
- if err != nil {
- return nil, nil, err
- }
-
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- SkipCheck: skipCheck,
- Syncing: SyncingDisabled,
- Retrieval: RetrievalEnabled,
- }, nil)
- bucket.Store(bucketKeyRegistry, r)
+ t.Helper()
+ t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) {
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
+ if err != nil {
+ return nil, nil, err
+ }
- cleanup = func() {
- r.Close()
- clean()
- }
+ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ SkipCheck: skipCheck,
+ Syncing: SyncingDisabled,
+ Retrieval: RetrievalEnabled,
+ }, nil)
+ bucket.Store(bucketKeyRegistry, r)
- return r, cleanup, nil
- },
- })
- defer sim.Close()
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
- log.Info("Adding nodes to simulation")
- _, err := sim.AddNodesAndConnectChain(nodes)
- if err != nil {
- t.Fatal(err)
- }
+ return r, cleanup, nil
+ },
+ })
+ defer sim.Close()
- log.Info("Starting simulation")
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
- nodeIDs := sim.UpNodeIDs()
- //determine the pivot node to be the first node of the simulation
- pivot := nodeIDs[0]
-
- //distribute chunks of a random file into Stores of nodes 1 to nodes
- //we will do this by creating a file store with an underlying round-robin store:
- //the file store will create a hash for the uploaded file, but every chunk will be
- //distributed to different nodes via round-robin scheduling
- log.Debug("Writing file to round-robin file store")
- //to do this, we create an array for chunkstores (length minus one, the pivot node)
- stores := make([]storage.ChunkStore, len(nodeIDs)-1)
- //we then need to get all stores from the sim....
- lStores := sim.NodesItems(bucketKeyStore)
- i := 0
- //...iterate the buckets...
- for id, bucketVal := range lStores {
- //...and remove the one which is the pivot node
- if id == pivot {
- continue
- }
- //the other ones are added to the array...
- stores[i] = bucketVal.(storage.ChunkStore)
- i++
- }
- //...which then gets passed to the round-robin file store
- roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
- //now we can actually upload a (random) file to the round-robin store
- size := chunkCount * chunkSize
- log.Debug("Storing data to file store")
- fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
- // wait until all chunks stored
+ log.Info("Adding nodes to simulation")
+ _, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
- return err
- }
- err = wait(ctx)
- if err != nil {
- return err
+ t.Fatal(err)
}
- log.Debug("Waiting for kademlia")
- // TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
- if _, err := sim.WaitTillHealthy(ctx); err != nil {
- return err
- }
+ log.Info("Starting simulation")
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
+ nodeIDs := sim.UpNodeIDs()
+ //determine the pivot node to be the first node of the simulation
+ pivot := nodeIDs[0]
+
+ //distribute chunks of a random file into Stores of nodes 1 to nodes
+ //we will do this by creating a file store with an underlying round-robin store:
+ //the file store will create a hash for the uploaded file, but every chunk will be
+ //distributed to different nodes via round-robin scheduling
+ log.Debug("Writing file to round-robin file store")
+ //to do this, we create an array for chunkstores (length minus one, the pivot node)
+ stores := make([]storage.ChunkStore, len(nodeIDs)-1)
+ //we then need to get all stores from the sim....
+ lStores := sim.NodesItems(bucketKeyStore)
+ i := 0
+ //...iterate the buckets...
+ for id, bucketVal := range lStores {
+ //...and remove the one which is the pivot node
+ if id == pivot {
+ continue
+ }
+ //the other ones are added to the array...
+ stores[i] = bucketVal.(storage.ChunkStore)
+ i++
+ }
+ //...which then gets passed to the round-robin file store
+ roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
+ //now we can actually upload a (random) file to the round-robin store
+ size := chunkCount * chunkSize
+ log.Debug("Storing data to file store")
+ fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
+ // wait until all chunks stored
+ if err != nil {
+ return err
+ }
+ err = wait(ctx)
+ if err != nil {
+ return err
+ }
- //get the pivot node's filestore
- item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
- if !ok {
- return fmt.Errorf("No filestore")
- }
- pivotFileStore := item.(*storage.FileStore)
- log.Debug("Starting retrieval routine")
- retErrC := make(chan error)
- go func() {
- // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
- // we must wait for the peer connections to have started before requesting
- n, err := readAll(pivotFileStore, fileHash)
- log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
- retErrC <- err
- }()
+ log.Debug("Waiting for kademlia")
+ // TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
+ if _, err := sim.WaitTillHealthy(ctx); err != nil {
+ return err
+ }
- disconnected := watchDisconnections(ctx, sim)
- defer func() {
- if err != nil && disconnected.bool() {
- err = errors.New("disconnect events received")
+ //get the pivot node's filestore
+ item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
+ if !ok {
+ return fmt.Errorf("No filestore")
}
- }()
+ pivotFileStore := item.(*storage.FileStore)
+ log.Debug("Starting retrieval routine")
+ retErrC := make(chan error)
+ go func() {
+ // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
+ // we must wait for the peer connections to have started before requesting
+ n, err := readAll(pivotFileStore, fileHash)
+ log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
+ retErrC <- err
+ }()
+
+ disconnected := watchDisconnections(ctx, sim)
+ defer func() {
+ if err != nil && disconnected.bool() {
+ err = errors.New("disconnect events received")
+ }
+ }()
- //finally check that the pivot node gets all chunks via the root hash
- log.Debug("Check retrieval")
- success := true
- var total int64
- total, err = readAll(pivotFileStore, fileHash)
- if err != nil {
- return err
- }
- log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
- if err != nil || total != int64(size) {
- success = false
- }
+ //finally check that the pivot node gets all chunks via the root hash
+ log.Debug("Check retrieval")
+ success := true
+ var total int64
+ total, err = readAll(pivotFileStore, fileHash)
+ if err != nil {
+ return err
+ }
+ log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
+ if err != nil || total != int64(size) {
+ success = false
+ }
- if !success {
- return fmt.Errorf("Test failed, chunks not available on all nodes")
- }
- if err := <-retErrC; err != nil {
- return fmt.Errorf("requesting chunks: %v", err)
+ if !success {
+ return fmt.Errorf("Test failed, chunks not available on all nodes")
+ }
+ if err := <-retErrC; err != nil {
+ return fmt.Errorf("requesting chunks: %v", err)
+ }
+ log.Debug("Test terminated successfully")
+ return nil
+ })
+ if result.Error != nil {
+ t.Fatal(result.Error)
}
- log.Debug("Test terminated successfully")
- return nil
})
- if result.Error != nil {
- t.Fatal(result.Error)
- }
}
func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index 0f20f1be0..afb023ae2 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -74,7 +74,7 @@ func TestRetrieval(t *testing.T) {
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
- err := runRetrievalTest(*chunks, *nodes)
+ err := runRetrievalTest(t, *chunks, *nodes)
if err != nil {
t.Fatal(err)
}
@@ -93,10 +93,12 @@ func TestRetrieval(t *testing.T) {
}
for _, n := range nodeCnt {
for _, c := range chnkCnt {
- err := runRetrievalTest(c, n)
- if err != nil {
- t.Fatal(err)
- }
+ t.Run(fmt.Sprintf("TestRetrieval_%d_%d", n, c), func(t *testing.T) {
+ err := runRetrievalTest(t, c, n)
+ if err != nil {
+ t.Fatal(err)
+ }
+ })
}
}
}
@@ -225,7 +227,8 @@ simulation's `action` function.
The snapshot should have 'streamer' in its service list.
*/
-func runRetrievalTest(chunkCount int, nodeCount int) error {
+func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
+ t.Helper()
sim := simulation.New(retrievalSimServiceMap)
defer sim.Close()
diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go
index a2595d9fa..202af2bf5 100644
--- a/swarm/storage/netstore.go
+++ b/swarm/storage/netstore.go
@@ -128,7 +128,25 @@ func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Cont
func (n *NetStore) Close() {
close(n.closeC)
n.store.Close()
- // TODO: loop through fetchers to cancel them
+
+ wg := sync.WaitGroup{}
+ for _, key := range n.fetchers.Keys() {
+ if f, ok := n.fetchers.Get(key); ok {
+ if fetch, ok := f.(*fetcher); ok {
+ wg.Add(1)
+ go func(fetch *fetcher) {
+ defer wg.Done()
+ fetch.cancel()
+
+ select {
+ case <-fetch.deliveredC:
+ case <-fetch.cancelledC:
+ }
+ }(fetch)
+ }
+ }
+ }
+ wg.Wait()
}
// get attempts at retrieving the chunk from LocalStore