diff options
Diffstat (limited to 'swarm/network/depo.go')
-rw-r--r-- | swarm/network/depo.go | 232 |
1 files changed, 0 insertions, 232 deletions
diff --git a/swarm/network/depo.go b/swarm/network/depo.go deleted file mode 100644 index 5ffbf8be1..000000000 --- a/swarm/network/depo.go +++ /dev/null @@ -1,232 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package network - -import ( - "bytes" - "encoding/binary" - "fmt" - "time" - - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/swarm/storage" -) - -//metrics variables -var ( - syncReceiveCount = metrics.NewRegisteredCounter("network.sync.recv.count", nil) - syncReceiveIgnore = metrics.NewRegisteredCounter("network.sync.recv.ignore", nil) - syncSendCount = metrics.NewRegisteredCounter("network.sync.send.count", nil) - syncSendRefused = metrics.NewRegisteredCounter("network.sync.send.refused", nil) - syncSendNotFound = metrics.NewRegisteredCounter("network.sync.send.notfound", nil) -) - -// Handler for storage/retrieval related protocol requests -// implements the StorageHandler interface used by the bzz protocol -type Depo struct { - hashfunc storage.SwarmHasher - localStore storage.ChunkStore - netStore storage.ChunkStore -} - -func NewDepo(hash storage.SwarmHasher, localStore, remoteStore storage.ChunkStore) *Depo { - return &Depo{ - hashfunc: hash, - localStore: localStore, - netStore: remoteStore, // entrypoint internal - } -} - -// Handles UnsyncedKeysMsg after msg decoding - unsynced hashes upto sync state -// * the remote sync state is just stored and handled in protocol -// * filters through the new syncRequests and send the ones missing -// * back immediately as a deliveryRequest message -// * empty message just pings back for more (is this needed?) -// * strict signed sync states may be needed. -func (self *Depo) HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error { - unsynced := req.Unsynced - var missing []*syncRequest - var chunk *storage.Chunk - var err error - for _, req := range unsynced { - // skip keys that are found, - chunk, err = self.localStore.Get(req.Key[:]) - if err != nil || chunk.SData == nil { - missing = append(missing, req) - } - } - log.Debug(fmt.Sprintf("Depo.HandleUnsyncedKeysMsg: received %v unsynced keys: %v missing. new state: %v", len(unsynced), len(missing), req.State)) - log.Trace(fmt.Sprintf("Depo.HandleUnsyncedKeysMsg: received %v", unsynced)) - // send delivery request with missing keys - err = p.deliveryRequest(missing) - if err != nil { - return err - } - // set peers state to persist - p.syncState = req.State - return nil -} - -// Handles deliveryRequestMsg -// * serves actual chunks asked by the remote peer -// by pushing to the delivery queue (sync db) of the correct priority -// (remote peer is free to reprioritize) -// * the message implies remote peer wants more, so trigger for -// * new outgoing unsynced keys message is fired -func (self *Depo) HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error { - deliver := req.Deliver - // queue the actual delivery of a chunk () - log.Trace(fmt.Sprintf("Depo.HandleDeliveryRequestMsg: received %v delivery requests: %v", len(deliver), deliver)) - for _, sreq := range deliver { - // TODO: look up in cache here or in deliveries - // priorities are taken from the message so the remote party can - // reprioritise to at their leisure - // r = self.pullCached(sreq.Key) // pulls and deletes from cache - Push(p, sreq.Key, sreq.Priority) - } - - // sends it out as unsyncedKeysMsg - p.syncer.sendUnsyncedKeys() - return nil -} - -// the entrypoint for store requests coming from the bzz wire protocol -// if key found locally, return. otherwise -// remote is untrusted, so hash is verified and chunk passed on to NetStore -func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) { - var islocal bool - req.from = p - chunk, err := self.localStore.Get(req.Key) - switch { - case err != nil: - log.Trace(fmt.Sprintf("Depo.handleStoreRequest: %v not found locally. create new chunk/request", req.Key)) - // not found in memory cache, ie., a genuine store request - // create chunk - syncReceiveCount.Inc(1) - chunk = storage.NewChunk(req.Key, nil) - - case chunk.SData == nil: - // found chunk in memory store, needs the data, validate now - log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v. request entry found", req)) - - default: - // data is found, store request ignored - // this should update access count? - syncReceiveIgnore.Inc(1) - log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v found locally. ignore.", req)) - islocal = true - //return - } - - hasher := self.hashfunc() - hasher.Write(req.SData) - if !bytes.Equal(hasher.Sum(nil), req.Key) { - // data does not validate, ignore - // TODO: peer should be penalised/dropped? - log.Warn(fmt.Sprintf("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req)) - return - } - - if islocal { - return - } - // update chunk with size and data - chunk.SData = req.SData // protocol validates that SData is minimum 9 bytes long (int64 size + at least one byte of data) - chunk.Size = int64(binary.LittleEndian.Uint64(req.SData[0:8])) - log.Trace(fmt.Sprintf("delivery of %v from %v", chunk, p)) - chunk.Source = p - self.netStore.Put(chunk) -} - -// entrypoint for retrieve requests coming from the bzz wire protocol -// checks swap balance - return if peer has no credit -func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer) { - req.from = p - // swap - record credit for 1 request - // note that only charge actual reqsearches - var err error - if p.swap != nil { - err = p.swap.Add(1) - } - if err != nil { - log.Warn(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - cannot process request: %v", req.Key.Log(), err)) - return - } - - // call storage.NetStore#Get which - // blocks until local retrieval finished - // launches cloud retrieval - chunk, _ := self.netStore.Get(req.Key) - req = self.strategyUpdateRequest(chunk.Req, req) - // check if we can immediately deliver - if chunk.SData != nil { - log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, delivering...", req.Key.Log())) - - if req.MaxSize == 0 || int64(req.MaxSize) >= chunk.Size { - sreq := &storeRequestMsgData{ - Id: req.Id, - Key: chunk.Key, - SData: chunk.SData, - requestTimeout: req.timeout, // - } - syncSendCount.Inc(1) - p.syncer.addRequest(sreq, DeliverReq) - } else { - syncSendRefused.Inc(1) - log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, not wanted", req.Key.Log())) - } - } else { - syncSendNotFound.Inc(1) - log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content not found locally. asked swarm for help. will get back", req.Key.Log())) - } -} - -// add peer request the chunk and decides the timeout for the response if still searching -func (self *Depo) strategyUpdateRequest(rs *storage.RequestStatus, origReq *retrieveRequestMsgData) (req *retrieveRequestMsgData) { - log.Trace(fmt.Sprintf("Depo.strategyUpdateRequest: key %v", origReq.Key.Log())) - // we do not create an alternative one - req = origReq - if rs != nil { - self.addRequester(rs, req) - req.setTimeout(self.searchTimeout(rs, req)) - } - return -} - -// decides the timeout promise sent with the immediate peers response to a retrieve request -// if timeout is explicitly set and expired -func (self *Depo) searchTimeout(rs *storage.RequestStatus, req *retrieveRequestMsgData) (timeout *time.Time) { - reqt := req.getTimeout() - t := time.Now().Add(searchTimeout) - if reqt != nil && reqt.Before(t) { - return reqt - } else { - return &t - } -} - -/* -adds a new peer to an existing open request -only add if less than requesterCount peers forwarded the same request id so far -note this is done irrespective of status (searching or found) -*/ -func (self *Depo) addRequester(rs *storage.RequestStatus, req *retrieveRequestMsgData) { - log.Trace(fmt.Sprintf("Depo.addRequester: key %v - add peer to req.Id %v", req.Key.Log(), req.Id)) - list := rs.Requesters[req.Id] - rs.Requesters[req.Id] = append(list, req) -} |