From 6566a0a3b82f5d24d478d3876d5fa2b1b0e8684c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Tr=C3=B3n?= Date: Fri, 12 Oct 2018 16:26:16 +0200 Subject: swarm/network/stream: generalise setting of next batch (#17818) * swarm/network/stream: generalize SetNextBatch and add Server SessionIndex * swarm/network/stream: fix a typo in comment * swarm/network/stream: remove live argument from NewSwarmSyncerServer --- swarm/network/stream/syncer.go | 52 ++++++++++++------------------------------ 1 file changed, 15 insertions(+), 37 deletions(-) (limited to 'swarm/network/stream/syncer.go') diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index 38b3078d2..4bfbac8b0 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -18,7 +18,6 @@ package stream import ( "context" - "math" "strconv" "time" @@ -36,38 +35,27 @@ const ( // * live request delivery with or without checkback // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { - po uint8 - store storage.SyncChunkStore - sessionAt uint64 - start uint64 - live bool - quit chan struct{} + po uint8 + store storage.SyncChunkStore + quit chan struct{} } -// NewSwarmSyncerServer is contructor for SwarmSyncerServer -func NewSwarmSyncerServer(live bool, po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) { - sessionAt := syncChunkStore.BinIndex(po) - var start uint64 - if live { - start = sessionAt - } +// NewSwarmSyncerServer is constructor for SwarmSyncerServer +func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) { return &SwarmSyncerServer{ - po: po, - store: syncChunkStore, - sessionAt: sessionAt, - start: start, - live: live, - quit: make(chan struct{}), + po: po, + store: syncChunkStore, + quit: make(chan struct{}), }, nil } func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) { - streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, live bool) (Server, error) { + streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { return nil, err } - return NewSwarmSyncerServer(live, po, syncChunkStore) + return NewSwarmSyncerServer(po, syncChunkStore) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) @@ -88,25 +76,15 @@ func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, er return chunk.Data(), nil } +// SessionIndex returns current storage bin (po) index. +func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { + return s.store.BinIndex(s.po), nil +} + // GetBatch retrieves the next batch of hashes from the dbstore func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { var batch []byte i := 0 - if s.live { - if from == 0 { - from = s.start - } - if to <= from || from >= s.sessionAt { - to = math.MaxUint64 - } - } else { - if (to < from && to != 0) || from > s.sessionAt { - return nil, 0, 0, nil, nil - } - if to == 0 || to > s.sessionAt { - to = s.sessionAt - } - } var ticker *time.Ticker defer func() { -- cgit v1.2.3