diff options
author | Anton Evangelatov <anton.evangelatov@gmail.com> | 2018-07-09 20:11:49 +0800 |
---|---|---|
committer | Balint Gabor <balint.g@gmail.com> | 2018-07-09 20:11:49 +0800 |
commit | b3711af05176f446fad5ee90e2be4bd09c4086a2 (patch) | |
tree | 036eb23e423c385c0be00e3f8d3d97dea7040f8c /swarm/network | |
parent | 30bdf817a0d0afb33f3635f1de877f9caf09be05 (diff) | |
download | dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.gz dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.bz2 dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.lz dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.xz dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.zst dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.zip |
swarm: ctx propagation; bmt fixes; pss generic notification framework (#17150)
* cmd/swarm: minor cli flag text adjustments
* swarm/api/http: sticky footer for swarm landing page using flex
* swarm/api/http: sticky footer for error pages and fix for multiple choices
* cmd/swarm, swarm/storage, swarm: fix mingw on windows test issues
* cmd/swarm: update description of swarm cmd
* swarm: added network ID test
* cmd/swarm: support for smoke tests on the production swarm cluster
* cmd/swarm/swarm-smoke: simplify cluster logic as per suggestion
* swarm: propagate ctx to internal apis (#754)
* swarm/metrics: collect disk measurements
* swarm/bmt: fix io.Writer interface
* Write now tolerates arbitrary variable buffers
* added variable buffer tests
* Write loop and finalise optimisation
* refactor / rename
* add tests for empty input
* swarm/pss: (UPDATE) Generic notifications package (#744)
swarm/pss: Generic package for creating pss notification svcs
* swarm: Adding context to more functions
* swarm/api: change colour of landing page in templates
* swarm/api: change landing page to react to enter keypress
Diffstat (limited to 'swarm/network')
-rw-r--r-- | swarm/network/networkid_test.go | 266 | ||||
-rw-r--r-- | swarm/network/stream/common_test.go | 2 | ||||
-rw-r--r-- | swarm/network/stream/delivery_test.go | 16 | ||||
-rw-r--r-- | swarm/network/stream/intervals_test.go | 8 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_retrieval_test.go | 12 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_sync_test.go | 8 | ||||
-rw-r--r-- | swarm/network/stream/syncer_test.go | 7 |
7 files changed, 304 insertions, 15 deletions
diff --git a/swarm/network/networkid_test.go b/swarm/network/networkid_test.go new file mode 100644 index 000000000..05134b083 --- /dev/null +++ b/swarm/network/networkid_test.go @@ -0,0 +1,266 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "bytes" + "context" + "flag" + "fmt" + "math/rand" + "strings" + "testing" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rpc" +) + +var ( + currentNetworkID int + cnt int + nodeMap map[int][]discover.NodeID + kademlias map[discover.NodeID]*Kademlia +) + +const ( + NumberOfNets = 4 + MaxTimeout = 6 +) + +func init() { + flag.Parse() + rand.Seed(time.Now().Unix()) +} + +/* +Run the network ID test. +The test creates one simulations.Network instance, +a number of nodes, then connects nodes with each other in this network. + +Each node gets a network ID assigned according to the number of networks. +Having more network IDs is just arbitrary in order to exclude +false positives. + +Nodes should only connect with other nodes with the same network ID. +After the setup phase, the test checks on each node if it has the +expected node connections (excluding those not sharing the network ID). +*/ +func TestNetworkID(t *testing.T) { + log.Debug("Start test") + //arbitrarily set the number of nodes. It could be any number + numNodes := 24 + //the nodeMap maps all nodes (slice value) with the same network ID (key) + nodeMap = make(map[int][]discover.NodeID) + //set up the network and connect nodes + net, err := setupNetwork(numNodes) + if err != nil { + t.Fatalf("Error setting up network: %v", err) + } + defer func() { + //shutdown the snapshot network + log.Trace("Shutting down network") + net.Shutdown() + }() + //let's sleep to ensure all nodes are connected + time.Sleep(1 * time.Second) + //for each group sharing the same network ID... + for _, netIDGroup := range nodeMap { + log.Trace("netIDGroup size", "size", len(netIDGroup)) + //...check that their size of the kademlia is of the expected size + //the assumption is that it should be the size of the group minus 1 (the node itself) + for _, node := range netIDGroup { + if kademlias[node].addrs.Size() != len(netIDGroup)-1 { + t.Fatalf("Kademlia size has not expected peer size. Kademlia size: %d, expected size: %d", kademlias[node].addrs.Size(), len(netIDGroup)-1) + } + kademlias[node].EachAddr(nil, 0, func(addr OverlayAddr, _ int, _ bool) bool { + found := false + for _, nd := range netIDGroup { + p := ToOverlayAddr(nd.Bytes()) + if bytes.Equal(p, addr.Address()) { + found = true + } + } + if !found { + t.Fatalf("Expected node not found for node %s", node.String()) + } + return true + }) + } + } + log.Info("Test terminated successfully") +} + +// setup simulated network with bzz/discovery and pss services. +// connects nodes in a circle +// if allowRaw is set, omission of builtin pss encryption is enabled (see PssParams) +func setupNetwork(numnodes int) (net *simulations.Network, err error) { + log.Debug("Setting up network") + quitC := make(chan struct{}) + errc := make(chan error) + nodes := make([]*simulations.Node, numnodes) + if numnodes < 16 { + return nil, fmt.Errorf("Minimum sixteen nodes in network") + } + adapter := adapters.NewSimAdapter(newServices()) + //create the network + net = simulations.NewNetwork(adapter, &simulations.NetworkConfig{ + ID: "NetworkIdTestNet", + DefaultService: "bzz", + }) + log.Debug("Creating networks and nodes") + + var connCount int + + //create nodes and connect them to each other + for i := 0; i < numnodes; i++ { + log.Trace("iteration: ", "i", i) + nodeconf := adapters.RandomNodeConfig() + nodes[i], err = net.NewNodeWithConfig(nodeconf) + if err != nil { + return nil, fmt.Errorf("error creating node %d: %v", i, err) + } + err = net.Start(nodes[i].ID()) + if err != nil { + return nil, fmt.Errorf("error starting node %d: %v", i, err) + } + client, err := nodes[i].Client() + if err != nil { + return nil, fmt.Errorf("create node %d rpc client fail: %v", i, err) + } + //now setup and start event watching in order to know when we can upload + ctx, watchCancel := context.WithTimeout(context.Background(), MaxTimeout*time.Second) + defer watchCancel() + watchSubscriptionEvents(ctx, nodes[i].ID(), client, errc, quitC) + //on every iteration we connect to all previous ones + for k := i - 1; k >= 0; k-- { + connCount++ + log.Debug(fmt.Sprintf("Connecting node %d with node %d; connection count is %d", i, k, connCount)) + err = net.Connect(nodes[i].ID(), nodes[k].ID()) + if err != nil { + if !strings.Contains(err.Error(), "already connected") { + return nil, fmt.Errorf("error connecting nodes: %v", err) + } + } + } + } + //now wait until the number of expected subscriptions has been finished + //`watchSubscriptionEvents` will write with a `nil` value to errc + for err := range errc { + if err != nil { + return nil, err + } + //`nil` received, decrement count + connCount-- + log.Trace("count down", "cnt", connCount) + //all subscriptions received + if connCount == 0 { + close(quitC) + break + } + } + log.Debug("Network setup phase terminated") + return net, nil +} + +func newServices() adapters.Services { + kademlias = make(map[discover.NodeID]*Kademlia) + kademlia := func(id discover.NodeID) *Kademlia { + if k, ok := kademlias[id]; ok { + return k + } + addr := NewAddrFromNodeID(id) + params := NewKadParams() + params.MinProxBinSize = 2 + params.MaxBinSize = 3 + params.MinBinSize = 1 + params.MaxRetries = 1000 + params.RetryExponent = 2 + params.RetryInterval = 1000000 + kademlias[id] = NewKademlia(addr.Over(), params) + return kademlias[id] + } + return adapters.Services{ + "bzz": func(ctx *adapters.ServiceContext) (node.Service, error) { + addr := NewAddrFromNodeID(ctx.Config.ID) + hp := NewHiveParams() + hp.Discovery = false + cnt++ + //assign the network ID + currentNetworkID = cnt % NumberOfNets + if ok := nodeMap[currentNetworkID]; ok == nil { + nodeMap[currentNetworkID] = make([]discover.NodeID, 0) + } + //add this node to the group sharing the same network ID + nodeMap[currentNetworkID] = append(nodeMap[currentNetworkID], ctx.Config.ID) + log.Debug("current network ID:", "id", currentNetworkID) + config := &BzzConfig{ + OverlayAddr: addr.Over(), + UnderlayAddr: addr.Under(), + HiveParams: hp, + NetworkID: uint64(currentNetworkID), + } + return NewBzz(config, kademlia(ctx.Config.ID), nil, nil, nil), nil + }, + } +} + +func watchSubscriptionEvents(ctx context.Context, id discover.NodeID, client *rpc.Client, errc chan error, quitC chan struct{}) { + events := make(chan *p2p.PeerEvent) + sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents") + if err != nil { + log.Error(err.Error()) + errc <- fmt.Errorf("error getting peer events for node %v: %s", id, err) + return + } + go func() { + defer func() { + sub.Unsubscribe() + log.Trace("watch subscription events: unsubscribe", "id", id) + }() + + for { + select { + case <-quitC: + return + case <-ctx.Done(): + select { + case errc <- ctx.Err(): + case <-quitC: + } + return + case e := <-events: + if e.Type == p2p.PeerEventTypeAdd { + errc <- nil + } + case err := <-sub.Err(): + if err != nil { + select { + case errc <- fmt.Errorf("error getting peer events for node %v: %v", id, err): + case <-quitC: + } + return + } + } + } + }() +} diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 9d1f997f2..6a2c27401 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -250,7 +250,7 @@ func (r *TestRegistry) APIs() []rpc.API { } func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) { - r, _ := fileStore.Retrieve(hash) + r, _ := fileStore.Retrieve(context.TODO(), hash) buf := make([]byte, 1024) var n int var total int64 diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index b03028c88..cd87557b1 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -345,9 +345,13 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck // here we distribute chunks of a random file into Stores of nodes 1 to nodes rrFileStore := storage.NewFileStore(newRoundRobinStore(sim.Stores[1:]...), storage.NewFileStoreParams()) size := chunkCount * chunkSize - fileHash, wait, err := rrFileStore.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false) + ctx := context.TODO() + fileHash, wait, err := rrFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) // wait until all chunks stored - wait() + if err != nil { + t.Fatal(err.Error()) + } + err = wait(ctx) if err != nil { t.Fatal(err.Error()) } @@ -627,9 +631,13 @@ Loop: hashes := make([]storage.Address, chunkCount) for i := 0; i < chunkCount; i++ { // create actual size real chunks - hash, wait, err := remoteFileStore.Store(io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize), false) + ctx := context.TODO() + hash, wait, err := remoteFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize), false) + if err != nil { + b.Fatalf("expected no error. got %v", err) + } // wait until all chunks stored - wait() + err = wait(ctx) if err != nil { b.Fatalf("expected no error. got %v", err) } diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index 4e2721cb0..d996cdc7e 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -117,8 +117,12 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { fileStore := storage.NewFileStore(sim.Stores[0], storage.NewFileStoreParams()) size := chunkCount * chunkSize - _, wait, err := fileStore.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false) - wait() + ctx := context.TODO() + _, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) + if err != nil { + t.Fatal(err) + } + err = wait(ctx) if err != nil { t.Fatal(err) } diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index 59c776c30..da5253e8a 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -410,7 +410,7 @@ func runFileRetrievalTest(nodeCount int) error { fileStore := registries[id].fileStore //check all chunks for i, hash := range conf.hashes { - reader, _ := fileStore.Retrieve(hash) + reader, _ := fileStore.Retrieve(context.TODO(), hash) //check that we can read the file size and that it corresponds to the generated file size if s, err := reader.Size(nil); err != nil || s != int64(len(randomFiles[i])) { allSuccess = false @@ -697,7 +697,7 @@ func runRetrievalTest(chunkCount int, nodeCount int) error { fileStore := registries[id].fileStore //check all chunks for _, chnk := range conf.hashes { - reader, _ := fileStore.Retrieve(chnk) + reader, _ := fileStore.Retrieve(context.TODO(), chnk) //assuming that reading the Size of the chunk is enough to know we found it if s, err := reader.Size(nil); err != nil || s != chunkSize { allSuccess = false @@ -765,9 +765,13 @@ func uploadFilesToNodes(nodes []*simulations.Node) ([]storage.Address, []string, return nil, nil, err } //store it (upload it) on the FileStore - rk, wait, err := fileStore.Store(strings.NewReader(rfiles[i]), int64(len(rfiles[i])), false) + ctx := context.TODO() + rk, wait, err := fileStore.Store(ctx, strings.NewReader(rfiles[i]), int64(len(rfiles[i])), false) log.Debug("Uploaded random string file to node") - wait() + if err != nil { + return nil, nil, err + } + err = wait(ctx) if err != nil { return nil, nil, err } diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index ff1c39319..fd8863d43 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -581,8 +581,12 @@ func uploadFileToSingleNodeStore(id discover.NodeID, chunkCount int) ([]storage. fileStore := storage.NewFileStore(lstore, storage.NewFileStoreParams()) var rootAddrs []storage.Address for i := 0; i < chunkCount; i++ { - rk, wait, err := fileStore.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false) - wait() + ctx := context.TODO() + rk, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) + if err != nil { + return nil, err + } + err = wait(ctx) if err != nil { return nil, err } diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index 68e20841d..5fea7befe 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -202,9 +202,12 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck // here we distribute chunks of a random file into stores 1...nodes rrFileStore := storage.NewFileStore(newRoundRobinStore(sim.Stores[1:]...), storage.NewFileStoreParams()) size := chunkCount * chunkSize - _, wait, err := rrFileStore.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false) + _, wait, err := rrFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) + if err != nil { + t.Fatal(err.Error()) + } // need to wait cos we then immediately collect the relevant bin content - wait() + wait(ctx) if err != nil { t.Fatal(err.Error()) } |