aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network')
-rw-r--r--swarm/network/hive_test.go6
-rw-r--r--swarm/network/kademlia.go20
-rw-r--r--swarm/network/kademlia_test.go58
-rw-r--r--swarm/network/protocol_test.go16
-rw-r--r--swarm/network/simulation/bucket_test.go4
-rw-r--r--swarm/network/stream/delivery.go5
-rw-r--r--swarm/network/stream/delivery_test.go180
-rw-r--r--swarm/network/stream/intervals_test.go2
-rw-r--r--swarm/network/stream/lightnode_test.go12
-rw-r--r--swarm/network/stream/snapshot_retrieval_test.go5
-rw-r--r--swarm/network/stream/snapshot_sync_test.go8
-rw-r--r--swarm/network/stream/stream.go57
-rw-r--r--swarm/network/stream/streamer_test.go2
-rw-r--r--swarm/network/stream/syncer_test.go5
14 files changed, 299 insertions, 81 deletions
diff --git a/swarm/network/hive_test.go b/swarm/network/hive_test.go
index 059c3dc96..56adc5a8e 100644
--- a/swarm/network/hive_test.go
+++ b/swarm/network/hive_test.go
@@ -70,6 +70,9 @@ func TestHiveStatePersistance(t *testing.T) {
defer os.RemoveAll(dir)
store, err := state.NewDBStore(dir) //start the hive with an empty dbstore
+ if err != nil {
+ t.Fatal(err)
+ }
params := NewHiveParams()
s, pp := newHiveTester(t, params, 5, store)
@@ -90,6 +93,9 @@ func TestHiveStatePersistance(t *testing.T) {
store.Close()
persistedStore, err := state.NewDBStore(dir) //start the hive with an empty dbstore
+ if err != nil {
+ t.Fatal(err)
+ }
s1, pp := newHiveTester(t, params, 1, persistedStore)
diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go
index 55a0c6f13..cd94741be 100644
--- a/swarm/network/kademlia.go
+++ b/swarm/network/kademlia.go
@@ -261,7 +261,7 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) {
// found among live peers, do nothing
return v
})
- if ins {
+ if ins && !p.BzzPeer.LightNode {
a := newEntry(p.BzzAddr)
a.conn = p
// insert new online peer into addrs
@@ -329,14 +329,18 @@ func (k *Kademlia) Off(p *Peer) {
k.lock.Lock()
defer k.lock.Unlock()
var del bool
- k.addrs, _, _, _ = pot.Swap(k.addrs, p, pof, func(v pot.Val) pot.Val {
- // v cannot be nil, must check otherwise we overwrite entry
- if v == nil {
- panic(fmt.Sprintf("connected peer not found %v", p))
- }
+ if !p.BzzPeer.LightNode {
+ k.addrs, _, _, _ = pot.Swap(k.addrs, p, pof, func(v pot.Val) pot.Val {
+ // v cannot be nil, must check otherwise we overwrite entry
+ if v == nil {
+ panic(fmt.Sprintf("connected peer not found %v", p))
+ }
+ del = true
+ return newEntry(p.BzzAddr)
+ })
+ } else {
del = true
- return newEntry(p.BzzAddr)
- })
+ }
if del {
k.conns, _, _, _ = pot.Swap(k.conns, p, pof, func(_ pot.Val) pot.Val {
diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go
index 903c8dbda..d2e051f45 100644
--- a/swarm/network/kademlia_test.go
+++ b/swarm/network/kademlia_test.go
@@ -46,19 +46,19 @@ func newTestKademlia(b string) *Kademlia {
return NewKademlia(base, params)
}
-func newTestKadPeer(k *Kademlia, s string) *Peer {
- return NewPeer(&BzzPeer{BzzAddr: testKadPeerAddr(s)}, k)
+func newTestKadPeer(k *Kademlia, s string, lightNode bool) *Peer {
+ return NewPeer(&BzzPeer{BzzAddr: testKadPeerAddr(s), LightNode: lightNode}, k)
}
func On(k *Kademlia, ons ...string) {
for _, s := range ons {
- k.On(newTestKadPeer(k, s))
+ k.On(newTestKadPeer(k, s, false))
}
}
func Off(k *Kademlia, offs ...string) {
for _, s := range offs {
- k.Off(newTestKadPeer(k, s))
+ k.Off(newTestKadPeer(k, s, false))
}
}
@@ -254,6 +254,56 @@ func TestSuggestPeerFindPeers(t *testing.T) {
}
+// a node should stay in the address book if it's removed from the kademlia
+func TestOffEffectingAddressBookNormalNode(t *testing.T) {
+ k := newTestKademlia("00000000")
+ // peer added to kademlia
+ k.On(newTestKadPeer(k, "01000000", false))
+ // peer should be in the address book
+ if k.addrs.Size() != 1 {
+ t.Fatal("known peer addresses should contain 1 entry")
+ }
+ // peer should be among live connections
+ if k.conns.Size() != 1 {
+ t.Fatal("live peers should contain 1 entry")
+ }
+ // remove peer from kademlia
+ k.Off(newTestKadPeer(k, "01000000", false))
+ // peer should be in the address book
+ if k.addrs.Size() != 1 {
+ t.Fatal("known peer addresses should contain 1 entry")
+ }
+ // peer should not be among live connections
+ if k.conns.Size() != 0 {
+ t.Fatal("live peers should contain 0 entry")
+ }
+}
+
+// a light node should not be in the address book
+func TestOffEffectingAddressBookLightNode(t *testing.T) {
+ k := newTestKademlia("00000000")
+ // light node peer added to kademlia
+ k.On(newTestKadPeer(k, "01000000", true))
+ // peer should not be in the address book
+ if k.addrs.Size() != 0 {
+ t.Fatal("known peer addresses should contain 0 entry")
+ }
+ // peer should be among live connections
+ if k.conns.Size() != 1 {
+ t.Fatal("live peers should contain 1 entry")
+ }
+ // remove peer from kademlia
+ k.Off(newTestKadPeer(k, "01000000", true))
+ // peer should not be in the address book
+ if k.addrs.Size() != 0 {
+ t.Fatal("known peer addresses should contain 0 entry")
+ }
+ // peer should not be among live connections
+ if k.conns.Size() != 0 {
+ t.Fatal("live peers should contain 0 entry")
+ }
+}
+
func TestSuggestPeerRetries(t *testing.T) {
k := newTestKademlia("00000000")
k.RetryInterval = int64(300 * time.Millisecond) // cycle
diff --git a/swarm/network/protocol_test.go b/swarm/network/protocol_test.go
index 4b83c7a27..f0d266628 100644
--- a/swarm/network/protocol_test.go
+++ b/swarm/network/protocol_test.go
@@ -50,10 +50,6 @@ type testStore struct {
values map[string][]byte
}
-func newTestStore() *testStore {
- return &testStore{values: make(map[string][]byte)}
-}
-
func (t *testStore) Load(key string) ([]byte, error) {
t.Lock()
defer t.Unlock()
@@ -157,17 +153,7 @@ func newBzzHandshakeTester(t *testing.T, n int, addr *BzzAddr, lightNode bool) *
// should test handshakes in one exchange? parallelisation
func (s *bzzTester) testHandshake(lhs, rhs *HandshakeMsg, disconnects ...*p2ptest.Disconnect) error {
- var peers []enode.ID
- id := rhs.Addr.ID()
- if len(disconnects) > 0 {
- for _, d := range disconnects {
- peers = append(peers, d.Peer)
- }
- } else {
- peers = []enode.ID{id}
- }
-
- if err := s.TestExchanges(HandshakeMsgExchange(lhs, rhs, id)...); err != nil {
+ if err := s.TestExchanges(HandshakeMsgExchange(lhs, rhs, rhs.Addr.ID())...); err != nil {
return err
}
diff --git a/swarm/network/simulation/bucket_test.go b/swarm/network/simulation/bucket_test.go
index 461d99825..69df19bfe 100644
--- a/swarm/network/simulation/bucket_test.go
+++ b/swarm/network/simulation/bucket_test.go
@@ -94,7 +94,7 @@ func TestServiceBucket(t *testing.T) {
t.Fatalf("expected %q, got %q", customValue, s)
}
- v, ok = sim.NodeItem(id2, customKey)
+ _, ok = sim.NodeItem(id2, customKey)
if ok {
t.Fatal("bucket item should not be found")
}
@@ -119,7 +119,7 @@ func TestServiceBucket(t *testing.T) {
t.Fatalf("expected %q, got %q", testValue+id1.String(), s)
}
- v, ok = items[id2]
+ _, ok = items[id2]
if ok {
t.Errorf("node 2 item should not be found")
}
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 9092ffe3e..0109fbdef 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -245,7 +245,10 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
} else {
d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool {
id := p.ID()
- // TODO: skip light nodes that do not accept retrieve requests
+ if p.LightNode {
+ // skip light nodes
+ return true
+ }
if req.SkipPeer(id.String()) {
log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id)
return true
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index 949645558..c77682e0e 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -29,17 +29,25 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
+ pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
)
+//Tests initializing a retrieve request
func TestStreamerRetrieveRequest(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
+ regOpts := &RegistryOptions{
+ Retrieval: RetrievalClientOnly,
+ Syncing: SyncingDisabled,
+ }
+ tester, streamer, _, teardown, err := newStreamerTester(t, regOpts)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -55,10 +63,21 @@ func TestStreamerRetrieveRequest(t *testing.T) {
)
streamer.delivery.RequestFromPeers(ctx, req)
+ stream := NewStream(swarmChunkServerStreamName, "", true)
+
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
Expects: []p2ptest.Expect{
- {
+ { //start expecting a subscription for RETRIEVE_REQUEST due to `RetrievalClientOnly`
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ History: nil,
+ Priority: Top,
+ },
+ Peer: node.ID(),
+ },
+ { //expect a retrieve request message for the given hash
Code: 5,
Msg: &RetrieveRequestMsg{
Addr: hash0[:],
@@ -74,9 +93,12 @@ func TestStreamerRetrieveRequest(t *testing.T) {
}
}
+//Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet)
+//Should time out as the peer does not have the chunk (no syncing happened previously)
func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
- DoServeRetrieve: true,
+ Retrieval: RetrievalEnabled,
+ Syncing: SyncingDisabled, //do no syncing
})
defer teardown()
if err != nil {
@@ -89,16 +111,31 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
peer := streamer.getPeer(node.ID())
+ stream := NewStream(swarmChunkServerStreamName, "", true)
+ //simulate pre-subscription to RETRIEVE_REQUEST stream on peer
peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{
- Stream: NewStream(swarmChunkServerStreamName, "", true),
+ Stream: stream,
History: nil,
Priority: Top,
})
+ //test the exchange
err = tester.TestExchanges(p2ptest.Exchange{
+ Expects: []p2ptest.Expect{
+ { //first expect a subscription to the RETRIEVE_REQUEST stream
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ History: nil,
+ Priority: Top,
+ },
+ Peer: node.ID(),
+ },
+ },
+ }, p2ptest.Exchange{
Label: "RetrieveRequestMsg",
Triggers: []p2ptest.Trigger{
- {
+ { //then the actual RETRIEVE_REQUEST....
Code: 5,
Msg: &RetrieveRequestMsg{
Addr: chunk.Address()[:],
@@ -107,7 +144,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
},
},
Expects: []p2ptest.Expect{
- {
+ { //to which the peer responds with offered hashes
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: nil,
@@ -120,7 +157,9 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
},
})
- expectedError := `exchange #0 "RetrieveRequestMsg": timed out`
+ //should fail with a timeout as the peer we are requesting
+ //the chunk from does not have the chunk
+ expectedError := `exchange #1 "RetrieveRequestMsg": timed out`
if err == nil || err.Error() != expectedError {
t.Fatalf("Expected error %v, got %v", expectedError, err)
}
@@ -130,7 +169,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
// offered hashes or delivery if skipHash is set to true
func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{
- DoServeRetrieve: true,
+ Retrieval: RetrievalEnabled,
+ Syncing: SyncingDisabled,
})
defer teardown()
if err != nil {
@@ -138,6 +178,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
node := tester.Nodes[0]
+
peer := streamer.getPeer(node.ID())
stream := NewStream(swarmChunkServerStreamName, "", true)
@@ -156,6 +197,18 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
err = tester.TestExchanges(p2ptest.Exchange{
+ Expects: []p2ptest.Expect{
+ {
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ History: nil,
+ Priority: Top,
+ },
+ Peer: node.ID(),
+ },
+ },
+ }, p2ptest.Exchange{
Label: "RetrieveRequestMsg",
Triggers: []p2ptest.Trigger{
{
@@ -224,9 +277,90 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
}
+// if there is one peer in the Kademlia, RequestFromPeers should return it
+func TestRequestFromPeers(t *testing.T) {
+ dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8")
+
+ addr := network.RandomAddr()
+ to := network.NewKademlia(addr.OAddr, network.NewKadParams())
+ delivery := NewDelivery(to, nil)
+ protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil)
+ peer := network.NewPeer(&network.BzzPeer{
+ BzzAddr: network.RandomAddr(),
+ LightNode: false,
+ Peer: protocolsPeer,
+ }, to)
+ to.On(peer)
+ r := NewRegistry(addr.ID(), delivery, nil, nil, nil)
+
+ // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
+ sp := &Peer{
+ Peer: protocolsPeer,
+ pq: pq.New(int(PriorityQueue), PriorityQueueCap),
+ streamer: r,
+ }
+ r.setPeer(sp)
+ req := network.NewRequest(
+ storage.Address(hash0[:]),
+ true,
+ &sync.Map{},
+ )
+ ctx := context.Background()
+ id, _, err := delivery.RequestFromPeers(ctx, req)
+
+ if err != nil {
+ t.Fatal(err)
+ }
+ if *id != dummyPeerID {
+ t.Fatalf("Expected an id, got %v", id)
+ }
+}
+
+// RequestFromPeers should not return light nodes
+func TestRequestFromPeersWithLightNode(t *testing.T) {
+ dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8")
+
+ addr := network.RandomAddr()
+ to := network.NewKademlia(addr.OAddr, network.NewKadParams())
+ delivery := NewDelivery(to, nil)
+
+ protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil)
+ // setting up a lightnode
+ peer := network.NewPeer(&network.BzzPeer{
+ BzzAddr: network.RandomAddr(),
+ LightNode: true,
+ Peer: protocolsPeer,
+ }, to)
+ to.On(peer)
+ r := NewRegistry(addr.ID(), delivery, nil, nil, nil)
+ // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
+ sp := &Peer{
+ Peer: protocolsPeer,
+ pq: pq.New(int(PriorityQueue), PriorityQueueCap),
+ streamer: r,
+ }
+ r.setPeer(sp)
+
+ req := network.NewRequest(
+ storage.Address(hash0[:]),
+ true,
+ &sync.Map{},
+ )
+
+ ctx := context.Background()
+ // making a request which should return with "no peer found"
+ _, _, err := delivery.RequestFromPeers(ctx, req)
+
+ expectedError := "no peer found"
+ if err.Error() != expectedError {
+ t.Fatalf("expected '%v', got %v", expectedError, err)
+ }
+}
+
func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{
- DoServeRetrieve: true,
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingDisabled,
})
defer teardown()
if err != nil {
@@ -241,6 +375,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
node := tester.Nodes[0]
+ //subscribe to custom stream
stream := NewStream("foo", "", true)
err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
if err != nil {
@@ -253,7 +388,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
Expects: []p2ptest.Expect{
- {
+ { //first expect subscription to the custom stream...
Code: 4,
Msg: &SubscribeMsg{
Stream: stream,
@@ -267,7 +402,8 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
p2ptest.Exchange{
Label: "ChunkDelivery message",
Triggers: []p2ptest.Trigger{
- {
+ { //...then trigger a chunk delivery for the given chunk from peer in order for
+ //local node to get the chunk delivered
Code: 6,
Msg: &ChunkDeliveryMsg{
Addr: chunkKey,
@@ -342,8 +478,9 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- SkipCheck: skipCheck,
- DoServeRetrieve: true,
+ SkipCheck: skipCheck,
+ Syncing: SyncingDisabled,
+ Retrieval: RetrievalEnabled,
})
bucket.Store(bucketKeyRegistry, r)
@@ -408,20 +545,6 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
return err
}
- //each of the nodes (except pivot node) subscribes to the stream of the next node
- for j, node := range nodeIDs[0 : nodes-1] {
- sid := nodeIDs[j+1]
- item, ok := sim.NodeItem(node, bucketKeyRegistry)
- if !ok {
- return fmt.Errorf("No registry")
- }
- registry := item.(*Registry)
- err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", true), nil, Top)
- if err != nil {
- return err
- }
- }
-
//get the pivot node's filestore
item, ok := sim.NodeItem(*sim.PivotNodeID(), bucketKeyFileStore)
if !ok {
@@ -530,7 +653,8 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
- DoSync: true,
+ Syncing: SyncingDisabled,
+ Retrieval: RetrievalDisabled,
SyncUpdateDelay: 0,
})
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go
index 3164193b3..0c95fabb7 100644
--- a/swarm/network/stream/intervals_test.go
+++ b/swarm/network/stream/intervals_test.go
@@ -83,6 +83,8 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingRegisterOnly,
SkipCheck: skipCheck,
})
bucket.Store(bucketKeyRegistry, r)
diff --git a/swarm/network/stream/lightnode_test.go b/swarm/network/stream/lightnode_test.go
index 0d3bc7f54..65cde2411 100644
--- a/swarm/network/stream/lightnode_test.go
+++ b/swarm/network/stream/lightnode_test.go
@@ -25,7 +25,8 @@ import (
// when it is serving Retrieve requests.
func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) {
registryOptions := &RegistryOptions{
- DoServeRetrieve: true,
+ Retrieval: RetrievalClientOnly,
+ Syncing: SyncingDisabled,
}
tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
defer teardown()
@@ -63,7 +64,8 @@ func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) {
// requests are disabled
func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) {
registryOptions := &RegistryOptions{
- DoServeRetrieve: false,
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingDisabled,
}
tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
defer teardown()
@@ -106,7 +108,8 @@ func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) {
// when syncing is enabled.
func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) {
registryOptions := &RegistryOptions{
- DoSync: true,
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingRegisterOnly,
}
tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
defer teardown()
@@ -150,7 +153,8 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) {
// when syncing is disabled.
func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) {
registryOptions := &RegistryOptions{
- DoSync: false,
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingDisabled,
}
tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
defer teardown()
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index b81cfc0ca..ad1519341 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -127,10 +127,9 @@ func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s no
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- DoSync: true,
+ Retrieval: RetrievalEnabled,
+ Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
- DoRetrieve: true,
- DoServeRetrieve: true,
})
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 8d89f28cb..2ddbed936 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -165,8 +165,8 @@ func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Servic
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- DoSync: true,
- DoServeRetrieve: true,
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
})
@@ -360,8 +360,8 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- DoServeRetrieve: true,
- DoSync: true,
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingRegisterOnly,
})
bucket.Store(bucketKeyRegistry, r)
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 0ac374def..695ff0c50 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -47,6 +47,31 @@ const (
HashSize = 32
)
+//Enumerate options for syncing and retrieval
+type SyncingOption int
+type RetrievalOption int
+
+//Syncing options
+const (
+ //Syncing disabled
+ SyncingDisabled SyncingOption = iota
+ //Register the client and the server but not subscribe
+ SyncingRegisterOnly
+ //Both client and server funcs are registered, subscribe sent automatically
+ SyncingAutoSubscribe
+)
+
+const (
+ //Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only)
+ RetrievalDisabled RetrievalOption = iota
+ //Only the client side of the retrieve request is registered.
+ //(light nodes do not serve retrieve requests)
+ //once the client is registered, subscription to retrieve request stream is always sent
+ RetrievalClientOnly
+ //Both client and server funcs are registered, subscribe sent automatically
+ RetrievalEnabled
+)
+
// Registry registry for outgoing and incoming streamer constructors
type Registry struct {
addr enode.ID
@@ -60,16 +85,15 @@ type Registry struct {
peers map[enode.ID]*Peer
delivery *Delivery
intervalsStore state.Store
- doRetrieve bool
+ autoRetrieval bool //automatically subscribe to retrieve request stream
maxPeerServers int
}
// RegistryOptions holds optional values for NewRegistry constructor.
type RegistryOptions struct {
SkipCheck bool
- DoSync bool // Sets if the server syncs with peers. Default is true, set to false by lightnode or nosync flags.
- DoRetrieve bool // Sets if the server issues Retrieve requests. Default is true.
- DoServeRetrieve bool // Sets if the server serves Retrieve requests. Default is true, set to false by lightnode flag.
+ Syncing SyncingOption //Defines syncing behavior
+ Retrieval RetrievalOption //Defines retrieval behavior
SyncUpdateDelay time.Duration
MaxPeerServers int // The limit of servers for each peer in registry
}
@@ -82,6 +106,9 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
if options.SyncUpdateDelay <= 0 {
options.SyncUpdateDelay = 15 * time.Second
}
+ //check if retriaval has been disabled
+ retrieval := options.Retrieval != RetrievalDisabled
+
streamer := &Registry{
addr: localID,
skipCheck: options.SkipCheck,
@@ -90,13 +117,14 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
peers: make(map[enode.ID]*Peer),
delivery: delivery,
intervalsStore: intervalsStore,
- doRetrieve: options.DoRetrieve,
+ autoRetrieval: retrieval,
maxPeerServers: options.MaxPeerServers,
}
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer
- if options.DoServeRetrieve {
+ //if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only)
+ if options.Retrieval == RetrievalEnabled {
streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) {
if !live {
return nil, errors.New("only live retrieval requests supported")
@@ -105,16 +133,21 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
})
}
- streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
- return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
- })
+ //if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests)
+ if options.Retrieval != RetrievalDisabled {
+ streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
+ return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
+ })
+ }
- if options.DoSync {
+ //If syncing is not disabled, the syncing functions are registered (both client and server)
+ if options.Syncing != SyncingDisabled {
RegisterSwarmSyncerServer(streamer, syncChunkStore)
RegisterSwarmSyncerClient(streamer, syncChunkStore)
}
- if options.DoSync {
+ //if syncing is set to automatically subscribe to the syncing stream, start the subscription process
+ if options.Syncing == SyncingAutoSubscribe {
// latestIntC function ensures that
// - receiving from the in chan is not blocked by processing inside the for loop
// - the latest int value is delivered to the loop after the processing is done
@@ -385,7 +418,7 @@ func (r *Registry) Run(p *network.BzzPeer) error {
defer close(sp.quit)
defer sp.close()
- if r.doRetrieve {
+ if r.autoRetrieval && !p.LightNode {
err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top)
if err != nil {
return err
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index e7f79e7a1..16c74d3b3 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -765,6 +765,8 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
var maxPeerServers = 6
tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingDisabled,
MaxPeerServers: maxPeerServers,
})
defer teardown()
diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go
index f2be3bef9..b0e35b0db 100644
--- a/swarm/network/stream/syncer_test.go
+++ b/swarm/network/stream/syncer_test.go
@@ -62,6 +62,9 @@ func createMockStore(globalStore *mockdb.GlobalStore, id enode.ID, addr *network
params.Init(datadir)
params.BaseKey = addr.Over()
lstore, err = storage.NewLocalStore(params, mockStore)
+ if err != nil {
+ return nil, "", err
+ }
return lstore, datadir, nil
}
@@ -114,6 +117,8 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
bucket.Store(bucketKeyDelivery, delivery)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingAutoSubscribe,
SkipCheck: skipCheck,
})