diff options
Diffstat (limited to 'swarm/network/stream/syncer.go')
-rw-r--r-- | swarm/network/stream/syncer.go | 124 |
1 files changed, 75 insertions, 49 deletions
diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index 5f03dcff7..c573da5d2 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -21,8 +21,7 @@ import ( "strconv" "time" - "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -36,12 +35,12 @@ const ( // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { po uint8 - store storage.SyncChunkStore + store chunk.FetchStore quit chan struct{} } // NewSwarmSyncerServer is constructor for SwarmSyncerServer -func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) { +func NewSwarmSyncerServer(po uint8, syncChunkStore chunk.FetchStore) (*SwarmSyncerServer, error) { return &SwarmSyncerServer{ po: po, store: syncChunkStore, @@ -49,7 +48,7 @@ func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*Swa }, nil } -func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) { +func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore chunk.FetchStore) { streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { @@ -69,76 +68,103 @@ func (s *SwarmSyncerServer) Close() { // GetData retrieves the actual chunk from netstore func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - chunk, err := s.store.Get(ctx, storage.Address(key)) + ch, err := s.store.Get(ctx, chunk.ModeGetSync, storage.Address(key)) if err != nil { return nil, err } - return chunk.Data(), nil + return ch.Data(), nil } // SessionIndex returns current storage bin (po) index. func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { - return s.store.BinIndex(s.po), nil + return s.store.LastPullSubscriptionBinID(s.po) } -// GetBatch retrieves the next batch of hashes from the dbstore +// SetNextBatch retrieves the next batch of hashes from the localstore. +// It expects a range of bin IDs, both ends inclusive in syncing, and returns +// concatenated byte slice of chunk addresses and bin IDs of the first and +// the last one in that slice. The batch may have up to BatchSize number of +// chunk addresses. If at least one chunk is added to the batch and no new chunks +// are added in batchTimeout period, the batch will be returned. This function +// will block until new chunks are received from localstore pull subscription. func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { - var batch []byte - i := 0 - - var ticker *time.Ticker + descriptors, stop := s.store.SubscribePull(context.Background(), s.po, from, to) + defer stop() + + const batchTimeout = 2 * time.Second + + var ( + batch []byte + batchSize int + batchStartID *uint64 + batchEndID uint64 + timer *time.Timer + timerC <-chan time.Time + ) defer func() { - if ticker != nil { - ticker.Stop() + if timer != nil { + timer.Stop() } }() - var wait bool - for { - if wait { - if ticker == nil { - ticker = time.NewTicker(1000 * time.Millisecond) + + for iterate := true; iterate; { + select { + case d, ok := <-descriptors: + if !ok { + iterate = false + break } - select { - case <-ticker.C: - case <-s.quit: - return nil, 0, 0, nil, nil + batch = append(batch, d.Address[:]...) + // This is the most naive approach to label the chunk as synced + // allowing it to be garbage collected. A proper way requires + // validating that the chunk is successfully stored by the peer. + err := s.store.Set(context.Background(), chunk.ModeSetSync, d.Address) + if err != nil { + return nil, 0, 0, nil, err } - } - - metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1) - err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool { - select { - case <-s.quit: - return false - default: + batchSize++ + if batchStartID == nil { + // set batch start id only if + // this is the first iteration + batchStartID = &d.BinID } - batch = append(batch, key[:]...) - i++ - to = idx - return i < BatchSize - }) - if err != nil { - return nil, 0, 0, nil, err - } - if len(batch) > 0 { - break + batchEndID = d.BinID + if batchSize >= BatchSize { + iterate = false + } + if timer == nil { + timer = time.NewTimer(batchTimeout) + } else { + if !timer.Stop() { + <-timer.C + } + timer.Reset(batchTimeout) + } + timerC = timer.C + case <-timerC: + // return batch if new chunks are not + // received after some time + iterate = false + case <-s.quit: + iterate = false } - wait = true } - - log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po)) - return batch, from, to, nil, nil + if batchStartID == nil { + // if batch start id is not set, return 0 + batchStartID = new(uint64) + } + return batch, *batchStartID, batchEndID, nil, nil } // SwarmSyncerClient type SwarmSyncerClient struct { - store storage.SyncChunkStore + store chunk.FetchStore peer *Peer stream Stream } // NewSwarmSyncerClient is a contructor for provable data exchange syncer -func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) { +func NewSwarmSyncerClient(p *Peer, store chunk.FetchStore, stream Stream) (*SwarmSyncerClient, error) { return &SwarmSyncerClient{ store: store, peer: p, @@ -184,7 +210,7 @@ func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) // RegisterSwarmSyncerClient registers the client constructor function for // to handle incoming sync streams -func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) { +func RegisterSwarmSyncerClient(streamer *Registry, store chunk.FetchStore) { streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) { return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live)) }) |