aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgluk256 <gluk256@gmail.com>2019-03-22 18:20:17 +0800
committerViktor TrĂ³n <viktor.tron@gmail.com>2019-03-22 18:20:17 +0800
commit8d04154691ef497f18816ac720f58b650d25e1e2 (patch)
tree724d259eaa5c92f6b17e72f65d723fb604af842a
parent09924cbcaab5106951fb67648315131bb4024ac5 (diff)
downloadgo-tangerine-8d04154691ef497f18816ac720f58b650d25e1e2.tar
go-tangerine-8d04154691ef497f18816ac720f58b650d25e1e2.tar.gz
go-tangerine-8d04154691ef497f18816ac720f58b650d25e1e2.tar.bz2
go-tangerine-8d04154691ef497f18816ac720f58b650d25e1e2.tar.lz
go-tangerine-8d04154691ef497f18816ac720f58b650d25e1e2.tar.xz
go-tangerine-8d04154691ef497f18816ac720f58b650d25e1e2.tar.zst
go-tangerine-8d04154691ef497f18816ac720f58b650d25e1e2.zip
p2p/simulations: wait until all connections are recreated when uploading snapshot (#19312)
* swarm/network/simulation: test cases refactored * swarm/pss: minor refactoring * swarm/simulation: UploadSnapshot updated * swarm/network: style fix * swarm/pss: bugfix
-rw-r--r--swarm/network/simulation/kademlia.go2
-rw-r--r--swarm/network/simulation/kademlia_test.go2
-rw-r--r--swarm/network/simulation/node.go27
-rw-r--r--swarm/network/simulation/node_test.go6
-rw-r--r--swarm/network/stream/snapshot_retrieval_test.go25
-rw-r--r--swarm/network/stream/snapshot_sync_test.go12
-rw-r--r--swarm/network/stream/streamer_test.go7
-rw-r--r--swarm/pss/prox_test.go37
8 files changed, 39 insertions, 79 deletions
diff --git a/swarm/network/simulation/kademlia.go b/swarm/network/simulation/kademlia.go
index 4b880aa0c..00e870a07 100644
--- a/swarm/network/simulation/kademlia.go
+++ b/swarm/network/simulation/kademlia.go
@@ -103,7 +103,7 @@ func (s *Simulation) kademlias() (ks map[enode.ID]*network.Kademlia) {
// in the snapshot are registered in the kademlia.
// It differs from WaitTillHealthy, which waits only until all the kademlias are
// healthy (it might happen even before all the connections are established).
-func (s *Simulation) WaitTillSnapshotRecreated(ctx context.Context, snap simulations.Snapshot) error {
+func (s *Simulation) WaitTillSnapshotRecreated(ctx context.Context, snap *simulations.Snapshot) error {
expected := getSnapshotConnections(snap.Conns)
ticker := time.NewTicker(150 * time.Millisecond)
defer ticker.Stop()
diff --git a/swarm/network/simulation/kademlia_test.go b/swarm/network/simulation/kademlia_test.go
index 9cbc39da5..0ac1e7803 100644
--- a/swarm/network/simulation/kademlia_test.go
+++ b/swarm/network/simulation/kademlia_test.go
@@ -182,7 +182,7 @@ func TestWaitTillSnapshotRecreated(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- err = controlSim.WaitTillSnapshotRecreated(ctx, *snap)
+ err = controlSim.WaitTillSnapshotRecreated(ctx, snap)
if err != nil {
t.Fatal(err)
}
diff --git a/swarm/network/simulation/node.go b/swarm/network/simulation/node.go
index 2d618a29d..e24dab21b 100644
--- a/swarm/network/simulation/node.go
+++ b/swarm/network/simulation/node.go
@@ -17,6 +17,7 @@
package simulation
import (
+ "context"
"encoding/json"
"errors"
"io/ioutil"
@@ -24,7 +25,6 @@ import (
"os"
"time"
- "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
@@ -217,30 +217,24 @@ func (s *Simulation) AddNodesAndConnectStar(count int, opts ...AddNodeOption) (i
// UploadSnapshot uploads a snapshot to the simulation
// This method tries to open the json file provided, applies the config to all nodes
// and then loads the snapshot into the Simulation network
-func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption) error {
+func (s *Simulation) UploadSnapshot(ctx context.Context, snapshotFile string, opts ...AddNodeOption) error {
f, err := os.Open(snapshotFile)
if err != nil {
return err
}
- defer func() {
- err := f.Close()
- if err != nil {
- log.Error("Error closing snapshot file", "err", err)
- }
- }()
+ defer f.Close()
+
jsonbyte, err := ioutil.ReadAll(f)
if err != nil {
return err
}
var snap simulations.Snapshot
- err = json.Unmarshal(jsonbyte, &snap)
- if err != nil {
+ if err := json.Unmarshal(jsonbyte, &snap); err != nil {
return err
}
//the snapshot probably has the property EnableMsgEvents not set
- //just in case, set it to true!
- //(we need this to wait for messages before uploading)
+ //set it to true (we need this to wait for messages before uploading)
for i := range snap.Nodes {
snap.Nodes[i].Node.Config.EnableMsgEvents = true
snap.Nodes[i].Node.Config.Services = s.serviceNames
@@ -249,15 +243,10 @@ func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption)
}
}
- log.Info("Waiting for p2p connections to be established...")
-
- //now we can load the snapshot
- err = s.Net.Load(&snap)
- if err != nil {
+ if err := s.Net.Load(&snap); err != nil {
return err
}
- log.Info("Snapshot loaded")
- return nil
+ return s.WaitTillSnapshotRecreated(ctx, &snap)
}
// StartNode starts a node by NodeID.
diff --git a/swarm/network/simulation/node_test.go b/swarm/network/simulation/node_test.go
index bae5afb26..e1e20a0f1 100644
--- a/swarm/network/simulation/node_test.go
+++ b/swarm/network/simulation/node_test.go
@@ -289,6 +289,7 @@ func TestUploadSnapshot(t *testing.T) {
HiveParams: hp,
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+ b.Store(BucketKeyKademlia, kad)
return network.NewBzz(config, kad, nil, nil, nil), nil, nil
},
})
@@ -296,12 +297,13 @@ func TestUploadSnapshot(t *testing.T) {
nodeCount := 16
log.Debug("Uploading snapshot")
- err := s.UploadSnapshot(fmt.Sprintf("../stream/testing/snapshot_%d.json", nodeCount))
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+ err := s.UploadSnapshot(ctx, fmt.Sprintf("../stream/testing/snapshot_%d.json", nodeCount))
if err != nil {
t.Fatalf("Error uploading snapshot to simulation network: %v", err)
}
- ctx := context.Background()
log.Debug("Starting simulation...")
s.Run(ctx, func(ctx context.Context, sim *Simulation) error {
log.Debug("Checking")
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index 2fdf8e9e3..2957999f8 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -22,8 +22,6 @@ import (
"testing"
"time"
- "github.com/ethereum/go-ethereum/swarm/testutil"
-
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
@@ -31,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/ethereum/go-ethereum/swarm/testutil"
)
//constants for random file generation
@@ -155,14 +154,15 @@ func runFileRetrievalTest(nodeCount int) error {
//array where the generated chunk hashes will be stored
conf.hashes = make([]storage.Address, 0)
- err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
+ defer cancelSimRun()
+
+ filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
+ err := sim.UploadSnapshot(ctx, filename)
if err != nil {
return err
}
- ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
- defer cancelSimRun()
-
log.Info("Starting simulation")
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
@@ -188,9 +188,6 @@ func runFileRetrievalTest(nodeCount int) error {
if err != nil {
return err
}
- if _, err := sim.WaitTillHealthy(ctx); err != nil {
- return err
- }
log.Info("network healthy, start file checks")
@@ -253,12 +250,15 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
//array where the generated chunk hashes will be stored
conf.hashes = make([]storage.Address, 0)
- err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
+ defer cancel()
+
+ filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
+ err := sim.UploadSnapshot(ctx, filename)
if err != nil {
return err
}
- ctx := context.Background()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
for _, n := range nodeIDs {
@@ -283,9 +283,6 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
if err != nil {
return err
}
- if _, err := sim.WaitTillHealthy(ctx); err != nil {
- return err
- }
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
// or until the timeout is reached.
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 9737ec0a5..ce1e69db2 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -147,20 +147,16 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
//array where the generated chunk hashes will be stored
conf.hashes = make([]storage.Address, 0)
- err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
- if err != nil {
- t.Fatal(err)
- }
-
- ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancelSimRun()
- if _, err := sim.WaitTillHealthy(ctx); err != nil {
+ filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
+ err := sim.UploadSnapshot(ctx, filename)
+ if err != nil {
t.Fatal(err)
}
result := runSim(conf, ctx, sim, chunkCount)
-
if result.Error != nil {
t.Fatal(result.Error)
}
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index 83719af8a..bdd3087bb 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -1257,9 +1257,10 @@ func TestGetSubscriptionsRPC(t *testing.T) {
simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(subscribeMsgCode),
)
- // upload a snapshot
- err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
- if err != nil {
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
+ defer cancel()
+ filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
+ if err := sim.UploadSnapshot(ctx, filename); err != nil {
t.Fatal(err)
}
diff --git a/swarm/pss/prox_test.go b/swarm/pss/prox_test.go
index 1c8538d50..0b60ec39a 100644
--- a/swarm/pss/prox_test.go
+++ b/swarm/pss/prox_test.go
@@ -3,11 +3,8 @@ package pss
import (
"context"
"encoding/binary"
- "encoding/json"
"errors"
"fmt"
- "io/ioutil"
- "os"
"strconv"
"strings"
"sync"
@@ -20,7 +17,6 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
- "github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/network"
@@ -105,24 +101,6 @@ func getCmdParams(t *testing.T) (int, int) {
return int(msgCount), int(nodeCount)
}
-func readSnapshot(t *testing.T, nodeCount int) simulations.Snapshot {
- f, err := os.Open(fmt.Sprintf("testdata/snapshot_%d.json", nodeCount))
- if err != nil {
- t.Fatal(err)
- }
- defer f.Close()
- jsonbyte, err := ioutil.ReadAll(f)
- if err != nil {
- t.Fatal(err)
- }
- var snap simulations.Snapshot
- err = json.Unmarshal(jsonbyte, &snap)
- if err != nil {
- t.Fatal(err)
- }
- return snap
-}
-
func newTestData() *testData {
return &testData{
kademlias: make(map[enode.ID]*network.Kademlia),
@@ -235,16 +213,12 @@ func testProxNetwork(t *testing.T) {
services := newProxServices(tstdata, true, handlerContextFuncs, tstdata.kademlias)
tstdata.sim = simulation.New(services)
defer tstdata.sim.Close()
- err := tstdata.sim.UploadSnapshot(fmt.Sprintf("testdata/snapshot_%d.json", nodeCount))
- if err != nil {
- t.Fatal(err)
- }
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*120)
+ ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
defer cancel()
- snap := readSnapshot(t, nodeCount)
- err = tstdata.sim.WaitTillSnapshotRecreated(ctx, snap)
+ filename := fmt.Sprintf("testdata/snapshot_%d.json", nodeCount)
+ err := tstdata.sim.UploadSnapshot(ctx, filename)
if err != nil {
- t.Fatalf("failed to recreate snapshot: %s", err)
+ t.Fatal(err)
}
tstdata.init(msgCount) // initialize the test data
wrapper := func(c context.Context, _ *simulation.Simulation) error {
@@ -426,7 +400,6 @@ func newProxServices(tstdata *testData, allowRaw bool, handlerContextFuncs map[T
if err != nil {
return nil, nil, err
}
- b.Store(simulation.BucketKeyKademlia, pskad)
// register the handlers we've been passed
var deregisters []func()
@@ -448,6 +421,8 @@ func newProxServices(tstdata *testData, allowRaw bool, handlerContextFuncs map[T
Public: false,
})
+ b.Store(simulation.BucketKeyKademlia, pskad)
+
// return Pss and cleanups
return ps, func() {
// run the handler deregister functions in reverse order