diff options
author | ethersphere <thesw@rm.eth> | 2018-06-20 20:06:27 +0800 |
---|---|---|
committer | ethersphere <thesw@rm.eth> | 2018-06-22 03:10:31 +0800 |
commit | e187711c6545487d4cac3701f0f506bb536234e2 (patch) | |
tree | d2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/fuse/swarmfs_unix.go | |
parent | 574378edb50c907b532946a1d4654dbd6701b20a (diff) | |
download | dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2 dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst dexon-e187711c6545487d4cac3701f0f506bb536234e2.zip |
swarm: network rewrite merge
Diffstat (limited to 'swarm/fuse/swarmfs_unix.go')
-rw-r--r-- | swarm/fuse/swarmfs_unix.go | 156 |
1 files changed, 102 insertions, 54 deletions
diff --git a/swarm/fuse/swarmfs_unix.go b/swarm/fuse/swarmfs_unix.go index 75742845a..74dd84a90 100644 --- a/swarm/fuse/swarmfs_unix.go +++ b/swarm/fuse/swarmfs_unix.go @@ -30,15 +30,16 @@ import ( "bazil.org/fuse" "bazil.org/fuse/fs" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/log" ) var ( - errEmptyMountPoint = errors.New("need non-empty mount point") - errMaxMountCount = errors.New("max FUSE mount count reached") - errMountTimeout = errors.New("mount timeout") - errAlreadyMounted = errors.New("mount point is already serving") + errEmptyMountPoint = errors.New("need non-empty mount point") + errNoRelativeMountPoint = errors.New("invalid path for mount point (need absolute path)") + errMaxMountCount = errors.New("max FUSE mount count reached") + errMountTimeout = errors.New("mount timeout") + errAlreadyMounted = errors.New("mount point is already serving") ) func isFUSEUnsupportedError(err error) bool { @@ -48,18 +49,20 @@ func isFUSEUnsupportedError(err error) bool { return err == fuse.ErrOSXFUSENotFound } -// information about every active mount +// MountInfo contains information about every active mount type MountInfo struct { MountPoint string StartManifest string LatestManifest string rootDir *SwarmDir fuseConnection *fuse.Conn - swarmApi *api.Api + swarmApi *api.API lock *sync.RWMutex + serveClose chan struct{} } -func NewMountInfo(mhash, mpoint string, sapi *api.Api) *MountInfo { +func NewMountInfo(mhash, mpoint string, sapi *api.API) *MountInfo { + log.Debug("swarmfs NewMountInfo", "hash", mhash, "mount point", mpoint) newMountInfo := &MountInfo{ MountPoint: mpoint, StartManifest: mhash, @@ -68,50 +71,57 @@ func NewMountInfo(mhash, mpoint string, sapi *api.Api) *MountInfo { fuseConnection: nil, swarmApi: sapi, lock: &sync.RWMutex{}, + serveClose: make(chan struct{}), } return newMountInfo } -func (self *SwarmFS) Mount(mhash, mountpoint string) (*MountInfo, error) { - +func (swarmfs *SwarmFS) Mount(mhash, mountpoint string) (*MountInfo, error) { + log.Info("swarmfs", "mounting hash", mhash, "mount point", mountpoint) if mountpoint == "" { return nil, errEmptyMountPoint } + if !strings.HasPrefix(mountpoint, "/") { + return nil, errNoRelativeMountPoint + } cleanedMountPoint, err := filepath.Abs(filepath.Clean(mountpoint)) if err != nil { return nil, err } + log.Trace("swarmfs mount", "cleanedMountPoint", cleanedMountPoint) - self.swarmFsLock.Lock() - defer self.swarmFsLock.Unlock() + swarmfs.swarmFsLock.Lock() + defer swarmfs.swarmFsLock.Unlock() - noOfActiveMounts := len(self.activeMounts) + noOfActiveMounts := len(swarmfs.activeMounts) + log.Debug("swarmfs mount", "# active mounts", noOfActiveMounts) if noOfActiveMounts >= maxFuseMounts { return nil, errMaxMountCount } - if _, ok := self.activeMounts[cleanedMountPoint]; ok { + if _, ok := swarmfs.activeMounts[cleanedMountPoint]; ok { return nil, errAlreadyMounted } - log.Info(fmt.Sprintf("Attempting to mount %s ", cleanedMountPoint)) - _, manifestEntryMap, err := self.swarmApi.BuildDirectoryTree(mhash, true) + log.Trace("swarmfs mount: getting manifest tree") + _, manifestEntryMap, err := swarmfs.swarmApi.BuildDirectoryTree(mhash, true) if err != nil { return nil, err } - mi := NewMountInfo(mhash, cleanedMountPoint, self.swarmApi) + log.Trace("swarmfs mount: building mount info") + mi := NewMountInfo(mhash, cleanedMountPoint, swarmfs.swarmApi) dirTree := map[string]*SwarmDir{} rootDir := NewSwarmDir("/", mi) - dirTree["/"] = rootDir + log.Trace("swarmfs mount", "rootDir", rootDir) mi.rootDir = rootDir + log.Trace("swarmfs mount: traversing manifest map") for suffix, entry := range manifestEntryMap { - key := common.Hex2Bytes(entry.Hash) + addr := common.Hex2Bytes(entry.Hash) fullpath := "/" + suffix basepath := filepath.Dir(fullpath) - parentDir := rootDir dirUntilNow := "" paths := strings.Split(basepath, "/") @@ -128,105 +138,143 @@ func (self *SwarmFS) Mount(mhash, mountpoint string) (*MountInfo, error) { } else { parentDir = dirTree[dirUntilNow] } - } } thisFile := NewSwarmFile(basepath, filepath.Base(fullpath), mi) - thisFile.key = key + thisFile.addr = addr parentDir.files = append(parentDir.files, thisFile) } fconn, err := fuse.Mount(cleanedMountPoint, fuse.FSName("swarmfs"), fuse.VolumeName(mhash)) if isFUSEUnsupportedError(err) { - log.Warn("Fuse not installed", "mountpoint", cleanedMountPoint, "err", err) + log.Error("swarmfs error - FUSE not installed", "mountpoint", cleanedMountPoint, "err", err) return nil, err } else if err != nil { fuse.Unmount(cleanedMountPoint) - log.Warn("Error mounting swarm manifest", "mountpoint", cleanedMountPoint, "err", err) + log.Error("swarmfs error mounting swarm manifest", "mountpoint", cleanedMountPoint, "err", err) return nil, err } mi.fuseConnection = fconn serverr := make(chan error, 1) go func() { - log.Info(fmt.Sprintf("Serving %s at %s", mhash, cleanedMountPoint)) + log.Info("swarmfs", "serving hash", mhash, "at", cleanedMountPoint) filesys := &SwarmRoot{root: rootDir} + //start serving the actual file system; see note below if err := fs.Serve(fconn, filesys); err != nil { - log.Warn(fmt.Sprintf("Could not Serve SwarmFileSystem error: %v", err)) + log.Warn("swarmfs could not serve the requested hash", "error", err) serverr <- err } - + mi.serveClose <- struct{}{} }() + /* + IMPORTANT NOTE: the fs.Serve function is blocking; + Serve builds up the actual fuse file system by calling the + Attr functions on each SwarmFile, creating the file inodes; + specifically calling the swarm's LazySectionReader.Size() to set the file size. + + This can take some time, and it appears that if we access the fuse file system + too early, we can bring the tests to deadlock. The assumption so far is that + at this point, the fuse driver didn't finish to initialize the file system. + + Accessing files too early not only deadlocks the tests, but locks the access + of the fuse file completely, resulting in blocked resources at OS system level. + Even a simple `ls /tmp/testDir/testMountDir` could deadlock in a shell. + + Workaround so far is to wait some time to give the OS enough time to initialize + the fuse file system. During tests, this seemed to address the issue. + + HOWEVER IT SHOULD BE NOTED THAT THIS MAY ONLY BE AN EFFECT, + AND THE DEADLOCK CAUSED BY SOMETHING ELSE BLOCKING ACCESS DUE TO SOME RACE CONDITION + (caused in the bazil.org library and/or the SwarmRoot, SwarmDir and SwarmFile implementations) + */ + time.Sleep(2 * time.Second) + + timer := time.NewTimer(mountTimeout) + defer timer.Stop() // Check if the mount process has an error to report. select { - case <-time.After(mountTimeout): - fuse.Unmount(cleanedMountPoint) + case <-timer.C: + log.Warn("swarmfs timed out mounting over FUSE", "mountpoint", cleanedMountPoint, "err", err) + err := fuse.Unmount(cleanedMountPoint) + if err != nil { + return nil, err + } return nil, errMountTimeout - case err := <-serverr: - fuse.Unmount(cleanedMountPoint) - log.Warn("Error serving swarm FUSE FS", "mountpoint", cleanedMountPoint, "err", err) + log.Warn("swarmfs error serving over FUSE", "mountpoint", cleanedMountPoint, "err", err) + err = fuse.Unmount(cleanedMountPoint) return nil, err case <-fconn.Ready: - log.Info("Now serving swarm FUSE FS", "manifest", mhash, "mountpoint", cleanedMountPoint) + //this signals that the actual mount point from the fuse.Mount call is ready; + //it does not signal though that the file system from fs.Serve is actually fully built up + if err := fconn.MountError; err != nil { + log.Error("Mounting error from fuse driver: ", "err", err) + return nil, err + } + log.Info("swarmfs now served over FUSE", "manifest", mhash, "mountpoint", cleanedMountPoint) } - self.activeMounts[cleanedMountPoint] = mi + timer.Stop() + swarmfs.activeMounts[cleanedMountPoint] = mi return mi, nil } -func (self *SwarmFS) Unmount(mountpoint string) (*MountInfo, error) { - - self.swarmFsLock.Lock() - defer self.swarmFsLock.Unlock() +func (swarmfs *SwarmFS) Unmount(mountpoint string) (*MountInfo, error) { + swarmfs.swarmFsLock.Lock() + defer swarmfs.swarmFsLock.Unlock() cleanedMountPoint, err := filepath.Abs(filepath.Clean(mountpoint)) if err != nil { return nil, err } - mountInfo := self.activeMounts[cleanedMountPoint] + mountInfo := swarmfs.activeMounts[cleanedMountPoint] if mountInfo == nil || mountInfo.MountPoint != cleanedMountPoint { - return nil, fmt.Errorf("%s is not mounted", cleanedMountPoint) + return nil, fmt.Errorf("swarmfs %s is not mounted", cleanedMountPoint) } err = fuse.Unmount(cleanedMountPoint) if err != nil { err1 := externalUnmount(cleanedMountPoint) if err1 != nil { - errStr := fmt.Sprintf("UnMount error: %v", err) + errStr := fmt.Sprintf("swarmfs unmount error: %v", err) log.Warn(errStr) return nil, err1 } } - mountInfo.fuseConnection.Close() - delete(self.activeMounts, cleanedMountPoint) + err = mountInfo.fuseConnection.Close() + if err != nil { + return nil, err + } + delete(swarmfs.activeMounts, cleanedMountPoint) - succString := fmt.Sprintf("UnMounting %v succeeded", cleanedMountPoint) + <-mountInfo.serveClose + + succString := fmt.Sprintf("swarmfs unmounting %v succeeded", cleanedMountPoint) log.Info(succString) return mountInfo, nil } -func (self *SwarmFS) Listmounts() []*MountInfo { - self.swarmFsLock.RLock() - defer self.swarmFsLock.RUnlock() - - rows := make([]*MountInfo, 0, len(self.activeMounts)) - for _, mi := range self.activeMounts { +func (swarmfs *SwarmFS) Listmounts() []*MountInfo { + swarmfs.swarmFsLock.RLock() + defer swarmfs.swarmFsLock.RUnlock() + rows := make([]*MountInfo, 0, len(swarmfs.activeMounts)) + for _, mi := range swarmfs.activeMounts { rows = append(rows, mi) } return rows } -func (self *SwarmFS) Stop() bool { - for mp := range self.activeMounts { - mountInfo := self.activeMounts[mp] - self.Unmount(mountInfo.MountPoint) +func (swarmfs *SwarmFS) Stop() bool { + for mp := range swarmfs.activeMounts { + mountInfo := swarmfs.activeMounts[mp] + swarmfs.Unmount(mountInfo.MountPoint) } return true } |