From e187711c6545487d4cac3701f0f506bb536234e2 Mon Sep 17 00:00:00 2001 From: ethersphere Date: Wed, 20 Jun 2018 14:06:27 +0200 Subject: swarm: network rewrite merge --- swarm/api/manifest.go | 185 +++++++++++++++++++++++++++++--------------------- 1 file changed, 107 insertions(+), 78 deletions(-) (limited to 'swarm/api/manifest.go') diff --git a/swarm/api/manifest.go b/swarm/api/manifest.go index 685a300fc..28597636e 100644 --- a/swarm/api/manifest.go +++ b/swarm/api/manifest.go @@ -24,16 +24,18 @@ import ( "io" "net/http" "strings" - "sync" "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage" ) const ( - ManifestType = "application/bzz-manifest+json" + ManifestType = "application/bzz-manifest+json" + ResourceContentType = "application/bzz-resource" + + manifestSizeLimit = 5 * 1024 * 1024 ) // Manifest represents a swarm manifest @@ -59,38 +61,58 @@ type ManifestList struct { } // NewManifest creates and stores a new, empty manifest -func (a *Api) NewManifest() (storage.Key, error) { +func (a *API) NewManifest(toEncrypt bool) (storage.Address, error) { var manifest Manifest data, err := json.Marshal(&manifest) if err != nil { return nil, err } - return a.Store(bytes.NewReader(data), int64(len(data)), &sync.WaitGroup{}) + key, wait, err := a.Store(bytes.NewReader(data), int64(len(data)), toEncrypt) + wait() + return key, err +} + +// Manifest hack for supporting Mutable Resource Updates from the bzz: scheme +// see swarm/api/api.go:API.Get() for more information +func (a *API) NewResourceManifest(resourceAddr string) (storage.Address, error) { + var manifest Manifest + entry := ManifestEntry{ + Hash: resourceAddr, + ContentType: ResourceContentType, + } + manifest.Entries = append(manifest.Entries, entry) + data, err := json.Marshal(&manifest) + if err != nil { + return nil, err + } + key, _, err := a.Store(bytes.NewReader(data), int64(len(data)), false) + return key, err } // ManifestWriter is used to add and remove entries from an underlying manifest type ManifestWriter struct { - api *Api + api *API trie *manifestTrie quitC chan bool } -func (a *Api) NewManifestWriter(key storage.Key, quitC chan bool) (*ManifestWriter, error) { - trie, err := loadManifest(a.dpa, key, quitC) +func (a *API) NewManifestWriter(addr storage.Address, quitC chan bool) (*ManifestWriter, error) { + trie, err := loadManifest(a.fileStore, addr, quitC) if err != nil { - return nil, fmt.Errorf("error loading manifest %s: %s", key, err) + return nil, fmt.Errorf("error loading manifest %s: %s", addr, err) } return &ManifestWriter{a, trie, quitC}, nil } // AddEntry stores the given data and adds the resulting key to the manifest -func (m *ManifestWriter) AddEntry(data io.Reader, e *ManifestEntry) (storage.Key, error) { - key, err := m.api.Store(data, e.Size, nil) +func (m *ManifestWriter) AddEntry(data io.Reader, e *ManifestEntry) (storage.Address, error) { + + key, _, err := m.api.Store(data, e.Size, m.trie.encrypted) if err != nil { return nil, err } entry := newManifestTrieEntry(e, nil) - entry.Hash = key.String() + entry.Hash = key.Hex() m.trie.addEntry(entry, m.quitC) return key, nil } @@ -102,29 +124,29 @@ func (m *ManifestWriter) RemoveEntry(path string) error { } // Store stores the manifest, returning the resulting storage key -func (m *ManifestWriter) Store() (storage.Key, error) { - return m.trie.hash, m.trie.recalcAndStore() +func (m *ManifestWriter) Store() (storage.Address, error) { + return m.trie.ref, m.trie.recalcAndStore() } // ManifestWalker is used to recursively walk the entries in the manifest and // all of its submanifests type ManifestWalker struct { - api *Api + api *API trie *manifestTrie quitC chan bool } -func (a *Api) NewManifestWalker(key storage.Key, quitC chan bool) (*ManifestWalker, error) { - trie, err := loadManifest(a.dpa, key, quitC) +func (a *API) NewManifestWalker(addr storage.Address, quitC chan bool) (*ManifestWalker, error) { + trie, err := loadManifest(a.fileStore, addr, quitC) if err != nil { - return nil, fmt.Errorf("error loading manifest %s: %s", key, err) + return nil, fmt.Errorf("error loading manifest %s: %s", addr, err) } return &ManifestWalker{a, trie, quitC}, nil } -// SkipManifest is used as a return value from WalkFn to indicate that the +// ErrSkipManifest is used as a return value from WalkFn to indicate that the // manifest should be skipped -var SkipManifest = errors.New("skip this manifest") +var ErrSkipManifest = errors.New("skip this manifest") // WalkFn is the type of function called for each entry visited by a recursive // manifest walk @@ -144,7 +166,7 @@ func (m *ManifestWalker) walk(trie *manifestTrie, prefix string, walkFn WalkFn) entry.Path = prefix + entry.Path err := walkFn(&entry.ManifestEntry) if err != nil { - if entry.ContentType == ManifestType && err == SkipManifest { + if entry.ContentType == ManifestType && err == ErrSkipManifest { continue } return err @@ -163,9 +185,10 @@ func (m *ManifestWalker) walk(trie *manifestTrie, prefix string, walkFn WalkFn) } type manifestTrie struct { - dpa *storage.DPA - entries [257]*manifestTrieEntry // indexed by first character of basePath, entries[256] is the empty basePath entry - hash storage.Key // if hash != nil, it is stored + fileStore *storage.FileStore + entries [257]*manifestTrieEntry // indexed by first character of basePath, entries[256] is the empty basePath entry + ref storage.Address // if ref != nil, it is stored + encrypted bool } func newManifestTrieEntry(entry *ManifestEntry, subtrie *manifestTrie) *manifestTrieEntry { @@ -181,48 +204,55 @@ type manifestTrieEntry struct { subtrie *manifestTrie } -func loadManifest(dpa *storage.DPA, hash storage.Key, quitC chan bool) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand - - log.Trace(fmt.Sprintf("manifest lookup key: '%v'.", hash.Log())) - // retrieve manifest via DPA - manifestReader := dpa.Retrieve(hash) - return readManifest(manifestReader, hash, dpa, quitC) +func loadManifest(fileStore *storage.FileStore, hash storage.Address, quitC chan bool) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand + log.Trace("manifest lookup", "key", hash) + // retrieve manifest via FileStore + manifestReader, isEncrypted := fileStore.Retrieve(hash) + log.Trace("reader retrieved", "key", hash) + return readManifest(manifestReader, hash, fileStore, isEncrypted, quitC) } -func readManifest(manifestReader storage.LazySectionReader, hash storage.Key, dpa *storage.DPA, quitC chan bool) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand +func readManifest(manifestReader storage.LazySectionReader, hash storage.Address, fileStore *storage.FileStore, isEncrypted bool, quitC chan bool) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand // TODO check size for oversized manifests size, err := manifestReader.Size(quitC) if err != nil { // size == 0 // can't determine size means we don't have the root chunk + log.Trace("manifest not found", "key", hash) err = fmt.Errorf("Manifest not Found") return } + if size > manifestSizeLimit { + log.Warn("manifest exceeds size limit", "key", hash, "size", size, "limit", manifestSizeLimit) + err = fmt.Errorf("Manifest size of %v bytes exceeds the %v byte limit", size, manifestSizeLimit) + return + } manifestData := make([]byte, size) read, err := manifestReader.Read(manifestData) if int64(read) < size { - log.Trace(fmt.Sprintf("Manifest %v not found.", hash.Log())) + log.Trace("manifest not found", "key", hash) if err == nil { err = fmt.Errorf("Manifest retrieval cut short: read %v, expect %v", read, size) } return } - log.Trace(fmt.Sprintf("Manifest %v retrieved", hash.Log())) + log.Debug("manifest retrieved", "key", hash) var man struct { Entries []*manifestTrieEntry `json:"entries"` } err = json.Unmarshal(manifestData, &man) if err != nil { err = fmt.Errorf("Manifest %v is malformed: %v", hash.Log(), err) - log.Trace(fmt.Sprintf("%v", err)) + log.Trace("malformed manifest", "key", hash) return } - log.Trace(fmt.Sprintf("Manifest %v has %d entries.", hash.Log(), len(man.Entries))) + log.Trace("manifest entries", "key", hash, "len", len(man.Entries)) trie = &manifestTrie{ - dpa: dpa, + fileStore: fileStore, + encrypted: isEncrypted, } for _, entry := range man.Entries { trie.addEntry(entry, quitC) @@ -230,18 +260,18 @@ func readManifest(manifestReader storage.LazySectionReader, hash storage.Key, dp return } -func (self *manifestTrie) addEntry(entry *manifestTrieEntry, quitC chan bool) { - self.hash = nil // trie modified, hash needs to be re-calculated on demand +func (mt *manifestTrie) addEntry(entry *manifestTrieEntry, quitC chan bool) { + mt.ref = nil // trie modified, hash needs to be re-calculated on demand if len(entry.Path) == 0 { - self.entries[256] = entry + mt.entries[256] = entry return } b := entry.Path[0] - oldentry := self.entries[b] + oldentry := mt.entries[b] if (oldentry == nil) || (oldentry.Path == entry.Path && oldentry.ContentType != ManifestType) { - self.entries[b] = entry + mt.entries[b] = entry return } @@ -251,7 +281,7 @@ func (self *manifestTrie) addEntry(entry *manifestTrieEntry, quitC chan bool) { } if (oldentry.ContentType == ManifestType) && (cpl == len(oldentry.Path)) { - if self.loadSubTrie(oldentry, quitC) != nil { + if mt.loadSubTrie(oldentry, quitC) != nil { return } entry.Path = entry.Path[cpl:] @@ -263,21 +293,22 @@ func (self *manifestTrie) addEntry(entry *manifestTrieEntry, quitC chan bool) { commonPrefix := entry.Path[:cpl] subtrie := &manifestTrie{ - dpa: self.dpa, + fileStore: mt.fileStore, + encrypted: mt.encrypted, } entry.Path = entry.Path[cpl:] oldentry.Path = oldentry.Path[cpl:] subtrie.addEntry(entry, quitC) subtrie.addEntry(oldentry, quitC) - self.entries[b] = newManifestTrieEntry(&ManifestEntry{ + mt.entries[b] = newManifestTrieEntry(&ManifestEntry{ Path: commonPrefix, ContentType: ManifestType, }, subtrie) } -func (self *manifestTrie) getCountLast() (cnt int, entry *manifestTrieEntry) { - for _, e := range self.entries { +func (mt *manifestTrie) getCountLast() (cnt int, entry *manifestTrieEntry) { + for _, e := range mt.entries { if e != nil { cnt++ entry = e @@ -286,27 +317,27 @@ func (self *manifestTrie) getCountLast() (cnt int, entry *manifestTrieEntry) { return } -func (self *manifestTrie) deleteEntry(path string, quitC chan bool) { - self.hash = nil // trie modified, hash needs to be re-calculated on demand +func (mt *manifestTrie) deleteEntry(path string, quitC chan bool) { + mt.ref = nil // trie modified, hash needs to be re-calculated on demand if len(path) == 0 { - self.entries[256] = nil + mt.entries[256] = nil return } b := path[0] - entry := self.entries[b] + entry := mt.entries[b] if entry == nil { return } if entry.Path == path { - self.entries[b] = nil + mt.entries[b] = nil return } epl := len(entry.Path) if (entry.ContentType == ManifestType) && (len(path) >= epl) && (path[:epl] == entry.Path) { - if self.loadSubTrie(entry, quitC) != nil { + if mt.loadSubTrie(entry, quitC) != nil { return } entry.subtrie.deleteEntry(path[epl:], quitC) @@ -317,13 +348,13 @@ func (self *manifestTrie) deleteEntry(path string, quitC chan bool) { if lastentry != nil { lastentry.Path = entry.Path + lastentry.Path } - self.entries[b] = lastentry + mt.entries[b] = lastentry } } } -func (self *manifestTrie) recalcAndStore() error { - if self.hash != nil { +func (mt *manifestTrie) recalcAndStore() error { + if mt.ref != nil { return nil } @@ -331,14 +362,14 @@ func (self *manifestTrie) recalcAndStore() error { buffer.WriteString(`{"entries":[`) list := &Manifest{} - for _, entry := range self.entries { + for _, entry := range mt.entries { if entry != nil { if entry.Hash == "" { // TODO: paralellize err := entry.subtrie.recalcAndStore() if err != nil { return err } - entry.Hash = entry.subtrie.hash.String() + entry.Hash = entry.subtrie.ref.Hex() } list.Entries = append(list.Entries, entry.ManifestEntry) } @@ -351,23 +382,22 @@ func (self *manifestTrie) recalcAndStore() error { } sr := bytes.NewReader(manifest) - wg := &sync.WaitGroup{} - key, err2 := self.dpa.Store(sr, int64(len(manifest)), wg, nil) - wg.Wait() - self.hash = key + key, wait, err2 := mt.fileStore.Store(sr, int64(len(manifest)), mt.encrypted) + wait() + mt.ref = key return err2 } -func (self *manifestTrie) loadSubTrie(entry *manifestTrieEntry, quitC chan bool) (err error) { +func (mt *manifestTrie) loadSubTrie(entry *manifestTrieEntry, quitC chan bool) (err error) { if entry.subtrie == nil { hash := common.Hex2Bytes(entry.Hash) - entry.subtrie, err = loadManifest(self.dpa, hash, quitC) + entry.subtrie, err = loadManifest(mt.fileStore, hash, quitC) entry.Hash = "" // might not match, should be recalculated } return } -func (self *manifestTrie) listWithPrefixInt(prefix, rp string, quitC chan bool, cb func(entry *manifestTrieEntry, suffix string)) error { +func (mt *manifestTrie) listWithPrefixInt(prefix, rp string, quitC chan bool, cb func(entry *manifestTrieEntry, suffix string)) error { plen := len(prefix) var start, stop int if plen == 0 { @@ -384,7 +414,7 @@ func (self *manifestTrie) listWithPrefixInt(prefix, rp string, quitC chan bool, return fmt.Errorf("aborted") default: } - entry := self.entries[i] + entry := mt.entries[i] if entry != nil { epl := len(entry.Path) if entry.ContentType == ManifestType { @@ -393,7 +423,7 @@ func (self *manifestTrie) listWithPrefixInt(prefix, rp string, quitC chan bool, l = epl } if prefix[:l] == entry.Path[:l] { - err := self.loadSubTrie(entry, quitC) + err := mt.loadSubTrie(entry, quitC) if err != nil { return err } @@ -412,23 +442,22 @@ func (self *manifestTrie) listWithPrefixInt(prefix, rp string, quitC chan bool, return nil } -func (self *manifestTrie) listWithPrefix(prefix string, quitC chan bool, cb func(entry *manifestTrieEntry, suffix string)) (err error) { - return self.listWithPrefixInt(prefix, "", quitC, cb) +func (mt *manifestTrie) listWithPrefix(prefix string, quitC chan bool, cb func(entry *manifestTrieEntry, suffix string)) (err error) { + return mt.listWithPrefixInt(prefix, "", quitC, cb) } -func (self *manifestTrie) findPrefixOf(path string, quitC chan bool) (entry *manifestTrieEntry, pos int) { - +func (mt *manifestTrie) findPrefixOf(path string, quitC chan bool) (entry *manifestTrieEntry, pos int) { log.Trace(fmt.Sprintf("findPrefixOf(%s)", path)) if len(path) == 0 { - return self.entries[256], 0 + return mt.entries[256], 0 } //see if first char is in manifest entries b := path[0] - entry = self.entries[b] + entry = mt.entries[b] if entry == nil { - return self.entries[256], 0 + return mt.entries[256], 0 } epl := len(entry.Path) @@ -436,7 +465,7 @@ func (self *manifestTrie) findPrefixOf(path string, quitC chan bool) (entry *man if len(path) <= epl { if entry.Path[:len(path)] == path { if entry.ContentType == ManifestType { - err := self.loadSubTrie(entry, quitC) + err := mt.loadSubTrie(entry, quitC) if err == nil && entry.subtrie != nil { subentries := entry.subtrie.entries for i := 0; i < len(subentries); i++ { @@ -457,7 +486,7 @@ func (self *manifestTrie) findPrefixOf(path string, quitC chan bool) (entry *man log.Trace(fmt.Sprintf("entry.ContentType = %v", entry.ContentType)) //the subentry is a manifest, load subtrie if entry.ContentType == ManifestType && (strings.Contains(entry.Path, path) || strings.Contains(path, entry.Path)) { - err := self.loadSubTrie(entry, quitC) + err := mt.loadSubTrie(entry, quitC) if err != nil { return nil, 0 } @@ -478,7 +507,7 @@ func (self *manifestTrie) findPrefixOf(path string, quitC chan bool) (entry *man pos = epl } } - return + return nil, 0 } // file system manifest always contains regularized paths @@ -495,10 +524,10 @@ func RegularSlashes(path string) (res string) { return } -func (self *manifestTrie) getEntry(spath string) (entry *manifestTrieEntry, fullpath string) { +func (mt *manifestTrie) getEntry(spath string) (entry *manifestTrieEntry, fullpath string) { path := RegularSlashes(spath) var pos int quitC := make(chan bool) - entry, pos = self.findPrefixOf(path, quitC) + entry, pos = mt.findPrefixOf(path, quitC) return entry, path[:pos] } -- cgit v1.2.3