diff options
Diffstat (limited to 'swarm/network/stream/intervals_test.go')
-rw-r--r-- | swarm/network/stream/intervals_test.go | 506 |
1 files changed, 288 insertions, 218 deletions
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index d996cdc7e..f4294134b 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -22,52 +22,22 @@ import ( "encoding/binary" "fmt" "io" + "os" "sync" "testing" "time" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/p2p/simulations" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" - "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/network" - streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing" + "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" ) -var ( - externalStreamName = "externalStream" - externalStreamSessionAt uint64 = 50 - externalStreamMaxKeys uint64 = 100 -) - -func newIntervalsStreamerService(ctx *adapters.ServiceContext) (node.Service, error) { - id := ctx.Config.ID - addr := toAddr(id) - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - store := stores[id].(*storage.LocalStore) - db := storage.NewDBAPI(store) - delivery := NewDelivery(kad, db) - deliveries[id] = delivery - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ - SkipCheck: defaultSkipCheck, - }) - - r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) { - return newTestExternalClient(db), nil - }) - r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) { - return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil - }) - - go func() { - waitPeerErrC <- waitForPeers(r, 1*time.Second, peerCount(id)) - }() - return &TestExternalRegistry{r}, nil -} - func TestIntervals(t *testing.T) { testIntervals(t, true, nil, false) testIntervals(t, false, NewRange(9, 26), false) @@ -81,237 +51,337 @@ func TestIntervals(t *testing.T) { func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { nodes := 2 chunkCount := dataChunkCount + externalStreamName := "externalStream" + externalStreamSessionAt := uint64(50) + externalStreamMaxKeys := uint64(100) - defer setDefaultSkipCheck(defaultSkipCheck) - defaultSkipCheck = skipCheck - - toAddr = network.NewAddrFromNodeID - conf := &streamTesting.RunConfig{ - Adapter: *adapter, - NodeCount: nodes, - ConnLevel: 1, - ToAddr: toAddr, - Services: services, - DefaultService: "intervalsStreamer", - } + sim := simulation.New(map[string]simulation.ServiceFunc{ + "intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - sim, teardown, err := streamTesting.NewSimulation(conf) - var rpcSubscriptionsWg sync.WaitGroup - defer func() { - rpcSubscriptionsWg.Wait() - teardown() - }() - if err != nil { - t.Fatal(err) - } + id := ctx.Config.ID + addr := network.NewAddrFromNodeID(id) + store, datadir, err := createTestLocalStorageForID(id, addr) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyStore, store) + cleanup = func() { + store.Close() + os.RemoveAll(datadir) + } + localStore := store.(*storage.LocalStore) + db := storage.NewDBAPI(localStore) + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, db) - stores = make(map[discover.NodeID]storage.ChunkStore) - deliveries = make(map[discover.NodeID]*Delivery) - for i, id := range sim.IDs { - stores[id] = sim.Stores[i] - } + r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + SkipCheck: skipCheck, + }) + bucket.Store(bucketKeyRegistry, r) - peerCount = func(id discover.NodeID) int { - return 1 - } + r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) { + return newTestExternalClient(db), nil + }) + r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) { + return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil + }) - fileStore := storage.NewFileStore(sim.Stores[0], storage.NewFileStoreParams()) - size := chunkCount * chunkSize - ctx := context.TODO() - _, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) - if err != nil { - t.Fatal(err) - } - err = wait(ctx) + fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams()) + bucket.Store(bucketKeyFileStore, fileStore) + + return r, cleanup, nil + + }, + }) + defer sim.Close() + + log.Info("Adding nodes to simulation") + _, err := sim.AddNodesAndConnectChain(nodes) if err != nil { t.Fatal(err) } - errc := make(chan error, 1) - waitPeerErrC = make(chan error) - quitC := make(chan struct{}) - defer close(quitC) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) + defer cancel() - action := func(ctx context.Context) error { - i := 0 - for err := range waitPeerErrC { - if err != nil { - return fmt.Errorf("error waiting for peers: %s", err) - } - i++ - if i == nodes { - break - } + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + nodeIDs := sim.UpNodeIDs() + storer := nodeIDs[0] + checker := nodeIDs[1] + + item, ok := sim.NodeItem(storer, bucketKeyFileStore) + if !ok { + return fmt.Errorf("No filestore") } + fileStore := item.(*storage.FileStore) - id := sim.IDs[1] + size := chunkCount * chunkSize + _, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) + if err != nil { + log.Error("Store error: %v", "err", err) + t.Fatal(err) + } + err = wait(ctx) + if err != nil { + log.Error("Wait error: %v", "err", err) + t.Fatal(err) + } - err := sim.CallClient(id, func(client *rpc.Client) error { + item, ok = sim.NodeItem(checker, bucketKeyRegistry) + if !ok { + return fmt.Errorf("No registry") + } + registry := item.(*Registry) - sid := sim.IDs[0] + liveErrC := make(chan error) + historyErrC := make(chan error) - doneC, err := streamTesting.WatchDisconnections(id, client, errc, quitC) - if err != nil { - return err + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + log.Error("WaitKademlia error: %v", "err", err) + return err + } + + log.Debug("Watching for disconnections") + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + if d.Error != nil { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + t.Fatal(d.Error) + } } - rpcSubscriptionsWg.Add(1) - go func() { - <-doneC - rpcSubscriptionsWg.Done() + }() + + go func() { + if !live { + close(liveErrC) + return + } + + var err error + defer func() { + liveErrC <- err }() - ctx, cancel := context.WithTimeout(ctx, 100*time.Second) - defer cancel() - err = client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(externalStreamName, "", live), history, Top) + // live stream + var liveHashesChan chan []byte + liveHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", true)) if err != nil { - return err + log.Error("Subscription error: %v", "err", err) + return } + i := externalStreamSessionAt - liveErrC := make(chan error) - historyErrC := make(chan error) + // we have subscribed, enable notifications + err = enableNotifications(registry, storer, NewStream(externalStreamName, "", true)) + if err != nil { + return + } - go func() { - if !live { - close(liveErrC) + for { + select { + case hash := <-liveHashesChan: + h := binary.BigEndian.Uint64(hash) + if h != i { + err = fmt.Errorf("expected live hash %d, got %d", i, h) + return + } + i++ + if i > externalStreamMaxKeys { + return + } + case <-ctx.Done(): return } + } + }() - var err error - defer func() { - liveErrC <- err - }() + go func() { + if live && history == nil { + close(historyErrC) + return + } - // live stream - liveHashesChan := make(chan []byte) - liveSubscription, err := client.Subscribe(ctx, "stream", liveHashesChan, "getHashes", sid, NewStream(externalStreamName, "", true)) - if err != nil { - return - } - defer liveSubscription.Unsubscribe() + var err error + defer func() { + historyErrC <- err + }() - i := externalStreamSessionAt + // history stream + var historyHashesChan chan []byte + historyHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", false)) + if err != nil { + return + } - // we have subscribed, enable notifications - err = client.CallContext(ctx, nil, "stream_enableNotifications", sid, NewStream(externalStreamName, "", true)) - if err != nil { - return + var i uint64 + historyTo := externalStreamMaxKeys + if history != nil { + i = history.From + if history.To != 0 { + historyTo = history.To } + } - for { - select { - case hash := <-liveHashesChan: - h := binary.BigEndian.Uint64(hash) - if h != i { - err = fmt.Errorf("expected live hash %d, got %d", i, h) - return - } - i++ - if i > externalStreamMaxKeys { - return - } - case err = <-liveSubscription.Err(): + // we have subscribed, enable notifications + err = enableNotifications(registry, storer, NewStream(externalStreamName, "", false)) + if err != nil { + return + } + + for { + select { + case hash := <-historyHashesChan: + h := binary.BigEndian.Uint64(hash) + if h != i { + err = fmt.Errorf("expected history hash %d, got %d", i, h) return - case <-ctx.Done(): + } + i++ + if i > historyTo { return } - } - }() - - go func() { - if live && history == nil { - close(historyErrC) + case <-ctx.Done(): return } + } + }() - var err error - defer func() { - historyErrC <- err - }() + err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top) + if err != nil { + return err + } + if err := <-liveErrC; err != nil { + return err + } + if err := <-historyErrC; err != nil { + return err + } - // history stream - historyHashesChan := make(chan []byte) - historySubscription, err := client.Subscribe(ctx, "stream", historyHashesChan, "getHashes", sid, NewStream(externalStreamName, "", false)) - if err != nil { - return - } - defer historySubscription.Unsubscribe() - - var i uint64 - historyTo := externalStreamMaxKeys - if history != nil { - i = history.From - if history.To != 0 { - historyTo = history.To - } - } + return nil + }) - // we have subscribed, enable notifications - err = client.CallContext(ctx, nil, "stream_enableNotifications", sid, NewStream(externalStreamName, "", false)) - if err != nil { - return - } + if result.Error != nil { + t.Fatal(result.Error) + } +} - for { - select { - case hash := <-historyHashesChan: - h := binary.BigEndian.Uint64(hash) - if h != i { - err = fmt.Errorf("expected history hash %d, got %d", i, h) - return - } - i++ - if i > historyTo { - return - } - case err = <-historySubscription.Err(): - return - case <-ctx.Done(): - return - } - } - }() +func getHashes(ctx context.Context, r *Registry, peerID discover.NodeID, s Stream) (chan []byte, error) { + peer := r.getPeer(peerID) - if err := <-liveErrC; err != nil { - return err - } - if err := <-historyErrC; err != nil { - return err - } + client, err := peer.getClient(ctx, s) + if err != nil { + return nil, err + } + + c := client.Client.(*testExternalClient) + + return c.hashes, nil +} + +func enableNotifications(r *Registry, peerID discover.NodeID, s Stream) error { + peer := r.getPeer(peerID) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() - return nil - }) + client, err := peer.getClient(ctx, s) + if err != nil { return err } - check := func(ctx context.Context, id discover.NodeID) (bool, error) { - select { - case err := <-errc: - return false, err - case <-ctx.Done(): - return false, ctx.Err() - default: + + close(client.Client.(*testExternalClient).enableNotificationsC) + + return nil +} + +type testExternalClient struct { + hashes chan []byte + db *storage.DBAPI + enableNotificationsC chan struct{} +} + +func newTestExternalClient(db *storage.DBAPI) *testExternalClient { + return &testExternalClient{ + hashes: make(chan []byte), + db: db, + enableNotificationsC: make(chan struct{}), + } +} + +func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() { + chunk, _ := c.db.GetOrCreateRequest(ctx, hash) + if chunk.ReqC == nil { + return nil + } + c.hashes <- hash + //NOTE: This was failing on go1.9.x with a deadlock. + //Sometimes this function would just block + //It is commented now, but it may be well worth after the chunk refactor + //to re-enable this and see if the problem has been addressed + /* + return func() { + return chunk.WaitToStore() } - return true, nil + */ + return nil +} + +func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) { + return nil +} + +func (c *testExternalClient) Close() {} + +const testExternalServerBatchSize = 10 + +type testExternalServer struct { + t string + keyFunc func(key []byte, index uint64) + sessionAt uint64 + maxKeys uint64 +} + +func newTestExternalServer(t string, sessionAt, maxKeys uint64, keyFunc func(key []byte, index uint64)) *testExternalServer { + if keyFunc == nil { + keyFunc = binary.BigEndian.PutUint64 } + return &testExternalServer{ + t: t, + keyFunc: keyFunc, + sessionAt: sessionAt, + maxKeys: maxKeys, + } +} - conf.Step = &simulations.Step{ - Action: action, - Trigger: streamTesting.Trigger(10*time.Millisecond, quitC, sim.IDs[0]), - Expect: &simulations.Expectation{ - Nodes: sim.IDs[1:1], - Check: check, - }, +func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { + if from == 0 && to == 0 { + from = s.sessionAt + to = s.sessionAt + testExternalServerBatchSize } - startedAt := time.Now() - timeout := 300 * time.Second - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - result, err := sim.Run(ctx, conf) - finishedAt := time.Now() - if err != nil { - t.Fatalf("Setting up simulation failed: %v", err) + if to-from > testExternalServerBatchSize { + to = from + testExternalServerBatchSize - 1 } - if result.Error != nil { - t.Fatalf("Simulation failed: %s", result.Error) + if from >= s.maxKeys && to > s.maxKeys { + return nil, 0, 0, nil, io.EOF + } + if to > s.maxKeys { + to = s.maxKeys + } + b := make([]byte, HashSize*(to-from+1)) + for i := from; i <= to; i++ { + s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i) } - streamTesting.CheckResult(t, result, startedAt, finishedAt) + return b, from, to, nil, nil } + +func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) { + return make([]byte, 4096), nil +} + +func (s *testExternalServer) Close() {} |