aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network
diff options
context:
space:
mode:
authorAnton Evangelatov <anton.evangelatov@gmail.com>2019-04-11 16:26:52 +0800
committerAnton Evangelatov <anton.evangelatov@gmail.com>2019-05-10 18:26:30 +0800
commit993b145f25845e50e8af41ffb1116eaee381d693 (patch)
tree47a88eec27f66b7237512c862d7ab2f8e9f314d3 /swarm/network
parent996755c4a832afce8629a771cab8879c88c98355 (diff)
downloadgo-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar
go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar.gz
go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar.bz2
go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar.lz
go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar.xz
go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.tar.zst
go-tangerine-993b145f25845e50e8af41ffb1116eaee381d693.zip
swarm/storage/localstore: fix export db.Put signature
cmd/swarm/swarm-smoke: improve smoke tests (#1337) swarm/network: remove dead code (#1339) swarm/network: remove FetchStore and SyncChunkStore in favor of NetStore (#1342)
Diffstat (limited to 'swarm/network')
-rw-r--r--swarm/network/hive.go2
-rw-r--r--swarm/network/kademlia.go15
-rw-r--r--swarm/network/kademlia_test.go2
-rw-r--r--swarm/network/stream/delivery.go149
-rw-r--r--swarm/network/stream/delivery_test.go162
-rw-r--r--swarm/network/stream/intervals_test.go10
-rw-r--r--swarm/network/stream/lightnode_test.go89
-rw-r--r--swarm/network/stream/messages.go4
-rw-r--r--swarm/network/stream/peer.go2
-rw-r--r--swarm/network/stream/snapshot_retrieval_test.go1
-rw-r--r--swarm/network/stream/snapshot_sync_test.go1
-rw-r--r--swarm/network/stream/stream.go124
-rw-r--r--swarm/network/stream/streamer_test.go21
-rw-r--r--swarm/network/stream/syncer.go82
-rw-r--r--swarm/network/stream/syncer_test.go7
15 files changed, 161 insertions, 510 deletions
diff --git a/swarm/network/hive.go b/swarm/network/hive.go
index 2eb521f1d..ad51b29c2 100644
--- a/swarm/network/hive.go
+++ b/swarm/network/hive.go
@@ -116,7 +116,7 @@ func (h *Hive) Stop() error {
log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4]))
h.EachConn(nil, 255, func(p *Peer, _ int) bool {
log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4]))
- p.Drop(nil)
+ p.Drop()
return true
})
diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go
index dd6de44fd..f553cb5f4 100644
--- a/swarm/network/kademlia.go
+++ b/swarm/network/kademlia.go
@@ -25,6 +25,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/pot"
sv "github.com/ethereum/go-ethereum/swarm/version"
@@ -138,6 +139,9 @@ func (e *entry) Hex() string {
func (k *Kademlia) Register(peers ...*BzzAddr) error {
k.lock.Lock()
defer k.lock.Unlock()
+
+ metrics.GetOrRegisterCounter("kad.register", nil).Inc(1)
+
var known, size int
for _, p := range peers {
log.Trace("kademlia trying to register", "addr", p)
@@ -164,8 +168,6 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error {
return newEntry(p)
}
- log.Trace("found among known peers, underlay addr is same, do nothing", "new", p, "old", e.BzzAddr)
-
return v
})
if found {
@@ -186,6 +188,9 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error {
func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, changed bool) {
k.lock.Lock()
defer k.lock.Unlock()
+
+ metrics.GetOrRegisterCounter("kad.suggestpeer", nil).Inc(1)
+
radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base)
// collect undersaturated bins in ascending order of number of connected peers
// and from shallow to deep (ascending order of PO)
@@ -297,6 +302,9 @@ func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, c
func (k *Kademlia) On(p *Peer) (uint8, bool) {
k.lock.Lock()
defer k.lock.Unlock()
+
+ metrics.GetOrRegisterCounter("kad.on", nil).Inc(1)
+
var ins bool
k.conns, _, _, _ = pot.Swap(k.conns, p, Pof, func(v pot.Val) pot.Val {
// if not found live
@@ -320,7 +328,6 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) {
k.addrCountC <- k.addrs.Size()
}
}
- log.Trace(k.string())
// calculate if depth of saturation changed
depth := uint8(k.saturation())
var changed bool
@@ -608,7 +615,7 @@ func (k *Kademlia) string() string {
if len(sv.GitCommit) > 0 {
rows = append(rows, fmt.Sprintf("commit hash: %s", sv.GitCommit))
}
- rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr()[:3]))
+ rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr()))
rows = append(rows, fmt.Sprintf("population: %d (%d), NeighbourhoodSize: %d, MinBinSize: %d, MaxBinSize: %d", k.conns.Size(), k.addrs.Size(), k.NeighbourhoodSize, k.MinBinSize, k.MaxBinSize))
liverows := make([]string, k.MaxProxDisplay)
diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go
index b4663eee5..93b990138 100644
--- a/swarm/network/kademlia_test.go
+++ b/swarm/network/kademlia_test.go
@@ -541,7 +541,7 @@ func TestKademliaHiveString(t *testing.T) {
tk.Register("10000000", "10000001")
tk.MaxProxDisplay = 8
h := tk.String()
- expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 000000\npopulation: 2 (4), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n============ DEPTH: 0 ==========================================\n000 0 | 2 8100 (0) 8000 (0)\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n========================================================================="
+ expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 0000000000000000000000000000000000000000000000000000000000000000\npopulation: 2 (4), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n============ DEPTH: 0 ==========================================\n000 0 | 2 8100 (0) 8000 (0)\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n========================================================================="
if expH[104:] != h[104:] {
t.Fatalf("incorrect hive output. expected %v, got %v", expH, h)
}
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 059666723..aa2c817ea 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
+ "time"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/enode"
@@ -33,11 +34,6 @@ import (
olog "github.com/opentracing/opentracing-go/log"
)
-const (
- swarmChunkServerStreamName = "RETRIEVE_REQUEST"
- deliveryCap = 32
-)
-
var (
processReceivedChunksCount = metrics.NewRegisteredCounter("network.stream.received_chunks.count", nil)
handleRetrieveRequestMsgCount = metrics.NewRegisteredCounter("network.stream.handle_retrieve_request_msg.count", nil)
@@ -45,91 +41,23 @@ var (
requestFromPeersCount = metrics.NewRegisteredCounter("network.stream.request_from_peers.count", nil)
requestFromPeersEachCount = metrics.NewRegisteredCounter("network.stream.request_from_peers_each.count", nil)
+
+ lastReceivedChunksMsg = metrics.GetOrRegisterGauge("network.stream.received_chunks", nil)
)
type Delivery struct {
- chunkStore chunk.FetchStore
- kad *network.Kademlia
- getPeer func(enode.ID) *Peer
+ netStore *storage.NetStore
+ kad *network.Kademlia
+ getPeer func(enode.ID) *Peer
+ quit chan struct{}
}
-func NewDelivery(kad *network.Kademlia, chunkStore chunk.FetchStore) *Delivery {
+func NewDelivery(kad *network.Kademlia, netStore *storage.NetStore) *Delivery {
return &Delivery{
- chunkStore: chunkStore,
- kad: kad,
- }
-}
-
-// SwarmChunkServer implements Server
-type SwarmChunkServer struct {
- deliveryC chan []byte
- batchC chan []byte
- chunkStore storage.ChunkStore
- currentLen uint64
- quit chan struct{}
-}
-
-// NewSwarmChunkServer is SwarmChunkServer constructor
-func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer {
- s := &SwarmChunkServer{
- deliveryC: make(chan []byte, deliveryCap),
- batchC: make(chan []byte),
- chunkStore: chunkStore,
- quit: make(chan struct{}),
- }
- go s.processDeliveries()
- return s
-}
-
-// processDeliveries handles delivered chunk hashes
-func (s *SwarmChunkServer) processDeliveries() {
- var hashes []byte
- var batchC chan []byte
- for {
- select {
- case <-s.quit:
- return
- case hash := <-s.deliveryC:
- hashes = append(hashes, hash...)
- batchC = s.batchC
- case batchC <- hashes:
- hashes = nil
- batchC = nil
- }
- }
-}
-
-// SessionIndex returns zero in all cases for SwarmChunkServer.
-func (s *SwarmChunkServer) SessionIndex() (uint64, error) {
- return 0, nil
-}
-
-// SetNextBatch
-func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) {
- select {
- case hashes = <-s.batchC:
- case <-s.quit:
- return
- }
-
- from = s.currentLen
- s.currentLen += uint64(len(hashes))
- to = s.currentLen
- return
-}
-
-// Close needs to be called on a stream server
-func (s *SwarmChunkServer) Close() {
- close(s.quit)
-}
-
-// GetData retrieves chunk data from db store
-func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
- ch, err := s.chunkStore.Get(ctx, chunk.ModeGetRequest, storage.Address(key))
- if err != nil {
- return nil, err
+ netStore: netStore,
+ kad: kad,
+ quit: make(chan struct{}),
}
- return ch.Data(), nil
}
// RetrieveRequestMsg is the protocol msg for chunk retrieve requests
@@ -150,12 +78,6 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
osp.LogFields(olog.String("ref", req.Addr.String()))
- s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true))
- if err != nil {
- return err
- }
- streamer := s.Server.(*SwarmChunkServer)
-
var cancel func()
// TODO: do something with this hardcoded timeout, maybe use TTL in the future
ctx = context.WithValue(ctx, "peer", sp.ID().String())
@@ -165,36 +87,26 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
go func() {
select {
case <-ctx.Done():
- case <-streamer.quit:
+ case <-d.quit:
}
cancel()
}()
go func() {
defer osp.Finish()
- ch, err := d.chunkStore.Get(ctx, chunk.ModeGetRequest, req.Addr)
+ ch, err := d.netStore.Get(ctx, chunk.ModeGetRequest, req.Addr)
if err != nil {
retrieveChunkFail.Inc(1)
log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err)
return
}
- if req.SkipCheck {
- syncing := false
- osp.LogFields(olog.Bool("skipCheck", true))
+ syncing := false
- err = sp.Deliver(ctx, ch, s.priority, syncing)
- if err != nil {
- log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
- }
- osp.LogFields(olog.Bool("delivered", true))
- return
- }
- osp.LogFields(olog.Bool("skipCheck", false))
- select {
- case streamer.deliveryC <- ch.Address()[:]:
- case <-streamer.quit:
+ err = sp.Deliver(ctx, ch, Top, syncing)
+ if err != nil {
+ log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
}
-
+ osp.LogFields(olog.Bool("delivered", true))
}()
return nil
@@ -225,6 +137,9 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int
processReceivedChunksCount.Inc(1)
+ // record the last time we received a chunk delivery message
+ lastReceivedChunksMsg.Update(time.Now().UnixNano())
+
var msg *ChunkDeliveryMsg
var mode chunk.ModePut
switch r := req.(type) {
@@ -244,31 +159,25 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int
case *ChunkDeliveryMsgSyncing:
msg = (*ChunkDeliveryMsg)(r)
mode = chunk.ModePutSync
+ case *ChunkDeliveryMsg:
+ msg = r
+ mode = chunk.ModePutSync
}
- // retrieve the span for the originating retrieverequest
- spanID := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), msg.Addr)
- span := tracing.ShiftSpanByKey(spanID)
-
log.Trace("handle.chunk.delivery", "ref", msg.Addr, "from peer", sp.ID())
go func() {
defer osp.Finish()
- if span != nil {
- span.LogFields(olog.String("finish", "from handleChunkDeliveryMsg"))
- defer span.Finish()
- }
-
msg.peer = sp
log.Trace("handle.chunk.delivery", "put", msg.Addr)
- _, err := d.chunkStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData))
+ _, err := d.netStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData))
if err != nil {
if err == storage.ErrChunkInvalid {
// we removed this log because it spams the logs
// TODO: Enable this log line
// log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", msg.Addr, )
- msg.peer.Drop(err)
+ msg.peer.Drop()
}
}
log.Trace("handle.chunk.delivery", "done put", msg.Addr, "err", err)
@@ -276,6 +185,12 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int
return nil
}
+func (d *Delivery) Close() {
+ d.kad.CloseNeighbourhoodDepthC()
+ d.kad.CloseAddrCountC()
+ close(d.quit)
+}
+
// RequestFromPeers sends a chunk retrieve request to a peer
// The most eligible peer that hasn't already been sent to is chosen
// TODO: define "eligible"
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index 801e6d98a..4037243c1 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -41,64 +41,11 @@ import (
"github.com/ethereum/go-ethereum/swarm/testutil"
)
-//Tests initializing a retrieve request
-func TestStreamerRetrieveRequest(t *testing.T) {
- regOpts := &RegistryOptions{
- Retrieval: RetrievalClientOnly,
- Syncing: SyncingDisabled,
- }
- tester, streamer, _, teardown, err := newStreamerTester(regOpts)
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- node := tester.Nodes[0]
-
- ctx := context.Background()
- req := network.NewRequest(
- storage.Address(hash0[:]),
- true,
- &sync.Map{},
- )
- streamer.delivery.RequestFromPeers(ctx, req)
-
- stream := NewStream(swarmChunkServerStreamName, "", true)
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "RetrieveRequestMsg",
- Expects: []p2ptest.Expect{
- { //start expecting a subscription for RETRIEVE_REQUEST due to `RetrievalClientOnly`
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- History: nil,
- Priority: Top,
- },
- Peer: node.ID(),
- },
- { //expect a retrieve request message for the given hash
- Code: 5,
- Msg: &RetrieveRequestMsg{
- Addr: hash0[:],
- SkipCheck: true,
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatalf("Expected no error, got %v", err)
- }
-}
-
//Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet)
//Should time out as the peer does not have the chunk (no syncing happened previously)
func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
- Retrieval: RetrievalEnabled,
- Syncing: SyncingDisabled, //do no syncing
+ tester, _, _, teardown, err := newStreamerTester(&RegistryOptions{
+ Syncing: SyncingDisabled, //do no syncing
})
if err != nil {
t.Fatal(err)
@@ -109,30 +56,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
chunk := storage.NewChunk(storage.Address(hash0[:]), nil)
- peer := streamer.getPeer(node.ID())
-
- stream := NewStream(swarmChunkServerStreamName, "", true)
- //simulate pre-subscription to RETRIEVE_REQUEST stream on peer
- peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{
- Stream: stream,
- History: nil,
- Priority: Top,
- })
-
//test the exchange
err = tester.TestExchanges(p2ptest.Exchange{
- Expects: []p2ptest.Expect{
- { //first expect a subscription to the RETRIEVE_REQUEST stream
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- History: nil,
- Priority: Top,
- },
- Peer: node.ID(),
- },
- },
- }, p2ptest.Exchange{
Label: "RetrieveRequestMsg",
Triggers: []p2ptest.Trigger{
{ //then the actual RETRIEVE_REQUEST....
@@ -159,7 +84,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
//should fail with a timeout as the peer we are requesting
//the chunk from does not have the chunk
- expectedError := `exchange #1 "RetrieveRequestMsg": timed out`
+ expectedError := `exchange #0 "RetrieveRequestMsg": timed out`
if err == nil || err.Error() != expectedError {
t.Fatalf("Expected error %v, got %v", expectedError, err)
}
@@ -168,9 +93,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
// upstream request server receives a retrieve Request and responds with
// offered hashes or delivery if skipHash is set to true
func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
- tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{
- Retrieval: RetrievalEnabled,
- Syncing: SyncingDisabled,
+ tester, _, localStore, teardown, err := newStreamerTester(&RegistryOptions{
+ Syncing: SyncingDisabled,
})
if err != nil {
t.Fatal(err)
@@ -179,36 +103,14 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
node := tester.Nodes[0]
- peer := streamer.getPeer(node.ID())
-
- stream := NewStream(swarmChunkServerStreamName, "", true)
-
- peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{
- Stream: stream,
- History: nil,
- Priority: Top,
- })
-
- hash := storage.Address(hash0[:])
- ch := storage.NewChunk(hash, hash)
+ hash := storage.Address(hash1[:])
+ ch := storage.NewChunk(hash, hash1[:])
_, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatalf("Expected no err got %v", err)
}
err = tester.TestExchanges(p2ptest.Exchange{
- Expects: []p2ptest.Expect{
- {
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- History: nil,
- Priority: Top,
- },
- Peer: node.ID(),
- },
- },
- }, p2ptest.Exchange{
Label: "RetrieveRequestMsg",
Triggers: []p2ptest.Trigger{
{
@@ -221,51 +123,10 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
},
Expects: []p2ptest.Expect{
{
- Code: 1,
- Msg: &OfferedHashesMsg{
- HandoverProof: &HandoverProof{
- Handover: &Handover{},
- },
- Hashes: hash,
- From: 0,
- // TODO: why is this 32???
- To: 32,
- Stream: stream,
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatal(err)
- }
-
- hash = storage.Address(hash1[:])
- ch = storage.NewChunk(hash, hash1[:])
- _, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch)
- if err != nil {
- t.Fatalf("Expected no err got %v", err)
- }
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "RetrieveRequestMsg",
- Triggers: []p2ptest.Trigger{
- {
- Code: 5,
- Msg: &RetrieveRequestMsg{
- Addr: hash,
- SkipCheck: true,
- },
- Peer: node.ID(),
- },
- },
- Expects: []p2ptest.Expect{
- {
Code: 6,
Msg: &ChunkDeliveryMsg{
- Addr: hash,
- SData: hash,
+ Addr: ch.Address(),
+ SData: ch.Data(),
},
Peer: node.ID(),
},
@@ -359,8 +220,7 @@ func TestRequestFromPeersWithLightNode(t *testing.T) {
func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{
- Retrieval: RetrievalDisabled,
- Syncing: SyncingDisabled,
+ Syncing: SyncingDisabled,
})
if err != nil {
t.Fatal(err)
@@ -472,7 +332,6 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
- Retrieval: RetrievalEnabled,
}, nil)
bucket.Store(bucketKeyRegistry, r)
@@ -623,7 +482,6 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
- Retrieval: RetrievalDisabled,
SyncUpdateDelay: 0,
}, nil)
bucket.Store(bucketKeyRegistry, r)
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go
index 1f2cdcada..660954857 100644
--- a/swarm/network/stream/intervals_test.go
+++ b/swarm/network/stream/intervals_test.go
@@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
- "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
@@ -67,7 +66,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
}
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- Retrieval: RetrievalDisabled,
Syncing: SyncingRegisterOnly,
SkipCheck: skipCheck,
}, nil)
@@ -288,20 +286,20 @@ func enableNotifications(r *Registry, peerID enode.ID, s Stream) error {
type testExternalClient struct {
hashes chan []byte
- store chunk.FetchStore
+ netStore *storage.NetStore
enableNotificationsC chan struct{}
}
-func newTestExternalClient(store chunk.FetchStore) *testExternalClient {
+func newTestExternalClient(netStore *storage.NetStore) *testExternalClient {
return &testExternalClient{
hashes: make(chan []byte),
- store: store,
+ netStore: netStore,
enableNotificationsC: make(chan struct{}),
}
}
func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
- wait := c.store.FetchFunc(ctx, storage.Address(hash))
+ wait := c.netStore.FetchFunc(ctx, storage.Address(hash))
if wait == nil {
return nil
}
diff --git a/swarm/network/stream/lightnode_test.go b/swarm/network/stream/lightnode_test.go
index 501660fab..eb4e73d47 100644
--- a/swarm/network/stream/lightnode_test.go
+++ b/swarm/network/stream/lightnode_test.go
@@ -22,94 +22,10 @@ import (
)
// This test checks the default behavior of the server, that is
-// when it is serving Retrieve requests.
-func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) {
- registryOptions := &RegistryOptions{
- Retrieval: RetrievalClientOnly,
- Syncing: SyncingDisabled,
- }
- tester, _, _, teardown, err := newStreamerTester(registryOptions)
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- node := tester.Nodes[0]
-
- stream := NewStream(swarmChunkServerStreamName, "", false)
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "SubscribeMsg",
- Triggers: []p2ptest.Trigger{
- {
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- },
- Peer: node.ID(),
- },
- },
- })
- if err != nil {
- t.Fatalf("Got %v", err)
- }
-
- err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID()})
- if err == nil || err.Error() != "timed out waiting for peers to disconnect" {
- t.Fatalf("Expected no disconnect, got %v", err)
- }
-}
-
-// This test checks the Lightnode behavior of server, when serving Retrieve
-// requests are disabled
-func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) {
- registryOptions := &RegistryOptions{
- Retrieval: RetrievalDisabled,
- Syncing: SyncingDisabled,
- }
- tester, _, _, teardown, err := newStreamerTester(registryOptions)
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- node := tester.Nodes[0]
-
- stream := NewStream(swarmChunkServerStreamName, "", false)
-
- err = tester.TestExchanges(
- p2ptest.Exchange{
- Label: "SubscribeMsg",
- Triggers: []p2ptest.Trigger{
- {
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- },
- Peer: node.ID(),
- },
- },
- Expects: []p2ptest.Expect{
- {
- Code: 7,
- Msg: &SubscribeErrorMsg{
- Error: "stream RETRIEVE_REQUEST not registered",
- },
- Peer: node.ID(),
- },
- },
- })
- if err != nil {
- t.Fatalf("Got %v", err)
- }
-}
-
-// This test checks the default behavior of the server, that is
// when syncing is enabled.
func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) {
registryOptions := &RegistryOptions{
- Retrieval: RetrievalDisabled,
- Syncing: SyncingRegisterOnly,
+ Syncing: SyncingRegisterOnly,
}
tester, _, _, teardown, err := newStreamerTester(registryOptions)
if err != nil {
@@ -153,8 +69,7 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) {
// when syncing is disabled.
func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) {
registryOptions := &RegistryOptions{
- Retrieval: RetrievalDisabled,
- Syncing: SyncingDisabled,
+ Syncing: SyncingDisabled,
}
tester, _, _, teardown, err := newStreamerTester(registryOptions)
if err != nil {
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index b293724cc..b60d2fcc9 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -247,7 +247,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
case err := <-errC:
if err != nil {
log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err)
- p.Drop(err)
+ p.Drop()
return
}
case <-ctx.Done():
@@ -289,7 +289,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
case err := <-c.next:
if err != nil {
log.Warn("c.next error dropping peer", "err", err)
- p.Drop(err)
+ p.Drop()
return
}
case <-c.quit:
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 152814bd4..98b237ce2 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -90,7 +90,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
err := p.Send(wmsg.Context, wmsg.Msg)
if err != nil {
log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
- p.Drop(err)
+ p.Drop()
}
})
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index 2d5935276..e34f87951 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -119,7 +119,6 @@ var retrievalSimServiceMap = map[string]simulation.ServiceFunc{
}
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- Retrieval: RetrievalEnabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: syncUpdateDelay,
}, nil)
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 605c9dbeb..fefdb7c9f 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -118,7 +118,6 @@ var simServiceMap = map[string]simulation.ServiceFunc{
store := state.NewInmemoryStore()
r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{
- Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
}, nil)
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 0d990da5c..10a8f7ec5 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -18,7 +18,6 @@ package stream
import (
"context"
- "errors"
"fmt"
"math"
"reflect"
@@ -30,11 +29,11 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/rpc"
- "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
"github.com/ethereum/go-ethereum/swarm/state"
+ "github.com/ethereum/go-ethereum/swarm/storage"
)
const (
@@ -49,7 +48,6 @@ const (
// Enumerate options for syncing and retrieval
type SyncingOption int
-type RetrievalOption int
// Syncing options
const (
@@ -61,17 +59,6 @@ const (
SyncingAutoSubscribe
)
-const (
- // Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only)
- RetrievalDisabled RetrievalOption = iota
- // Only the client side of the retrieve request is registered.
- // (light nodes do not serve retrieve requests)
- // once the client is registered, subscription to retrieve request stream is always sent
- RetrievalClientOnly
- // Both client and server funcs are registered, subscribe sent automatically
- RetrievalEnabled
-)
-
// subscriptionFunc is used to determine what to do in order to perform subscriptions
// usually we would start to really subscribe to nodes, but for tests other functionality may be needed
// (see TestRequestPeerSubscriptions in streamer_test.go)
@@ -90,7 +77,6 @@ type Registry struct {
peers map[enode.ID]*Peer
delivery *Delivery
intervalsStore state.Store
- autoRetrieval bool // automatically subscribe to retrieve request stream
maxPeerServers int
spec *protocols.Spec //this protocol's spec
balance protocols.Balance //implements protocols.Balance, for accounting
@@ -101,22 +87,19 @@ type Registry struct {
// RegistryOptions holds optional values for NewRegistry constructor.
type RegistryOptions struct {
SkipCheck bool
- Syncing SyncingOption // Defines syncing behavior
- Retrieval RetrievalOption // Defines retrieval behavior
+ Syncing SyncingOption // Defines syncing behavior
SyncUpdateDelay time.Duration
MaxPeerServers int // The limit of servers for each peer in registry
}
// NewRegistry is Streamer constructor
-func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore chunk.FetchStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry {
+func NewRegistry(localID enode.ID, delivery *Delivery, netStore *storage.NetStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry {
if options == nil {
options = &RegistryOptions{}
}
if options.SyncUpdateDelay <= 0 {
options.SyncUpdateDelay = 15 * time.Second
}
- // check if retrieval has been disabled
- retrieval := options.Retrieval != RetrievalDisabled
quit := make(chan struct{})
@@ -128,7 +111,6 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore chunk.Fetc
peers: make(map[enode.ID]*Peer),
delivery: delivery,
intervalsStore: intervalsStore,
- autoRetrieval: retrieval,
maxPeerServers: options.MaxPeerServers,
balance: balance,
quit: quit,
@@ -139,27 +121,10 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore chunk.Fetc
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer
- // if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only)
- if options.Retrieval == RetrievalEnabled {
- streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) {
- if !live {
- return nil, errors.New("only live retrieval requests supported")
- }
- return NewSwarmChunkServer(delivery.chunkStore), nil
- })
- }
-
- // if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests)
- if options.Retrieval != RetrievalDisabled {
- streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
- return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
- })
- }
-
// If syncing is not disabled, the syncing functions are registered (both client and server)
if options.Syncing != SyncingDisabled {
- RegisterSwarmSyncerServer(streamer, syncChunkStore)
- RegisterSwarmSyncerClient(streamer, syncChunkStore)
+ RegisterSwarmSyncerServer(streamer, netStore)
+ RegisterSwarmSyncerClient(streamer, netStore)
}
// if syncing is set to automatically subscribe to the syncing stream, start the subscription process
@@ -381,7 +346,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8
}
log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h)
- return peer.SendPriority(context.TODO(), msg, priority)
+ return peer.Send(context.TODO(), msg)
}
func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error {
@@ -422,8 +387,7 @@ func (r *Registry) Quit(peerId enode.ID, s Stream) error {
func (r *Registry) Close() error {
// Stop sending neighborhood depth change and address count
// change from Kademlia that were initiated in NewRegistry constructor.
- r.delivery.kad.CloseNeighbourhoodDepthC()
- r.delivery.kad.CloseAddrCountC()
+ r.delivery.Close()
close(r.quit)
return r.intervalsStore.Close()
}
@@ -464,13 +428,6 @@ func (r *Registry) Run(p *network.BzzPeer) error {
defer close(sp.quit)
defer sp.close()
- if r.autoRetrieval && !p.LightNode {
- err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top)
- if err != nil {
- return err
- }
- }
-
return sp.Run(sp.HandleMsg)
}
@@ -619,19 +576,66 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
return p.handleUnsubscribeMsg(msg)
case *OfferedHashesMsg:
- return p.handleOfferedHashesMsg(ctx, msg)
+ go func() {
+ err := p.handleOfferedHashesMsg(ctx, msg)
+ if err != nil {
+ log.Error(err.Error())
+ p.Drop()
+ }
+ }()
+ return nil
case *TakeoverProofMsg:
- return p.handleTakeoverProofMsg(ctx, msg)
+ go func() {
+ err := p.handleTakeoverProofMsg(ctx, msg)
+ if err != nil {
+ log.Error(err.Error())
+ p.Drop()
+ }
+ }()
+ return nil
case *WantedHashesMsg:
- return p.handleWantedHashesMsg(ctx, msg)
+ go func() {
+ err := p.handleWantedHashesMsg(ctx, msg)
+ if err != nil {
+ log.Error(err.Error())
+ p.Drop()
+ }
+ }()
+ return nil
+
+ case *ChunkDeliveryMsgRetrieval:
+ // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
+ go func() {
+ err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
+ if err != nil {
+ log.Error(err.Error())
+ p.Drop()
+ }
+ }()
+ return nil
- case *ChunkDeliveryMsgRetrieval, *ChunkDeliveryMsgSyncing:
- return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, msg)
+ case *ChunkDeliveryMsgSyncing:
+ // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
+ go func() {
+ err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
+ if err != nil {
+ log.Error(err.Error())
+ p.Drop()
+ }
+ }()
+ return nil
case *RetrieveRequestMsg:
- return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg)
+ go func() {
+ err := p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg)
+ if err != nil {
+ log.Error(err.Error())
+ p.Drop()
+ }
+ }()
+ return nil
case *RequestSubscriptionMsg:
return p.handleRequestSubscription(ctx, msg)
@@ -762,7 +766,7 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error
return err
}
- if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil {
+ if err := p.Send(context.TODO(), tp); err != nil {
return err
}
if c.to > 0 && tp.Takeover.End >= c.to {
@@ -964,15 +968,13 @@ func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error {
}
/*
-GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has.
+GetPeerServerSubscriptions is a API function which allows to query a peer for stream subscriptions it has.
It can be called via RPC.
It returns a map of node IDs with an array of string representations of Stream objects.
*/
-func (api *API) GetPeerSubscriptions() map[string][]string {
- //create the empty map
+func (api *API) GetPeerServerSubscriptions() map[string][]string {
pstreams := make(map[string][]string)
- //iterate all streamer peers
api.streamer.peersMu.RLock()
defer api.streamer.peersMu.RUnlock()
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index bdd3087bb..c7da05014 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -539,7 +539,7 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
t.Fatal(err)
}
- expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)")
+ expectedError := errors.New("subprotocol error")
if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: expectedError}); err != nil {
t.Fatal(err)
}
@@ -779,7 +779,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
var maxPeerServers = 6
tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
- Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
MaxPeerServers: maxPeerServers,
})
@@ -940,8 +939,7 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
//`Price` interface implementation
func TestHasPriceImplementation(t *testing.T) {
_, r, _, teardown, err := newStreamerTester(&RegistryOptions{
- Retrieval: RetrievalDisabled,
- Syncing: SyncingDisabled,
+ Syncing: SyncingDisabled,
})
if err != nil {
t.Fatal(err)
@@ -1123,8 +1121,8 @@ func TestRequestPeerSubscriptions(t *testing.T) {
}
}
-// TestGetSubscriptions is a unit test for the api.GetPeerSubscriptions() function
-func TestGetSubscriptions(t *testing.T) {
+// TestGetServerSubscriptions is a unit test for the api.GetPeerServerSubscriptions() function
+func TestGetServerSubscriptions(t *testing.T) {
// create an amount of dummy peers
testPeerCount := 8
// every peer will have this amount of dummy servers
@@ -1135,7 +1133,7 @@ func TestGetSubscriptions(t *testing.T) {
r := &Registry{}
api := NewAPI(r)
// call once, at this point should be empty
- regs := api.GetPeerSubscriptions()
+ regs := api.GetPeerServerSubscriptions()
if len(regs) != 0 {
t.Fatal("Expected subscription count to be 0, but it is not")
}
@@ -1159,7 +1157,7 @@ func TestGetSubscriptions(t *testing.T) {
r.peers = peerMap
// call the subscriptions again
- regs = api.GetPeerSubscriptions()
+ regs = api.GetPeerServerSubscriptions()
// count how many (fake) subscriptions there are
cnt := 0
for _, reg := range regs {
@@ -1175,11 +1173,11 @@ func TestGetSubscriptions(t *testing.T) {
}
/*
-TestGetSubscriptionsRPC sets up a simulation network of `nodeCount` nodes,
+TestGetServerSubscriptionsRPC sets up a simulation network of `nodeCount` nodes,
starts the simulation, waits for SyncUpdateDelay in order to kick off
stream registration, then tests that there are subscriptions.
*/
-func TestGetSubscriptionsRPC(t *testing.T) {
+func TestGetServerSubscriptionsRPC(t *testing.T) {
if testutil.RaceEnabled && os.Getenv("TRAVIS") == "true" {
t.Skip("flaky with -race on Travis")
@@ -1226,7 +1224,6 @@ func TestGetSubscriptionsRPC(t *testing.T) {
// configure so that sync registrations actually happen
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- Retrieval: RetrievalEnabled,
Syncing: SyncingAutoSubscribe, //enable sync registrations
SyncUpdateDelay: syncUpdateDelay,
}, nil)
@@ -1321,7 +1318,7 @@ func TestGetSubscriptionsRPC(t *testing.T) {
//ask it for subscriptions
pstreams := make(map[string][]string)
- err = client.Call(&pstreams, "stream_getPeerSubscriptions")
+ err = client.Call(&pstreams, "stream_getPeerServerSubscriptions")
if err != nil {
return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err)
}
diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go
index c573da5d2..79b04a307 100644
--- a/swarm/network/stream/syncer.go
+++ b/swarm/network/stream/syncer.go
@@ -34,27 +34,27 @@ const (
// * live request delivery with or without checkback
// * (live/non-live historical) chunk syncing per proximity bin
type SwarmSyncerServer struct {
- po uint8
- store chunk.FetchStore
- quit chan struct{}
+ po uint8
+ netStore *storage.NetStore
+ quit chan struct{}
}
// NewSwarmSyncerServer is constructor for SwarmSyncerServer
-func NewSwarmSyncerServer(po uint8, syncChunkStore chunk.FetchStore) (*SwarmSyncerServer, error) {
+func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore) (*SwarmSyncerServer, error) {
return &SwarmSyncerServer{
- po: po,
- store: syncChunkStore,
- quit: make(chan struct{}),
+ po: po,
+ netStore: netStore,
+ quit: make(chan struct{}),
}, nil
}
-func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore chunk.FetchStore) {
+func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore) {
streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) {
po, err := ParseSyncBinKey(t)
if err != nil {
return nil, err
}
- return NewSwarmSyncerServer(po, syncChunkStore)
+ return NewSwarmSyncerServer(po, netStore)
})
// streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) {
// return NewOutgoingProvableSwarmSyncer(po, db)
@@ -68,7 +68,7 @@ func (s *SwarmSyncerServer) Close() {
// GetData retrieves the actual chunk from netstore
func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
- ch, err := s.store.Get(ctx, chunk.ModeGetSync, storage.Address(key))
+ ch, err := s.netStore.Get(ctx, chunk.ModeGetSync, storage.Address(key))
if err != nil {
return nil, err
}
@@ -77,7 +77,7 @@ func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, er
// SessionIndex returns current storage bin (po) index.
func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
- return s.store.LastPullSubscriptionBinID(s.po)
+ return s.netStore.LastPullSubscriptionBinID(s.po)
}
// SetNextBatch retrieves the next batch of hashes from the localstore.
@@ -88,7 +88,7 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
// are added in batchTimeout period, the batch will be returned. This function
// will block until new chunks are received from localstore pull subscription.
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
- descriptors, stop := s.store.SubscribePull(context.Background(), s.po, from, to)
+ descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to)
defer stop()
const batchTimeout = 2 * time.Second
@@ -118,7 +118,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
// This is the most naive approach to label the chunk as synced
// allowing it to be garbage collected. A proper way requires
// validating that the chunk is successfully stored by the peer.
- err := s.store.Set(context.Background(), chunk.ModeSetSync, d.Address)
+ err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address)
if err != nil {
return nil, 0, 0, nil, err
}
@@ -158,67 +158,31 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
// SwarmSyncerClient
type SwarmSyncerClient struct {
- store chunk.FetchStore
- peer *Peer
- stream Stream
+ netStore *storage.NetStore
+ peer *Peer
+ stream Stream
}
// NewSwarmSyncerClient is a contructor for provable data exchange syncer
-func NewSwarmSyncerClient(p *Peer, store chunk.FetchStore, stream Stream) (*SwarmSyncerClient, error) {
+func NewSwarmSyncerClient(p *Peer, netStore *storage.NetStore, stream Stream) (*SwarmSyncerClient, error) {
return &SwarmSyncerClient{
- store: store,
- peer: p,
- stream: stream,
+ netStore: netStore,
+ peer: p,
+ stream: stream,
}, nil
}
-// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer
-// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient {
-// retrieveC := make(storage.Chunk, chunksCap)
-// RunChunkRequestor(p, retrieveC)
-// storeC := make(storage.Chunk, chunksCap)
-// RunChunkStorer(store, storeC)
-// s := &SwarmSyncerClient{
-// po: po,
-// priority: priority,
-// sessionAt: sessionAt,
-// start: index,
-// end: index,
-// nextC: make(chan struct{}, 1),
-// intervals: intervals,
-// sessionRoot: sessionRoot,
-// sessionReader: chunker.Join(sessionRoot, retrieveC),
-// retrieveC: retrieveC,
-// storeC: storeC,
-// }
-// return s
-// }
-
-// // StartSyncing is called on the Peer to start the syncing process
-// // the idea is that it is called only after kademlia is close to healthy
-// func StartSyncing(s *Streamer, peerId enode.ID, po uint8, nn bool) {
-// lastPO := po
-// if nn {
-// lastPO = maxPO
-// }
-//
-// for i := po; i <= lastPO; i++ {
-// s.Subscribe(peerId, "SYNC", newSyncLabel("LIVE", po), 0, 0, High, true)
-// s.Subscribe(peerId, "SYNC", newSyncLabel("HISTORY", po), 0, 0, Mid, false)
-// }
-// }
-
// RegisterSwarmSyncerClient registers the client constructor function for
// to handle incoming sync streams
-func RegisterSwarmSyncerClient(streamer *Registry, store chunk.FetchStore) {
+func RegisterSwarmSyncerClient(streamer *Registry, netStore *storage.NetStore) {
streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) {
- return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live))
+ return NewSwarmSyncerClient(p, netStore, NewStream("SYNC", t, live))
})
}
// NeedData
func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) {
- return s.store.FetchFunc(ctx, key)
+ return s.netStore.FetchFunc(ctx, key)
}
// BatchDone
diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go
index a8651f386..b787c7bb8 100644
--- a/swarm/network/stream/syncer_test.go
+++ b/swarm/network/stream/syncer_test.go
@@ -83,7 +83,6 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
}
r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{
- Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
SkipCheck: skipCheck,
}, nil)
@@ -232,8 +231,7 @@ func TestSameVersionID(t *testing.T) {
}
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- Retrieval: RetrievalDisabled,
- Syncing: SyncingAutoSubscribe,
+ Syncing: SyncingAutoSubscribe,
}, nil)
bucket.Store(bucketKeyRegistry, r)
@@ -296,8 +294,7 @@ func TestDifferentVersionID(t *testing.T) {
}
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- Retrieval: RetrievalDisabled,
- Syncing: SyncingAutoSubscribe,
+ Syncing: SyncingAutoSubscribe,
}, nil)
bucket.Store(bucketKeyRegistry, r)