diff options
author | Péter Szilágyi <peterke@gmail.com> | 2019-05-10 19:09:01 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-05-10 19:09:01 +0800 |
commit | 494f5d448a1685d5de4cb1524b863cd1fc9a13b0 (patch) | |
tree | 4db9d1afe4910c888f3488cd93e8537501d88314 /swarm/network/stream/syncer.go | |
parent | c94d582aa781b26412ba7d570f6707d193303a02 (diff) | |
parent | 9b1543c282f39d452f611eeee0307bdf828e8bc2 (diff) | |
download | go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.gz go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.bz2 go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.lz go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.xz go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.zst go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.zip |
Merge pull request #19550 from ethersphere/swarm-rather-stable
swarm v0.4-rc1
Diffstat (limited to 'swarm/network/stream/syncer.go')
-rw-r--r-- | swarm/network/stream/syncer.go | 209 |
1 files changed, 110 insertions, 99 deletions
diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index 5f03dcff7..9bde39550 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -22,6 +22,7 @@ import ( "time" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -35,27 +36,29 @@ const ( // * live request delivery with or without checkback // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { - po uint8 - store storage.SyncChunkStore - quit chan struct{} + correlateId string //used for logging + po uint8 + netStore *storage.NetStore + quit chan struct{} } // NewSwarmSyncerServer is constructor for SwarmSyncerServer -func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) { +func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore, correlateId string) (*SwarmSyncerServer, error) { return &SwarmSyncerServer{ - po: po, - store: syncChunkStore, - quit: make(chan struct{}), + correlateId: correlateId, + po: po, + netStore: netStore, + quit: make(chan struct{}), }, nil } -func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) { - streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) { +func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore) { + streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, _ bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { return nil, err } - return NewSwarmSyncerServer(po, syncChunkStore) + return NewSwarmSyncerServer(po, netStore, p.ID().String()+"|"+string(po)) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) @@ -69,130 +72,138 @@ func (s *SwarmSyncerServer) Close() { // GetData retrieves the actual chunk from netstore func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - chunk, err := s.store.Get(ctx, storage.Address(key)) + ch, err := s.netStore.Get(ctx, chunk.ModeGetSync, storage.Address(key)) if err != nil { return nil, err } - return chunk.Data(), nil + return ch.Data(), nil } // SessionIndex returns current storage bin (po) index. func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { - return s.store.BinIndex(s.po), nil + return s.netStore.LastPullSubscriptionBinID(s.po) } -// GetBatch retrieves the next batch of hashes from the dbstore +// SetNextBatch retrieves the next batch of hashes from the localstore. +// It expects a range of bin IDs, both ends inclusive in syncing, and returns +// concatenated byte slice of chunk addresses and bin IDs of the first and +// the last one in that slice. The batch may have up to BatchSize number of +// chunk addresses. If at least one chunk is added to the batch and no new chunks +// are added in batchTimeout period, the batch will be returned. This function +// will block until new chunks are received from localstore pull subscription. func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { - var batch []byte - i := 0 - - var ticker *time.Ticker - defer func() { - if ticker != nil { - ticker.Stop() + //TODO: maybe add unit test for intervals usage in netstore/localstore together with SwarmSyncerServer? + if from > 0 { + from-- + } + batchStart := time.Now() + descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to) + defer stop() + + const batchTimeout = 2 * time.Second + + var ( + batch []byte + batchSize int + batchStartID *uint64 + batchEndID uint64 + timer *time.Timer + timerC <-chan time.Time + ) + + defer func(start time.Time) { + metrics.GetOrRegisterResettingTimer("syncer.set-next-batch.total-time", nil).UpdateSince(start) + metrics.GetOrRegisterCounter("syncer.set-next-batch.batch-size", nil).Inc(int64(batchSize)) + if timer != nil { + timer.Stop() } - }() - var wait bool - for { - if wait { - if ticker == nil { - ticker = time.NewTicker(1000 * time.Millisecond) + }(batchStart) + + for iterate := true; iterate; { + select { + case d, ok := <-descriptors: + if !ok { + iterate = false + break } - select { - case <-ticker.C: - case <-s.quit: - return nil, 0, 0, nil, nil + batch = append(batch, d.Address[:]...) + // This is the most naive approach to label the chunk as synced + // allowing it to be garbage collected. A proper way requires + // validating that the chunk is successfully stored by the peer. + err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address) + if err != nil { + metrics.GetOrRegisterCounter("syncer.set-next-batch.set-sync-err", nil).Inc(1) + log.Debug("syncer pull subscription - err setting chunk as synced", "correlateId", s.correlateId, "err", err) + return nil, 0, 0, nil, err } - } - - metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1) - err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool { - select { - case <-s.quit: - return false - default: + batchSize++ + if batchStartID == nil { + // set batch start id only if + // this is the first iteration + batchStartID = &d.BinID } - batch = append(batch, key[:]...) - i++ - to = idx - return i < BatchSize - }) - if err != nil { - return nil, 0, 0, nil, err - } - if len(batch) > 0 { - break + batchEndID = d.BinID + if batchSize >= BatchSize { + iterate = false + metrics.GetOrRegisterCounter("syncer.set-next-batch.full-batch", nil).Inc(1) + log.Debug("syncer pull subscription - batch size reached", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) + } + if timer == nil { + timer = time.NewTimer(batchTimeout) + } else { + log.Debug("syncer pull subscription - stopping timer", "correlateId", s.correlateId) + if !timer.Stop() { + <-timer.C + } + log.Debug("syncer pull subscription - channel drained, resetting timer", "correlateId", s.correlateId) + timer.Reset(batchTimeout) + } + timerC = timer.C + case <-timerC: + // return batch if new chunks are not + // received after some time + iterate = false + metrics.GetOrRegisterCounter("syncer.set-next-batch.timer-expire", nil).Inc(1) + log.Debug("syncer pull subscription timer expired", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) + case <-s.quit: + iterate = false + log.Debug("syncer pull subscription - quit received", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) } - wait = true } - - log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po)) - return batch, from, to, nil, nil + if batchStartID == nil { + // if batch start id is not set, return 0 + batchStartID = new(uint64) + } + return batch, *batchStartID, batchEndID, nil, nil } // SwarmSyncerClient type SwarmSyncerClient struct { - store storage.SyncChunkStore - peer *Peer - stream Stream + netStore *storage.NetStore + peer *Peer + stream Stream } // NewSwarmSyncerClient is a contructor for provable data exchange syncer -func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) { +func NewSwarmSyncerClient(p *Peer, netStore *storage.NetStore, stream Stream) (*SwarmSyncerClient, error) { return &SwarmSyncerClient{ - store: store, - peer: p, - stream: stream, + netStore: netStore, + peer: p, + stream: stream, }, nil } -// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer -// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { -// retrieveC := make(storage.Chunk, chunksCap) -// RunChunkRequestor(p, retrieveC) -// storeC := make(storage.Chunk, chunksCap) -// RunChunkStorer(store, storeC) -// s := &SwarmSyncerClient{ -// po: po, -// priority: priority, -// sessionAt: sessionAt, -// start: index, -// end: index, -// nextC: make(chan struct{}, 1), -// intervals: intervals, -// sessionRoot: sessionRoot, -// sessionReader: chunker.Join(sessionRoot, retrieveC), -// retrieveC: retrieveC, -// storeC: storeC, -// } -// return s -// } - -// // StartSyncing is called on the Peer to start the syncing process -// // the idea is that it is called only after kademlia is close to healthy -// func StartSyncing(s *Streamer, peerId enode.ID, po uint8, nn bool) { -// lastPO := po -// if nn { -// lastPO = maxPO -// } -// -// for i := po; i <= lastPO; i++ { -// s.Subscribe(peerId, "SYNC", newSyncLabel("LIVE", po), 0, 0, High, true) -// s.Subscribe(peerId, "SYNC", newSyncLabel("HISTORY", po), 0, 0, Mid, false) -// } -// } - // RegisterSwarmSyncerClient registers the client constructor function for // to handle incoming sync streams -func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) { +func RegisterSwarmSyncerClient(streamer *Registry, netStore *storage.NetStore) { streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live)) + return NewSwarmSyncerClient(p, netStore, NewStream("SYNC", t, live)) }) } // NeedData func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) { - return s.store.FetchFunc(ctx, key) + return s.netStore.FetchFunc(ctx, key) } // BatchDone |