From 6566a0a3b82f5d24d478d3876d5fa2b1b0e8684c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Tr=C3=B3n?= Date: Fri, 12 Oct 2018 16:26:16 +0200 Subject: swarm/network/stream: generalise setting of next batch (#17818) * swarm/network/stream: generalize SetNextBatch and add Server SessionIndex * swarm/network/stream: fix a typo in comment * swarm/network/stream: remove live argument from NewSwarmSyncerServer --- swarm/network/stream/delivery.go | 7 ++++- swarm/network/stream/delivery_test.go | 6 ++-- swarm/network/stream/intervals_test.go | 16 +++-------- swarm/network/stream/peer.go | 13 ++++++--- swarm/network/stream/stream.go | 30 +++++++++++++++++++- swarm/network/stream/streamer_test.go | 42 ++++++++++++++------------- swarm/network/stream/syncer.go | 52 ++++++++++------------------------ 7 files changed, 89 insertions(+), 77 deletions(-) (limited to 'swarm/network') diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index c2adb1009..0429c4dff 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -96,6 +96,11 @@ func (s *SwarmChunkServer) processDeliveries() { } } +// SessionIndex returns zero in all cases for SwarmChunkServer. +func (s *SwarmChunkServer) SessionIndex() (uint64, error) { + return 0, nil +} + // SetNextBatch func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) { select { @@ -141,7 +146,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * "retrieve.request") defer osp.Finish() - s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", false)) + s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true)) if err != nil { return err } diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index b021b8771..c6ebae3f0 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -88,7 +88,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { peer := streamer.getPeer(node.ID()) peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ - Stream: NewStream(swarmChunkServerStreamName, "", false), + Stream: NewStream(swarmChunkServerStreamName, "", true), History: nil, Priority: Top, }) @@ -136,7 +136,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { node := tester.Nodes[0] peer := streamer.getPeer(node.ID()) - stream := NewStream(swarmChunkServerStreamName, "", false) + stream := NewStream(swarmChunkServerStreamName, "", true) peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ Stream: stream, @@ -409,7 +409,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck return fmt.Errorf("No registry") } registry := item.(*Registry) - err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top) + err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", true), NewRange(0, 0), Top) if err != nil { return err } diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index 269259423..3164193b3 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -345,8 +345,6 @@ func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (* func (c *testExternalClient) Close() {} -const testExternalServerBatchSize = 10 - type testExternalServer struct { t string keyFunc func(key []byte, index uint64) @@ -366,17 +364,11 @@ func newTestExternalServer(t string, sessionAt, maxKeys uint64, keyFunc func(key } } +func (s *testExternalServer) SessionIndex() (uint64, error) { + return s.sessionAt, nil +} + func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { - if from == 0 && to == 0 { - from = s.sessionAt - to = s.sessionAt + testExternalServerBatchSize - } - if to-from > testExternalServerBatchSize { - to = from + testExternalServerBatchSize - 1 - } - if from >= s.maxKeys && to > s.maxKeys { - return nil, 0, 0, nil, io.EOF - } if to > s.maxKeys { to = s.maxKeys } diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index ef6bbdf70..89d135ad5 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -166,7 +166,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { "send.offered.hashes") defer sp.Finish() - hashes, from, to, proof, err := s.SetNextBatch(f, t) + hashes, from, to, proof, err := s.setNextBatch(f, t) if err != nil { return err } @@ -214,10 +214,15 @@ func (p *Peer) setServer(s Stream, o Server, priority uint8) (*server, error) { return nil, ErrMaxPeerServers } + sessionIndex, err := o.SessionIndex() + if err != nil { + return nil, err + } os := &server{ - Server: o, - stream: s, - priority: priority, + Server: o, + stream: s, + priority: priority, + sessionIndex: sessionIndex, } p.servers[s] = os return os, nil diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 1eda06c6a..3861cfcf6 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -375,7 +375,7 @@ func (r *Registry) Run(p *network.BzzPeer) error { defer sp.close() if r.doRetrieve { - err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", false), nil, Top) + err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top) if err != nil { return err } @@ -500,10 +500,38 @@ type server struct { stream Stream priority uint8 currentBatch []byte + sessionIndex uint64 +} + +// setNextBatch adjusts passed interval based on session index and whether +// stream is live or history. It calls Server SetNextBatch with adjusted +// interval and returns batch hashes and their interval. +func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { + if s.stream.Live { + if from == 0 { + from = s.sessionIndex + } + if to <= from || from >= s.sessionIndex { + to = math.MaxUint64 + } + } else { + if (to < from && to != 0) || from > s.sessionIndex { + return nil, 0, 0, nil, nil + } + if to == 0 || to > s.sessionIndex { + to = s.sessionIndex + } + } + return s.SetNextBatch(from, to) } // Server interface for outgoing peer Streamer type Server interface { + // SessionIndex is called when a server is initialized + // to get the current cursor state of the stream data. + // Based on this index, live and history stream intervals + // will be adjusted before calling SetNextBatch. + SessionIndex() (uint64, error) SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) GetData(context.Context, []byte) ([]byte, error) Close() diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 5d91eecfd..e7f79e7a1 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -107,15 +107,21 @@ func (self *testClient) BatchDone(Stream, uint64, []byte, []byte) func() (*Takeo func (self *testClient) Close() {} type testServer struct { - t string + t string + sessionIndex uint64 } -func newTestServer(t string) *testServer { +func newTestServer(t string, sessionIndex uint64) *testServer { return &testServer{ - t: t, + t: t, + sessionIndex: sessionIndex, } } +func (s *testServer) SessionIndex() (uint64, error) { + return s.sessionIndex, nil +} + func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { return make([]byte, HashSize), from + 1, to + 1, nil, nil } @@ -230,7 +236,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { stream := NewStream("foo", "", false) streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { - return newTestServer(t), nil + return newTestServer(t, 10), nil }) node := tester.Nodes[0] @@ -297,7 +303,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { stream := NewStream("foo", "", true) streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { - return newTestServer(t), nil + return newTestServer(t, 0), nil }) node := tester.Nodes[0] @@ -324,7 +330,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { }, Hashes: make([]byte, HashSize), From: 1, - To: 1, + To: 0, }, Peer: node.ID(), }, @@ -361,7 +367,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { } streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { - return newTestServer(t), nil + return newTestServer(t, 0), nil }) stream := NewStream("bar", "", true) @@ -407,9 +413,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { stream := NewStream("foo", "", true) streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { - return &testServer{ - t: t, - }, nil + return newTestServer(t, 10), nil }) node := tester.Nodes[0] @@ -448,8 +452,8 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { HandoverProof: &HandoverProof{ Handover: &Handover{}, }, - From: 1, - To: 1, + From: 11, + To: 0, Hashes: make([]byte, HashSize), }, Peer: node.ID(), @@ -634,7 +638,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { } streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { - return newTestServer(t), nil + return newTestServer(t, 10), nil }) node := tester.Nodes[0] @@ -694,8 +698,8 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { HandoverProof: &HandoverProof{ Handover: &Handover{}, }, - From: 1, - To: 1, + From: 11, + To: 0, Hashes: make([]byte, HashSize), }, Peer: node.ID(), @@ -769,7 +773,7 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) { } streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { - return newTestServer(t), nil + return newTestServer(t, 0), nil }) node := tester.Nodes[0] @@ -799,7 +803,7 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) { }, Hashes: make([]byte, HashSize), From: 1, - To: 1, + To: 0, }, Peer: node.ID(), }, @@ -843,7 +847,7 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { } streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { - return newTestServer(t), nil + return newTestServer(t, 0), nil }) node := tester.Nodes[0] @@ -903,7 +907,7 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { }, Hashes: make([]byte, HashSize), From: 1, - To: 1, + To: 0, }, Peer: node.ID(), }, diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index 38b3078d2..4bfbac8b0 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -18,7 +18,6 @@ package stream import ( "context" - "math" "strconv" "time" @@ -36,38 +35,27 @@ const ( // * live request delivery with or without checkback // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { - po uint8 - store storage.SyncChunkStore - sessionAt uint64 - start uint64 - live bool - quit chan struct{} + po uint8 + store storage.SyncChunkStore + quit chan struct{} } -// NewSwarmSyncerServer is contructor for SwarmSyncerServer -func NewSwarmSyncerServer(live bool, po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) { - sessionAt := syncChunkStore.BinIndex(po) - var start uint64 - if live { - start = sessionAt - } +// NewSwarmSyncerServer is constructor for SwarmSyncerServer +func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) { return &SwarmSyncerServer{ - po: po, - store: syncChunkStore, - sessionAt: sessionAt, - start: start, - live: live, - quit: make(chan struct{}), + po: po, + store: syncChunkStore, + quit: make(chan struct{}), }, nil } func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) { - streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, live bool) (Server, error) { + streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { return nil, err } - return NewSwarmSyncerServer(live, po, syncChunkStore) + return NewSwarmSyncerServer(po, syncChunkStore) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) @@ -88,25 +76,15 @@ func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, er return chunk.Data(), nil } +// SessionIndex returns current storage bin (po) index. +func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { + return s.store.BinIndex(s.po), nil +} + // GetBatch retrieves the next batch of hashes from the dbstore func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { var batch []byte i := 0 - if s.live { - if from == 0 { - from = s.start - } - if to <= from || from >= s.sessionAt { - to = math.MaxUint64 - } - } else { - if (to < from && to != 0) || from > s.sessionAt { - return nil, 0, 0, nil, nil - } - if to == 0 || to > s.sessionAt { - to = s.sessionAt - } - } var ticker *time.Ticker defer func() { -- cgit v1.2.3