diff options
Diffstat (limited to 'swarm/storage/dpa.go')
-rw-r--r-- | swarm/storage/dpa.go | 239 |
1 files changed, 239 insertions, 0 deletions
diff --git a/swarm/storage/dpa.go b/swarm/storage/dpa.go new file mode 100644 index 000000000..31b6c54ac --- /dev/null +++ b/swarm/storage/dpa.go @@ -0,0 +1,239 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package storage + +import ( + "errors" + "io" + "sync" + "time" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" +) + +/* +DPA provides the client API entrypoints Store and Retrieve to store and retrieve +It can store anything that has a byte slice representation, so files or serialised objects etc. + +Storage: DPA calls the Chunker to segment the input datastream of any size to a merkle hashed tree of chunks. The key of the root block is returned to the client. + +Retrieval: given the key of the root block, the DPA retrieves the block chunks and reconstructs the original data and passes it back as a lazy reader. A lazy reader is a reader with on-demand delayed processing, i.e. the chunks needed to reconstruct a large file are only fetched and processed if that particular part of the document is actually read. + +As the chunker produces chunks, DPA dispatches them to its own chunk store +implementation for storage or retrieval. +*/ + +const ( + storeChanCapacity = 100 + retrieveChanCapacity = 100 + singletonSwarmDbCapacity = 50000 + singletonSwarmCacheCapacity = 500 + maxStoreProcesses = 8 + maxRetrieveProcesses = 8 +) + +var ( + notFound = errors.New("not found") +) + +type DPA struct { + ChunkStore + storeC chan *Chunk + retrieveC chan *Chunk + Chunker Chunker + + lock sync.Mutex + running bool + wg *sync.WaitGroup + quitC chan bool +} + +// for testing locally +func NewLocalDPA(datadir string) (*DPA, error) { + + hash := MakeHashFunc("SHA256") + + dbStore, err := NewDbStore(datadir, hash, singletonSwarmDbCapacity, 0) + if err != nil { + return nil, err + } + + return NewDPA(&LocalStore{ + NewMemStore(dbStore, singletonSwarmCacheCapacity), + dbStore, + }, NewChunkerParams()), nil +} + +func NewDPA(store ChunkStore, params *ChunkerParams) *DPA { + chunker := NewTreeChunker(params) + return &DPA{ + Chunker: chunker, + ChunkStore: store, + } +} + +// Public API. Main entry point for document retrieval directly. Used by the +// FS-aware API and httpaccess +// Chunk retrieval blocks on netStore requests with a timeout so reader will +// report error if retrieval of chunks within requested range time out. +func (self *DPA) Retrieve(key Key) LazySectionReader { + return self.Chunker.Join(key, self.retrieveC) +} + +// Public API. Main entry point for document storage directly. Used by the +// FS-aware API and httpaccess +func (self *DPA) Store(data io.Reader, size int64, swg *sync.WaitGroup, wwg *sync.WaitGroup) (key Key, err error) { + return self.Chunker.Split(data, size, self.storeC, swg, wwg) +} + +func (self *DPA) Start() { + self.lock.Lock() + defer self.lock.Unlock() + if self.running { + return + } + self.running = true + self.retrieveC = make(chan *Chunk, retrieveChanCapacity) + self.storeC = make(chan *Chunk, storeChanCapacity) + self.quitC = make(chan bool) + self.storeLoop() + self.retrieveLoop() +} + +func (self *DPA) Stop() { + self.lock.Lock() + defer self.lock.Unlock() + if !self.running { + return + } + self.running = false + close(self.quitC) +} + +// retrieveLoop dispatches the parallel chunk retrieval requests received on the +// retrieve channel to its ChunkStore (NetStore or LocalStore) +func (self *DPA) retrieveLoop() { + for i := 0; i < maxRetrieveProcesses; i++ { + go self.retrieveWorker() + } + glog.V(logger.Detail).Infof("dpa: retrieve loop spawning %v workers", maxRetrieveProcesses) +} + +func (self *DPA) retrieveWorker() { + for chunk := range self.retrieveC { + glog.V(logger.Detail).Infof("dpa: retrieve loop : chunk %v", chunk.Key.Log()) + storedChunk, err := self.Get(chunk.Key) + if err == notFound { + glog.V(logger.Detail).Infof("chunk %v not found", chunk.Key.Log()) + } else if err != nil { + glog.V(logger.Detail).Infof("error retrieving chunk %v: %v", chunk.Key.Log(), err) + } else { + chunk.SData = storedChunk.SData + chunk.Size = storedChunk.Size + } + close(chunk.C) + + select { + case <-self.quitC: + return + default: + } + } +} + +// storeLoop dispatches the parallel chunk store request processors +// received on the store channel to its ChunkStore (NetStore or LocalStore) +func (self *DPA) storeLoop() { + for i := 0; i < maxStoreProcesses; i++ { + go self.storeWorker() + } + glog.V(logger.Detail).Infof("dpa: store spawning %v workers", maxStoreProcesses) +} + +func (self *DPA) storeWorker() { + + for chunk := range self.storeC { + self.Put(chunk) + if chunk.wg != nil { + glog.V(logger.Detail).Infof("dpa: store processor %v", chunk.Key.Log()) + chunk.wg.Done() + + } + select { + case <-self.quitC: + return + default: + } + } +} + +// DpaChunkStore implements the ChunkStore interface, +// this chunk access layer assumed 2 chunk stores +// local storage eg. LocalStore and network storage eg., NetStore +// access by calling network is blocking with a timeout + +type dpaChunkStore struct { + n int + localStore ChunkStore + netStore ChunkStore +} + +func NewDpaChunkStore(localStore, netStore ChunkStore) *dpaChunkStore { + return &dpaChunkStore{0, localStore, netStore} +} + +// Get is the entrypoint for local retrieve requests +// waits for response or times out +func (self *dpaChunkStore) Get(key Key) (chunk *Chunk, err error) { + chunk, err = self.netStore.Get(key) + // timeout := time.Now().Add(searchTimeout) + if chunk.SData != nil { + glog.V(logger.Detail).Infof("DPA.Get: %v found locally, %d bytes", key.Log(), len(chunk.SData)) + return + } + // TODO: use self.timer time.Timer and reset with defer disableTimer + timer := time.After(searchTimeout) + select { + case <-timer: + glog.V(logger.Detail).Infof("DPA.Get: %v request time out ", key.Log()) + err = notFound + case <-chunk.Req.C: + glog.V(logger.Detail).Infof("DPA.Get: %v retrieved, %d bytes (%p)", key.Log(), len(chunk.SData), chunk) + } + return +} + +// Put is the entrypoint for local store requests coming from storeLoop +func (self *dpaChunkStore) Put(entry *Chunk) { + chunk, err := self.localStore.Get(entry.Key) + if err != nil { + glog.V(logger.Detail).Infof("DPA.Put: %v new chunk. call netStore.Put", entry.Key.Log()) + chunk = entry + } else if chunk.SData == nil { + glog.V(logger.Detail).Infof("DPA.Put: %v request entry found", entry.Key.Log()) + chunk.SData = entry.SData + chunk.Size = entry.Size + } else { + glog.V(logger.Detail).Infof("DPA.Put: %v chunk already known", entry.Key.Log()) + return + } + // from this point on the storage logic is the same with network storage requests + glog.V(logger.Detail).Infof("DPA.Put %v: %v", self.n, chunk.Key.Log()) + self.n++ + self.netStore.Put(chunk) +} |