diff options
Diffstat (limited to 'swarm/network/stream/syncer.go')
-rw-r--r-- | swarm/network/stream/syncer.go | 297 |
1 files changed, 297 insertions, 0 deletions
diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go new file mode 100644 index 000000000..5510b2409 --- /dev/null +++ b/swarm/network/stream/syncer.go @@ -0,0 +1,297 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package stream + +import ( + "math" + "strconv" + "time" + + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +const ( + // BatchSize = 2 + BatchSize = 128 +) + +// SwarmSyncerServer implements an Server for history syncing on bins +// offered streams: +// * live request delivery with or without checkback +// * (live/non-live historical) chunk syncing per proximity bin +type SwarmSyncerServer struct { + po uint8 + db *storage.DBAPI + sessionAt uint64 + start uint64 + quit chan struct{} +} + +// NewSwarmSyncerServer is contructor for SwarmSyncerServer +func NewSwarmSyncerServer(live bool, po uint8, db *storage.DBAPI) (*SwarmSyncerServer, error) { + sessionAt := db.CurrentBucketStorageIndex(po) + var start uint64 + if live { + start = sessionAt + } + return &SwarmSyncerServer{ + po: po, + db: db, + sessionAt: sessionAt, + start: start, + quit: make(chan struct{}), + }, nil +} + +func RegisterSwarmSyncerServer(streamer *Registry, db *storage.DBAPI) { + streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, live bool) (Server, error) { + po, err := ParseSyncBinKey(t) + if err != nil { + return nil, err + } + return NewSwarmSyncerServer(live, po, db) + }) + // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { + // return NewOutgoingProvableSwarmSyncer(po, db) + // }) +} + +// Close needs to be called on a stream server +func (s *SwarmSyncerServer) Close() { + close(s.quit) +} + +// GetSection retrieves the actual chunk from localstore +func (s *SwarmSyncerServer) GetData(key []byte) ([]byte, error) { + chunk, err := s.db.Get(storage.Address(key)) + if err == storage.ErrFetching { + <-chunk.ReqC + } else if err != nil { + return nil, err + } + return chunk.SData, nil +} + +// GetBatch retrieves the next batch of hashes from the dbstore +func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { + var batch []byte + i := 0 + if from == 0 { + from = s.start + } + if to <= from || from >= s.sessionAt { + to = math.MaxUint64 + } + var ticker *time.Ticker + defer func() { + if ticker != nil { + ticker.Stop() + } + }() + var wait bool + for { + if wait { + if ticker == nil { + ticker = time.NewTicker(1000 * time.Millisecond) + } + select { + case <-ticker.C: + case <-s.quit: + return nil, 0, 0, nil, nil + } + } + + metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1) + err := s.db.Iterator(from, to, s.po, func(addr storage.Address, idx uint64) bool { + batch = append(batch, addr[:]...) + i++ + to = idx + return i < BatchSize + }) + if err != nil { + return nil, 0, 0, nil, err + } + if len(batch) > 0 { + break + } + wait = true + } + + log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.db.CurrentBucketStorageIndex(s.po)) + return batch, from, to, nil, nil +} + +// SwarmSyncerClient +type SwarmSyncerClient struct { + sessionAt uint64 + nextC chan struct{} + sessionRoot storage.Address + sessionReader storage.LazySectionReader + retrieveC chan *storage.Chunk + storeC chan *storage.Chunk + db *storage.DBAPI + // chunker storage.Chunker + currentRoot storage.Address + requestFunc func(chunk *storage.Chunk) + end, start uint64 + peer *Peer + ignoreExistingRequest bool + stream Stream +} + +// NewSwarmSyncerClient is a contructor for provable data exchange syncer +func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool, stream Stream) (*SwarmSyncerClient, error) { + return &SwarmSyncerClient{ + db: db, + peer: p, + ignoreExistingRequest: ignoreExistingRequest, + stream: stream, + }, nil +} + +// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer +// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Key, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { +// retrieveC := make(storage.Chunk, chunksCap) +// RunChunkRequestor(p, retrieveC) +// storeC := make(storage.Chunk, chunksCap) +// RunChunkStorer(store, storeC) +// s := &SwarmSyncerClient{ +// po: po, +// priority: priority, +// sessionAt: sessionAt, +// start: index, +// end: index, +// nextC: make(chan struct{}, 1), +// intervals: intervals, +// sessionRoot: sessionRoot, +// sessionReader: chunker.Join(sessionRoot, retrieveC), +// retrieveC: retrieveC, +// storeC: storeC, +// } +// return s +// } + +// // StartSyncing is called on the Peer to start the syncing process +// // the idea is that it is called only after kademlia is close to healthy +// func StartSyncing(s *Streamer, peerId discover.NodeID, po uint8, nn bool) { +// lastPO := po +// if nn { +// lastPO = maxPO +// } +// +// for i := po; i <= lastPO; i++ { +// s.Subscribe(peerId, "SYNC", newSyncLabel("LIVE", po), 0, 0, High, true) +// s.Subscribe(peerId, "SYNC", newSyncLabel("HISTORY", po), 0, 0, Mid, false) +// } +// } + +// RegisterSwarmSyncerClient registers the client constructor function for +// to handle incoming sync streams +func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) { + streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) { + return NewSwarmSyncerClient(p, db, true, NewStream("SYNC", t, live)) + }) +} + +// NeedData +func (s *SwarmSyncerClient) NeedData(key []byte) (wait func()) { + chunk, _ := s.db.GetOrCreateRequest(key) + // TODO: we may want to request from this peer anyway even if the request exists + + // ignoreExistingRequest is temporary commented out until its functionality is verified. + // For now, this optimization can be disabled. + if chunk.ReqC == nil { //|| (s.ignoreExistingRequest && !created) { + return nil + } + // create request and wait until the chunk data arrives and is stored + return func() { + chunk.WaitToStore() + } +} + +// BatchDone +func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error) { + // TODO: reenable this with putter/getter refactored code + // if s.chunker != nil { + // return func() (*TakeoverProof, error) { return s.TakeoverProof(stream, from, hashes, root) } + // } + return nil +} + +func (s *SwarmSyncerClient) TakeoverProof(stream Stream, from uint64, hashes []byte, root storage.Address) (*TakeoverProof, error) { + // for provable syncer currentRoot is non-zero length + // TODO: reenable this with putter/getter + // if s.chunker != nil { + // if from > s.sessionAt { // for live syncing currentRoot is always updated + // //expRoot, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC, s.storeC) + // expRoot, _, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC) + // if err != nil { + // return nil, err + // } + // if !bytes.Equal(root, expRoot) { + // return nil, fmt.Errorf("HandoverProof mismatch") + // } + // s.currentRoot = root + // } else { + // expHashes := make([]byte, len(hashes)) + // _, err := s.sessionReader.ReadAt(expHashes, int64(s.end*HashSize)) + // if err != nil && err != io.EOF { + // return nil, err + // } + // if !bytes.Equal(expHashes, hashes) { + // return nil, errors.New("invalid proof") + // } + // } + // return nil, nil + // } + s.end += uint64(len(hashes)) / HashSize + takeover := &Takeover{ + Stream: stream, + Start: s.start, + End: s.end, + Root: root, + } + // serialise and sign + return &TakeoverProof{ + Takeover: takeover, + Sig: nil, + }, nil +} + +func (s *SwarmSyncerClient) Close() {} + +// base for parsing and formating sync bin key +// it must be 2 <= base <= 36 +const syncBinKeyBase = 36 + +// FormatSyncBinKey returns a string representation of +// Kademlia bin number to be used as key for SYNC stream. +func FormatSyncBinKey(bin uint8) string { + return strconv.FormatUint(uint64(bin), syncBinKeyBase) +} + +// ParseSyncBinKey parses the string representation +// and returns the Kademlia bin number. +func ParseSyncBinKey(s string) (uint8, error) { + bin, err := strconv.ParseUint(s, syncBinKeyBase, 8) + if err != nil { + return 0, err + } + return uint8(bin), nil +} |