aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/syncer.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/syncer.go')
-rw-r--r--swarm/network/stream/syncer.go209
1 files changed, 110 insertions, 99 deletions
diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go
index 5f03dcff7..9bde39550 100644
--- a/swarm/network/stream/syncer.go
+++ b/swarm/network/stream/syncer.go
@@ -22,6 +22,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/storage"
)
@@ -35,27 +36,29 @@ const (
// * live request delivery with or without checkback
// * (live/non-live historical) chunk syncing per proximity bin
type SwarmSyncerServer struct {
- po uint8
- store storage.SyncChunkStore
- quit chan struct{}
+ correlateId string //used for logging
+ po uint8
+ netStore *storage.NetStore
+ quit chan struct{}
}
// NewSwarmSyncerServer is constructor for SwarmSyncerServer
-func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) {
+func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore, correlateId string) (*SwarmSyncerServer, error) {
return &SwarmSyncerServer{
- po: po,
- store: syncChunkStore,
- quit: make(chan struct{}),
+ correlateId: correlateId,
+ po: po,
+ netStore: netStore,
+ quit: make(chan struct{}),
}, nil
}
-func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) {
- streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) {
+func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore) {
+ streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, _ bool) (Server, error) {
po, err := ParseSyncBinKey(t)
if err != nil {
return nil, err
}
- return NewSwarmSyncerServer(po, syncChunkStore)
+ return NewSwarmSyncerServer(po, netStore, p.ID().String()+"|"+string(po))
})
// streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) {
// return NewOutgoingProvableSwarmSyncer(po, db)
@@ -69,130 +72,138 @@ func (s *SwarmSyncerServer) Close() {
// GetData retrieves the actual chunk from netstore
func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
- chunk, err := s.store.Get(ctx, storage.Address(key))
+ ch, err := s.netStore.Get(ctx, chunk.ModeGetSync, storage.Address(key))
if err != nil {
return nil, err
}
- return chunk.Data(), nil
+ return ch.Data(), nil
}
// SessionIndex returns current storage bin (po) index.
func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
- return s.store.BinIndex(s.po), nil
+ return s.netStore.LastPullSubscriptionBinID(s.po)
}
-// GetBatch retrieves the next batch of hashes from the dbstore
+// SetNextBatch retrieves the next batch of hashes from the localstore.
+// It expects a range of bin IDs, both ends inclusive in syncing, and returns
+// concatenated byte slice of chunk addresses and bin IDs of the first and
+// the last one in that slice. The batch may have up to BatchSize number of
+// chunk addresses. If at least one chunk is added to the batch and no new chunks
+// are added in batchTimeout period, the batch will be returned. This function
+// will block until new chunks are received from localstore pull subscription.
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
- var batch []byte
- i := 0
-
- var ticker *time.Ticker
- defer func() {
- if ticker != nil {
- ticker.Stop()
+ //TODO: maybe add unit test for intervals usage in netstore/localstore together with SwarmSyncerServer?
+ if from > 0 {
+ from--
+ }
+ batchStart := time.Now()
+ descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to)
+ defer stop()
+
+ const batchTimeout = 2 * time.Second
+
+ var (
+ batch []byte
+ batchSize int
+ batchStartID *uint64
+ batchEndID uint64
+ timer *time.Timer
+ timerC <-chan time.Time
+ )
+
+ defer func(start time.Time) {
+ metrics.GetOrRegisterResettingTimer("syncer.set-next-batch.total-time", nil).UpdateSince(start)
+ metrics.GetOrRegisterCounter("syncer.set-next-batch.batch-size", nil).Inc(int64(batchSize))
+ if timer != nil {
+ timer.Stop()
}
- }()
- var wait bool
- for {
- if wait {
- if ticker == nil {
- ticker = time.NewTicker(1000 * time.Millisecond)
+ }(batchStart)
+
+ for iterate := true; iterate; {
+ select {
+ case d, ok := <-descriptors:
+ if !ok {
+ iterate = false
+ break
}
- select {
- case <-ticker.C:
- case <-s.quit:
- return nil, 0, 0, nil, nil
+ batch = append(batch, d.Address[:]...)
+ // This is the most naive approach to label the chunk as synced
+ // allowing it to be garbage collected. A proper way requires
+ // validating that the chunk is successfully stored by the peer.
+ err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address)
+ if err != nil {
+ metrics.GetOrRegisterCounter("syncer.set-next-batch.set-sync-err", nil).Inc(1)
+ log.Debug("syncer pull subscription - err setting chunk as synced", "correlateId", s.correlateId, "err", err)
+ return nil, 0, 0, nil, err
}
- }
-
- metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1)
- err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool {
- select {
- case <-s.quit:
- return false
- default:
+ batchSize++
+ if batchStartID == nil {
+ // set batch start id only if
+ // this is the first iteration
+ batchStartID = &d.BinID
}
- batch = append(batch, key[:]...)
- i++
- to = idx
- return i < BatchSize
- })
- if err != nil {
- return nil, 0, 0, nil, err
- }
- if len(batch) > 0 {
- break
+ batchEndID = d.BinID
+ if batchSize >= BatchSize {
+ iterate = false
+ metrics.GetOrRegisterCounter("syncer.set-next-batch.full-batch", nil).Inc(1)
+ log.Debug("syncer pull subscription - batch size reached", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID)
+ }
+ if timer == nil {
+ timer = time.NewTimer(batchTimeout)
+ } else {
+ log.Debug("syncer pull subscription - stopping timer", "correlateId", s.correlateId)
+ if !timer.Stop() {
+ <-timer.C
+ }
+ log.Debug("syncer pull subscription - channel drained, resetting timer", "correlateId", s.correlateId)
+ timer.Reset(batchTimeout)
+ }
+ timerC = timer.C
+ case <-timerC:
+ // return batch if new chunks are not
+ // received after some time
+ iterate = false
+ metrics.GetOrRegisterCounter("syncer.set-next-batch.timer-expire", nil).Inc(1)
+ log.Debug("syncer pull subscription timer expired", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID)
+ case <-s.quit:
+ iterate = false
+ log.Debug("syncer pull subscription - quit received", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID)
}
- wait = true
}
-
- log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po))
- return batch, from, to, nil, nil
+ if batchStartID == nil {
+ // if batch start id is not set, return 0
+ batchStartID = new(uint64)
+ }
+ return batch, *batchStartID, batchEndID, nil, nil
}
// SwarmSyncerClient
type SwarmSyncerClient struct {
- store storage.SyncChunkStore
- peer *Peer
- stream Stream
+ netStore *storage.NetStore
+ peer *Peer
+ stream Stream
}
// NewSwarmSyncerClient is a contructor for provable data exchange syncer
-func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) {
+func NewSwarmSyncerClient(p *Peer, netStore *storage.NetStore, stream Stream) (*SwarmSyncerClient, error) {
return &SwarmSyncerClient{
- store: store,
- peer: p,
- stream: stream,
+ netStore: netStore,
+ peer: p,
+ stream: stream,
}, nil
}
-// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer
-// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient {
-// retrieveC := make(storage.Chunk, chunksCap)
-// RunChunkRequestor(p, retrieveC)
-// storeC := make(storage.Chunk, chunksCap)
-// RunChunkStorer(store, storeC)
-// s := &SwarmSyncerClient{
-// po: po,
-// priority: priority,
-// sessionAt: sessionAt,
-// start: index,
-// end: index,
-// nextC: make(chan struct{}, 1),
-// intervals: intervals,
-// sessionRoot: sessionRoot,
-// sessionReader: chunker.Join(sessionRoot, retrieveC),
-// retrieveC: retrieveC,
-// storeC: storeC,
-// }
-// return s
-// }
-
-// // StartSyncing is called on the Peer to start the syncing process
-// // the idea is that it is called only after kademlia is close to healthy
-// func StartSyncing(s *Streamer, peerId enode.ID, po uint8, nn bool) {
-// lastPO := po
-// if nn {
-// lastPO = maxPO
-// }
-//
-// for i := po; i <= lastPO; i++ {
-// s.Subscribe(peerId, "SYNC", newSyncLabel("LIVE", po), 0, 0, High, true)
-// s.Subscribe(peerId, "SYNC", newSyncLabel("HISTORY", po), 0, 0, Mid, false)
-// }
-// }
-
// RegisterSwarmSyncerClient registers the client constructor function for
// to handle incoming sync streams
-func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) {
+func RegisterSwarmSyncerClient(streamer *Registry, netStore *storage.NetStore) {
streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) {
- return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live))
+ return NewSwarmSyncerClient(p, netStore, NewStream("SYNC", t, live))
})
}
// NeedData
func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) {
- return s.store.FetchFunc(ctx, key)
+ return s.netStore.FetchFunc(ctx, key)
}
// BatchDone