aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/depo.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/depo.go')
-rw-r--r--swarm/network/depo.go232
1 files changed, 0 insertions, 232 deletions
diff --git a/swarm/network/depo.go b/swarm/network/depo.go
deleted file mode 100644
index 5ffbf8be1..000000000
--- a/swarm/network/depo.go
+++ /dev/null
@@ -1,232 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package network
-
-import (
- "bytes"
- "encoding/binary"
- "fmt"
- "time"
-
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/swarm/storage"
-)
-
-//metrics variables
-var (
- syncReceiveCount = metrics.NewRegisteredCounter("network.sync.recv.count", nil)
- syncReceiveIgnore = metrics.NewRegisteredCounter("network.sync.recv.ignore", nil)
- syncSendCount = metrics.NewRegisteredCounter("network.sync.send.count", nil)
- syncSendRefused = metrics.NewRegisteredCounter("network.sync.send.refused", nil)
- syncSendNotFound = metrics.NewRegisteredCounter("network.sync.send.notfound", nil)
-)
-
-// Handler for storage/retrieval related protocol requests
-// implements the StorageHandler interface used by the bzz protocol
-type Depo struct {
- hashfunc storage.SwarmHasher
- localStore storage.ChunkStore
- netStore storage.ChunkStore
-}
-
-func NewDepo(hash storage.SwarmHasher, localStore, remoteStore storage.ChunkStore) *Depo {
- return &Depo{
- hashfunc: hash,
- localStore: localStore,
- netStore: remoteStore, // entrypoint internal
- }
-}
-
-// Handles UnsyncedKeysMsg after msg decoding - unsynced hashes upto sync state
-// * the remote sync state is just stored and handled in protocol
-// * filters through the new syncRequests and send the ones missing
-// * back immediately as a deliveryRequest message
-// * empty message just pings back for more (is this needed?)
-// * strict signed sync states may be needed.
-func (self *Depo) HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error {
- unsynced := req.Unsynced
- var missing []*syncRequest
- var chunk *storage.Chunk
- var err error
- for _, req := range unsynced {
- // skip keys that are found,
- chunk, err = self.localStore.Get(req.Key[:])
- if err != nil || chunk.SData == nil {
- missing = append(missing, req)
- }
- }
- log.Debug(fmt.Sprintf("Depo.HandleUnsyncedKeysMsg: received %v unsynced keys: %v missing. new state: %v", len(unsynced), len(missing), req.State))
- log.Trace(fmt.Sprintf("Depo.HandleUnsyncedKeysMsg: received %v", unsynced))
- // send delivery request with missing keys
- err = p.deliveryRequest(missing)
- if err != nil {
- return err
- }
- // set peers state to persist
- p.syncState = req.State
- return nil
-}
-
-// Handles deliveryRequestMsg
-// * serves actual chunks asked by the remote peer
-// by pushing to the delivery queue (sync db) of the correct priority
-// (remote peer is free to reprioritize)
-// * the message implies remote peer wants more, so trigger for
-// * new outgoing unsynced keys message is fired
-func (self *Depo) HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error {
- deliver := req.Deliver
- // queue the actual delivery of a chunk ()
- log.Trace(fmt.Sprintf("Depo.HandleDeliveryRequestMsg: received %v delivery requests: %v", len(deliver), deliver))
- for _, sreq := range deliver {
- // TODO: look up in cache here or in deliveries
- // priorities are taken from the message so the remote party can
- // reprioritise to at their leisure
- // r = self.pullCached(sreq.Key) // pulls and deletes from cache
- Push(p, sreq.Key, sreq.Priority)
- }
-
- // sends it out as unsyncedKeysMsg
- p.syncer.sendUnsyncedKeys()
- return nil
-}
-
-// the entrypoint for store requests coming from the bzz wire protocol
-// if key found locally, return. otherwise
-// remote is untrusted, so hash is verified and chunk passed on to NetStore
-func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
- var islocal bool
- req.from = p
- chunk, err := self.localStore.Get(req.Key)
- switch {
- case err != nil:
- log.Trace(fmt.Sprintf("Depo.handleStoreRequest: %v not found locally. create new chunk/request", req.Key))
- // not found in memory cache, ie., a genuine store request
- // create chunk
- syncReceiveCount.Inc(1)
- chunk = storage.NewChunk(req.Key, nil)
-
- case chunk.SData == nil:
- // found chunk in memory store, needs the data, validate now
- log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v. request entry found", req))
-
- default:
- // data is found, store request ignored
- // this should update access count?
- syncReceiveIgnore.Inc(1)
- log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v found locally. ignore.", req))
- islocal = true
- //return
- }
-
- hasher := self.hashfunc()
- hasher.Write(req.SData)
- if !bytes.Equal(hasher.Sum(nil), req.Key) {
- // data does not validate, ignore
- // TODO: peer should be penalised/dropped?
- log.Warn(fmt.Sprintf("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req))
- return
- }
-
- if islocal {
- return
- }
- // update chunk with size and data
- chunk.SData = req.SData // protocol validates that SData is minimum 9 bytes long (int64 size + at least one byte of data)
- chunk.Size = int64(binary.LittleEndian.Uint64(req.SData[0:8]))
- log.Trace(fmt.Sprintf("delivery of %v from %v", chunk, p))
- chunk.Source = p
- self.netStore.Put(chunk)
-}
-
-// entrypoint for retrieve requests coming from the bzz wire protocol
-// checks swap balance - return if peer has no credit
-func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer) {
- req.from = p
- // swap - record credit for 1 request
- // note that only charge actual reqsearches
- var err error
- if p.swap != nil {
- err = p.swap.Add(1)
- }
- if err != nil {
- log.Warn(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - cannot process request: %v", req.Key.Log(), err))
- return
- }
-
- // call storage.NetStore#Get which
- // blocks until local retrieval finished
- // launches cloud retrieval
- chunk, _ := self.netStore.Get(req.Key)
- req = self.strategyUpdateRequest(chunk.Req, req)
- // check if we can immediately deliver
- if chunk.SData != nil {
- log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, delivering...", req.Key.Log()))
-
- if req.MaxSize == 0 || int64(req.MaxSize) >= chunk.Size {
- sreq := &storeRequestMsgData{
- Id: req.Id,
- Key: chunk.Key,
- SData: chunk.SData,
- requestTimeout: req.timeout, //
- }
- syncSendCount.Inc(1)
- p.syncer.addRequest(sreq, DeliverReq)
- } else {
- syncSendRefused.Inc(1)
- log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, not wanted", req.Key.Log()))
- }
- } else {
- syncSendNotFound.Inc(1)
- log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content not found locally. asked swarm for help. will get back", req.Key.Log()))
- }
-}
-
-// add peer request the chunk and decides the timeout for the response if still searching
-func (self *Depo) strategyUpdateRequest(rs *storage.RequestStatus, origReq *retrieveRequestMsgData) (req *retrieveRequestMsgData) {
- log.Trace(fmt.Sprintf("Depo.strategyUpdateRequest: key %v", origReq.Key.Log()))
- // we do not create an alternative one
- req = origReq
- if rs != nil {
- self.addRequester(rs, req)
- req.setTimeout(self.searchTimeout(rs, req))
- }
- return
-}
-
-// decides the timeout promise sent with the immediate peers response to a retrieve request
-// if timeout is explicitly set and expired
-func (self *Depo) searchTimeout(rs *storage.RequestStatus, req *retrieveRequestMsgData) (timeout *time.Time) {
- reqt := req.getTimeout()
- t := time.Now().Add(searchTimeout)
- if reqt != nil && reqt.Before(t) {
- return reqt
- } else {
- return &t
- }
-}
-
-/*
-adds a new peer to an existing open request
-only add if less than requesterCount peers forwarded the same request id so far
-note this is done irrespective of status (searching or found)
-*/
-func (self *Depo) addRequester(rs *storage.RequestStatus, req *retrieveRequestMsgData) {
- log.Trace(fmt.Sprintf("Depo.addRequester: key %v - add peer to req.Id %v", req.Key.Log(), req.Id))
- list := rs.Requesters[req.Id]
- rs.Requesters[req.Id] = append(list, req)
-}