diff options
author | Viktor TrĂ³n <viktor.tron@gmail.com> | 2018-10-12 22:26:16 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-12 22:26:16 +0800 |
commit | 6566a0a3b82f5d24d478d3876d5fa2b1b0e8684c (patch) | |
tree | 52f31f4bc541ed6bf9643bef11ceefd75e831d0e /swarm/network/stream/stream.go | |
parent | dc3c3fb1e177c5d01ae3ca63717130eea924271e (diff) | |
download | dexon-6566a0a3b82f5d24d478d3876d5fa2b1b0e8684c.tar dexon-6566a0a3b82f5d24d478d3876d5fa2b1b0e8684c.tar.gz dexon-6566a0a3b82f5d24d478d3876d5fa2b1b0e8684c.tar.bz2 dexon-6566a0a3b82f5d24d478d3876d5fa2b1b0e8684c.tar.lz dexon-6566a0a3b82f5d24d478d3876d5fa2b1b0e8684c.tar.xz dexon-6566a0a3b82f5d24d478d3876d5fa2b1b0e8684c.tar.zst dexon-6566a0a3b82f5d24d478d3876d5fa2b1b0e8684c.zip |
swarm/network/stream: generalise setting of next batch (#17818)
* swarm/network/stream: generalize SetNextBatch and add Server SessionIndex
* swarm/network/stream: fix a typo in comment
* swarm/network/stream: remove live argument from NewSwarmSyncerServer
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r-- | swarm/network/stream/stream.go | 30 |
1 files changed, 29 insertions, 1 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 1eda06c6a..3861cfcf6 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -375,7 +375,7 @@ func (r *Registry) Run(p *network.BzzPeer) error { defer sp.close() if r.doRetrieve { - err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", false), nil, Top) + err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top) if err != nil { return err } @@ -500,10 +500,38 @@ type server struct { stream Stream priority uint8 currentBatch []byte + sessionIndex uint64 +} + +// setNextBatch adjusts passed interval based on session index and whether +// stream is live or history. It calls Server SetNextBatch with adjusted +// interval and returns batch hashes and their interval. +func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { + if s.stream.Live { + if from == 0 { + from = s.sessionIndex + } + if to <= from || from >= s.sessionIndex { + to = math.MaxUint64 + } + } else { + if (to < from && to != 0) || from > s.sessionIndex { + return nil, 0, 0, nil, nil + } + if to == 0 || to > s.sessionIndex { + to = s.sessionIndex + } + } + return s.SetNextBatch(from, to) } // Server interface for outgoing peer Streamer type Server interface { + // SessionIndex is called when a server is initialized + // to get the current cursor state of the stream data. + // Based on this index, live and history stream intervals + // will be adjusted before calling SetNextBatch. + SessionIndex() (uint64, error) SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) GetData(context.Context, []byte) ([]byte, error) Close() |