From 8d04154691ef497f18816ac720f58b650d25e1e2 Mon Sep 17 00:00:00 2001
From: gluk256 <gluk256@gmail.com>
Date: Fri, 22 Mar 2019 14:20:17 +0400
Subject: p2p/simulations: wait until all connections are recreated when
 uploading snapshot (#19312)

* swarm/network/simulation: test cases refactored

* swarm/pss: minor refactoring

* swarm/simulation: UploadSnapshot updated

* swarm/network: style fix

* swarm/pss: bugfix
---
 swarm/network/simulation/kademlia.go            |  2 +-
 swarm/network/simulation/kademlia_test.go       |  2 +-
 swarm/network/simulation/node.go                | 27 ++++++------------
 swarm/network/simulation/node_test.go           |  6 ++--
 swarm/network/stream/snapshot_retrieval_test.go | 25 ++++++++---------
 swarm/network/stream/snapshot_sync_test.go      | 12 +++-----
 swarm/network/stream/streamer_test.go           |  7 +++--
 swarm/pss/prox_test.go                          | 37 ++++---------------------
 8 files changed, 39 insertions(+), 79 deletions(-)

diff --git a/swarm/network/simulation/kademlia.go b/swarm/network/simulation/kademlia.go
index 4b880aa0c..00e870a07 100644
--- a/swarm/network/simulation/kademlia.go
+++ b/swarm/network/simulation/kademlia.go
@@ -103,7 +103,7 @@ func (s *Simulation) kademlias() (ks map[enode.ID]*network.Kademlia) {
 // in the snapshot are registered in the kademlia.
 // It differs from WaitTillHealthy, which waits only until all the kademlias are
 // healthy (it might happen even before all the connections are established).
-func (s *Simulation) WaitTillSnapshotRecreated(ctx context.Context, snap simulations.Snapshot) error {
+func (s *Simulation) WaitTillSnapshotRecreated(ctx context.Context, snap *simulations.Snapshot) error {
 	expected := getSnapshotConnections(snap.Conns)
 	ticker := time.NewTicker(150 * time.Millisecond)
 	defer ticker.Stop()
diff --git a/swarm/network/simulation/kademlia_test.go b/swarm/network/simulation/kademlia_test.go
index 9cbc39da5..0ac1e7803 100644
--- a/swarm/network/simulation/kademlia_test.go
+++ b/swarm/network/simulation/kademlia_test.go
@@ -182,7 +182,7 @@ func TestWaitTillSnapshotRecreated(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	err = controlSim.WaitTillSnapshotRecreated(ctx, *snap)
+	err = controlSim.WaitTillSnapshotRecreated(ctx, snap)
 	if err != nil {
 		t.Fatal(err)
 	}
diff --git a/swarm/network/simulation/node.go b/swarm/network/simulation/node.go
index 2d618a29d..e24dab21b 100644
--- a/swarm/network/simulation/node.go
+++ b/swarm/network/simulation/node.go
@@ -17,6 +17,7 @@
 package simulation
 
 import (
+	"context"
 	"encoding/json"
 	"errors"
 	"io/ioutil"
@@ -24,7 +25,6 @@ import (
 	"os"
 	"time"
 
-	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/p2p/enode"
 	"github.com/ethereum/go-ethereum/p2p/simulations"
 	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
@@ -217,30 +217,24 @@ func (s *Simulation) AddNodesAndConnectStar(count int, opts ...AddNodeOption) (i
 // UploadSnapshot uploads a snapshot to the simulation
 // This method tries to open the json file provided, applies the config to all nodes
 // and then loads the snapshot into the Simulation network
-func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption) error {
+func (s *Simulation) UploadSnapshot(ctx context.Context, snapshotFile string, opts ...AddNodeOption) error {
 	f, err := os.Open(snapshotFile)
 	if err != nil {
 		return err
 	}
-	defer func() {
-		err := f.Close()
-		if err != nil {
-			log.Error("Error closing snapshot file", "err", err)
-		}
-	}()
+	defer f.Close()
+
 	jsonbyte, err := ioutil.ReadAll(f)
 	if err != nil {
 		return err
 	}
 	var snap simulations.Snapshot
-	err = json.Unmarshal(jsonbyte, &snap)
-	if err != nil {
+	if err := json.Unmarshal(jsonbyte, &snap); err != nil {
 		return err
 	}
 
 	//the snapshot probably has the property EnableMsgEvents not set
-	//just in case, set it to true!
-	//(we need this to wait for messages before uploading)
+	//set it to true (we need this to wait for messages before uploading)
 	for i := range snap.Nodes {
 		snap.Nodes[i].Node.Config.EnableMsgEvents = true
 		snap.Nodes[i].Node.Config.Services = s.serviceNames
@@ -249,15 +243,10 @@ func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption)
 		}
 	}
 
-	log.Info("Waiting for p2p connections to be established...")
-
-	//now we can load the snapshot
-	err = s.Net.Load(&snap)
-	if err != nil {
+	if err := s.Net.Load(&snap); err != nil {
 		return err
 	}
-	log.Info("Snapshot loaded")
-	return nil
+	return s.WaitTillSnapshotRecreated(ctx, &snap)
 }
 
 // StartNode starts a node by NodeID.
diff --git a/swarm/network/simulation/node_test.go b/swarm/network/simulation/node_test.go
index bae5afb26..e1e20a0f1 100644
--- a/swarm/network/simulation/node_test.go
+++ b/swarm/network/simulation/node_test.go
@@ -289,6 +289,7 @@ func TestUploadSnapshot(t *testing.T) {
 				HiveParams:   hp,
 			}
 			kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+			b.Store(BucketKeyKademlia, kad)
 			return network.NewBzz(config, kad, nil, nil, nil), nil, nil
 		},
 	})
@@ -296,12 +297,13 @@ func TestUploadSnapshot(t *testing.T) {
 
 	nodeCount := 16
 	log.Debug("Uploading snapshot")
-	err := s.UploadSnapshot(fmt.Sprintf("../stream/testing/snapshot_%d.json", nodeCount))
+	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+	defer cancel()
+	err := s.UploadSnapshot(ctx, fmt.Sprintf("../stream/testing/snapshot_%d.json", nodeCount))
 	if err != nil {
 		t.Fatalf("Error uploading snapshot to simulation network: %v", err)
 	}
 
-	ctx := context.Background()
 	log.Debug("Starting simulation...")
 	s.Run(ctx, func(ctx context.Context, sim *Simulation) error {
 		log.Debug("Checking")
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index 2fdf8e9e3..2957999f8 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -22,8 +22,6 @@ import (
 	"testing"
 	"time"
 
-	"github.com/ethereum/go-ethereum/swarm/testutil"
-
 	"github.com/ethereum/go-ethereum/node"
 	"github.com/ethereum/go-ethereum/p2p/enode"
 	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
@@ -31,6 +29,7 @@ import (
 	"github.com/ethereum/go-ethereum/swarm/network/simulation"
 	"github.com/ethereum/go-ethereum/swarm/state"
 	"github.com/ethereum/go-ethereum/swarm/storage"
+	"github.com/ethereum/go-ethereum/swarm/testutil"
 )
 
 //constants for random file generation
@@ -155,14 +154,15 @@ func runFileRetrievalTest(nodeCount int) error {
 	//array where the generated chunk hashes will be stored
 	conf.hashes = make([]storage.Address, 0)
 
-	err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
+	ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
+	defer cancelSimRun()
+
+	filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
+	err := sim.UploadSnapshot(ctx, filename)
 	if err != nil {
 		return err
 	}
 
-	ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
-	defer cancelSimRun()
-
 	log.Info("Starting simulation")
 
 	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
@@ -188,9 +188,6 @@ func runFileRetrievalTest(nodeCount int) error {
 		if err != nil {
 			return err
 		}
-		if _, err := sim.WaitTillHealthy(ctx); err != nil {
-			return err
-		}
 
 		log.Info("network healthy, start file checks")
 
@@ -253,12 +250,15 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
 	//array where the generated chunk hashes will be stored
 	conf.hashes = make([]storage.Address, 0)
 
-	err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
+	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
+	defer cancel()
+
+	filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
+	err := sim.UploadSnapshot(ctx, filename)
 	if err != nil {
 		return err
 	}
 
-	ctx := context.Background()
 	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
 		nodeIDs := sim.UpNodeIDs()
 		for _, n := range nodeIDs {
@@ -283,9 +283,6 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
 		if err != nil {
 			return err
 		}
-		if _, err := sim.WaitTillHealthy(ctx); err != nil {
-			return err
-		}
 
 		// File retrieval check is repeated until all uploaded files are retrieved from all nodes
 		// or until the timeout is reached.
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 9737ec0a5..ce1e69db2 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -147,20 +147,16 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
 	//array where the generated chunk hashes will be stored
 	conf.hashes = make([]storage.Address, 0)
 
-	err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
+	ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
 	defer cancelSimRun()
 
-	if _, err := sim.WaitTillHealthy(ctx); err != nil {
+	filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
+	err := sim.UploadSnapshot(ctx, filename)
+	if err != nil {
 		t.Fatal(err)
 	}
 
 	result := runSim(conf, ctx, sim, chunkCount)
-
 	if result.Error != nil {
 		t.Fatal(result.Error)
 	}
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index 83719af8a..bdd3087bb 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -1257,9 +1257,10 @@ func TestGetSubscriptionsRPC(t *testing.T) {
 		simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(subscribeMsgCode),
 	)
 
-	// upload a snapshot
-	err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
-	if err != nil {
+	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
+	defer cancel()
+	filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
+	if err := sim.UploadSnapshot(ctx, filename); err != nil {
 		t.Fatal(err)
 	}
 
diff --git a/swarm/pss/prox_test.go b/swarm/pss/prox_test.go
index 1c8538d50..0b60ec39a 100644
--- a/swarm/pss/prox_test.go
+++ b/swarm/pss/prox_test.go
@@ -3,11 +3,8 @@ package pss
 import (
 	"context"
 	"encoding/binary"
-	"encoding/json"
 	"errors"
 	"fmt"
-	"io/ioutil"
-	"os"
 	"strconv"
 	"strings"
 	"sync"
@@ -20,7 +17,6 @@ import (
 	"github.com/ethereum/go-ethereum/node"
 	"github.com/ethereum/go-ethereum/p2p"
 	"github.com/ethereum/go-ethereum/p2p/enode"
-	"github.com/ethereum/go-ethereum/p2p/simulations"
 	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
 	"github.com/ethereum/go-ethereum/rpc"
 	"github.com/ethereum/go-ethereum/swarm/network"
@@ -105,24 +101,6 @@ func getCmdParams(t *testing.T) (int, int) {
 	return int(msgCount), int(nodeCount)
 }
 
-func readSnapshot(t *testing.T, nodeCount int) simulations.Snapshot {
-	f, err := os.Open(fmt.Sprintf("testdata/snapshot_%d.json", nodeCount))
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer f.Close()
-	jsonbyte, err := ioutil.ReadAll(f)
-	if err != nil {
-		t.Fatal(err)
-	}
-	var snap simulations.Snapshot
-	err = json.Unmarshal(jsonbyte, &snap)
-	if err != nil {
-		t.Fatal(err)
-	}
-	return snap
-}
-
 func newTestData() *testData {
 	return &testData{
 		kademlias:    make(map[enode.ID]*network.Kademlia),
@@ -235,16 +213,12 @@ func testProxNetwork(t *testing.T) {
 	services := newProxServices(tstdata, true, handlerContextFuncs, tstdata.kademlias)
 	tstdata.sim = simulation.New(services)
 	defer tstdata.sim.Close()
-	err := tstdata.sim.UploadSnapshot(fmt.Sprintf("testdata/snapshot_%d.json", nodeCount))
-	if err != nil {
-		t.Fatal(err)
-	}
-	ctx, cancel := context.WithTimeout(context.Background(), time.Second*120)
+	ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
 	defer cancel()
-	snap := readSnapshot(t, nodeCount)
-	err = tstdata.sim.WaitTillSnapshotRecreated(ctx, snap)
+	filename := fmt.Sprintf("testdata/snapshot_%d.json", nodeCount)
+	err := tstdata.sim.UploadSnapshot(ctx, filename)
 	if err != nil {
-		t.Fatalf("failed to recreate snapshot: %s", err)
+		t.Fatal(err)
 	}
 	tstdata.init(msgCount) // initialize the test data
 	wrapper := func(c context.Context, _ *simulation.Simulation) error {
@@ -426,7 +400,6 @@ func newProxServices(tstdata *testData, allowRaw bool, handlerContextFuncs map[T
 			if err != nil {
 				return nil, nil, err
 			}
-			b.Store(simulation.BucketKeyKademlia, pskad)
 
 			// register the handlers we've been passed
 			var deregisters []func()
@@ -448,6 +421,8 @@ func newProxServices(tstdata *testData, allowRaw bool, handlerContextFuncs map[T
 				Public:    false,
 			})
 
+			b.Store(simulation.BucketKeyKademlia, pskad)
+
 			// return Pss and cleanups
 			return ps, func() {
 				// run the handler deregister functions in reverse order
-- 
cgit v1.2.3