// Copyright 2016 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package api import ( "bufio" "fmt" "io" "net/http" "os" "path/filepath" "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/swarm/storage" ) const maxParallelFiles = 5 type FileSystem struct { api *Api } func NewFileSystem(api *Api) *FileSystem { return &FileSystem{api} } // Upload replicates a local directory as a manifest file and uploads it // using dpa store // TODO: localpath should point to a manifest func (self *FileSystem) Upload(lpath, index string) (string, error) { var list []*manifestTrieEntry localpath, err := filepath.Abs(filepath.Clean(lpath)) if err != nil { return "", err } f, err := os.Open(localpath) if err != nil { return "", err } stat, err := f.Stat() if err != nil { return "", err } var start int if stat.IsDir() { start = len(localpath) glog.V(logger.Debug).Infof("uploading '%s'", localpath) err = filepath.Walk(localpath, func(path string, info os.FileInfo, err error) error { if (err == nil) && !info.IsDir() { //fmt.Printf("lp %s path %s\n", localpath, path) if len(path) <= start { return fmt.Errorf("Path is too short") } if path[:start] != localpath { return fmt.Errorf("Path prefix of '%s' does not match localpath '%s'", path, localpath) } entry := &manifestTrieEntry{ Path: filepath.ToSlash(path), } list = append(list, entry) } return err }) if err != nil { return "", err } } else { dir := filepath.Dir(localpath) start = len(dir) if len(localpath) <= start { return "", fmt.Errorf("Path is too short") } if localpath[:start] != dir { return "", fmt.Errorf("Path prefix of '%s' does not match dir '%s'", localpath, dir) } entry := &manifestTrieEntry{ Path: filepath.ToSlash(localpath), } list = append(list, entry) } cnt := len(list) errors := make([]error, cnt) done := make(chan bool, maxParallelFiles) dcnt := 0 awg := &sync.WaitGroup{} for i, entry := range list { if i >= dcnt+maxParallelFiles { <-done dcnt++ } awg.Add(1) go func(i int, entry *manifestTrieEntry, done chan bool) { f, err := os.Open(entry.Path) if err == nil { stat, _ := f.Stat() var hash storage.Key wg := &sync.WaitGroup{} hash, err = self.api.dpa.Store(f, stat.Size(), wg, nil) if hash != nil { list[i].Hash = hash.String() } wg.Wait() awg.Done() if err == nil { first512 := make([]byte, 512) fread, _ := f.ReadAt(first512, 0) if fread > 0 { mimeType := http.DetectContentType(first512[:fread]) if filepath.Ext(entry.Path) == ".css" { mimeType = "text/css" } list[i].ContentType = mimeType } } f.Close() } errors[i] = err done <- true }(i, entry, done) } for dcnt < cnt { <-done dcnt++ } trie := &manifestTrie{ dpa: self.api.dpa, } quitC := make(chan bool) for i, entry := range list { if errors[i] != nil { return "", errors[i] } entry.Path = RegularSlashes(entry.Path[start:]) if entry.Path == index { ientry := &manifestTrieEntry{ Path: "", Hash: entry.Hash, ContentType: entry.ContentType, } trie.addEntry(ientry, quitC) } trie.addEntry(entry, quitC) } err2 := trie.recalcAndStore() var hs string if err2 == nil { hs = trie.hash.String() } awg.Wait() return hs, err2 } // Download replicates the manifest path structure on the local filesystem // under localpath func (self *FileSystem) Download(bzzpath, localpath string) error { lpath, err := filepath.Abs(filepath.Clean(localpath)) if err != nil { return err } err = os.MkdirAll(lpath, os.ModePerm) if err != nil { return err } //resolving host and port key, _, path, err := self.api.parseAndResolve(bzzpath, true) if err != nil { return err } if len(path) > 0 { path += "/" } quitC := make(chan bool) trie, err := loadManifest(self.api.dpa, key, quitC) if err != nil { glog.V(logger.Warn).Infof("fs.Download: loadManifestTrie error: %v", err) return err } type downloadListEntry struct { key storage.Key path string } var list []*downloadListEntry var mde error prevPath := lpath err = trie.listWithPrefix(path, quitC, func(entry *manifestTrieEntry, suffix string) { glog.V(logger.Detail).Infof("fs.Download: %#v", entry) key = common.Hex2Bytes(entry.Hash) path := lpath + "/" + suffix dir := filepath.Dir(path) if dir != prevPath { mde = os.MkdirAll(dir, os.ModePerm) prevPath = dir } if (mde == nil) && (path != dir+"/") { list = append(list, &downloadListEntry{key: key, path: path}) } }) if err != nil { return err } wg := sync.WaitGroup{} errC := make(chan error) done := make(chan bool, maxParallelFiles) for i, entry := range list { select { case done <- true: wg.Add(1) case <-quitC: return fmt.Errorf("aborted") } go func(i int, entry *downloadListEntry) { defer wg.Done() f, err := os.Create(entry.path) // TODO: path separators if err == nil { reader := self.api.dpa.Retrieve(entry.key) writer := bufio.NewWriter(f) size, err := reader.Size(quitC) if err == nil { _, err = io.CopyN(writer, reader, size) // TODO: handle errors err2 := writer.Flush() if err == nil { err = err2 } err2 = f.Close() if err == nil { err = err2 } } } if err != nil { select { case errC <- err: case <-quitC: } return } <-done }(i, entry) } go func() { wg.Wait() close(errC) }() select { case err = <-errC: return err case <-quitC: return fmt.Errorf("aborted") } }