From 11e7a712f469fb24ddb88ecebcefab6ed8880eb8 Mon Sep 17 00:00:00 2001 From: Zahoor Mohamed Date: Thu, 23 Mar 2017 19:26:06 +0530 Subject: swarm/api: support mounting manifests via FUSE (#3690) --- swarm/api/fuse.go | 139 +++++++++++++++++++++ swarm/api/swarmfs.go | 48 ++++++++ swarm/api/swarmfs_unix.go | 266 +++++++++++++++++++++++++++++++++++++++++ swarm/api/swarmfs_unix_test.go | 122 +++++++++++++++++++ swarm/api/swarmfs_windows.go | 48 ++++++++ swarm/swarm.go | 15 ++- 6 files changed, 636 insertions(+), 2 deletions(-) create mode 100644 swarm/api/fuse.go create mode 100644 swarm/api/swarmfs.go create mode 100644 swarm/api/swarmfs_unix.go create mode 100644 swarm/api/swarmfs_unix_test.go create mode 100644 swarm/api/swarmfs_windows.go (limited to 'swarm') diff --git a/swarm/api/fuse.go b/swarm/api/fuse.go new file mode 100644 index 000000000..4b1f817f8 --- /dev/null +++ b/swarm/api/fuse.go @@ -0,0 +1,139 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// +build !windows + +package api + +import ( + "io" + "os" + + "bazil.org/fuse" + "bazil.org/fuse/fs" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/storage" + "golang.org/x/net/context" +) + + + + +// Data structures used for Fuse filesystem, serving directories and serving files to Fuse driver +type FS struct { + root *Dir +} + +type Dir struct { + inode uint64 + name string + path string + directories []*Dir + files []*File +} + +type File struct { + inode uint64 + name string + path string + key storage.Key + swarmApi *Api + fileSize uint64 + reader storage.LazySectionReader +} + + +// Functions which satisfy the Fuse File System requests +func (filesystem *FS) Root() (fs.Node, error) { + return filesystem.root, nil +} + +func (directory *Dir) Attr(ctx context.Context, a *fuse.Attr) error { + a.Inode = directory.inode + //TODO: need to get permission as argument + a.Mode = os.ModeDir | 0500 + a.Uid = uint32(os.Getuid()) + a.Gid = uint32(os.Getegid()) + return nil +} + +func (directory *Dir) Lookup(ctx context.Context, name string) (fs.Node, error) { + if directory.files != nil { + for _, n := range directory.files { + if n.name == name { + return n, nil + } + } + } + if directory.directories != nil { + for _, n := range directory.directories { + if n.name == name { + return n, nil + } + } + } + return nil, fuse.ENOENT +} + +func (d *Dir) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { + var children []fuse.Dirent + if d.files != nil { + for _, file := range d.files { + children = append(children, fuse.Dirent{Inode: file.inode, Type: fuse.DT_File, Name: file.name}) + } + } + if d.directories != nil { + for _, dir := range d.directories { + children = append(children, fuse.Dirent{Inode: dir.inode, Type: fuse.DT_Dir, Name: dir.name}) + } + } + return children, nil +} + +func (file *File) Attr(ctx context.Context, a *fuse.Attr) error { + + a.Inode = file.inode + //TODO: need to get permission as argument + a.Mode = 0500 + a.Uid = uint32(os.Getuid()) + a.Gid = uint32(os.Getegid()) + + + reader := file.swarmApi.Retrieve(file.key) + quitC := make(chan bool) + size, err := reader.Size(quitC) + if err != nil { + log.Warn("Couldnt file size of file %s : %v", file.path, err) + a.Size = uint64(0) + } + a.Size = uint64(size) + file.fileSize = a.Size + return nil +} + +var _ = fs.HandleReader(&File{}) + +func (file *File) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { + buf := make([]byte, req.Size) + reader := file.swarmApi.Retrieve(file.key) + n, err := reader.ReadAt(buf, req.Offset) + if err == io.ErrUnexpectedEOF || err == io.EOF { + err = nil + } + resp.Data = buf[:n] + return err + +} diff --git a/swarm/api/swarmfs.go b/swarm/api/swarmfs.go new file mode 100644 index 000000000..8427d3c5b --- /dev/null +++ b/swarm/api/swarmfs.go @@ -0,0 +1,48 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package api + +import ( + "time" + "sync" +) + +const ( + Swarmfs_Version = "0.1" + mountTimeout = time.Second * 5 + maxFuseMounts = 5 +) + + +type SwarmFS struct { + swarmApi *Api + activeMounts map[string]*MountInfo + activeLock *sync.RWMutex +} + + + +func NewSwarmFS(api *Api) *SwarmFS { + swarmfs := &SwarmFS{ + swarmApi: api, + activeLock: &sync.RWMutex{}, + activeMounts: map[string]*MountInfo{}, + } + return swarmfs +} + + diff --git a/swarm/api/swarmfs_unix.go b/swarm/api/swarmfs_unix.go new file mode 100644 index 000000000..ae0216e1d --- /dev/null +++ b/swarm/api/swarmfs_unix.go @@ -0,0 +1,266 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// +build linux darwin + +package api + +import ( + "path/filepath" + "fmt" + "strings" + "time" + "github.com/ethereum/go-ethereum/swarm/storage" + "bazil.org/fuse" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/common" + "bazil.org/fuse/fs" + "sync" +) + + +var ( + inode uint64 = 1 + inodeLock sync.RWMutex +) + +// information about every active mount +type MountInfo struct { + mountPoint string + manifestHash string + resolvedKey storage.Key + rootDir *Dir + fuseConnection *fuse.Conn +} + +// Inode numbers need to be unique, they are used for caching inside fuse +func NewInode() uint64 { + inodeLock.Lock() + defer inodeLock.Unlock() + inode += 1 + return inode +} + + + +func (self *SwarmFS) Mount(mhash, mountpoint string) (string, error) { + + self.activeLock.Lock() + defer self.activeLock.Unlock() + + noOfActiveMounts := len(self.activeMounts) + if noOfActiveMounts >= maxFuseMounts { + err := fmt.Errorf("Max mount count reached. Cannot mount %s ", mountpoint) + log.Warn(err.Error()) + return err.Error(), err + } + + cleanedMountPoint, err := filepath.Abs(filepath.Clean(mountpoint)) + if err != nil { + return err.Error(), err + } + + if _, ok := self.activeMounts[cleanedMountPoint]; ok { + err := fmt.Errorf("Mountpoint %s already mounted.", cleanedMountPoint) + log.Warn(err.Error()) + return err.Error(), err + } + + log.Info(fmt.Sprintf("Attempting to mount %s ", cleanedMountPoint)) + key, _, path, err := self.swarmApi.parseAndResolve(mhash, true) + if err != nil { + errStr := fmt.Sprintf("Could not resolve %s : %v", mhash, err) + log.Warn(errStr) + return errStr, err + } + + if len(path) > 0 { + path += "/" + } + + quitC := make(chan bool) + trie, err := loadManifest(self.swarmApi.dpa, key, quitC) + if err != nil { + errStr := fmt.Sprintf("fs.Download: loadManifestTrie error: %v", err) + log.Warn(errStr) + return errStr, err + } + + dirTree := map[string]*Dir{} + + rootDir := &Dir{ + inode: NewInode(), + name: "root", + directories: nil, + files: nil, + } + dirTree["root"] = rootDir + + err = trie.listWithPrefix(path, quitC, func(entry *manifestTrieEntry, suffix string) { + + key = common.Hex2Bytes(entry.Hash) + fullpath := "/" + suffix + basepath := filepath.Dir(fullpath) + filename := filepath.Base(fullpath) + + parentDir := rootDir + dirUntilNow := "" + paths := strings.Split(basepath, "/") + for i := range paths { + if paths[i] != "" { + thisDir := paths[i] + dirUntilNow = dirUntilNow + "/" + thisDir + + if _, ok := dirTree[dirUntilNow]; !ok { + dirTree[dirUntilNow] = &Dir{ + inode: NewInode(), + name: thisDir, + path: dirUntilNow, + directories: nil, + files: nil, + } + parentDir.directories = append(parentDir.directories, dirTree[dirUntilNow]) + parentDir = dirTree[dirUntilNow] + + } else { + parentDir = dirTree[dirUntilNow] + } + + } + } + thisFile := &File{ + inode: NewInode(), + name: filename, + path: fullpath, + key: key, + swarmApi: self.swarmApi, + } + parentDir.files = append(parentDir.files, thisFile) + }) + + fconn, err := fuse.Mount(cleanedMountPoint, fuse.FSName("swarmfs"), fuse.VolumeName(mhash)) + if err != nil { + fuse.Unmount(cleanedMountPoint) + errStr := fmt.Sprintf("Mounting %s encountered error: %v", cleanedMountPoint, err) + log.Warn(errStr) + return errStr, err + } + + mounterr := make(chan error, 1) + go func() { + log.Info(fmt.Sprintf("Serving %s at %s", mhash, cleanedMountPoint)) + filesys := &FS{root: rootDir} + if err := fs.Serve(fconn, filesys); err != nil { + log.Warn(fmt.Sprintf("Could not Serve FS error: %v", err)) + } + }() + + // Check if the mount process has an error to report. + select { + + case <-time.After(mountTimeout): + err := fmt.Errorf("Mounting %s timed out.", cleanedMountPoint) + log.Warn(err.Error()) + return err.Error(), err + + case err := <-mounterr: + errStr := fmt.Sprintf("Mounting %s encountered error: %v", cleanedMountPoint, err) + log.Warn(errStr) + return errStr, err + + case <-fconn.Ready: + log.Debug(fmt.Sprintf("Mounting connection succeeded for : %v", cleanedMountPoint)) + } + + + + //Assemble and Store the mount information for future use + mountInformation := &MountInfo{ + mountPoint: cleanedMountPoint, + manifestHash: mhash, + resolvedKey: key, + rootDir: rootDir, + fuseConnection: fconn, + } + self.activeMounts[cleanedMountPoint] = mountInformation + + succString := fmt.Sprintf("Mounting successful for %s", cleanedMountPoint) + log.Info(succString) + + return succString, nil +} + +func (self *SwarmFS) Unmount(mountpoint string) (string, error) { + + self.activeLock.Lock() + defer self.activeLock.Unlock() + + cleanedMountPoint, err := filepath.Abs(filepath.Clean(mountpoint)) + if err != nil { + return err.Error(), err + } + + // Get the mount information based on the mountpoint argument + mountInfo := self.activeMounts[cleanedMountPoint] + + + if mountInfo == nil || mountInfo.mountPoint != cleanedMountPoint { + err := fmt.Errorf("Could not find mount information for %s ", cleanedMountPoint) + log.Warn(err.Error()) + return err.Error(), err + } + + err = fuse.Unmount(cleanedMountPoint) + if err != nil { + //TODO: try forceful unmount if normal unmount fails + errStr := fmt.Sprintf("UnMount error: %v", err) + log.Warn(errStr) + return errStr, err + } + + mountInfo.fuseConnection.Close() + + //remove the mount information from the active map + delete(self.activeMounts, cleanedMountPoint) + + succString := fmt.Sprintf("UnMounting %v succeeded", cleanedMountPoint) + log.Info(succString) + return succString, nil +} + +func (self *SwarmFS) Listmounts() (string, error) { + + self.activeLock.RLock() + defer self.activeLock.RUnlock() + + var rows []string + for mp := range self.activeMounts { + mountInfo := self.activeMounts[mp] + rows = append(rows, fmt.Sprintf("Swarm Root: %s, Mount Point: %s ", mountInfo.manifestHash, mountInfo.mountPoint)) + } + + return strings.Join(rows, "\n"), nil +} + +func (self *SwarmFS) Stop() bool { + + for mp := range self.activeMounts { + mountInfo := self.activeMounts[mp] + self.Unmount(mountInfo.mountPoint) + } + + return true +} diff --git a/swarm/api/swarmfs_unix_test.go b/swarm/api/swarmfs_unix_test.go new file mode 100644 index 000000000..4f59dba5b --- /dev/null +++ b/swarm/api/swarmfs_unix_test.go @@ -0,0 +1,122 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// +build linux darwin + +package api + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" +) + +var testUploadDir, _ = ioutil.TempDir(os.TempDir(), "fuse-source") +var testMountDir, _ = ioutil.TempDir(os.TempDir(), "fuse-dest") + +func testFuseFileSystem(t *testing.T, f func(*FileSystem)) { + testApi(t, func(api *Api) { + f(NewFileSystem(api)) + }) +} + +func createTestFiles(t *testing.T, files []string) { + + os.RemoveAll(testUploadDir) + os.RemoveAll(testMountDir) + defer os.MkdirAll(testMountDir, 0777) + + for f := range files { + actualPath := filepath.Join(testUploadDir, files[f]) + filePath := filepath.Dir(actualPath) + + err := os.MkdirAll(filePath, 0777) + if err != nil { + t.Fatalf("Error creating directory '%v' : %v", filePath, err) + } + + _, err1 := os.OpenFile(actualPath, os.O_RDONLY|os.O_CREATE, 0666) + if err1 != nil { + t.Fatalf("Error creating file %v: %v", actualPath, err1) + } + } + +} + +func compareFiles(t *testing.T, files []string) { + + for f := range files { + + sourceFile := filepath.Join(testUploadDir, files[f]) + destinationFile := filepath.Join(testMountDir, files[f]) + + sfinfo, err := os.Stat(sourceFile) + if err != nil { + t.Fatalf("Source file %v missing in mount: %v", files[f], err) + } + + dfinfo, err := os.Stat(destinationFile) + if err != nil { + t.Fatalf("Destination file %v missing in mount: %v", files[f], err) + } + + if sfinfo.Size() != dfinfo.Size() { + t.Fatalf("Size mismatch source (%v) vs destination(%v)", sfinfo.Size(), dfinfo.Size()) + } + + if dfinfo.Mode().Perm().String() != "-r-x------" { + t.Fatalf("Permission is not 0500for file: %v", err) + } + + } +} + +func doHashTest(fs *FileSystem, t *testing.T, ensName string, files ...string) { + + createTestFiles(t, files) + bzzhash, err := fs.Upload(testUploadDir, "") + if err != nil { + t.Fatalf("Error uploading directory %v: %v", testUploadDir, err) + } + + swarmfs := NewSwarmFS(fs.api) + _ ,err1 := swarmfs.Mount(bzzhash, testMountDir) + if err1 != nil { + + t.Fatalf("Error mounting hash %v: %v", bzzhash, err) + } + compareFiles(t, files) + _, err2 := swarmfs.Unmount(testMountDir) + if err2 != nil { + t.Fatalf("Error unmounting path %v: %v", testMountDir, err) + } + swarmfs.Stop() + +} + +// mounting with manifest Hash +func TestFuseMountingScenarios(t *testing.T) { + testFuseFileSystem(t, func(fs *FileSystem) { + + //doHashTest(fs,t, "test","1.txt") + doHashTest(fs, t, "", "1.txt") + doHashTest(fs, t, "", "1.txt", "11.txt", "111.txt", "two/2.txt", "two/two/2.txt", "three/3.txt") + doHashTest(fs, t, "", "1/2/3/4/5/6/7/8/9/10/11/12/1.txt") + doHashTest(fs, t, "", "one/one.txt", "one.txt", "once/one.txt", "one/one/one.txt") + + }) +} diff --git a/swarm/api/swarmfs_windows.go b/swarm/api/swarmfs_windows.go new file mode 100644 index 000000000..525a25399 --- /dev/null +++ b/swarm/api/swarmfs_windows.go @@ -0,0 +1,48 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// +build windows + +package api + +import ( + "github.com/ethereum/go-ethereum/log" +) + +// Dummy struct and functions to satsfy windows build +type MountInfo struct { +} + + +func (self *SwarmFS) Mount(mhash, mountpoint string) error { + log.Info("Platform not supported") + return nil +} + +func (self *SwarmFS) Unmount(mountpoint string) error { + log.Info("Platform not supported") + return nil +} + +func (self *SwarmFS) Listmounts() (string, error) { + log.Info("Platform not supported") + return "",nil +} + +func (self *SwarmFS) Stop() error { + log.Info("Platform not supported") + return nil +} \ No newline at end of file diff --git a/swarm/swarm.go b/swarm/swarm.go index bd256edaa..0ce31bcad 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -53,7 +53,8 @@ type Swarm struct { privateKey *ecdsa.PrivateKey corsString string swapEnabled bool - lstore *storage.LocalStore // local store, needs to store for releasing resources after node stopped + lstore *storage.LocalStore // local store, needs to store for releasing resources after node stopped + sfs *api.SwarmFS // need this to cleanup all the active mounts on node exit } type SwarmAPI struct { @@ -142,6 +143,9 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api. // Manifests for Smart Hosting log.Debug(fmt.Sprintf("-> Web3 virtual server API")) + self.sfs = api.NewSwarmFS(self.api) + log.Debug("-> Initializing Fuse file system") + return self, nil } @@ -216,7 +220,7 @@ func (self *Swarm) Stop() error { if self.lstore != nil { self.lstore.DbStore.Close() } - + self.sfs.Stop() return self.config.Save() } @@ -240,6 +244,7 @@ func (self *Swarm) APIs() []rpc.API { Service: api.NewStorage(self.api), Public: true, }, + { Namespace: "bzz", Version: "0.1", @@ -264,6 +269,12 @@ func (self *Swarm) APIs() []rpc.API { Service: chequebook.NewApi(self.config.Swap.Chequebook), Public: false, }, + { + Namespace: "swarmfs", + Version: api.Swarmfs_Version, + Service: self.sfs, + Public: false, + }, // {Namespace, Version, api.NewAdmin(self), false}, } } -- cgit v1.2.3