aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/stream.go
diff options
context:
space:
mode:
authorholisticode <holistic.computing@gmail.com>2018-11-07 07:04:18 +0800
committerViktor TrĂ³n <viktor.tron@gmail.com>2018-11-07 07:04:18 +0800
commit79c7a69ac8066cc28ceee2ebaab3d0221a8adf57 (patch)
treeafbba15675b0b8a8153523bb661499673d4e43da /swarm/network/stream/stream.go
parent53eb4e0b0fffdc105fbe9f5eed671b96de6e2ba1 (diff)
downloaddexon-79c7a69ac8066cc28ceee2ebaab3d0221a8adf57.tar
dexon-79c7a69ac8066cc28ceee2ebaab3d0221a8adf57.tar.gz
dexon-79c7a69ac8066cc28ceee2ebaab3d0221a8adf57.tar.bz2
dexon-79c7a69ac8066cc28ceee2ebaab3d0221a8adf57.tar.lz
dexon-79c7a69ac8066cc28ceee2ebaab3d0221a8adf57.tar.xz
dexon-79c7a69ac8066cc28ceee2ebaab3d0221a8adf57.tar.zst
dexon-79c7a69ac8066cc28ceee2ebaab3d0221a8adf57.zip
swarm: Better syncing and retrieval option definition (#17986)
* swarm: Better syncing and retrieval option definition * swarm/network/stream: better comments * swarm/network/stream: addressed PR comments
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r--swarm/network/stream/stream.go57
1 files changed, 45 insertions, 12 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 0ac374def..695ff0c50 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -47,6 +47,31 @@ const (
HashSize = 32
)
+//Enumerate options for syncing and retrieval
+type SyncingOption int
+type RetrievalOption int
+
+//Syncing options
+const (
+ //Syncing disabled
+ SyncingDisabled SyncingOption = iota
+ //Register the client and the server but not subscribe
+ SyncingRegisterOnly
+ //Both client and server funcs are registered, subscribe sent automatically
+ SyncingAutoSubscribe
+)
+
+const (
+ //Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only)
+ RetrievalDisabled RetrievalOption = iota
+ //Only the client side of the retrieve request is registered.
+ //(light nodes do not serve retrieve requests)
+ //once the client is registered, subscription to retrieve request stream is always sent
+ RetrievalClientOnly
+ //Both client and server funcs are registered, subscribe sent automatically
+ RetrievalEnabled
+)
+
// Registry registry for outgoing and incoming streamer constructors
type Registry struct {
addr enode.ID
@@ -60,16 +85,15 @@ type Registry struct {
peers map[enode.ID]*Peer
delivery *Delivery
intervalsStore state.Store
- doRetrieve bool
+ autoRetrieval bool //automatically subscribe to retrieve request stream
maxPeerServers int
}
// RegistryOptions holds optional values for NewRegistry constructor.
type RegistryOptions struct {
SkipCheck bool
- DoSync bool // Sets if the server syncs with peers. Default is true, set to false by lightnode or nosync flags.
- DoRetrieve bool // Sets if the server issues Retrieve requests. Default is true.
- DoServeRetrieve bool // Sets if the server serves Retrieve requests. Default is true, set to false by lightnode flag.
+ Syncing SyncingOption //Defines syncing behavior
+ Retrieval RetrievalOption //Defines retrieval behavior
SyncUpdateDelay time.Duration
MaxPeerServers int // The limit of servers for each peer in registry
}
@@ -82,6 +106,9 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
if options.SyncUpdateDelay <= 0 {
options.SyncUpdateDelay = 15 * time.Second
}
+ //check if retriaval has been disabled
+ retrieval := options.Retrieval != RetrievalDisabled
+
streamer := &Registry{
addr: localID,
skipCheck: options.SkipCheck,
@@ -90,13 +117,14 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
peers: make(map[enode.ID]*Peer),
delivery: delivery,
intervalsStore: intervalsStore,
- doRetrieve: options.DoRetrieve,
+ autoRetrieval: retrieval,
maxPeerServers: options.MaxPeerServers,
}
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer
- if options.DoServeRetrieve {
+ //if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only)
+ if options.Retrieval == RetrievalEnabled {
streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) {
if !live {
return nil, errors.New("only live retrieval requests supported")
@@ -105,16 +133,21 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
})
}
- streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
- return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
- })
+ //if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests)
+ if options.Retrieval != RetrievalDisabled {
+ streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
+ return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
+ })
+ }
- if options.DoSync {
+ //If syncing is not disabled, the syncing functions are registered (both client and server)
+ if options.Syncing != SyncingDisabled {
RegisterSwarmSyncerServer(streamer, syncChunkStore)
RegisterSwarmSyncerClient(streamer, syncChunkStore)
}
- if options.DoSync {
+ //if syncing is set to automatically subscribe to the syncing stream, start the subscription process
+ if options.Syncing == SyncingAutoSubscribe {
// latestIntC function ensures that
// - receiving from the in chan is not blocked by processing inside the for loop
// - the latest int value is delivered to the loop after the processing is done
@@ -385,7 +418,7 @@ func (r *Registry) Run(p *network.BzzPeer) error {
defer close(sp.quit)
defer sp.close()
- if r.doRetrieve {
+ if r.autoRetrieval && !p.LightNode {
err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top)
if err != nil {
return err