aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/dpa.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/dpa.go')
-rw-r--r--swarm/storage/dpa.go239
1 files changed, 239 insertions, 0 deletions
diff --git a/swarm/storage/dpa.go b/swarm/storage/dpa.go
new file mode 100644
index 000000000..31b6c54ac
--- /dev/null
+++ b/swarm/storage/dpa.go
@@ -0,0 +1,239 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package storage
+
+import (
+ "errors"
+ "io"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+)
+
+/*
+DPA provides the client API entrypoints Store and Retrieve to store and retrieve
+It can store anything that has a byte slice representation, so files or serialised objects etc.
+
+Storage: DPA calls the Chunker to segment the input datastream of any size to a merkle hashed tree of chunks. The key of the root block is returned to the client.
+
+Retrieval: given the key of the root block, the DPA retrieves the block chunks and reconstructs the original data and passes it back as a lazy reader. A lazy reader is a reader with on-demand delayed processing, i.e. the chunks needed to reconstruct a large file are only fetched and processed if that particular part of the document is actually read.
+
+As the chunker produces chunks, DPA dispatches them to its own chunk store
+implementation for storage or retrieval.
+*/
+
+const (
+ storeChanCapacity = 100
+ retrieveChanCapacity = 100
+ singletonSwarmDbCapacity = 50000
+ singletonSwarmCacheCapacity = 500
+ maxStoreProcesses = 8
+ maxRetrieveProcesses = 8
+)
+
+var (
+ notFound = errors.New("not found")
+)
+
+type DPA struct {
+ ChunkStore
+ storeC chan *Chunk
+ retrieveC chan *Chunk
+ Chunker Chunker
+
+ lock sync.Mutex
+ running bool
+ wg *sync.WaitGroup
+ quitC chan bool
+}
+
+// for testing locally
+func NewLocalDPA(datadir string) (*DPA, error) {
+
+ hash := MakeHashFunc("SHA256")
+
+ dbStore, err := NewDbStore(datadir, hash, singletonSwarmDbCapacity, 0)
+ if err != nil {
+ return nil, err
+ }
+
+ return NewDPA(&LocalStore{
+ NewMemStore(dbStore, singletonSwarmCacheCapacity),
+ dbStore,
+ }, NewChunkerParams()), nil
+}
+
+func NewDPA(store ChunkStore, params *ChunkerParams) *DPA {
+ chunker := NewTreeChunker(params)
+ return &DPA{
+ Chunker: chunker,
+ ChunkStore: store,
+ }
+}
+
+// Public API. Main entry point for document retrieval directly. Used by the
+// FS-aware API and httpaccess
+// Chunk retrieval blocks on netStore requests with a timeout so reader will
+// report error if retrieval of chunks within requested range time out.
+func (self *DPA) Retrieve(key Key) LazySectionReader {
+ return self.Chunker.Join(key, self.retrieveC)
+}
+
+// Public API. Main entry point for document storage directly. Used by the
+// FS-aware API and httpaccess
+func (self *DPA) Store(data io.Reader, size int64, swg *sync.WaitGroup, wwg *sync.WaitGroup) (key Key, err error) {
+ return self.Chunker.Split(data, size, self.storeC, swg, wwg)
+}
+
+func (self *DPA) Start() {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if self.running {
+ return
+ }
+ self.running = true
+ self.retrieveC = make(chan *Chunk, retrieveChanCapacity)
+ self.storeC = make(chan *Chunk, storeChanCapacity)
+ self.quitC = make(chan bool)
+ self.storeLoop()
+ self.retrieveLoop()
+}
+
+func (self *DPA) Stop() {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if !self.running {
+ return
+ }
+ self.running = false
+ close(self.quitC)
+}
+
+// retrieveLoop dispatches the parallel chunk retrieval requests received on the
+// retrieve channel to its ChunkStore (NetStore or LocalStore)
+func (self *DPA) retrieveLoop() {
+ for i := 0; i < maxRetrieveProcesses; i++ {
+ go self.retrieveWorker()
+ }
+ glog.V(logger.Detail).Infof("dpa: retrieve loop spawning %v workers", maxRetrieveProcesses)
+}
+
+func (self *DPA) retrieveWorker() {
+ for chunk := range self.retrieveC {
+ glog.V(logger.Detail).Infof("dpa: retrieve loop : chunk %v", chunk.Key.Log())
+ storedChunk, err := self.Get(chunk.Key)
+ if err == notFound {
+ glog.V(logger.Detail).Infof("chunk %v not found", chunk.Key.Log())
+ } else if err != nil {
+ glog.V(logger.Detail).Infof("error retrieving chunk %v: %v", chunk.Key.Log(), err)
+ } else {
+ chunk.SData = storedChunk.SData
+ chunk.Size = storedChunk.Size
+ }
+ close(chunk.C)
+
+ select {
+ case <-self.quitC:
+ return
+ default:
+ }
+ }
+}
+
+// storeLoop dispatches the parallel chunk store request processors
+// received on the store channel to its ChunkStore (NetStore or LocalStore)
+func (self *DPA) storeLoop() {
+ for i := 0; i < maxStoreProcesses; i++ {
+ go self.storeWorker()
+ }
+ glog.V(logger.Detail).Infof("dpa: store spawning %v workers", maxStoreProcesses)
+}
+
+func (self *DPA) storeWorker() {
+
+ for chunk := range self.storeC {
+ self.Put(chunk)
+ if chunk.wg != nil {
+ glog.V(logger.Detail).Infof("dpa: store processor %v", chunk.Key.Log())
+ chunk.wg.Done()
+
+ }
+ select {
+ case <-self.quitC:
+ return
+ default:
+ }
+ }
+}
+
+// DpaChunkStore implements the ChunkStore interface,
+// this chunk access layer assumed 2 chunk stores
+// local storage eg. LocalStore and network storage eg., NetStore
+// access by calling network is blocking with a timeout
+
+type dpaChunkStore struct {
+ n int
+ localStore ChunkStore
+ netStore ChunkStore
+}
+
+func NewDpaChunkStore(localStore, netStore ChunkStore) *dpaChunkStore {
+ return &dpaChunkStore{0, localStore, netStore}
+}
+
+// Get is the entrypoint for local retrieve requests
+// waits for response or times out
+func (self *dpaChunkStore) Get(key Key) (chunk *Chunk, err error) {
+ chunk, err = self.netStore.Get(key)
+ // timeout := time.Now().Add(searchTimeout)
+ if chunk.SData != nil {
+ glog.V(logger.Detail).Infof("DPA.Get: %v found locally, %d bytes", key.Log(), len(chunk.SData))
+ return
+ }
+ // TODO: use self.timer time.Timer and reset with defer disableTimer
+ timer := time.After(searchTimeout)
+ select {
+ case <-timer:
+ glog.V(logger.Detail).Infof("DPA.Get: %v request time out ", key.Log())
+ err = notFound
+ case <-chunk.Req.C:
+ glog.V(logger.Detail).Infof("DPA.Get: %v retrieved, %d bytes (%p)", key.Log(), len(chunk.SData), chunk)
+ }
+ return
+}
+
+// Put is the entrypoint for local store requests coming from storeLoop
+func (self *dpaChunkStore) Put(entry *Chunk) {
+ chunk, err := self.localStore.Get(entry.Key)
+ if err != nil {
+ glog.V(logger.Detail).Infof("DPA.Put: %v new chunk. call netStore.Put", entry.Key.Log())
+ chunk = entry
+ } else if chunk.SData == nil {
+ glog.V(logger.Detail).Infof("DPA.Put: %v request entry found", entry.Key.Log())
+ chunk.SData = entry.SData
+ chunk.Size = entry.Size
+ } else {
+ glog.V(logger.Detail).Infof("DPA.Put: %v chunk already known", entry.Key.Log())
+ return
+ }
+ // from this point on the storage logic is the same with network storage requests
+ glog.V(logger.Detail).Infof("DPA.Put %v: %v", self.n, chunk.Key.Log())
+ self.n++
+ self.netStore.Put(chunk)
+}