aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/netstore.go
diff options
context:
space:
mode:
authorethersphere <thesw@rm.eth>2018-06-20 20:06:27 +0800
committerethersphere <thesw@rm.eth>2018-06-22 03:10:31 +0800
commite187711c6545487d4cac3701f0f506bb536234e2 (patch)
treed2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/storage/netstore.go
parent574378edb50c907b532946a1d4654dbd6701b20a (diff)
downloaddexon-e187711c6545487d4cac3701f0f506bb536234e2.tar
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst
dexon-e187711c6545487d4cac3701f0f506bb536234e2.zip
swarm: network rewrite merge
Diffstat (limited to 'swarm/storage/netstore.go')
-rw-r--r--swarm/storage/netstore.go229
1 files changed, 137 insertions, 92 deletions
diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go
index 0552b84ef..6a205cfa4 100644
--- a/swarm/storage/netstore.go
+++ b/swarm/storage/netstore.go
@@ -17,120 +17,165 @@
package storage
import (
- "fmt"
- "path/filepath"
"time"
- "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/swarm/log"
)
-/*
-NetStore is a cloud storage access abstaction layer for swarm
-it contains the shared logic of network served chunk store/retrieval requests
-both local (coming from DPA api) and remote (coming from peers via bzz protocol)
-it implements the ChunkStore interface and embeds LocalStore
-
-It is called by the bzz protocol instances via Depo (the store/retrieve request handler)
-a protocol instance is running on each peer, so this is heavily parallelised.
-NetStore falls back to a backend (CloudStorage interface)
-implemented by bzz/network/forwarder. forwarder or IPFS or IPΞS
-*/
+var (
+ // NetStore.Get timeout for get and get retries
+ // This is the maximum period that the Get will block.
+ // If it is reached, Get will return ErrChunkNotFound.
+ netStoreRetryTimeout = 30 * time.Second
+ // Minimal period between calling get method on NetStore
+ // on retry. It protects calling get very frequently if
+ // it returns ErrChunkNotFound very fast.
+ netStoreMinRetryDelay = 3 * time.Second
+ // Timeout interval before retrieval is timed out.
+ // It is used in NetStore.get on waiting for ReqC to be
+ // closed on a single retrieve request.
+ searchTimeout = 10 * time.Second
+)
+
+// NetStore implements the ChunkStore interface,
+// this chunk access layer assumed 2 chunk stores
+// local storage eg. LocalStore and network storage eg., NetStore
+// access by calling network is blocking with a timeout
type NetStore struct {
- hashfunc SwarmHasher
localStore *LocalStore
- cloud CloudStore
+ retrieve func(chunk *Chunk) error
}
-// backend engine for cloud store
-// It can be aggregate dispatching to several parallel implementations:
-// bzz/network/forwarder. forwarder or IPFS or IPΞS
-type CloudStore interface {
- Store(*Chunk)
- Deliver(*Chunk)
- Retrieve(*Chunk)
+func NewNetStore(localStore *LocalStore, retrieve func(chunk *Chunk) error) *NetStore {
+ return &NetStore{localStore, retrieve}
}
-type StoreParams struct {
- ChunkDbPath string
- DbCapacity uint64
- CacheCapacity uint
- Radius int
-}
+// Get is the entrypoint for local retrieve requests
+// waits for response or times out
+//
+// Get uses get method to retrieve request, but retries if the
+// ErrChunkNotFound is returned by get, until the netStoreRetryTimeout
+// is reached.
+func (ns *NetStore) Get(addr Address) (chunk *Chunk, err error) {
+ timer := time.NewTimer(netStoreRetryTimeout)
+ defer timer.Stop()
-//create params with default values
-func NewDefaultStoreParams() (self *StoreParams) {
- return &StoreParams{
- DbCapacity: defaultDbCapacity,
- CacheCapacity: defaultCacheCapacity,
- Radius: defaultRadius,
+ // result and resultC provide results from the goroutine
+ // where NetStore.get is called.
+ type result struct {
+ chunk *Chunk
+ err error
}
-}
+ resultC := make(chan result)
-//this can only finally be set after all config options (file, cmd line, env vars)
-//have been evaluated
-func (self *StoreParams) Init(path string) {
- self.ChunkDbPath = filepath.Join(path, "chunks")
-}
+ // quitC ensures that retring goroutine is terminated
+ // when this function returns.
+ quitC := make(chan struct{})
+ defer close(quitC)
-// netstore contructor, takes path argument that is used to initialise dbStore,
-// the persistent (disk) storage component of LocalStore
-// the second argument is the hive, the connection/logistics manager for the node
-func NewNetStore(hash SwarmHasher, lstore *LocalStore, cloud CloudStore, params *StoreParams) *NetStore {
- return &NetStore{
- hashfunc: hash,
- localStore: lstore,
- cloud: cloud,
+ // do retries in a goroutine so that the timer can
+ // force this method to return after the netStoreRetryTimeout.
+ go func() {
+ // limiter ensures that NetStore.get is not called more frequently
+ // then netStoreMinRetryDelay. If NetStore.get takes longer
+ // then netStoreMinRetryDelay, the next retry call will be
+ // without a delay.
+ limiter := time.NewTimer(netStoreMinRetryDelay)
+ defer limiter.Stop()
+
+ for {
+ chunk, err := ns.get(addr, 0)
+ if err != ErrChunkNotFound {
+ // break retry only if the error is nil
+ // or other error then ErrChunkNotFound
+ select {
+ case <-quitC:
+ // Maybe NetStore.Get function has returned
+ // by the timer.C while we were waiting for the
+ // results. Terminate this goroutine.
+ case resultC <- result{chunk: chunk, err: err}:
+ // Send the result to the parrent goroutine.
+ }
+ return
+
+ }
+ select {
+ case <-quitC:
+ // NetStore.Get function has returned, possibly
+ // by the timer.C, which makes this goroutine
+ // not needed.
+ return
+ case <-limiter.C:
+ }
+ // Reset the limiter for the next iteration.
+ limiter.Reset(netStoreMinRetryDelay)
+ log.Debug("NetStore.Get retry chunk", "key", addr)
+ }
+ }()
+
+ select {
+ case r := <-resultC:
+ return r.chunk, r.err
+ case <-timer.C:
+ return nil, ErrChunkNotFound
}
}
-var (
- // timeout interval before retrieval is timed out
- searchTimeout = 3 * time.Second
-)
+// GetWithTimeout makes a single retrieval attempt for a chunk with a explicit timeout parameter
+func (ns *NetStore) GetWithTimeout(addr Address, timeout time.Duration) (chunk *Chunk, err error) {
+ return ns.get(addr, timeout)
+}
-// store logic common to local and network chunk store requests
-// ~ unsafe put in localdb no check if exists no extra copy no hash validation
-// the chunk is forced to propagate (Cloud.Store) even if locally found!
-// caller needs to make sure if that is wanted
-func (self *NetStore) Put(entry *Chunk) {
- self.localStore.Put(entry)
-
- // handle deliveries
- if entry.Req != nil {
- log.Trace(fmt.Sprintf("NetStore.Put: localStore.Put %v hit existing request...delivering", entry.Key.Log()))
- // closing C signals to other routines (local requests)
- // that the chunk is has been retrieved
- close(entry.Req.C)
- // deliver the chunk to requesters upstream
- go self.cloud.Deliver(entry)
- } else {
- log.Trace(fmt.Sprintf("NetStore.Put: localStore.Put %v stored locally", entry.Key.Log()))
- // handle propagating store requests
- // go self.cloud.Store(entry)
- go self.cloud.Store(entry)
+func (ns *NetStore) get(addr Address, timeout time.Duration) (chunk *Chunk, err error) {
+ if timeout == 0 {
+ timeout = searchTimeout
}
-}
+ if ns.retrieve == nil {
+ chunk, err = ns.localStore.Get(addr)
+ if err == nil {
+ return chunk, nil
+ }
+ if err != ErrFetching {
+ return nil, err
+ }
+ } else {
+ var created bool
+ chunk, created = ns.localStore.GetOrCreateRequest(addr)
+
+ if chunk.ReqC == nil {
+ return chunk, nil
+ }
-// retrieve logic common for local and network chunk retrieval requests
-func (self *NetStore) Get(key Key) (*Chunk, error) {
- var err error
- chunk, err := self.localStore.Get(key)
- if err == nil {
- if chunk.Req == nil {
- log.Trace(fmt.Sprintf("NetStore.Get: %v found locally", key))
- } else {
- log.Trace(fmt.Sprintf("NetStore.Get: %v hit on an existing request", key))
- // no need to launch again
+ if created {
+ err := ns.retrieve(chunk)
+ if err != nil {
+ // mark chunk request as failed so that we can retry it later
+ chunk.SetErrored(ErrChunkUnavailable)
+ return nil, err
+ }
}
- return chunk, err
}
- // no data and no request status
- log.Trace(fmt.Sprintf("NetStore.Get: %v not found locally. open new request", key))
- chunk = NewChunk(key, newRequestStatus(key))
- self.localStore.memStore.Put(chunk)
- go self.cloud.Retrieve(chunk)
+
+ t := time.NewTicker(timeout)
+ defer t.Stop()
+
+ select {
+ case <-t.C:
+ // mark chunk request as failed so that we can retry
+ chunk.SetErrored(ErrChunkNotFound)
+ return nil, ErrChunkNotFound
+ case <-chunk.ReqC:
+ }
+ chunk.SetErrored(nil)
return chunk, nil
}
-// Close netstore
-func (self *NetStore) Close() {}
+// Put is the entrypoint for local store requests coming from storeLoop
+func (ns *NetStore) Put(chunk *Chunk) {
+ ns.localStore.Put(chunk)
+}
+
+// Close chunk store
+func (ns *NetStore) Close() {
+ ns.localStore.Close()
+}