diff options
author | holisticode <holistic.computing@gmail.com> | 2018-11-07 07:04:18 +0800 |
---|---|---|
committer | Viktor TrĂ³n <viktor.tron@gmail.com> | 2018-11-07 07:04:18 +0800 |
commit | 79c7a69ac8066cc28ceee2ebaab3d0221a8adf57 (patch) | |
tree | afbba15675b0b8a8153523bb661499673d4e43da /swarm/network/stream/stream.go | |
parent | 53eb4e0b0fffdc105fbe9f5eed671b96de6e2ba1 (diff) | |
download | dexon-79c7a69ac8066cc28ceee2ebaab3d0221a8adf57.tar dexon-79c7a69ac8066cc28ceee2ebaab3d0221a8adf57.tar.gz dexon-79c7a69ac8066cc28ceee2ebaab3d0221a8adf57.tar.bz2 dexon-79c7a69ac8066cc28ceee2ebaab3d0221a8adf57.tar.lz dexon-79c7a69ac8066cc28ceee2ebaab3d0221a8adf57.tar.xz dexon-79c7a69ac8066cc28ceee2ebaab3d0221a8adf57.tar.zst dexon-79c7a69ac8066cc28ceee2ebaab3d0221a8adf57.zip |
swarm: Better syncing and retrieval option definition (#17986)
* swarm: Better syncing and retrieval option definition
* swarm/network/stream: better comments
* swarm/network/stream: addressed PR comments
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r-- | swarm/network/stream/stream.go | 57 |
1 files changed, 45 insertions, 12 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 0ac374def..695ff0c50 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -47,6 +47,31 @@ const ( HashSize = 32 ) +//Enumerate options for syncing and retrieval +type SyncingOption int +type RetrievalOption int + +//Syncing options +const ( + //Syncing disabled + SyncingDisabled SyncingOption = iota + //Register the client and the server but not subscribe + SyncingRegisterOnly + //Both client and server funcs are registered, subscribe sent automatically + SyncingAutoSubscribe +) + +const ( + //Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only) + RetrievalDisabled RetrievalOption = iota + //Only the client side of the retrieve request is registered. + //(light nodes do not serve retrieve requests) + //once the client is registered, subscription to retrieve request stream is always sent + RetrievalClientOnly + //Both client and server funcs are registered, subscribe sent automatically + RetrievalEnabled +) + // Registry registry for outgoing and incoming streamer constructors type Registry struct { addr enode.ID @@ -60,16 +85,15 @@ type Registry struct { peers map[enode.ID]*Peer delivery *Delivery intervalsStore state.Store - doRetrieve bool + autoRetrieval bool //automatically subscribe to retrieve request stream maxPeerServers int } // RegistryOptions holds optional values for NewRegistry constructor. type RegistryOptions struct { SkipCheck bool - DoSync bool // Sets if the server syncs with peers. Default is true, set to false by lightnode or nosync flags. - DoRetrieve bool // Sets if the server issues Retrieve requests. Default is true. - DoServeRetrieve bool // Sets if the server serves Retrieve requests. Default is true, set to false by lightnode flag. + Syncing SyncingOption //Defines syncing behavior + Retrieval RetrievalOption //Defines retrieval behavior SyncUpdateDelay time.Duration MaxPeerServers int // The limit of servers for each peer in registry } @@ -82,6 +106,9 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy if options.SyncUpdateDelay <= 0 { options.SyncUpdateDelay = 15 * time.Second } + //check if retriaval has been disabled + retrieval := options.Retrieval != RetrievalDisabled + streamer := &Registry{ addr: localID, skipCheck: options.SkipCheck, @@ -90,13 +117,14 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy peers: make(map[enode.ID]*Peer), delivery: delivery, intervalsStore: intervalsStore, - doRetrieve: options.DoRetrieve, + autoRetrieval: retrieval, maxPeerServers: options.MaxPeerServers, } streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer - if options.DoServeRetrieve { + //if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only) + if options.Retrieval == RetrievalEnabled { streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) { if !live { return nil, errors.New("only live retrieval requests supported") @@ -105,16 +133,21 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy }) } - streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) - }) + //if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests) + if options.Retrieval != RetrievalDisabled { + streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { + return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) + }) + } - if options.DoSync { + //If syncing is not disabled, the syncing functions are registered (both client and server) + if options.Syncing != SyncingDisabled { RegisterSwarmSyncerServer(streamer, syncChunkStore) RegisterSwarmSyncerClient(streamer, syncChunkStore) } - if options.DoSync { + //if syncing is set to automatically subscribe to the syncing stream, start the subscription process + if options.Syncing == SyncingAutoSubscribe { // latestIntC function ensures that // - receiving from the in chan is not blocked by processing inside the for loop // - the latest int value is delivered to the loop after the processing is done @@ -385,7 +418,7 @@ func (r *Registry) Run(p *network.BzzPeer) error { defer close(sp.quit) defer sp.close() - if r.doRetrieve { + if r.autoRetrieval && !p.LightNode { err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top) if err != nil { return err |