path: root/swarm/network/stream/snapshot_sync_test.go
diff options
authorholisticode <holistic.computing@gmail.com>2019-01-11 22:08:09 +0800
committerViktor TrĂ³n <viktor.tron@gmail.com>2019-01-11 22:08:09 +0800
commit88168ff5c57b1a9c944d02e93e6e49368ccc968f (patch)
tree150f46d66bccf780044469af65ee3c70d395a320 /swarm/network/stream/snapshot_sync_test.go
parentd5cad488be0069d768b358b2267cd5432b0f9a43 (diff)
Stream subscriptions (#18355)
* swarm/network: eachBin now starts at kaddepth for nn * swarm/network: fix Kademlia.EachBin * swarm/network: fix kademlia.EachBin * swarm/network: correct EachBin implementation according to requirements * swarm/network: less addresses simplified tests * swarm: calc kad depth outside loop in EachBin test * swarm/network: removed printResults * swarm/network: cleanup imports * swarm/network: remove kademlia.EachBin; fix RequestSubscriptions and add unit test * swarm/network/stream: address PR comments * swarm/network/stream: package-wide subscriptionFunc * swarm/network/stream: refactor to kad.EachConn
Diffstat (limited to 'swarm/network/stream/snapshot_sync_test.go')
1 files changed, 0 insertions, 266 deletions
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 4e4497ccd..6af19c12a 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -106,43 +106,6 @@ func TestSyncingViaGlobalSync(t *testing.T) {
-func TestSyncingViaDirectSubscribe(t *testing.T) {
- if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" {
- t.Skip("Flaky on mac on travis")
- }
- //if nodes/chunks have been provided via commandline,
- //run the tests with these values
- if *nodes != 0 && *chunks != 0 {
- log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
- err := testSyncingViaDirectSubscribe(t, *chunks, *nodes)
- if err != nil {
- t.Fatal(err)
- }
- } else {
- var nodeCnt []int
- var chnkCnt []int
- //if the `longrunning` flag has been provided
- //run more test combinations
- if *longrunning {
- chnkCnt = []int{1, 8, 32, 256, 1024}
- nodeCnt = []int{32, 16}
- } else {
- //default test
- chnkCnt = []int{4, 32}
- nodeCnt = []int{32, 16}
- }
- for _, chnk := range chnkCnt {
- for _, n := range nodeCnt {
- log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
- err := testSyncingViaDirectSubscribe(t, chnk, n)
- if err != nil {
- t.Fatal(err)
- }
- }
- }
- }
var simServiceMap = map[string]simulation.ServiceFunc{
"streamer": streamerFunc,
@@ -323,235 +286,6 @@ func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulatio
-The test generates the given number of chunks
-For every chunk generated, the nearest node addresses
-are identified, we verify that the nodes closer to the
-chunk addresses actually do have the chunks in their local stores.
-The test loads a snapshot file to construct the swarm network,
-assuming that the snapshot file identifies a healthy
-kademlia network. The snapshot should have 'streamer' in its service list.
-func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) error {
- sim := simulation.New(map[string]simulation.ServiceFunc{
- "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- n := ctx.Config.Node()
- addr := network.NewAddr(n)
- store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyStore, store)
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- Retrieval: RetrievalDisabled,
- Syncing: SyncingRegisterOnly,
- }, nil)
- bucket.Store(bucketKeyRegistry, r)
- fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
- bucket.Store(bucketKeyFileStore, fileStore)
- cleanup = func() {
- os.RemoveAll(datadir)
- netStore.Close()
- r.Close()
- }
- return r, cleanup, nil
- },
- })
- defer sim.Close()
- ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
- defer cancelSimRun()
- conf := &synctestConfig{}
- //map of discover ID to indexes of chunks expected at that ID
- conf.idToChunksMap = make(map[enode.ID][]int)
- //map of overlay address to discover ID
- conf.addrToIDMap = make(map[string]enode.ID)
- //array where the generated chunk hashes will be stored
- conf.hashes = make([]storage.Address, 0)
- err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
- if err != nil {
- return err
- }
- if _, err := sim.WaitTillHealthy(ctx); err != nil {
- return err
- }
- disconnections := sim.PeerEvents(
- context.Background(),
- sim.NodeIDs(),
- simulation.NewPeerEventsFilter().Drop(),
- )
- var disconnected atomic.Value
- go func() {
- for d := range disconnections {
- if d.Error != nil {
- log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
- disconnected.Store(true)
- }
- }
- }()
- result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
- nodeIDs := sim.UpNodeIDs()
- for _, n := range nodeIDs {
- //get the kademlia overlay address from this ID
- a := n.Bytes()
- //append it to the array of all overlay addresses
- conf.addrs = append(conf.addrs, a)
- //the proximity calculation is on overlay addr,
- //the p2p/simulations check func triggers on enode.ID,
- //so we need to know which overlay addr maps to which nodeID
- conf.addrToIDMap[string(a)] = n
- }
- var subscriptionCount int
- filter := simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(4)
- eventC := sim.PeerEvents(ctx, nodeIDs, filter)
- for j, node := range nodeIDs {
- log.Trace(fmt.Sprintf("Start syncing subscriptions: %d", j))
- //start syncing!
- item, ok := sim.NodeItem(node, bucketKeyRegistry)
- if !ok {
- return fmt.Errorf("No registry")
- }
- registry := item.(*Registry)
- var cnt int
- cnt, err = startSyncing(registry, conf)
- if err != nil {
- return err
- }
- //increment the number of subscriptions we need to wait for
- //by the count returned from startSyncing (SYNC subscriptions)
- subscriptionCount += cnt
- }
- for e := range eventC {
- if e.Error != nil {
- return e.Error
- }
- subscriptionCount--
- if subscriptionCount == 0 {
- break
- }
- }
- //select a random node for upload
- node := sim.Net.GetRandomUpNode()
- item, ok := sim.NodeItem(node.ID(), bucketKeyStore)
- if !ok {
- return fmt.Errorf("No localstore")
- }
- lstore := item.(*storage.LocalStore)
- hashes, err := uploadFileToSingleNodeStore(node.ID(), chunkCount, lstore)
- if err != nil {
- return err
- }
- conf.hashes = append(conf.hashes, hashes...)
- mapKeysToNodes(conf)
- if _, err := sim.WaitTillHealthy(ctx); err != nil {
- return err
- }
- var globalStore mock.GlobalStorer
- if *useMockStore {
- globalStore = mockmem.NewGlobalStore()
- }
- // File retrieval check is repeated until all uploaded files are retrieved from all nodes
- // or until the timeout is reached.
- for {
- for _, id := range nodeIDs {
- //for each expected chunk, check if it is in the local store
- localChunks := conf.idToChunksMap[id]
- for _, ch := range localChunks {
- //get the real chunk by the index in the index array
- chunk := conf.hashes[ch]
- log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
- //check if the expected chunk is indeed in the localstore
- var err error
- if *useMockStore {
- //use the globalStore if the mockStore should be used; in that case,
- //the complete localStore stack is bypassed for getting the chunk
- _, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
- } else {
- //use the actual localstore
- item, ok := sim.NodeItem(id, bucketKeyStore)
- if !ok {
- return fmt.Errorf("Error accessing localstore")
- }
- lstore := item.(*storage.LocalStore)
- _, err = lstore.Get(ctx, chunk)
- }
- if err != nil {
- log.Debug(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
- // Do not get crazy with logging the warn message
- time.Sleep(500 * time.Millisecond)
- continue REPEAT
- }
- log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
- }
- }
- return nil
- }
- })
- if result.Error != nil {
- return result.Error
- }
- if yes, ok := disconnected.Load().(bool); ok && yes {
- t.Fatal("disconnect events received")
- }
- log.Info("Simulation ended")
- return nil
-//the server func to start syncing
-//issues `RequestSubscriptionMsg` to peers, based on po, by iterating over
-//the kademlia's `EachBin` function.
-//returns the number of subscriptions requested
-func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
- var err error
- kad := r.delivery.kad
- subCnt := 0
- //iterate over each bin and solicit needed subscription to bins
- kad.EachBin(r.addr[:], pof, 0, func(conn *network.Peer, po int) bool {
- //identify begin and start index of the bin(s) we want to subscribe to
- subCnt++
- err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), NewRange(0, 0), High)
- if err != nil {
- log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err))
- return false
- }
- return true
- })
- return subCnt, nil
//map chunk keys to addresses which are responsible
func mapKeysToNodes(conf *synctestConfig) {
nodemap := make(map[string][]int)