diff options
author | ethersphere <thesw@rm.eth> | 2018-06-20 20:06:27 +0800 |
---|---|---|
committer | ethersphere <thesw@rm.eth> | 2018-06-22 03:10:31 +0800 |
commit | e187711c6545487d4cac3701f0f506bb536234e2 (patch) | |
tree | d2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/storage/dpa.go | |
parent | 574378edb50c907b532946a1d4654dbd6701b20a (diff) | |
download | go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2 go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.zip |
swarm: network rewrite merge
Diffstat (limited to 'swarm/storage/dpa.go')
-rw-r--r-- | swarm/storage/dpa.go | 241 |
1 files changed, 0 insertions, 241 deletions
diff --git a/swarm/storage/dpa.go b/swarm/storage/dpa.go deleted file mode 100644 index 44a2669f1..000000000 --- a/swarm/storage/dpa.go +++ /dev/null @@ -1,241 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package storage - -import ( - "errors" - "fmt" - "io" - "sync" - "time" - - "github.com/ethereum/go-ethereum/log" -) - -/* -DPA provides the client API entrypoints Store and Retrieve to store and retrieve -It can store anything that has a byte slice representation, so files or serialised objects etc. - -Storage: DPA calls the Chunker to segment the input datastream of any size to a merkle hashed tree of chunks. The key of the root block is returned to the client. - -Retrieval: given the key of the root block, the DPA retrieves the block chunks and reconstructs the original data and passes it back as a lazy reader. A lazy reader is a reader with on-demand delayed processing, i.e. the chunks needed to reconstruct a large file are only fetched and processed if that particular part of the document is actually read. - -As the chunker produces chunks, DPA dispatches them to its own chunk store -implementation for storage or retrieval. -*/ - -const ( - storeChanCapacity = 100 - retrieveChanCapacity = 100 - singletonSwarmDbCapacity = 50000 - singletonSwarmCacheCapacity = 500 - maxStoreProcesses = 8 - maxRetrieveProcesses = 8 -) - -var ( - notFound = errors.New("not found") -) - -type DPA struct { - ChunkStore - storeC chan *Chunk - retrieveC chan *Chunk - Chunker Chunker - - lock sync.Mutex - running bool - quitC chan bool -} - -// for testing locally -func NewLocalDPA(datadir string) (*DPA, error) { - - hash := MakeHashFunc("SHA256") - - dbStore, err := NewDbStore(datadir, hash, singletonSwarmDbCapacity, 0) - if err != nil { - return nil, err - } - - return NewDPA(&LocalStore{ - NewMemStore(dbStore, singletonSwarmCacheCapacity), - dbStore, - }, NewChunkerParams()), nil -} - -func NewDPA(store ChunkStore, params *ChunkerParams) *DPA { - chunker := NewTreeChunker(params) - return &DPA{ - Chunker: chunker, - ChunkStore: store, - } -} - -// Public API. Main entry point for document retrieval directly. Used by the -// FS-aware API and httpaccess -// Chunk retrieval blocks on netStore requests with a timeout so reader will -// report error if retrieval of chunks within requested range time out. -func (self *DPA) Retrieve(key Key) LazySectionReader { - return self.Chunker.Join(key, self.retrieveC) -} - -// Public API. Main entry point for document storage directly. Used by the -// FS-aware API and httpaccess -func (self *DPA) Store(data io.Reader, size int64, swg *sync.WaitGroup, wwg *sync.WaitGroup) (key Key, err error) { - return self.Chunker.Split(data, size, self.storeC, swg, wwg) -} - -func (self *DPA) Start() { - self.lock.Lock() - defer self.lock.Unlock() - if self.running { - return - } - self.running = true - self.retrieveC = make(chan *Chunk, retrieveChanCapacity) - self.storeC = make(chan *Chunk, storeChanCapacity) - self.quitC = make(chan bool) - self.storeLoop() - self.retrieveLoop() -} - -func (self *DPA) Stop() { - self.lock.Lock() - defer self.lock.Unlock() - if !self.running { - return - } - self.running = false - close(self.quitC) -} - -// retrieveLoop dispatches the parallel chunk retrieval requests received on the -// retrieve channel to its ChunkStore (NetStore or LocalStore) -func (self *DPA) retrieveLoop() { - for i := 0; i < maxRetrieveProcesses; i++ { - go self.retrieveWorker() - } - log.Trace(fmt.Sprintf("dpa: retrieve loop spawning %v workers", maxRetrieveProcesses)) -} - -func (self *DPA) retrieveWorker() { - for chunk := range self.retrieveC { - log.Trace(fmt.Sprintf("dpa: retrieve loop : chunk %v", chunk.Key.Log())) - storedChunk, err := self.Get(chunk.Key) - if err == notFound { - log.Trace(fmt.Sprintf("chunk %v not found", chunk.Key.Log())) - } else if err != nil { - log.Trace(fmt.Sprintf("error retrieving chunk %v: %v", chunk.Key.Log(), err)) - } else { - chunk.SData = storedChunk.SData - chunk.Size = storedChunk.Size - } - close(chunk.C) - - select { - case <-self.quitC: - return - default: - } - } -} - -// storeLoop dispatches the parallel chunk store request processors -// received on the store channel to its ChunkStore (NetStore or LocalStore) -func (self *DPA) storeLoop() { - for i := 0; i < maxStoreProcesses; i++ { - go self.storeWorker() - } - log.Trace(fmt.Sprintf("dpa: store spawning %v workers", maxStoreProcesses)) -} - -func (self *DPA) storeWorker() { - - for chunk := range self.storeC { - self.Put(chunk) - if chunk.wg != nil { - log.Trace(fmt.Sprintf("dpa: store processor %v", chunk.Key.Log())) - chunk.wg.Done() - - } - select { - case <-self.quitC: - return - default: - } - } -} - -// DpaChunkStore implements the ChunkStore interface, -// this chunk access layer assumed 2 chunk stores -// local storage eg. LocalStore and network storage eg., NetStore -// access by calling network is blocking with a timeout - -type dpaChunkStore struct { - n int - localStore ChunkStore - netStore ChunkStore -} - -func NewDpaChunkStore(localStore, netStore ChunkStore) *dpaChunkStore { - return &dpaChunkStore{0, localStore, netStore} -} - -// Get is the entrypoint for local retrieve requests -// waits for response or times out -func (self *dpaChunkStore) Get(key Key) (chunk *Chunk, err error) { - chunk, err = self.netStore.Get(key) - // timeout := time.Now().Add(searchTimeout) - if chunk.SData != nil { - log.Trace(fmt.Sprintf("DPA.Get: %v found locally, %d bytes", key.Log(), len(chunk.SData))) - return - } - // TODO: use self.timer time.Timer and reset with defer disableTimer - timer := time.After(searchTimeout) - select { - case <-timer: - log.Trace(fmt.Sprintf("DPA.Get: %v request time out ", key.Log())) - err = notFound - case <-chunk.Req.C: - log.Trace(fmt.Sprintf("DPA.Get: %v retrieved, %d bytes (%p)", key.Log(), len(chunk.SData), chunk)) - } - return -} - -// Put is the entrypoint for local store requests coming from storeLoop -func (self *dpaChunkStore) Put(entry *Chunk) { - chunk, err := self.localStore.Get(entry.Key) - if err != nil { - log.Trace(fmt.Sprintf("DPA.Put: %v new chunk. call netStore.Put", entry.Key.Log())) - chunk = entry - } else if chunk.SData == nil { - log.Trace(fmt.Sprintf("DPA.Put: %v request entry found", entry.Key.Log())) - chunk.SData = entry.SData - chunk.Size = entry.Size - } else { - log.Trace(fmt.Sprintf("DPA.Put: %v chunk already known", entry.Key.Log())) - return - } - // from this point on the storage logic is the same with network storage requests - log.Trace(fmt.Sprintf("DPA.Put %v: %v", self.n, chunk.Key.Log())) - self.n++ - self.netStore.Put(chunk) -} - -// Close chunk store -func (self *dpaChunkStore) Close() {} |