From 3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e Mon Sep 17 00:00:00 2001 From: Balint Gabor Date: Thu, 13 Sep 2018 11:42:19 +0200 Subject: swarm: Chunk refactor (#17659) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Janos Guljas Co-authored-by: Balint Gabor Co-authored-by: Anton Evangelatov Co-authored-by: Viktor TrĂ³n --- swarm/network/stream/syncer.go | 94 ++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 49 deletions(-) (limited to 'swarm/network/stream/syncer.go') diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index d7febe4a3..e9811a678 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -28,7 +28,6 @@ import ( ) const ( - // BatchSize = 2 BatchSize = 128 ) @@ -38,35 +37,37 @@ const ( // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { po uint8 - db *storage.DBAPI + store storage.SyncChunkStore sessionAt uint64 start uint64 + live bool quit chan struct{} } // NewSwarmSyncerServer is contructor for SwarmSyncerServer -func NewSwarmSyncerServer(live bool, po uint8, db *storage.DBAPI) (*SwarmSyncerServer, error) { - sessionAt := db.CurrentBucketStorageIndex(po) +func NewSwarmSyncerServer(live bool, po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) { + sessionAt := syncChunkStore.BinIndex(po) var start uint64 if live { start = sessionAt } return &SwarmSyncerServer{ po: po, - db: db, + store: syncChunkStore, sessionAt: sessionAt, start: start, + live: live, quit: make(chan struct{}), }, nil } -func RegisterSwarmSyncerServer(streamer *Registry, db *storage.DBAPI) { +func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) { streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, live bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { return nil, err } - return NewSwarmSyncerServer(live, po, db) + return NewSwarmSyncerServer(live, po, syncChunkStore) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) @@ -78,27 +79,35 @@ func (s *SwarmSyncerServer) Close() { close(s.quit) } -// GetSection retrieves the actual chunk from localstore +// GetData retrieves the actual chunk from netstore func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - chunk, err := s.db.Get(ctx, storage.Address(key)) - if err == storage.ErrFetching { - <-chunk.ReqC - } else if err != nil { + chunk, err := s.store.Get(ctx, storage.Address(key)) + if err != nil { return nil, err } - return chunk.SData, nil + return chunk.Data(), nil } // GetBatch retrieves the next batch of hashes from the dbstore func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { var batch []byte i := 0 - if from == 0 { - from = s.start - } - if to <= from || from >= s.sessionAt { - to = math.MaxUint64 + if s.live { + if from == 0 { + from = s.start + } + if to <= from || from >= s.sessionAt { + to = math.MaxUint64 + } + } else { + if (to < from && to != 0) || from > s.sessionAt { + return nil, 0, 0, nil, nil + } + if to == 0 || to > s.sessionAt { + to = s.sessionAt + } } + var ticker *time.Ticker defer func() { if ticker != nil { @@ -119,8 +128,8 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 } metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1) - err := s.db.Iterator(from, to, s.po, func(addr storage.Address, idx uint64) bool { - batch = append(batch, addr[:]...) + err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool { + batch = append(batch, key[:]...) i++ to = idx return i < BatchSize @@ -134,7 +143,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 wait = true } - log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.db.CurrentBucketStorageIndex(s.po)) + log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po)) return batch, from, to, nil, nil } @@ -146,28 +155,26 @@ type SwarmSyncerClient struct { sessionReader storage.LazySectionReader retrieveC chan *storage.Chunk storeC chan *storage.Chunk - db *storage.DBAPI + store storage.SyncChunkStore // chunker storage.Chunker - currentRoot storage.Address - requestFunc func(chunk *storage.Chunk) - end, start uint64 - peer *Peer - ignoreExistingRequest bool - stream Stream + currentRoot storage.Address + requestFunc func(chunk *storage.Chunk) + end, start uint64 + peer *Peer + stream Stream } // NewSwarmSyncerClient is a contructor for provable data exchange syncer -func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool, stream Stream) (*SwarmSyncerClient, error) { +func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) { return &SwarmSyncerClient{ - db: db, - peer: p, - ignoreExistingRequest: ignoreExistingRequest, - stream: stream, + store: store, + peer: p, + stream: stream, }, nil } // // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer -// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Key, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { +// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { // retrieveC := make(storage.Chunk, chunksCap) // RunChunkRequestor(p, retrieveC) // storeC := make(storage.Chunk, chunksCap) @@ -204,26 +211,15 @@ func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool // RegisterSwarmSyncerClient registers the client constructor function for // to handle incoming sync streams -func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) { +func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) { streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, db, true, NewStream("SYNC", t, live)) + return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live)) }) } // NeedData -func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func()) { - chunk, _ := s.db.GetOrCreateRequest(ctx, key) - // TODO: we may want to request from this peer anyway even if the request exists - - // ignoreExistingRequest is temporary commented out until its functionality is verified. - // For now, this optimization can be disabled. - if chunk.ReqC == nil { //|| (s.ignoreExistingRequest && !created) { - return nil - } - // create request and wait until the chunk data arrives and is stored - return func() { - chunk.WaitToStore() - } +func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) { + return s.store.FetchFunc(ctx, key) } // BatchDone -- cgit v1.2.3