// Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package stream import ( "context" "strconv" "time" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage" ) const ( BatchSize = 128 ) // SwarmSyncerServer implements an Server for history syncing on bins // offered streams: // * live request delivery with or without checkback // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { po uint8 store storage.SyncChunkStore quit chan struct{} } // NewSwarmSyncerServer is constructor for SwarmSyncerServer func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) { return &SwarmSyncerServer{ po: po, store: syncChunkStore, quit: make(chan struct{}), }, nil } func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) { streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { return nil, err } return NewSwarmSyncerServer(po, syncChunkStore) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) // }) } // Close needs to be called on a stream server func (s *SwarmSyncerServer) Close() { close(s.quit) } // GetData retrieves the actual chunk from netstore func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) { chunk, err := s.store.Get(ctx, storage.Address(key)) if err != nil { return nil, err } return chunk.Data(), nil } // SessionIndex returns current storage bin (po) index. func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { return s.store.BinIndex(s.po), nil } // GetBatch retrieves the next batch of hashes from the dbstore func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { var batch []byte i := 0 var ticker *time.Ticker defer func() { if ticker != nil { ticker.Stop() } }() var wait bool for { if wait { if ticker == nil { ticker = time.NewTicker(1000 * time.Millisecond) } select { case <-ticker.C: case <-s.quit: return nil, 0, 0, nil, nil } } metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1) err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool { batch = append(batch, key[:]...) i++ to = idx return i < BatchSize }) if err != nil { return nil, 0, 0, nil, err } if len(batch) > 0 { break } wait = true } log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po)) return batch, from, to, nil, nil } // SwarmSyncerClient type SwarmSyncerClient struct { sessionAt uint64 nextC chan struct{} sessionRoot storage.Address sessionReader storage.LazySectionReader retrieveC chan *storage.Chunk storeC chan *storage.Chunk store storage.SyncChunkStore // chunker storage.Chunker currentRoot storage.Address requestFunc func(chunk *storage.Chunk) end, start uint64 peer *Peer stream Stream } // NewSwarmSyncerClient is a contructor for provable data exchange syncer func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) { return &SwarmSyncerClient{ store: store, peer: p, stream: stream, }, nil } // // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer // func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { // retrieveC := make(storage.Chunk, chunksCap) // RunChunkRequestor(p, retrieveC) // storeC := make(storage.Chunk, chunksCap) // RunChunkStorer(store, storeC) // s := &SwarmSyncerClient{ // po: po, // priority: priority, // sessionAt: sessionAt, // start: index, // end: index, // nextC: make(chan struct{}, 1), // intervals: intervals, // sessionRoot: sessionRoot, // sessionReader: chunker.Join(sessionRoot, retrieveC), // retrieveC: retrieveC, // storeC: storeC, // } // return s // } // // StartSyncing is called on the Peer to start the syncing process // // the idea is that it is called only after kademlia is close to healthy // func StartSyncing(s *Streamer, peerId enode.ID, po uint8, nn bool) { // lastPO := po // if nn { // lastPO = maxPO // } // // for i := po; i <= lastPO; i++ { // s.Subscribe(peerId, "SYNC", newSyncLabel("LIVE", po), 0, 0, High, true) // s.Subscribe(peerId, "SYNC", newSyncLabel("HISTORY", po), 0, 0, Mid, false) // } // } // RegisterSwarmSyncerClient registers the client constructor function for // to handle incoming sync streams func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) { streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) { return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live)) }) } // NeedData func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) { return s.store.FetchFunc(ctx, key) } // BatchDone func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error) { // TODO: reenable this with putter/getter refactored code // if s.chunker != nil { // return func() (*TakeoverProof, error) { return s.TakeoverProof(stream, from, hashes, root) } // } return nil } func (s *SwarmSyncerClient) TakeoverProof(stream Stream, from uint64, hashes []byte, root storage.Address) (*TakeoverProof, error) { // for provable syncer currentRoot is non-zero length // TODO: reenable this with putter/getter // if s.chunker != nil { // if from > s.sessionAt { // for live syncing currentRoot is always updated // //expRoot, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC, s.storeC) // expRoot, _, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC) // if err != nil { // return nil, err // } // if !bytes.Equal(root, expRoot) { // return nil, fmt.Errorf("HandoverProof mismatch") // } // s.currentRoot = root // } else { // expHashes := make([]byte, len(hashes)) // _, err := s.sessionReader.ReadAt(expHashes, int64(s.end*HashSize)) // if err != nil && err != io.EOF { // return nil, err // } // if !bytes.Equal(expHashes, hashes) { // return nil, errors.New("invalid proof") // } // } // return nil, nil // } s.end += uint64(len(hashes)) / HashSize takeover := &Takeover{ Stream: stream, Start: s.start, End: s.end, Root: root, } // serialise and sign return &TakeoverProof{ Takeover: takeover, Sig: nil, }, nil } func (s *SwarmSyncerClient) Close() {} // base for parsing and formating sync bin key // it must be 2 <= base <= 36 const syncBinKeyBase = 36 // FormatSyncBinKey returns a string representation of // Kademlia bin number to be used as key for SYNC stream. func FormatSyncBinKey(bin uint8) string { return strconv.FormatUint(uint64(bin), syncBinKeyBase) } // ParseSyncBinKey parses the string representation // and returns the Kademlia bin number. func ParseSyncBinKey(s string) (uint8, error) { bin, err := strconv.ParseUint(s, syncBinKeyBase, 8) if err != nil { return 0, err } return uint8(bin), nil }