diff options
author | Anton Evangelatov <anton.evangelatov@gmail.com> | 2019-04-11 16:26:52 +0800 |
---|---|---|
committer | Anton Evangelatov <anton.evangelatov@gmail.com> | 2019-05-10 18:26:30 +0800 |
commit | 993b145f25845e50e8af41ffb1116eaee381d693 (patch) | |
tree | 47a88eec27f66b7237512c862d7ab2f8e9f314d3 /swarm/network | |
parent | 996755c4a832afce8629a771cab8879c88c98355 (diff) | |
download | go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar.gz go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar.bz2 go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar.lz go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar.xz go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar.zst go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.zip |
swarm/storage/localstore: fix export db.Put signature
cmd/swarm/swarm-smoke: improve smoke tests (#1337)
swarm/network: remove dead code (#1339)
swarm/network: remove FetchStore and SyncChunkStore in favor of NetStore (#1342)
Diffstat (limited to 'swarm/network')
-rw-r--r-- | swarm/network/hive.go | 2 | ||||
-rw-r--r-- | swarm/network/kademlia.go | 15 | ||||
-rw-r--r-- | swarm/network/kademlia_test.go | 2 | ||||
-rw-r--r-- | swarm/network/stream/delivery.go | 149 | ||||
-rw-r--r-- | swarm/network/stream/delivery_test.go | 162 | ||||
-rw-r--r-- | swarm/network/stream/intervals_test.go | 10 | ||||
-rw-r--r-- | swarm/network/stream/lightnode_test.go | 89 | ||||
-rw-r--r-- | swarm/network/stream/messages.go | 4 | ||||
-rw-r--r-- | swarm/network/stream/peer.go | 2 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_retrieval_test.go | 1 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_sync_test.go | 1 | ||||
-rw-r--r-- | swarm/network/stream/stream.go | 124 | ||||
-rw-r--r-- | swarm/network/stream/streamer_test.go | 21 | ||||
-rw-r--r-- | swarm/network/stream/syncer.go | 82 | ||||
-rw-r--r-- | swarm/network/stream/syncer_test.go | 7 |
15 files changed, 161 insertions, 510 deletions
diff --git a/swarm/network/hive.go b/swarm/network/hive.go index 2eb521f1d..ad51b29c2 100644 --- a/swarm/network/hive.go +++ b/swarm/network/hive.go @@ -116,7 +116,7 @@ func (h *Hive) Stop() error { log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4])) h.EachConn(nil, 255, func(p *Peer, _ int) bool { log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4])) - p.Drop(nil) + p.Drop() return true }) diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index dd6de44fd..f553cb5f4 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/pot" sv "github.com/ethereum/go-ethereum/swarm/version" @@ -138,6 +139,9 @@ func (e *entry) Hex() string { func (k *Kademlia) Register(peers ...*BzzAddr) error { k.lock.Lock() defer k.lock.Unlock() + + metrics.GetOrRegisterCounter("kad.register", nil).Inc(1) + var known, size int for _, p := range peers { log.Trace("kademlia trying to register", "addr", p) @@ -164,8 +168,6 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error { return newEntry(p) } - log.Trace("found among known peers, underlay addr is same, do nothing", "new", p, "old", e.BzzAddr) - return v }) if found { @@ -186,6 +188,9 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error { func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, changed bool) { k.lock.Lock() defer k.lock.Unlock() + + metrics.GetOrRegisterCounter("kad.suggestpeer", nil).Inc(1) + radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base) // collect undersaturated bins in ascending order of number of connected peers // and from shallow to deep (ascending order of PO) @@ -297,6 +302,9 @@ func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, c func (k *Kademlia) On(p *Peer) (uint8, bool) { k.lock.Lock() defer k.lock.Unlock() + + metrics.GetOrRegisterCounter("kad.on", nil).Inc(1) + var ins bool k.conns, _, _, _ = pot.Swap(k.conns, p, Pof, func(v pot.Val) pot.Val { // if not found live @@ -320,7 +328,6 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) { k.addrCountC <- k.addrs.Size() } } - log.Trace(k.string()) // calculate if depth of saturation changed depth := uint8(k.saturation()) var changed bool @@ -608,7 +615,7 @@ func (k *Kademlia) string() string { if len(sv.GitCommit) > 0 { rows = append(rows, fmt.Sprintf("commit hash: %s", sv.GitCommit)) } - rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr()[:3])) + rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr())) rows = append(rows, fmt.Sprintf("population: %d (%d), NeighbourhoodSize: %d, MinBinSize: %d, MaxBinSize: %d", k.conns.Size(), k.addrs.Size(), k.NeighbourhoodSize, k.MinBinSize, k.MaxBinSize)) liverows := make([]string, k.MaxProxDisplay) diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go index b4663eee5..93b990138 100644 --- a/swarm/network/kademlia_test.go +++ b/swarm/network/kademlia_test.go @@ -541,7 +541,7 @@ func TestKademliaHiveString(t *testing.T) { tk.Register("10000000", "10000001") tk.MaxProxDisplay = 8 h := tk.String() - expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 000000\npopulation: 2 (4), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n============ DEPTH: 0 ==========================================\n000 0 | 2 8100 (0) 8000 (0)\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n=========================================================================" + expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 0000000000000000000000000000000000000000000000000000000000000000\npopulation: 2 (4), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n============ DEPTH: 0 ==========================================\n000 0 | 2 8100 (0) 8000 (0)\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n=========================================================================" if expH[104:] != h[104:] { t.Fatalf("incorrect hive output. expected %v, got %v", expH, h) } diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 059666723..aa2c817ea 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/enode" @@ -33,11 +34,6 @@ import ( olog "github.com/opentracing/opentracing-go/log" ) -const ( - swarmChunkServerStreamName = "RETRIEVE_REQUEST" - deliveryCap = 32 -) - var ( processReceivedChunksCount = metrics.NewRegisteredCounter("network.stream.received_chunks.count", nil) handleRetrieveRequestMsgCount = metrics.NewRegisteredCounter("network.stream.handle_retrieve_request_msg.count", nil) @@ -45,91 +41,23 @@ var ( requestFromPeersCount = metrics.NewRegisteredCounter("network.stream.request_from_peers.count", nil) requestFromPeersEachCount = metrics.NewRegisteredCounter("network.stream.request_from_peers_each.count", nil) + + lastReceivedChunksMsg = metrics.GetOrRegisterGauge("network.stream.received_chunks", nil) ) type Delivery struct { - chunkStore chunk.FetchStore - kad *network.Kademlia - getPeer func(enode.ID) *Peer + netStore *storage.NetStore + kad *network.Kademlia + getPeer func(enode.ID) *Peer + quit chan struct{} } -func NewDelivery(kad *network.Kademlia, chunkStore chunk.FetchStore) *Delivery { +func NewDelivery(kad *network.Kademlia, netStore *storage.NetStore) *Delivery { return &Delivery{ - chunkStore: chunkStore, - kad: kad, - } -} - -// SwarmChunkServer implements Server -type SwarmChunkServer struct { - deliveryC chan []byte - batchC chan []byte - chunkStore storage.ChunkStore - currentLen uint64 - quit chan struct{} -} - -// NewSwarmChunkServer is SwarmChunkServer constructor -func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer { - s := &SwarmChunkServer{ - deliveryC: make(chan []byte, deliveryCap), - batchC: make(chan []byte), - chunkStore: chunkStore, - quit: make(chan struct{}), - } - go s.processDeliveries() - return s -} - -// processDeliveries handles delivered chunk hashes -func (s *SwarmChunkServer) processDeliveries() { - var hashes []byte - var batchC chan []byte - for { - select { - case <-s.quit: - return - case hash := <-s.deliveryC: - hashes = append(hashes, hash...) - batchC = s.batchC - case batchC <- hashes: - hashes = nil - batchC = nil - } - } -} - -// SessionIndex returns zero in all cases for SwarmChunkServer. -func (s *SwarmChunkServer) SessionIndex() (uint64, error) { - return 0, nil -} - -// SetNextBatch -func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) { - select { - case hashes = <-s.batchC: - case <-s.quit: - return - } - - from = s.currentLen - s.currentLen += uint64(len(hashes)) - to = s.currentLen - return -} - -// Close needs to be called on a stream server -func (s *SwarmChunkServer) Close() { - close(s.quit) -} - -// GetData retrieves chunk data from db store -func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - ch, err := s.chunkStore.Get(ctx, chunk.ModeGetRequest, storage.Address(key)) - if err != nil { - return nil, err + netStore: netStore, + kad: kad, + quit: make(chan struct{}), } - return ch.Data(), nil } // RetrieveRequestMsg is the protocol msg for chunk retrieve requests @@ -150,12 +78,6 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * osp.LogFields(olog.String("ref", req.Addr.String())) - s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true)) - if err != nil { - return err - } - streamer := s.Server.(*SwarmChunkServer) - var cancel func() // TODO: do something with this hardcoded timeout, maybe use TTL in the future ctx = context.WithValue(ctx, "peer", sp.ID().String()) @@ -165,36 +87,26 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * go func() { select { case <-ctx.Done(): - case <-streamer.quit: + case <-d.quit: } cancel() }() go func() { defer osp.Finish() - ch, err := d.chunkStore.Get(ctx, chunk.ModeGetRequest, req.Addr) + ch, err := d.netStore.Get(ctx, chunk.ModeGetRequest, req.Addr) if err != nil { retrieveChunkFail.Inc(1) log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err) return } - if req.SkipCheck { - syncing := false - osp.LogFields(olog.Bool("skipCheck", true)) + syncing := false - err = sp.Deliver(ctx, ch, s.priority, syncing) - if err != nil { - log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) - } - osp.LogFields(olog.Bool("delivered", true)) - return - } - osp.LogFields(olog.Bool("skipCheck", false)) - select { - case streamer.deliveryC <- ch.Address()[:]: - case <-streamer.quit: + err = sp.Deliver(ctx, ch, Top, syncing) + if err != nil { + log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) } - + osp.LogFields(olog.Bool("delivered", true)) }() return nil @@ -225,6 +137,9 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int processReceivedChunksCount.Inc(1) + // record the last time we received a chunk delivery message + lastReceivedChunksMsg.Update(time.Now().UnixNano()) + var msg *ChunkDeliveryMsg var mode chunk.ModePut switch r := req.(type) { @@ -244,31 +159,25 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int case *ChunkDeliveryMsgSyncing: msg = (*ChunkDeliveryMsg)(r) mode = chunk.ModePutSync + case *ChunkDeliveryMsg: + msg = r + mode = chunk.ModePutSync } - // retrieve the span for the originating retrieverequest - spanID := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), msg.Addr) - span := tracing.ShiftSpanByKey(spanID) - log.Trace("handle.chunk.delivery", "ref", msg.Addr, "from peer", sp.ID()) go func() { defer osp.Finish() - if span != nil { - span.LogFields(olog.String("finish", "from handleChunkDeliveryMsg")) - defer span.Finish() - } - msg.peer = sp log.Trace("handle.chunk.delivery", "put", msg.Addr) - _, err := d.chunkStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData)) + _, err := d.netStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData)) if err != nil { if err == storage.ErrChunkInvalid { // we removed this log because it spams the logs // TODO: Enable this log line // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", msg.Addr, ) - msg.peer.Drop(err) + msg.peer.Drop() } } log.Trace("handle.chunk.delivery", "done put", msg.Addr, "err", err) @@ -276,6 +185,12 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int return nil } +func (d *Delivery) Close() { + d.kad.CloseNeighbourhoodDepthC() + d.kad.CloseAddrCountC() + close(d.quit) +} + // RequestFromPeers sends a chunk retrieve request to a peer // The most eligible peer that hasn't already been sent to is chosen // TODO: define "eligible" diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 801e6d98a..4037243c1 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -41,64 +41,11 @@ import ( "github.com/ethereum/go-ethereum/swarm/testutil" ) -//Tests initializing a retrieve request -func TestStreamerRetrieveRequest(t *testing.T) { - regOpts := &RegistryOptions{ - Retrieval: RetrievalClientOnly, - Syncing: SyncingDisabled, - } - tester, streamer, _, teardown, err := newStreamerTester(regOpts) - if err != nil { - t.Fatal(err) - } - defer teardown() - - node := tester.Nodes[0] - - ctx := context.Background() - req := network.NewRequest( - storage.Address(hash0[:]), - true, - &sync.Map{}, - ) - streamer.delivery.RequestFromPeers(ctx, req) - - stream := NewStream(swarmChunkServerStreamName, "", true) - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "RetrieveRequestMsg", - Expects: []p2ptest.Expect{ - { //start expecting a subscription for RETRIEVE_REQUEST due to `RetrievalClientOnly` - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }, - Peer: node.ID(), - }, - { //expect a retrieve request message for the given hash - Code: 5, - Msg: &RetrieveRequestMsg{ - Addr: hash0[:], - SkipCheck: true, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatalf("Expected no error, got %v", err) - } -} - //Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet) //Should time out as the peer does not have the chunk (no syncing happened previously) func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalEnabled, - Syncing: SyncingDisabled, //do no syncing + tester, _, _, teardown, err := newStreamerTester(&RegistryOptions{ + Syncing: SyncingDisabled, //do no syncing }) if err != nil { t.Fatal(err) @@ -109,30 +56,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { chunk := storage.NewChunk(storage.Address(hash0[:]), nil) - peer := streamer.getPeer(node.ID()) - - stream := NewStream(swarmChunkServerStreamName, "", true) - //simulate pre-subscription to RETRIEVE_REQUEST stream on peer - peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }) - //test the exchange err = tester.TestExchanges(p2ptest.Exchange{ - Expects: []p2ptest.Expect{ - { //first expect a subscription to the RETRIEVE_REQUEST stream - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }, - Peer: node.ID(), - }, - }, - }, p2ptest.Exchange{ Label: "RetrieveRequestMsg", Triggers: []p2ptest.Trigger{ { //then the actual RETRIEVE_REQUEST.... @@ -159,7 +84,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { //should fail with a timeout as the peer we are requesting //the chunk from does not have the chunk - expectedError := `exchange #1 "RetrieveRequestMsg": timed out` + expectedError := `exchange #0 "RetrieveRequestMsg": timed out` if err == nil || err.Error() != expectedError { t.Fatalf("Expected error %v, got %v", expectedError, err) } @@ -168,9 +93,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { // upstream request server receives a retrieve Request and responds with // offered hashes or delivery if skipHash is set to true func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { - tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalEnabled, - Syncing: SyncingDisabled, + tester, _, localStore, teardown, err := newStreamerTester(&RegistryOptions{ + Syncing: SyncingDisabled, }) if err != nil { t.Fatal(err) @@ -179,36 +103,14 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { node := tester.Nodes[0] - peer := streamer.getPeer(node.ID()) - - stream := NewStream(swarmChunkServerStreamName, "", true) - - peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }) - - hash := storage.Address(hash0[:]) - ch := storage.NewChunk(hash, hash) + hash := storage.Address(hash1[:]) + ch := storage.NewChunk(hash, hash1[:]) _, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch) if err != nil { t.Fatalf("Expected no err got %v", err) } err = tester.TestExchanges(p2ptest.Exchange{ - Expects: []p2ptest.Expect{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }, - Peer: node.ID(), - }, - }, - }, p2ptest.Exchange{ Label: "RetrieveRequestMsg", Triggers: []p2ptest.Trigger{ { @@ -221,51 +123,10 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { }, Expects: []p2ptest.Expect{ { - Code: 1, - Msg: &OfferedHashesMsg{ - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, - Hashes: hash, - From: 0, - // TODO: why is this 32??? - To: 32, - Stream: stream, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } - - hash = storage.Address(hash1[:]) - ch = storage.NewChunk(hash, hash1[:]) - _, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch) - if err != nil { - t.Fatalf("Expected no err got %v", err) - } - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "RetrieveRequestMsg", - Triggers: []p2ptest.Trigger{ - { - Code: 5, - Msg: &RetrieveRequestMsg{ - Addr: hash, - SkipCheck: true, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { Code: 6, Msg: &ChunkDeliveryMsg{ - Addr: hash, - SData: hash, + Addr: ch.Address(), + SData: ch.Data(), }, Peer: node.ID(), }, @@ -359,8 +220,7 @@ func TestRequestFromPeersWithLightNode(t *testing.T) { func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingDisabled, + Syncing: SyncingDisabled, }) if err != nil { t.Fatal(err) @@ -472,7 +332,6 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, Syncing: SyncingDisabled, - Retrieval: RetrievalEnabled, }, nil) bucket.Store(bucketKeyRegistry, r) @@ -623,7 +482,6 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, Syncing: SyncingDisabled, - Retrieval: RetrievalDisabled, SyncUpdateDelay: 0, }, nil) bucket.Store(bucketKeyRegistry, r) diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index 1f2cdcada..660954857 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" - "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" @@ -67,7 +66,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { } r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalDisabled, Syncing: SyncingRegisterOnly, SkipCheck: skipCheck, }, nil) @@ -288,20 +286,20 @@ func enableNotifications(r *Registry, peerID enode.ID, s Stream) error { type testExternalClient struct { hashes chan []byte - store chunk.FetchStore + netStore *storage.NetStore enableNotificationsC chan struct{} } -func newTestExternalClient(store chunk.FetchStore) *testExternalClient { +func newTestExternalClient(netStore *storage.NetStore) *testExternalClient { return &testExternalClient{ hashes: make(chan []byte), - store: store, + netStore: netStore, enableNotificationsC: make(chan struct{}), } } func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error { - wait := c.store.FetchFunc(ctx, storage.Address(hash)) + wait := c.netStore.FetchFunc(ctx, storage.Address(hash)) if wait == nil { return nil } diff --git a/swarm/network/stream/lightnode_test.go b/swarm/network/stream/lightnode_test.go index 501660fab..eb4e73d47 100644 --- a/swarm/network/stream/lightnode_test.go +++ b/swarm/network/stream/lightnode_test.go @@ -22,94 +22,10 @@ import ( ) // This test checks the default behavior of the server, that is -// when it is serving Retrieve requests. -func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) { - registryOptions := &RegistryOptions{ - Retrieval: RetrievalClientOnly, - Syncing: SyncingDisabled, - } - tester, _, _, teardown, err := newStreamerTester(registryOptions) - if err != nil { - t.Fatal(err) - } - defer teardown() - - node := tester.Nodes[0] - - stream := NewStream(swarmChunkServerStreamName, "", false) - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "SubscribeMsg", - Triggers: []p2ptest.Trigger{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - }, - Peer: node.ID(), - }, - }, - }) - if err != nil { - t.Fatalf("Got %v", err) - } - - err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID()}) - if err == nil || err.Error() != "timed out waiting for peers to disconnect" { - t.Fatalf("Expected no disconnect, got %v", err) - } -} - -// This test checks the Lightnode behavior of server, when serving Retrieve -// requests are disabled -func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) { - registryOptions := &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingDisabled, - } - tester, _, _, teardown, err := newStreamerTester(registryOptions) - if err != nil { - t.Fatal(err) - } - defer teardown() - - node := tester.Nodes[0] - - stream := NewStream(swarmChunkServerStreamName, "", false) - - err = tester.TestExchanges( - p2ptest.Exchange{ - Label: "SubscribeMsg", - Triggers: []p2ptest.Trigger{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { - Code: 7, - Msg: &SubscribeErrorMsg{ - Error: "stream RETRIEVE_REQUEST not registered", - }, - Peer: node.ID(), - }, - }, - }) - if err != nil { - t.Fatalf("Got %v", err) - } -} - -// This test checks the default behavior of the server, that is // when syncing is enabled. func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { registryOptions := &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingRegisterOnly, + Syncing: SyncingRegisterOnly, } tester, _, _, teardown, err := newStreamerTester(registryOptions) if err != nil { @@ -153,8 +69,7 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { // when syncing is disabled. func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) { registryOptions := &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingDisabled, + Syncing: SyncingDisabled, } tester, _, _, teardown, err := newStreamerTester(registryOptions) if err != nil { diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index b293724cc..b60d2fcc9 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -247,7 +247,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg case err := <-errC: if err != nil { log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err) - p.Drop(err) + p.Drop() return } case <-ctx.Done(): @@ -289,7 +289,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg case err := <-c.next: if err != nil { log.Warn("c.next error dropping peer", "err", err) - p.Drop(err) + p.Drop() return } case <-c.quit: diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 152814bd4..98b237ce2 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -90,7 +90,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { err := p.Send(wmsg.Context, wmsg.Msg) if err != nil { log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) - p.Drop(err) + p.Drop() } }) diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index 2d5935276..e34f87951 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -119,7 +119,6 @@ var retrievalSimServiceMap = map[string]simulation.ServiceFunc{ } r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalEnabled, Syncing: SyncingAutoSubscribe, SyncUpdateDelay: syncUpdateDelay, }, nil) diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 605c9dbeb..fefdb7c9f 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -118,7 +118,6 @@ var simServiceMap = map[string]simulation.ServiceFunc{ store := state.NewInmemoryStore() r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{ - Retrieval: RetrievalDisabled, Syncing: SyncingAutoSubscribe, SyncUpdateDelay: 3 * time.Second, }, nil) diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 0d990da5c..10a8f7ec5 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -18,7 +18,6 @@ package stream import ( "context" - "errors" "fmt" "math" "reflect" @@ -30,11 +29,11 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/protocols" "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/state" + "github.com/ethereum/go-ethereum/swarm/storage" ) const ( @@ -49,7 +48,6 @@ const ( // Enumerate options for syncing and retrieval type SyncingOption int -type RetrievalOption int // Syncing options const ( @@ -61,17 +59,6 @@ const ( SyncingAutoSubscribe ) -const ( - // Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only) - RetrievalDisabled RetrievalOption = iota - // Only the client side of the retrieve request is registered. - // (light nodes do not serve retrieve requests) - // once the client is registered, subscription to retrieve request stream is always sent - RetrievalClientOnly - // Both client and server funcs are registered, subscribe sent automatically - RetrievalEnabled -) - // subscriptionFunc is used to determine what to do in order to perform subscriptions // usually we would start to really subscribe to nodes, but for tests other functionality may be needed // (see TestRequestPeerSubscriptions in streamer_test.go) @@ -90,7 +77,6 @@ type Registry struct { peers map[enode.ID]*Peer delivery *Delivery intervalsStore state.Store - autoRetrieval bool // automatically subscribe to retrieve request stream maxPeerServers int spec *protocols.Spec //this protocol's spec balance protocols.Balance //implements protocols.Balance, for accounting @@ -101,22 +87,19 @@ type Registry struct { // RegistryOptions holds optional values for NewRegistry constructor. type RegistryOptions struct { SkipCheck bool - Syncing SyncingOption // Defines syncing behavior - Retrieval RetrievalOption // Defines retrieval behavior + Syncing SyncingOption // Defines syncing behavior SyncUpdateDelay time.Duration MaxPeerServers int // The limit of servers for each peer in registry } // NewRegistry is Streamer constructor -func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore chunk.FetchStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry { +func NewRegistry(localID enode.ID, delivery *Delivery, netStore *storage.NetStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry { if options == nil { options = &RegistryOptions{} } if options.SyncUpdateDelay <= 0 { options.SyncUpdateDelay = 15 * time.Second } - // check if retrieval has been disabled - retrieval := options.Retrieval != RetrievalDisabled quit := make(chan struct{}) @@ -128,7 +111,6 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore chunk.Fetc peers: make(map[enode.ID]*Peer), delivery: delivery, intervalsStore: intervalsStore, - autoRetrieval: retrieval, maxPeerServers: options.MaxPeerServers, balance: balance, quit: quit, @@ -139,27 +121,10 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore chunk.Fetc streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer - // if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only) - if options.Retrieval == RetrievalEnabled { - streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) { - if !live { - return nil, errors.New("only live retrieval requests supported") - } - return NewSwarmChunkServer(delivery.chunkStore), nil - }) - } - - // if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests) - if options.Retrieval != RetrievalDisabled { - streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) - }) - } - // If syncing is not disabled, the syncing functions are registered (both client and server) if options.Syncing != SyncingDisabled { - RegisterSwarmSyncerServer(streamer, syncChunkStore) - RegisterSwarmSyncerClient(streamer, syncChunkStore) + RegisterSwarmSyncerServer(streamer, netStore) + RegisterSwarmSyncerClient(streamer, netStore) } // if syncing is set to automatically subscribe to the syncing stream, start the subscription process @@ -381,7 +346,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8 } log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h) - return peer.SendPriority(context.TODO(), msg, priority) + return peer.Send(context.TODO(), msg) } func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error { @@ -422,8 +387,7 @@ func (r *Registry) Quit(peerId enode.ID, s Stream) error { func (r *Registry) Close() error { // Stop sending neighborhood depth change and address count // change from Kademlia that were initiated in NewRegistry constructor. - r.delivery.kad.CloseNeighbourhoodDepthC() - r.delivery.kad.CloseAddrCountC() + r.delivery.Close() close(r.quit) return r.intervalsStore.Close() } @@ -464,13 +428,6 @@ func (r *Registry) Run(p *network.BzzPeer) error { defer close(sp.quit) defer sp.close() - if r.autoRetrieval && !p.LightNode { - err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top) - if err != nil { - return err - } - } - return sp.Run(sp.HandleMsg) } @@ -619,19 +576,66 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { return p.handleUnsubscribeMsg(msg) case *OfferedHashesMsg: - return p.handleOfferedHashesMsg(ctx, msg) + go func() { + err := p.handleOfferedHashesMsg(ctx, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *TakeoverProofMsg: - return p.handleTakeoverProofMsg(ctx, msg) + go func() { + err := p.handleTakeoverProofMsg(ctx, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *WantedHashesMsg: - return p.handleWantedHashesMsg(ctx, msg) + go func() { + err := p.handleWantedHashesMsg(ctx, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil + + case *ChunkDeliveryMsgRetrieval: + // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg + go func() { + err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil - case *ChunkDeliveryMsgRetrieval, *ChunkDeliveryMsgSyncing: - return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, msg) + case *ChunkDeliveryMsgSyncing: + // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg + go func() { + err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *RetrieveRequestMsg: - return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) + go func() { + err := p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *RequestSubscriptionMsg: return p.handleRequestSubscription(ctx, msg) @@ -762,7 +766,7 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error return err } - if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil { + if err := p.Send(context.TODO(), tp); err != nil { return err } if c.to > 0 && tp.Takeover.End >= c.to { @@ -964,15 +968,13 @@ func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error { } /* -GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. +GetPeerServerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. It can be called via RPC. It returns a map of node IDs with an array of string representations of Stream objects. */ -func (api *API) GetPeerSubscriptions() map[string][]string { - //create the empty map +func (api *API) GetPeerServerSubscriptions() map[string][]string { pstreams := make(map[string][]string) - //iterate all streamer peers api.streamer.peersMu.RLock() defer api.streamer.peersMu.RUnlock() diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index bdd3087bb..c7da05014 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -539,7 +539,7 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { t.Fatal(err) } - expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)") + expectedError := errors.New("subprotocol error") if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: expectedError}); err != nil { t.Fatal(err) } @@ -779,7 +779,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { func TestMaxPeerServersWithUnsubscribe(t *testing.T) { var maxPeerServers = 6 tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalDisabled, Syncing: SyncingDisabled, MaxPeerServers: maxPeerServers, }) @@ -940,8 +939,7 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { //`Price` interface implementation func TestHasPriceImplementation(t *testing.T) { _, r, _, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingDisabled, + Syncing: SyncingDisabled, }) if err != nil { t.Fatal(err) @@ -1123,8 +1121,8 @@ func TestRequestPeerSubscriptions(t *testing.T) { } } -// TestGetSubscriptions is a unit test for the api.GetPeerSubscriptions() function -func TestGetSubscriptions(t *testing.T) { +// TestGetServerSubscriptions is a unit test for the api.GetPeerServerSubscriptions() function +func TestGetServerSubscriptions(t *testing.T) { // create an amount of dummy peers testPeerCount := 8 // every peer will have this amount of dummy servers @@ -1135,7 +1133,7 @@ func TestGetSubscriptions(t *testing.T) { r := &Registry{} api := NewAPI(r) // call once, at this point should be empty - regs := api.GetPeerSubscriptions() + regs := api.GetPeerServerSubscriptions() if len(regs) != 0 { t.Fatal("Expected subscription count to be 0, but it is not") } @@ -1159,7 +1157,7 @@ func TestGetSubscriptions(t *testing.T) { r.peers = peerMap // call the subscriptions again - regs = api.GetPeerSubscriptions() + regs = api.GetPeerServerSubscriptions() // count how many (fake) subscriptions there are cnt := 0 for _, reg := range regs { @@ -1175,11 +1173,11 @@ func TestGetSubscriptions(t *testing.T) { } /* -TestGetSubscriptionsRPC sets up a simulation network of `nodeCount` nodes, +TestGetServerSubscriptionsRPC sets up a simulation network of `nodeCount` nodes, starts the simulation, waits for SyncUpdateDelay in order to kick off stream registration, then tests that there are subscriptions. */ -func TestGetSubscriptionsRPC(t *testing.T) { +func TestGetServerSubscriptionsRPC(t *testing.T) { if testutil.RaceEnabled && os.Getenv("TRAVIS") == "true" { t.Skip("flaky with -race on Travis") @@ -1226,7 +1224,6 @@ func TestGetSubscriptionsRPC(t *testing.T) { // configure so that sync registrations actually happen r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalEnabled, Syncing: SyncingAutoSubscribe, //enable sync registrations SyncUpdateDelay: syncUpdateDelay, }, nil) @@ -1321,7 +1318,7 @@ func TestGetSubscriptionsRPC(t *testing.T) { //ask it for subscriptions pstreams := make(map[string][]string) - err = client.Call(&pstreams, "stream_getPeerSubscriptions") + err = client.Call(&pstreams, "stream_getPeerServerSubscriptions") if err != nil { return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err) } diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index c573da5d2..79b04a307 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -34,27 +34,27 @@ const ( // * live request delivery with or without checkback // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { - po uint8 - store chunk.FetchStore - quit chan struct{} + po uint8 + netStore *storage.NetStore + quit chan struct{} } // NewSwarmSyncerServer is constructor for SwarmSyncerServer -func NewSwarmSyncerServer(po uint8, syncChunkStore chunk.FetchStore) (*SwarmSyncerServer, error) { +func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore) (*SwarmSyncerServer, error) { return &SwarmSyncerServer{ - po: po, - store: syncChunkStore, - quit: make(chan struct{}), + po: po, + netStore: netStore, + quit: make(chan struct{}), }, nil } -func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore chunk.FetchStore) { +func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore) { streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { return nil, err } - return NewSwarmSyncerServer(po, syncChunkStore) + return NewSwarmSyncerServer(po, netStore) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) @@ -68,7 +68,7 @@ func (s *SwarmSyncerServer) Close() { // GetData retrieves the actual chunk from netstore func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - ch, err := s.store.Get(ctx, chunk.ModeGetSync, storage.Address(key)) + ch, err := s.netStore.Get(ctx, chunk.ModeGetSync, storage.Address(key)) if err != nil { return nil, err } @@ -77,7 +77,7 @@ func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, er // SessionIndex returns current storage bin (po) index. func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { - return s.store.LastPullSubscriptionBinID(s.po) + return s.netStore.LastPullSubscriptionBinID(s.po) } // SetNextBatch retrieves the next batch of hashes from the localstore. @@ -88,7 +88,7 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { // are added in batchTimeout period, the batch will be returned. This function // will block until new chunks are received from localstore pull subscription. func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { - descriptors, stop := s.store.SubscribePull(context.Background(), s.po, from, to) + descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to) defer stop() const batchTimeout = 2 * time.Second @@ -118,7 +118,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 // This is the most naive approach to label the chunk as synced // allowing it to be garbage collected. A proper way requires // validating that the chunk is successfully stored by the peer. - err := s.store.Set(context.Background(), chunk.ModeSetSync, d.Address) + err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address) if err != nil { return nil, 0, 0, nil, err } @@ -158,67 +158,31 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 // SwarmSyncerClient type SwarmSyncerClient struct { - store chunk.FetchStore - peer *Peer - stream Stream + netStore *storage.NetStore + peer *Peer + stream Stream } // NewSwarmSyncerClient is a contructor for provable data exchange syncer -func NewSwarmSyncerClient(p *Peer, store chunk.FetchStore, stream Stream) (*SwarmSyncerClient, error) { +func NewSwarmSyncerClient(p *Peer, netStore *storage.NetStore, stream Stream) (*SwarmSyncerClient, error) { return &SwarmSyncerClient{ - store: store, - peer: p, - stream: stream, + netStore: netStore, + peer: p, + stream: stream, }, nil } -// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer -// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { -// retrieveC := make(storage.Chunk, chunksCap) -// RunChunkRequestor(p, retrieveC) -// storeC := make(storage.Chunk, chunksCap) -// RunChunkStorer(store, storeC) -// s := &SwarmSyncerClient{ -// po: po, -// priority: priority, -// sessionAt: sessionAt, -// start: index, -// end: index, -// nextC: make(chan struct{}, 1), -// intervals: intervals, -// sessionRoot: sessionRoot, -// sessionReader: chunker.Join(sessionRoot, retrieveC), -// retrieveC: retrieveC, -// storeC: storeC, -// } -// return s -// } - -// // StartSyncing is called on the Peer to start the syncing process -// // the idea is that it is called only after kademlia is close to healthy -// func StartSyncing(s *Streamer, peerId enode.ID, po uint8, nn bool) { -// lastPO := po -// if nn { -// lastPO = maxPO -// } -// -// for i := po; i <= lastPO; i++ { -// s.Subscribe(peerId, "SYNC", newSyncLabel("LIVE", po), 0, 0, High, true) -// s.Subscribe(peerId, "SYNC", newSyncLabel("HISTORY", po), 0, 0, Mid, false) -// } -// } - // RegisterSwarmSyncerClient registers the client constructor function for // to handle incoming sync streams -func RegisterSwarmSyncerClient(streamer *Registry, store chunk.FetchStore) { +func RegisterSwarmSyncerClient(streamer *Registry, netStore *storage.NetStore) { streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live)) + return NewSwarmSyncerClient(p, netStore, NewStream("SYNC", t, live)) }) } // NeedData func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) { - return s.store.FetchFunc(ctx, key) + return s.netStore.FetchFunc(ctx, key) } // BatchDone diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index a8651f386..b787c7bb8 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -83,7 +83,6 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p } r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{ - Retrieval: RetrievalDisabled, Syncing: SyncingAutoSubscribe, SkipCheck: skipCheck, }, nil) @@ -232,8 +231,7 @@ func TestSameVersionID(t *testing.T) { } r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingAutoSubscribe, + Syncing: SyncingAutoSubscribe, }, nil) bucket.Store(bucketKeyRegistry, r) @@ -296,8 +294,7 @@ func TestDifferentVersionID(t *testing.T) { } r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingAutoSubscribe, + Syncing: SyncingAutoSubscribe, }, nil) bucket.Store(bucketKeyRegistry, r) |