aboutsummaryrefslogblamecommitdiffstats
path: root/swarm/network/stream/syncer.go
blob: 4bfbac8b0dbab621510f098373c643ec409b727f (plain) (tree)


















                                                                                  
                 








                                                       







                                                                     


                                    

 

                                                                                                        
                                  


                                           


              
                                                                                           
                                                                                             



                                             
                                                               










                                                                              
                                                   
                                                                                      

                                                            

                               
                                

 




                                                            



                                                                                                           
 



















                                                                                        

                                                                                                    












                                                  
                                                                                                                                          










                                               
                                            
                                                




                                              


                                                                         
                                                                                                             
                                  


                               



                                                                                      
                                                                                                                                                                                                                                     





















                                                                           
                                                                       












                                                                                          
                                                                                  
                                                                                                
                                                                                 



           

                                                                                                          







































































                                                                                                                                    
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package stream

import (
    "context"
    "strconv"
    "time"

    "github.com/ethereum/go-ethereum/metrics"
    "github.com/ethereum/go-ethereum/swarm/log"
    "github.com/ethereum/go-ethereum/swarm/storage"
)

const (
    BatchSize = 128
)

// SwarmSyncerServer implements an Server for history syncing on bins
// offered streams:
// * live request delivery with or without checkback
// * (live/non-live historical) chunk syncing per proximity bin
type SwarmSyncerServer struct {
    po    uint8
    store storage.SyncChunkStore
    quit  chan struct{}
}

// NewSwarmSyncerServer is constructor for SwarmSyncerServer
func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) {
    return &SwarmSyncerServer{
        po:    po,
        store: syncChunkStore,
        quit:  make(chan struct{}),
    }, nil
}

func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) {
    streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) {
        po, err := ParseSyncBinKey(t)
        if err != nil {
            return nil, err
        }
        return NewSwarmSyncerServer(po, syncChunkStore)
    })
    // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) {
    //  return NewOutgoingProvableSwarmSyncer(po, db)
    // })
}

// Close needs to be called on a stream server
func (s *SwarmSyncerServer) Close() {
    close(s.quit)
}

// GetData retrieves the actual chunk from netstore
func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
    chunk, err := s.store.Get(ctx, storage.Address(key))
    if err != nil {
        return nil, err
    }
    return chunk.Data(), nil
}

// SessionIndex returns current storage bin (po) index.
func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
    return s.store.BinIndex(s.po), nil
}

// GetBatch retrieves the next batch of hashes from the dbstore
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
    var batch []byte
    i := 0

    var ticker *time.Ticker
    defer func() {
        if ticker != nil {
            ticker.Stop()
        }
    }()
    var wait bool
    for {
        if wait {
            if ticker == nil {
                ticker = time.NewTicker(1000 * time.Millisecond)
            }
            select {
            case <-ticker.C:
            case <-s.quit:
                return nil, 0, 0, nil, nil
            }
        }

        metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1)
        err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool {
            batch = append(batch, key[:]...)
            i++
            to = idx
            return i < BatchSize
        })
        if err != nil {
            return nil, 0, 0, nil, err
        }
        if len(batch) > 0 {
            break
        }
        wait = true
    }

    log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po))
    return batch, from, to, nil, nil
}

// SwarmSyncerClient
type SwarmSyncerClient struct {
    sessionAt     uint64
    nextC         chan struct{}
    sessionRoot   storage.Address
    sessionReader storage.LazySectionReader
    retrieveC     chan *storage.Chunk
    storeC        chan *storage.Chunk
    store         storage.SyncChunkStore
    // chunker               storage.Chunker
    currentRoot storage.Address
    requestFunc func(chunk *storage.Chunk)
    end, start  uint64
    peer        *Peer
    stream      Stream
}

// NewSwarmSyncerClient is a contructor for provable data exchange syncer
func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) {
    return &SwarmSyncerClient{
        store:  store,
        peer:   p,
        stream: stream,
    }, nil
}

// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer
// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient {
//  retrieveC := make(storage.Chunk, chunksCap)
//  RunChunkRequestor(p, retrieveC)
//  storeC := make(storage.Chunk, chunksCap)
//  RunChunkStorer(store, storeC)
//  s := &SwarmSyncerClient{
//      po:            po,
//      priority:      priority,
//      sessionAt:     sessionAt,
//      start:         index,
//      end:           index,
//      nextC:         make(chan struct{}, 1),
//      intervals:     intervals,
//      sessionRoot:   sessionRoot,
//      sessionReader: chunker.Join(sessionRoot, retrieveC),
//      retrieveC:     retrieveC,
//      storeC:        storeC,
//  }
//  return s
// }

// // StartSyncing is called on the Peer to start the syncing process
// // the idea is that it is called only after kademlia is close to healthy
// func StartSyncing(s *Streamer, peerId enode.ID, po uint8, nn bool) {
//  lastPO := po
//  if nn {
//      lastPO = maxPO
//  }
//
//  for i := po; i <= lastPO; i++ {
//      s.Subscribe(peerId, "SYNC", newSyncLabel("LIVE", po), 0, 0, High, true)
//      s.Subscribe(peerId, "SYNC", newSyncLabel("HISTORY", po), 0, 0, Mid, false)
//  }
// }

// RegisterSwarmSyncerClient registers the client constructor function for
// to handle incoming sync streams
func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) {
    streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) {
        return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live))
    })
}

// NeedData
func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) {
    return s.store.FetchFunc(ctx, key)
}

// BatchDone
func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error) {
    // TODO: reenable this with putter/getter refactored code
    // if s.chunker != nil {
    //  return func() (*TakeoverProof, error) { return s.TakeoverProof(stream, from, hashes, root) }
    // }
    return nil
}

func (s *SwarmSyncerClient) TakeoverProof(stream Stream, from uint64, hashes []byte, root storage.Address) (*TakeoverProof, error) {
    // for provable syncer currentRoot is non-zero length
    // TODO: reenable this with putter/getter
    // if s.chunker != nil {
    //  if from > s.sessionAt { // for live syncing currentRoot is always updated
    //      //expRoot, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC, s.storeC)
    //      expRoot, _, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC)
    //      if err != nil {
    //          return nil, err
    //      }
    //      if !bytes.Equal(root, expRoot) {
    //          return nil, fmt.Errorf("HandoverProof mismatch")
    //      }
    //      s.currentRoot = root
    //  } else {
    //      expHashes := make([]byte, len(hashes))
    //      _, err := s.sessionReader.ReadAt(expHashes, int64(s.end*HashSize))
    //      if err != nil && err != io.EOF {
    //          return nil, err
    //      }
    //      if !bytes.Equal(expHashes, hashes) {
    //          return nil, errors.New("invalid proof")
    //      }
    //  }
    //  return nil, nil
    // }
    s.end += uint64(len(hashes)) / HashSize
    takeover := &Takeover{
        Stream: stream,
        Start:  s.start,
        End:    s.end,
        Root:   root,
    }
    // serialise and sign
    return &TakeoverProof{
        Takeover: takeover,
        Sig:      nil,
    }, nil
}

func (s *SwarmSyncerClient) Close() {}

// base for parsing and formating sync bin key
// it must be 2 <= base <= 36
const syncBinKeyBase = 36

// FormatSyncBinKey returns a string representation of
// Kademlia bin number to be used as key for SYNC stream.
func FormatSyncBinKey(bin uint8) string {
    return strconv.FormatUint(uint64(bin), syncBinKeyBase)
}

// ParseSyncBinKey parses the string representation
// and returns the Kademlia bin number.
func ParseSyncBinKey(s string) (uint8, error) {
    bin, err := strconv.ParseUint(s, syncBinKeyBase, 8)
    if err != nil {
        return 0, err
    }
    return uint8(bin), nil
}