aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/api/filesystem.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/api/filesystem.go')
-rw-r--r--swarm/api/filesystem.go283
1 files changed, 283 insertions, 0 deletions
diff --git a/swarm/api/filesystem.go b/swarm/api/filesystem.go
new file mode 100644
index 000000000..428f3e3ac
--- /dev/null
+++ b/swarm/api/filesystem.go
@@ -0,0 +1,283 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package api
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "net/http"
+ "os"
+ "path/filepath"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+const maxParallelFiles = 5
+
+type FileSystem struct {
+ api *Api
+}
+
+func NewFileSystem(api *Api) *FileSystem {
+ return &FileSystem{api}
+}
+
+// Upload replicates a local directory as a manifest file and uploads it
+// using dpa store
+// TODO: localpath should point to a manifest
+func (self *FileSystem) Upload(lpath, index string) (string, error) {
+ var list []*manifestTrieEntry
+ localpath, err := filepath.Abs(filepath.Clean(lpath))
+ if err != nil {
+ return "", err
+ }
+
+ f, err := os.Open(localpath)
+ if err != nil {
+ return "", err
+ }
+ stat, err := f.Stat()
+ if err != nil {
+ return "", err
+ }
+
+ var start int
+ if stat.IsDir() {
+ start = len(localpath)
+ glog.V(logger.Debug).Infof("uploading '%s'", localpath)
+ err = filepath.Walk(localpath, func(path string, info os.FileInfo, err error) error {
+ if (err == nil) && !info.IsDir() {
+ //fmt.Printf("lp %s path %s\n", localpath, path)
+ if len(path) <= start {
+ return fmt.Errorf("Path is too short")
+ }
+ if path[:start] != localpath {
+ return fmt.Errorf("Path prefix of '%s' does not match localpath '%s'", path, localpath)
+ }
+ entry := &manifestTrieEntry{
+ Path: filepath.ToSlash(path),
+ }
+ list = append(list, entry)
+ }
+ return err
+ })
+ if err != nil {
+ return "", err
+ }
+ } else {
+ dir := filepath.Dir(localpath)
+ start = len(dir)
+ if len(localpath) <= start {
+ return "", fmt.Errorf("Path is too short")
+ }
+ if localpath[:start] != dir {
+ return "", fmt.Errorf("Path prefix of '%s' does not match dir '%s'", localpath, dir)
+ }
+ entry := &manifestTrieEntry{
+ Path: filepath.ToSlash(localpath),
+ }
+ list = append(list, entry)
+ }
+
+ cnt := len(list)
+ errors := make([]error, cnt)
+ done := make(chan bool, maxParallelFiles)
+ dcnt := 0
+ awg := &sync.WaitGroup{}
+
+ for i, entry := range list {
+ if i >= dcnt+maxParallelFiles {
+ <-done
+ dcnt++
+ }
+ awg.Add(1)
+ go func(i int, entry *manifestTrieEntry, done chan bool) {
+ f, err := os.Open(entry.Path)
+ if err == nil {
+ stat, _ := f.Stat()
+ var hash storage.Key
+ wg := &sync.WaitGroup{}
+ hash, err = self.api.dpa.Store(f, stat.Size(), wg, nil)
+ if hash != nil {
+ list[i].Hash = hash.String()
+ }
+ wg.Wait()
+ awg.Done()
+ if err == nil {
+ first512 := make([]byte, 512)
+ fread, _ := f.ReadAt(first512, 0)
+ if fread > 0 {
+ mimeType := http.DetectContentType(first512[:fread])
+ if filepath.Ext(entry.Path) == ".css" {
+ mimeType = "text/css"
+ }
+ list[i].ContentType = mimeType
+ }
+ }
+ f.Close()
+ }
+ errors[i] = err
+ done <- true
+ }(i, entry, done)
+ }
+ for dcnt < cnt {
+ <-done
+ dcnt++
+ }
+
+ trie := &manifestTrie{
+ dpa: self.api.dpa,
+ }
+ quitC := make(chan bool)
+ for i, entry := range list {
+ if errors[i] != nil {
+ return "", errors[i]
+ }
+ entry.Path = RegularSlashes(entry.Path[start:])
+ if entry.Path == index {
+ ientry := &manifestTrieEntry{
+ Path: "",
+ Hash: entry.Hash,
+ ContentType: entry.ContentType,
+ }
+ trie.addEntry(ientry, quitC)
+ }
+ trie.addEntry(entry, quitC)
+ }
+
+ err2 := trie.recalcAndStore()
+ var hs string
+ if err2 == nil {
+ hs = trie.hash.String()
+ }
+ awg.Wait()
+ return hs, err2
+}
+
+// Download replicates the manifest path structure on the local filesystem
+// under localpath
+func (self *FileSystem) Download(bzzpath, localpath string) error {
+ lpath, err := filepath.Abs(filepath.Clean(localpath))
+ if err != nil {
+ return err
+ }
+ err = os.MkdirAll(lpath, os.ModePerm)
+ if err != nil {
+ return err
+ }
+
+ //resolving host and port
+ key, _, path, err := self.api.parseAndResolve(bzzpath, true)
+ if err != nil {
+ return err
+ }
+
+ if len(path) > 0 {
+ path += "/"
+ }
+
+ quitC := make(chan bool)
+ trie, err := loadManifest(self.api.dpa, key, quitC)
+ if err != nil {
+ glog.V(logger.Warn).Infof("fs.Download: loadManifestTrie error: %v", err)
+ return err
+ }
+
+ type downloadListEntry struct {
+ key storage.Key
+ path string
+ }
+
+ var list []*downloadListEntry
+ var mde error
+
+ prevPath := lpath
+ err = trie.listWithPrefix(path, quitC, func(entry *manifestTrieEntry, suffix string) {
+ glog.V(logger.Detail).Infof("fs.Download: %#v", entry)
+
+ key = common.Hex2Bytes(entry.Hash)
+ path := lpath + "/" + suffix
+ dir := filepath.Dir(path)
+ if dir != prevPath {
+ mde = os.MkdirAll(dir, os.ModePerm)
+ prevPath = dir
+ }
+ if (mde == nil) && (path != dir+"/") {
+ list = append(list, &downloadListEntry{key: key, path: path})
+ }
+ })
+ if err != nil {
+ return err
+ }
+
+ wg := sync.WaitGroup{}
+ errC := make(chan error)
+ done := make(chan bool, maxParallelFiles)
+ for i, entry := range list {
+ select {
+ case done <- true:
+ wg.Add(1)
+ case <-quitC:
+ return fmt.Errorf("aborted")
+ }
+ go func(i int, entry *downloadListEntry) {
+ defer wg.Done()
+ f, err := os.Create(entry.path) // TODO: path separators
+ if err == nil {
+
+ reader := self.api.dpa.Retrieve(entry.key)
+ writer := bufio.NewWriter(f)
+ size, err := reader.Size(quitC)
+ if err == nil {
+ _, err = io.CopyN(writer, reader, size) // TODO: handle errors
+ err2 := writer.Flush()
+ if err == nil {
+ err = err2
+ }
+ err2 = f.Close()
+ if err == nil {
+ err = err2
+ }
+ }
+ }
+ if err != nil {
+ select {
+ case errC <- err:
+ case <-quitC:
+ }
+ return
+ }
+ <-done
+ }(i, entry)
+ }
+ go func() {
+ wg.Wait()
+ close(errC)
+ }()
+ select {
+ case err = <-errC:
+ return err
+ case <-quitC:
+ return fmt.Errorf("aborted")
+ }
+
+}