diff options
author | ethersphere <thesw@rm.eth> | 2018-06-20 20:06:27 +0800 |
---|---|---|
committer | ethersphere <thesw@rm.eth> | 2018-06-22 03:10:31 +0800 |
commit | e187711c6545487d4cac3701f0f506bb536234e2 (patch) | |
tree | d2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/storage/netstore.go | |
parent | 574378edb50c907b532946a1d4654dbd6701b20a (diff) | |
download | dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2 dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst dexon-e187711c6545487d4cac3701f0f506bb536234e2.zip |
swarm: network rewrite merge
Diffstat (limited to 'swarm/storage/netstore.go')
-rw-r--r-- | swarm/storage/netstore.go | 229 |
1 files changed, 137 insertions, 92 deletions
diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go index 0552b84ef..6a205cfa4 100644 --- a/swarm/storage/netstore.go +++ b/swarm/storage/netstore.go @@ -17,120 +17,165 @@ package storage import ( - "fmt" - "path/filepath" "time" - "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/log" ) -/* -NetStore is a cloud storage access abstaction layer for swarm -it contains the shared logic of network served chunk store/retrieval requests -both local (coming from DPA api) and remote (coming from peers via bzz protocol) -it implements the ChunkStore interface and embeds LocalStore - -It is called by the bzz protocol instances via Depo (the store/retrieve request handler) -a protocol instance is running on each peer, so this is heavily parallelised. -NetStore falls back to a backend (CloudStorage interface) -implemented by bzz/network/forwarder. forwarder or IPFS or IPΞS -*/ +var ( + // NetStore.Get timeout for get and get retries + // This is the maximum period that the Get will block. + // If it is reached, Get will return ErrChunkNotFound. + netStoreRetryTimeout = 30 * time.Second + // Minimal period between calling get method on NetStore + // on retry. It protects calling get very frequently if + // it returns ErrChunkNotFound very fast. + netStoreMinRetryDelay = 3 * time.Second + // Timeout interval before retrieval is timed out. + // It is used in NetStore.get on waiting for ReqC to be + // closed on a single retrieve request. + searchTimeout = 10 * time.Second +) + +// NetStore implements the ChunkStore interface, +// this chunk access layer assumed 2 chunk stores +// local storage eg. LocalStore and network storage eg., NetStore +// access by calling network is blocking with a timeout type NetStore struct { - hashfunc SwarmHasher localStore *LocalStore - cloud CloudStore + retrieve func(chunk *Chunk) error } -// backend engine for cloud store -// It can be aggregate dispatching to several parallel implementations: -// bzz/network/forwarder. forwarder or IPFS or IPΞS -type CloudStore interface { - Store(*Chunk) - Deliver(*Chunk) - Retrieve(*Chunk) +func NewNetStore(localStore *LocalStore, retrieve func(chunk *Chunk) error) *NetStore { + return &NetStore{localStore, retrieve} } -type StoreParams struct { - ChunkDbPath string - DbCapacity uint64 - CacheCapacity uint - Radius int -} +// Get is the entrypoint for local retrieve requests +// waits for response or times out +// +// Get uses get method to retrieve request, but retries if the +// ErrChunkNotFound is returned by get, until the netStoreRetryTimeout +// is reached. +func (ns *NetStore) Get(addr Address) (chunk *Chunk, err error) { + timer := time.NewTimer(netStoreRetryTimeout) + defer timer.Stop() -//create params with default values -func NewDefaultStoreParams() (self *StoreParams) { - return &StoreParams{ - DbCapacity: defaultDbCapacity, - CacheCapacity: defaultCacheCapacity, - Radius: defaultRadius, + // result and resultC provide results from the goroutine + // where NetStore.get is called. + type result struct { + chunk *Chunk + err error } -} + resultC := make(chan result) -//this can only finally be set after all config options (file, cmd line, env vars) -//have been evaluated -func (self *StoreParams) Init(path string) { - self.ChunkDbPath = filepath.Join(path, "chunks") -} + // quitC ensures that retring goroutine is terminated + // when this function returns. + quitC := make(chan struct{}) + defer close(quitC) -// netstore contructor, takes path argument that is used to initialise dbStore, -// the persistent (disk) storage component of LocalStore -// the second argument is the hive, the connection/logistics manager for the node -func NewNetStore(hash SwarmHasher, lstore *LocalStore, cloud CloudStore, params *StoreParams) *NetStore { - return &NetStore{ - hashfunc: hash, - localStore: lstore, - cloud: cloud, + // do retries in a goroutine so that the timer can + // force this method to return after the netStoreRetryTimeout. + go func() { + // limiter ensures that NetStore.get is not called more frequently + // then netStoreMinRetryDelay. If NetStore.get takes longer + // then netStoreMinRetryDelay, the next retry call will be + // without a delay. + limiter := time.NewTimer(netStoreMinRetryDelay) + defer limiter.Stop() + + for { + chunk, err := ns.get(addr, 0) + if err != ErrChunkNotFound { + // break retry only if the error is nil + // or other error then ErrChunkNotFound + select { + case <-quitC: + // Maybe NetStore.Get function has returned + // by the timer.C while we were waiting for the + // results. Terminate this goroutine. + case resultC <- result{chunk: chunk, err: err}: + // Send the result to the parrent goroutine. + } + return + + } + select { + case <-quitC: + // NetStore.Get function has returned, possibly + // by the timer.C, which makes this goroutine + // not needed. + return + case <-limiter.C: + } + // Reset the limiter for the next iteration. + limiter.Reset(netStoreMinRetryDelay) + log.Debug("NetStore.Get retry chunk", "key", addr) + } + }() + + select { + case r := <-resultC: + return r.chunk, r.err + case <-timer.C: + return nil, ErrChunkNotFound } } -var ( - // timeout interval before retrieval is timed out - searchTimeout = 3 * time.Second -) +// GetWithTimeout makes a single retrieval attempt for a chunk with a explicit timeout parameter +func (ns *NetStore) GetWithTimeout(addr Address, timeout time.Duration) (chunk *Chunk, err error) { + return ns.get(addr, timeout) +} -// store logic common to local and network chunk store requests -// ~ unsafe put in localdb no check if exists no extra copy no hash validation -// the chunk is forced to propagate (Cloud.Store) even if locally found! -// caller needs to make sure if that is wanted -func (self *NetStore) Put(entry *Chunk) { - self.localStore.Put(entry) - - // handle deliveries - if entry.Req != nil { - log.Trace(fmt.Sprintf("NetStore.Put: localStore.Put %v hit existing request...delivering", entry.Key.Log())) - // closing C signals to other routines (local requests) - // that the chunk is has been retrieved - close(entry.Req.C) - // deliver the chunk to requesters upstream - go self.cloud.Deliver(entry) - } else { - log.Trace(fmt.Sprintf("NetStore.Put: localStore.Put %v stored locally", entry.Key.Log())) - // handle propagating store requests - // go self.cloud.Store(entry) - go self.cloud.Store(entry) +func (ns *NetStore) get(addr Address, timeout time.Duration) (chunk *Chunk, err error) { + if timeout == 0 { + timeout = searchTimeout } -} + if ns.retrieve == nil { + chunk, err = ns.localStore.Get(addr) + if err == nil { + return chunk, nil + } + if err != ErrFetching { + return nil, err + } + } else { + var created bool + chunk, created = ns.localStore.GetOrCreateRequest(addr) + + if chunk.ReqC == nil { + return chunk, nil + } -// retrieve logic common for local and network chunk retrieval requests -func (self *NetStore) Get(key Key) (*Chunk, error) { - var err error - chunk, err := self.localStore.Get(key) - if err == nil { - if chunk.Req == nil { - log.Trace(fmt.Sprintf("NetStore.Get: %v found locally", key)) - } else { - log.Trace(fmt.Sprintf("NetStore.Get: %v hit on an existing request", key)) - // no need to launch again + if created { + err := ns.retrieve(chunk) + if err != nil { + // mark chunk request as failed so that we can retry it later + chunk.SetErrored(ErrChunkUnavailable) + return nil, err + } } - return chunk, err } - // no data and no request status - log.Trace(fmt.Sprintf("NetStore.Get: %v not found locally. open new request", key)) - chunk = NewChunk(key, newRequestStatus(key)) - self.localStore.memStore.Put(chunk) - go self.cloud.Retrieve(chunk) + + t := time.NewTicker(timeout) + defer t.Stop() + + select { + case <-t.C: + // mark chunk request as failed so that we can retry + chunk.SetErrored(ErrChunkNotFound) + return nil, ErrChunkNotFound + case <-chunk.ReqC: + } + chunk.SetErrored(nil) return chunk, nil } -// Close netstore -func (self *NetStore) Close() {} +// Put is the entrypoint for local store requests coming from storeLoop +func (ns *NetStore) Put(chunk *Chunk) { + ns.localStore.Put(chunk) +} + +// Close chunk store +func (ns *NetStore) Close() { + ns.localStore.Close() +} |