diff options
author | Janoš Guljaš <janos@users.noreply.github.com> | 2019-02-23 17:47:33 +0800 |
---|---|---|
committer | Viktor Trón <viktor.tron@gmail.com> | 2019-02-23 17:47:33 +0800 |
commit | 64d10c08726af33048e8eeb8df257628a3944870 (patch) | |
tree | 00002e147d650e1d6ec571731bbf900e77d9e307 /swarm/storage/mock | |
parent | 02c28046a04ebf649af5d1b2a702d0da1c8a2a39 (diff) | |
download | go-tangerine-64d10c08726af33048e8eeb8df257628a3944870.tar go-tangerine-64d10c08726af33048e8eeb8df257628a3944870.tar.gz go-tangerine-64d10c08726af33048e8eeb8df257628a3944870.tar.bz2 go-tangerine-64d10c08726af33048e8eeb8df257628a3944870.tar.lz go-tangerine-64d10c08726af33048e8eeb8df257628a3944870.tar.xz go-tangerine-64d10c08726af33048e8eeb8df257628a3944870.tar.zst go-tangerine-64d10c08726af33048e8eeb8df257628a3944870.zip |
swarm: mock store listings (#19157)
* swarm/storage/mock: implement listings methods for mem and rpc stores
* swarm/storage/mock/rpc: add comments and newTestStore helper function
* swarm/storage/mock/mem: add missing comments
* swarm/storage/mock: add comments to new types and constants
* swarm/storage/mock/db: implement listings for mock/db global store
* swarm/storage/mock/test: add comments for MockStoreListings
* swarm/storage/mock/explorer: initial implementation
* cmd/swarm/global-store: add chunk explorer
* cmd/swarm/global-store: add chunk explorer tests
* swarm/storage/mock/explorer: add tests
* swarm/storage/mock/explorer: add swagger api definition
* swarm/storage/mock/explorer: not-zero test values for invalid addr and key
* swarm/storage/mock/explorer: test wildcard cors origin
* swarm/storage/mock/db: renames based on Fabio's suggestions
* swarm/storage/mock/explorer: add more comments to testHandler function
* cmd/swarm/global-store: terminate subprocess with Kill in tests
Diffstat (limited to 'swarm/storage/mock')
-rw-r--r-- | swarm/storage/mock/db/db.go | 295 | ||||
-rw-r--r-- | swarm/storage/mock/db/db_test.go | 58 | ||||
-rw-r--r-- | swarm/storage/mock/explorer/explorer.go | 257 | ||||
-rw-r--r-- | swarm/storage/mock/explorer/explorer_test.go | 471 | ||||
-rw-r--r-- | swarm/storage/mock/explorer/headers_test.go | 163 | ||||
-rw-r--r-- | swarm/storage/mock/explorer/swagger.yaml | 176 | ||||
-rw-r--r-- | swarm/storage/mock/mem/mem.go | 270 | ||||
-rw-r--r-- | swarm/storage/mock/mem/mem_test.go | 6 | ||||
-rw-r--r-- | swarm/storage/mock/mock.go | 31 | ||||
-rw-r--r-- | swarm/storage/mock/rpc/rpc.go | 24 | ||||
-rw-r--r-- | swarm/storage/mock/rpc/rpc_test.go | 29 | ||||
-rw-r--r-- | swarm/storage/mock/test/test.go | 118 |
12 files changed, 1797 insertions, 101 deletions
diff --git a/swarm/storage/mock/db/db.go b/swarm/storage/mock/db/db.go index 73ae199e8..313a61b43 100644 --- a/swarm/storage/mock/db/db.go +++ b/swarm/storage/mock/db/db.go @@ -21,8 +21,12 @@ import ( "archive/tar" "bytes" "encoding/json" + "errors" + "fmt" "io" "io/ioutil" + "sync" + "time" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/util" @@ -37,6 +41,10 @@ import ( // release resources used by the database. type GlobalStore struct { db *leveldb.DB + // protects nodes and keys indexes + // in Put and Delete methods + nodesLocks sync.Map + keysLocks sync.Map } // NewGlobalStore creates a new instance of GlobalStore. @@ -64,14 +72,14 @@ func (s *GlobalStore) NewNodeStore(addr common.Address) *mock.NodeStore { // Get returns chunk data if the chunk with key exists for node // on address addr. func (s *GlobalStore) Get(addr common.Address, key []byte) (data []byte, err error) { - has, err := s.db.Has(nodeDBKey(addr, key), nil) + has, err := s.db.Has(indexForHashesPerNode(addr, key), nil) if err != nil { return nil, mock.ErrNotFound } if !has { return nil, mock.ErrNotFound } - data, err = s.db.Get(dataDBKey(key), nil) + data, err = s.db.Get(indexDataKey(key), nil) if err == leveldb.ErrNotFound { err = mock.ErrNotFound } @@ -80,28 +88,165 @@ func (s *GlobalStore) Get(addr common.Address, key []byte) (data []byte, err err // Put saves the chunk data for node with address addr. func (s *GlobalStore) Put(addr common.Address, key []byte, data []byte) error { + unlock, err := s.lock(addr, key) + if err != nil { + return err + } + defer unlock() + batch := new(leveldb.Batch) - batch.Put(nodeDBKey(addr, key), nil) - batch.Put(dataDBKey(key), data) + batch.Put(indexForHashesPerNode(addr, key), nil) + batch.Put(indexForNodesWithHash(key, addr), nil) + batch.Put(indexForNodes(addr), nil) + batch.Put(indexForHashes(key), nil) + batch.Put(indexDataKey(key), data) return s.db.Write(batch, nil) } // Delete removes the chunk reference to node with address addr. func (s *GlobalStore) Delete(addr common.Address, key []byte) error { + unlock, err := s.lock(addr, key) + if err != nil { + return err + } + defer unlock() + batch := new(leveldb.Batch) - batch.Delete(nodeDBKey(addr, key)) + batch.Delete(indexForHashesPerNode(addr, key)) + batch.Delete(indexForNodesWithHash(key, addr)) + + // check if this node contains any keys, and if not + // remove it from the + x := indexForHashesPerNodePrefix(addr) + if k, _ := s.db.Get(x, nil); !bytes.HasPrefix(k, x) { + batch.Delete(indexForNodes(addr)) + } + + x = indexForNodesWithHashPrefix(key) + if k, _ := s.db.Get(x, nil); !bytes.HasPrefix(k, x) { + batch.Delete(indexForHashes(key)) + } return s.db.Write(batch, nil) } // HasKey returns whether a node with addr contains the key. func (s *GlobalStore) HasKey(addr common.Address, key []byte) bool { - has, err := s.db.Has(nodeDBKey(addr, key), nil) + has, err := s.db.Has(indexForHashesPerNode(addr, key), nil) if err != nil { has = false } return has } +// Keys returns a paginated list of keys on all nodes. +func (s *GlobalStore) Keys(startKey []byte, limit int) (keys mock.Keys, err error) { + return s.keys(nil, startKey, limit) +} + +// Nodes returns a paginated list of all known nodes. +func (s *GlobalStore) Nodes(startAddr *common.Address, limit int) (nodes mock.Nodes, err error) { + return s.nodes(nil, startAddr, limit) +} + +// NodeKeys returns a paginated list of keys on a node with provided address. +func (s *GlobalStore) NodeKeys(addr common.Address, startKey []byte, limit int) (keys mock.Keys, err error) { + return s.keys(&addr, startKey, limit) +} + +// KeyNodes returns a paginated list of nodes that contain a particular key. +func (s *GlobalStore) KeyNodes(key []byte, startAddr *common.Address, limit int) (nodes mock.Nodes, err error) { + return s.nodes(key, startAddr, limit) +} + +// keys returns a paginated list of keys. If addr is not nil, only keys on that +// node will be returned. +func (s *GlobalStore) keys(addr *common.Address, startKey []byte, limit int) (keys mock.Keys, err error) { + iter := s.db.NewIterator(nil, nil) + defer iter.Release() + + if limit <= 0 { + limit = mock.DefaultLimit + } + + prefix := []byte{indexForHashesPrefix} + if addr != nil { + prefix = indexForHashesPerNodePrefix(*addr) + } + if startKey != nil { + if addr != nil { + startKey = indexForHashesPerNode(*addr, startKey) + } else { + startKey = indexForHashes(startKey) + } + } else { + startKey = prefix + } + + ok := iter.Seek(startKey) + if !ok { + return keys, iter.Error() + } + for ; ok; ok = iter.Next() { + k := iter.Key() + if !bytes.HasPrefix(k, prefix) { + break + } + key := append([]byte(nil), bytes.TrimPrefix(k, prefix)...) + + if len(keys.Keys) >= limit { + keys.Next = key + break + } + + keys.Keys = append(keys.Keys, key) + } + return keys, iter.Error() +} + +// nodes returns a paginated list of node addresses. If key is not nil, +// only nodes that contain that key will be returned. +func (s *GlobalStore) nodes(key []byte, startAddr *common.Address, limit int) (nodes mock.Nodes, err error) { + iter := s.db.NewIterator(nil, nil) + defer iter.Release() + + if limit <= 0 { + limit = mock.DefaultLimit + } + + prefix := []byte{indexForNodesPrefix} + if key != nil { + prefix = indexForNodesWithHashPrefix(key) + } + startKey := prefix + if startAddr != nil { + if key != nil { + startKey = indexForNodesWithHash(key, *startAddr) + } else { + startKey = indexForNodes(*startAddr) + } + } + + ok := iter.Seek(startKey) + if !ok { + return nodes, iter.Error() + } + for ; ok; ok = iter.Next() { + k := iter.Key() + if !bytes.HasPrefix(k, prefix) { + break + } + addr := common.BytesToAddress(append([]byte(nil), bytes.TrimPrefix(k, prefix)...)) + + if len(nodes.Addrs) >= limit { + nodes.Next = &addr + break + } + + nodes.Addrs = append(nodes.Addrs, addr) + } + return nodes, iter.Error() +} + // Import reads tar archive from a reader that contains exported chunk data. // It returns the number of chunks imported and an error. func (s *GlobalStore) Import(r io.Reader) (n int, err error) { @@ -126,12 +271,18 @@ func (s *GlobalStore) Import(r io.Reader) (n int, err error) { return n, err } + key := common.Hex2Bytes(hdr.Name) + batch := new(leveldb.Batch) for _, addr := range c.Addrs { - batch.Put(nodeDBKeyHex(addr, hdr.Name), nil) + batch.Put(indexForHashesPerNode(addr, key), nil) + batch.Put(indexForNodesWithHash(key, addr), nil) + batch.Put(indexForNodes(addr), nil) } - batch.Put(dataDBKey(common.Hex2Bytes(hdr.Name)), c.Data) + batch.Put(indexForHashes(key), nil) + batch.Put(indexDataKey(key), c.Data) + if err = s.db.Write(batch, nil); err != nil { return n, err } @@ -150,18 +301,23 @@ func (s *GlobalStore) Export(w io.Writer) (n int, err error) { buf := bytes.NewBuffer(make([]byte, 0, 1024)) encoder := json.NewEncoder(buf) - iter := s.db.NewIterator(util.BytesPrefix(nodeKeyPrefix), nil) + snap, err := s.db.GetSnapshot() + if err != nil { + return 0, err + } + + iter := snap.NewIterator(util.BytesPrefix([]byte{indexForHashesByNodePrefix}), nil) defer iter.Release() var currentKey string var addrs []common.Address - saveChunk := func(hexKey string) error { - key := common.Hex2Bytes(hexKey) + saveChunk := func() error { + hexKey := currentKey - data, err := s.db.Get(dataDBKey(key), nil) + data, err := snap.Get(indexDataKey(common.Hex2Bytes(hexKey)), nil) if err != nil { - return err + return fmt.Errorf("get data %s: %v", hexKey, err) } buf.Reset() @@ -189,8 +345,8 @@ func (s *GlobalStore) Export(w io.Writer) (n int, err error) { } for iter.Next() { - k := bytes.TrimPrefix(iter.Key(), nodeKeyPrefix) - i := bytes.Index(k, []byte("-")) + k := bytes.TrimPrefix(iter.Key(), []byte{indexForHashesByNodePrefix}) + i := bytes.Index(k, []byte{keyTermByte}) if i < 0 { continue } @@ -201,7 +357,7 @@ func (s *GlobalStore) Export(w io.Writer) (n int, err error) { } if hexKey != currentKey { - if err = saveChunk(currentKey); err != nil { + if err = saveChunk(); err != nil { return n, err } @@ -209,35 +365,112 @@ func (s *GlobalStore) Export(w io.Writer) (n int, err error) { } currentKey = hexKey - addrs = append(addrs, common.BytesToAddress(k[i:])) + addrs = append(addrs, common.BytesToAddress(k[i+1:])) } if len(addrs) > 0 { - if err = saveChunk(currentKey); err != nil { + if err = saveChunk(); err != nil { return n, err } } - return n, err + return n, iter.Error() } var ( - nodeKeyPrefix = []byte("node-") - dataKeyPrefix = []byte("data-") + // maximal time for lock to wait until it returns error + lockTimeout = 3 * time.Second + // duration between two lock checks. + lockCheckDelay = 30 * time.Microsecond + // error returned by lock method when lock timeout is reached + errLockTimeout = errors.New("lock timeout") +) + +// lock protects parallel writes in Put and Delete methods for both +// node with provided address and for data with provided key. +func (s *GlobalStore) lock(addr common.Address, key []byte) (unlock func(), err error) { + start := time.Now() + nodeLockKey := addr.Hex() + for { + _, loaded := s.nodesLocks.LoadOrStore(nodeLockKey, struct{}{}) + if !loaded { + break + } + time.Sleep(lockCheckDelay) + if time.Since(start) > lockTimeout { + return nil, errLockTimeout + } + } + start = time.Now() + keyLockKey := common.Bytes2Hex(key) + for { + _, loaded := s.keysLocks.LoadOrStore(keyLockKey, struct{}{}) + if !loaded { + break + } + time.Sleep(lockCheckDelay) + if time.Since(start) > lockTimeout { + return nil, errLockTimeout + } + } + return func() { + s.nodesLocks.Delete(nodeLockKey) + s.keysLocks.Delete(keyLockKey) + }, nil +} + +const ( + // prefixes for different indexes + indexDataPrefix = 0 + indexForNodesWithHashesPrefix = 1 + indexForHashesByNodePrefix = 2 + indexForNodesPrefix = 3 + indexForHashesPrefix = 4 + + // keyTermByte splits keys and node addresses + // in database keys + keyTermByte = 0xff ) -// nodeDBKey constructs a database key for key/node mappings. -func nodeDBKey(addr common.Address, key []byte) []byte { - return nodeDBKeyHex(addr, common.Bytes2Hex(key)) +// indexForHashesPerNode constructs a database key to store keys used in +// NodeKeys method. +func indexForHashesPerNode(addr common.Address, key []byte) []byte { + return append(indexForHashesPerNodePrefix(addr), key...) +} + +// indexForHashesPerNodePrefix returns a prefix containing a node address used in +// NodeKeys method. Node address is hex encoded to be able to use keyTermByte +// for splitting node address and key. +func indexForHashesPerNodePrefix(addr common.Address) []byte { + return append([]byte{indexForNodesWithHashesPrefix}, append([]byte(addr.Hex()), keyTermByte)...) +} + +// indexForNodesWithHash constructs a database key to store keys used in +// KeyNodes method. +func indexForNodesWithHash(key []byte, addr common.Address) []byte { + return append(indexForNodesWithHashPrefix(key), addr[:]...) +} + +// indexForNodesWithHashPrefix returns a prefix containing a key used in +// KeyNodes method. Key is hex encoded to be able to use keyTermByte +// for splitting key and node address. +func indexForNodesWithHashPrefix(key []byte) []byte { + return append([]byte{indexForHashesByNodePrefix}, append([]byte(common.Bytes2Hex(key)), keyTermByte)...) +} + +// indexForNodes constructs a database key to store keys used in +// Nodes method. +func indexForNodes(addr common.Address) []byte { + return append([]byte{indexForNodesPrefix}, addr[:]...) } -// nodeDBKeyHex constructs a database key for key/node mappings -// using the hexadecimal string representation of the key. -func nodeDBKeyHex(addr common.Address, hexKey string) []byte { - return append(append(nodeKeyPrefix, []byte(hexKey+"-")...), addr[:]...) +// indexForHashes constructs a database key to store keys used in +// Keys method. +func indexForHashes(key []byte) []byte { + return append([]byte{indexForHashesPrefix}, key...) } -// dataDBkey constructs a database key for key/data storage. -func dataDBKey(key []byte) []byte { - return append(dataKeyPrefix, key...) +// indexDataKey constructs a database key for key/data storage. +func indexDataKey(key []byte) []byte { + return append([]byte{indexDataPrefix}, key...) } diff --git a/swarm/storage/mock/db/db_test.go b/swarm/storage/mock/db/db_test.go index 782faaf35..efbf942f6 100644 --- a/swarm/storage/mock/db/db_test.go +++ b/swarm/storage/mock/db/db_test.go @@ -1,5 +1,3 @@ -// +build go1.8 -// // Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // @@ -29,47 +27,49 @@ import ( // TestDBStore is running a test.MockStore tests // using test.MockStore function. func TestDBStore(t *testing.T) { - dir, err := ioutil.TempDir("", "mock_"+t.Name()) - if err != nil { - panic(err) - } - defer os.RemoveAll(dir) - - store, err := NewGlobalStore(dir) - if err != nil { - t.Fatal(err) - } - defer store.Close() + store, cleanup := newTestStore(t) + defer cleanup() test.MockStore(t, store, 100) } +// TestDBStoreListings is running test.MockStoreListings tests. +func TestDBStoreListings(t *testing.T) { + store, cleanup := newTestStore(t) + defer cleanup() + + test.MockStoreListings(t, store, 1000) +} + // TestImportExport is running a test.ImportExport tests // using test.MockStore function. func TestImportExport(t *testing.T) { - dir1, err := ioutil.TempDir("", "mock_"+t.Name()+"_exporter") - if err != nil { - panic(err) - } - defer os.RemoveAll(dir1) + store1, cleanup := newTestStore(t) + defer cleanup() - store1, err := NewGlobalStore(dir1) - if err != nil { - t.Fatal(err) - } - defer store1.Close() + store2, cleanup := newTestStore(t) + defer cleanup() + + test.ImportExport(t, store1, store2, 100) +} - dir2, err := ioutil.TempDir("", "mock_"+t.Name()+"_importer") +// newTestStore creates a temporary GlobalStore +// that will be closed and data deleted when +// calling returned cleanup function. +func newTestStore(t *testing.T) (s *GlobalStore, cleanup func()) { + dir, err := ioutil.TempDir("", "swarm-mock-db-") if err != nil { - panic(err) + t.Fatal(err) } - defer os.RemoveAll(dir2) - store2, err := NewGlobalStore(dir2) + s, err = NewGlobalStore(dir) if err != nil { + os.RemoveAll(dir) t.Fatal(err) } - defer store2.Close() - test.ImportExport(t, store1, store2, 100) + return s, func() { + s.Close() + os.RemoveAll(dir) + } } diff --git a/swarm/storage/mock/explorer/explorer.go b/swarm/storage/mock/explorer/explorer.go new file mode 100644 index 000000000..8fffff8fd --- /dev/null +++ b/swarm/storage/mock/explorer/explorer.go @@ -0,0 +1,257 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package explorer + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "net/url" + "strconv" + "strings" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/storage/mock" + "github.com/rs/cors" +) + +const jsonContentType = "application/json; charset=utf-8" + +// NewHandler constructs an http.Handler with router +// that servers requests required by chunk explorer. +// +// /api/has-key/{node}/{key} +// /api/keys?start={key}&node={node}&limit={int[0..1000]} +// /api/nodes?start={node}&key={key}&limit={int[0..1000]} +// +// Data from global store will be served and appropriate +// CORS headers will be sent if allowed origins are provided. +func NewHandler(store mock.GlobalStorer, corsOrigins []string) (handler http.Handler) { + mux := http.NewServeMux() + mux.Handle("/api/has-key/", newHasKeyHandler(store)) + mux.Handle("/api/keys", newKeysHandler(store)) + mux.Handle("/api/nodes", newNodesHandler(store)) + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + jsonStatusResponse(w, http.StatusNotFound) + }) + handler = noCacheHandler(mux) + if corsOrigins != nil { + handler = cors.New(cors.Options{ + AllowedOrigins: corsOrigins, + AllowedMethods: []string{"GET"}, + MaxAge: 600, + }).Handler(handler) + } + return handler +} + +// newHasKeyHandler returns a new handler that serves +// requests for HasKey global store method. +// Possible responses are StatusResponse with +// status codes 200 or 404 if the chunk is found or not. +func newHasKeyHandler(store mock.GlobalStorer) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + addr, key, ok := parseHasKeyPath(r.URL.Path) + if !ok { + jsonStatusResponse(w, http.StatusNotFound) + return + } + found := store.HasKey(addr, key) + if !found { + jsonStatusResponse(w, http.StatusNotFound) + return + } + jsonStatusResponse(w, http.StatusOK) + } +} + +// KeysResponse is a JSON-encoded response for global store +// Keys and NodeKeys methods. +type KeysResponse struct { + Keys []string `json:"keys"` + Next string `json:"next,omitempty"` +} + +// newKeysHandler returns a new handler that serves +// requests for Key global store method. +// HTTP response body will be JSON-encoded KeysResponse. +func newKeysHandler(store mock.GlobalStorer) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + node := q.Get("node") + start, limit := listingPage(q) + + var keys mock.Keys + if node == "" { + var err error + keys, err = store.Keys(common.Hex2Bytes(start), limit) + if err != nil { + log.Error("chunk explorer: keys handler: get keys", "start", start, "err", err) + jsonStatusResponse(w, http.StatusInternalServerError) + return + } + } else { + var err error + keys, err = store.NodeKeys(common.HexToAddress(node), common.Hex2Bytes(start), limit) + if err != nil { + log.Error("chunk explorer: keys handler: get node keys", "node", node, "start", start, "err", err) + jsonStatusResponse(w, http.StatusInternalServerError) + return + } + } + ks := make([]string, len(keys.Keys)) + for i, k := range keys.Keys { + ks[i] = common.Bytes2Hex(k) + } + data, err := json.Marshal(KeysResponse{ + Keys: ks, + Next: common.Bytes2Hex(keys.Next), + }) + if err != nil { + log.Error("chunk explorer: keys handler: json marshal", "err", err) + jsonStatusResponse(w, http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", jsonContentType) + _, err = io.Copy(w, bytes.NewReader(data)) + if err != nil { + log.Error("chunk explorer: keys handler: write response", "err", err) + } + } +} + +// NodesResponse is a JSON-encoded response for global store +// Nodes and KeyNodes methods. +type NodesResponse struct { + Nodes []string `json:"nodes"` + Next string `json:"next,omitempty"` +} + +// newNodesHandler returns a new handler that serves +// requests for Nodes global store method. +// HTTP response body will be JSON-encoded NodesResponse. +func newNodesHandler(store mock.GlobalStorer) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + key := q.Get("key") + var start *common.Address + queryStart, limit := listingPage(q) + if queryStart != "" { + s := common.HexToAddress(queryStart) + start = &s + } + + var nodes mock.Nodes + if key == "" { + var err error + nodes, err = store.Nodes(start, limit) + if err != nil { + log.Error("chunk explorer: nodes handler: get nodes", "start", queryStart, "err", err) + jsonStatusResponse(w, http.StatusInternalServerError) + return + } + } else { + var err error + nodes, err = store.KeyNodes(common.Hex2Bytes(key), start, limit) + if err != nil { + log.Error("chunk explorer: nodes handler: get key nodes", "key", key, "start", queryStart, "err", err) + jsonStatusResponse(w, http.StatusInternalServerError) + return + } + } + ns := make([]string, len(nodes.Addrs)) + for i, n := range nodes.Addrs { + ns[i] = n.Hex() + } + var next string + if nodes.Next != nil { + next = nodes.Next.Hex() + } + data, err := json.Marshal(NodesResponse{ + Nodes: ns, + Next: next, + }) + if err != nil { + log.Error("chunk explorer: nodes handler", "err", err) + jsonStatusResponse(w, http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", jsonContentType) + _, err = io.Copy(w, bytes.NewReader(data)) + if err != nil { + log.Error("chunk explorer: nodes handler: write response", "err", err) + } + } +} + +// parseHasKeyPath extracts address and key from HTTP request +// path for HasKey route: /api/has-key/{node}/{key}. +// If ok is false, the provided path is not matched. +func parseHasKeyPath(p string) (addr common.Address, key []byte, ok bool) { + p = strings.TrimPrefix(p, "/api/has-key/") + parts := strings.SplitN(p, "/", 2) + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return addr, nil, false + } + addr = common.HexToAddress(parts[0]) + key = common.Hex2Bytes(parts[1]) + return addr, key, true +} + +// listingPage returns start value and listing limit +// from url query values. +func listingPage(q url.Values) (start string, limit int) { + // if limit is not a valid integer (or blank string), + // ignore the error and use the returned 0 value + limit, _ = strconv.Atoi(q.Get("limit")) + return q.Get("start"), limit +} + +// StatusResponse is a standardized JSON-encoded response +// that contains information about HTTP response code +// for easier status identification. +type StatusResponse struct { + Message string `json:"message"` + Code int `json:"code"` +} + +// jsonStatusResponse writes to the response writer +// JSON-encoded StatusResponse based on the provided status code. +func jsonStatusResponse(w http.ResponseWriter, code int) { + w.Header().Set("Content-Type", jsonContentType) + w.WriteHeader(code) + err := json.NewEncoder(w).Encode(StatusResponse{ + Message: http.StatusText(code), + Code: code, + }) + if err != nil { + log.Error("chunk explorer: json status response", "err", err) + } +} + +// noCacheHandler sets required HTTP headers to prevent +// response caching at the client side. +func noCacheHandler(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") + w.Header().Set("Pragma", "no-cache") + w.Header().Set("Expires", "0") + h.ServeHTTP(w, r) + }) +} diff --git a/swarm/storage/mock/explorer/explorer_test.go b/swarm/storage/mock/explorer/explorer_test.go new file mode 100644 index 000000000..be2668426 --- /dev/null +++ b/swarm/storage/mock/explorer/explorer_test.go @@ -0,0 +1,471 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package explorer + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "os" + "sort" + "strconv" + "strings" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/swarm/storage/mock" + "github.com/ethereum/go-ethereum/swarm/storage/mock/db" + "github.com/ethereum/go-ethereum/swarm/storage/mock/mem" +) + +// TestHandler_memGlobalStore runs a set of tests +// to validate handler with mem global store. +func TestHandler_memGlobalStore(t *testing.T) { + t.Parallel() + + globalStore := mem.NewGlobalStore() + + testHandler(t, globalStore) +} + +// TestHandler_dbGlobalStore runs a set of tests +// to validate handler with database global store. +func TestHandler_dbGlobalStore(t *testing.T) { + t.Parallel() + + dir, err := ioutil.TempDir("", "swarm-mock-explorer-db-") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + globalStore, err := db.NewGlobalStore(dir) + if err != nil { + t.Fatal(err) + } + defer globalStore.Close() + + testHandler(t, globalStore) +} + +// testHandler stores data distributed by node addresses +// and validates if this data is correctly retrievable +// by using the http.Handler returned by NewHandler function. +// This test covers all HTTP routes and various get parameters +// on them to check paginated results. +func testHandler(t *testing.T, globalStore mock.GlobalStorer) { + const ( + nodeCount = 350 + keyCount = 250 + keysOnNodeCount = 150 + ) + + // keys for every node + nodeKeys := make(map[string][]string) + + // a node address that is not present in global store + invalidAddr := "0x7b8b72938c254cf002c4e1e714d27e022be88d93" + + // a key that is not present in global store + invalidKey := "f9824192fb515cfb" + + for i := 1; i <= nodeCount; i++ { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(i)) + addr := common.BytesToAddress(b).Hex() + nodeKeys[addr] = make([]string, 0) + } + + for i := 1; i <= keyCount; i++ { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(i)) + + key := common.Bytes2Hex(b) + + var c int + for addr := range nodeKeys { + nodeKeys[addr] = append(nodeKeys[addr], key) + c++ + if c >= keysOnNodeCount { + break + } + } + } + + // sort keys for every node as they are expected to be + // sorted in HTTP responses + for _, keys := range nodeKeys { + sort.Strings(keys) + } + + // nodes for every key + keyNodes := make(map[string][]string) + + // construct a reverse mapping of nodes for every key + for addr, keys := range nodeKeys { + for _, key := range keys { + keyNodes[key] = append(keyNodes[key], addr) + } + } + + // sort node addresses with case insensitive sort, + // as hex letters in node addresses are in mixed caps + for _, addrs := range keyNodes { + sortCaseInsensitive(addrs) + } + + // find a key that is not stored at the address + var ( + unmatchedAddr string + unmatchedKey string + ) + for addr, keys := range nodeKeys { + for key := range keyNodes { + var found bool + for _, k := range keys { + if k == key { + found = true + break + } + } + if !found { + unmatchedAddr = addr + unmatchedKey = key + } + break + } + if unmatchedAddr != "" { + break + } + } + // check if unmatched key/address pair is found + if unmatchedAddr == "" || unmatchedKey == "" { + t.Fatalf("could not find a key that is not associated with a node") + } + + // store the data + for addr, keys := range nodeKeys { + for _, key := range keys { + err := globalStore.Put(common.HexToAddress(addr), common.Hex2Bytes(key), []byte("data")) + if err != nil { + t.Fatal(err) + } + } + } + + handler := NewHandler(globalStore, nil) + + // this subtest confirms that it has uploaded key and that it does not have invalid keys + t.Run("has key", func(t *testing.T) { + for addr, keys := range nodeKeys { + for _, key := range keys { + testStatusResponse(t, handler, "/api/has-key/"+addr+"/"+key, http.StatusOK) + testStatusResponse(t, handler, "/api/has-key/"+invalidAddr+"/"+key, http.StatusNotFound) + } + testStatusResponse(t, handler, "/api/has-key/"+addr+"/"+invalidKey, http.StatusNotFound) + } + testStatusResponse(t, handler, "/api/has-key/"+invalidAddr+"/"+invalidKey, http.StatusNotFound) + testStatusResponse(t, handler, "/api/has-key/"+unmatchedAddr+"/"+unmatchedKey, http.StatusNotFound) + }) + + // this subtest confirms that all keys are are listed in correct order with expected pagination + t.Run("keys", func(t *testing.T) { + var allKeys []string + for key := range keyNodes { + allKeys = append(allKeys, key) + } + sort.Strings(allKeys) + + t.Run("limit 0", testKeys(handler, allKeys, 0, "")) + t.Run("limit default", testKeys(handler, allKeys, mock.DefaultLimit, "")) + t.Run("limit 2x default", testKeys(handler, allKeys, 2*mock.DefaultLimit, "")) + t.Run("limit 0.5x default", testKeys(handler, allKeys, mock.DefaultLimit/2, "")) + t.Run("limit max", testKeys(handler, allKeys, mock.MaxLimit, "")) + t.Run("limit 2x max", testKeys(handler, allKeys, 2*mock.MaxLimit, "")) + t.Run("limit negative", testKeys(handler, allKeys, -10, "")) + }) + + // this subtest confirms that all keys are are listed for every node in correct order + // and that for one node different pagination options are correct + t.Run("node keys", func(t *testing.T) { + var limitCheckAddr string + + for addr, keys := range nodeKeys { + testKeys(handler, keys, 0, addr)(t) + if limitCheckAddr == "" { + limitCheckAddr = addr + } + } + testKeys(handler, nil, 0, invalidAddr)(t) + + limitCheckKeys := nodeKeys[limitCheckAddr] + t.Run("limit 0", testKeys(handler, limitCheckKeys, 0, limitCheckAddr)) + t.Run("limit default", testKeys(handler, limitCheckKeys, mock.DefaultLimit, limitCheckAddr)) + t.Run("limit 2x default", testKeys(handler, limitCheckKeys, 2*mock.DefaultLimit, limitCheckAddr)) + t.Run("limit 0.5x default", testKeys(handler, limitCheckKeys, mock.DefaultLimit/2, limitCheckAddr)) + t.Run("limit max", testKeys(handler, limitCheckKeys, mock.MaxLimit, limitCheckAddr)) + t.Run("limit 2x max", testKeys(handler, limitCheckKeys, 2*mock.MaxLimit, limitCheckAddr)) + t.Run("limit negative", testKeys(handler, limitCheckKeys, -10, limitCheckAddr)) + }) + + // this subtest confirms that all nodes are are listed in correct order with expected pagination + t.Run("nodes", func(t *testing.T) { + var allNodes []string + for addr := range nodeKeys { + allNodes = append(allNodes, addr) + } + sortCaseInsensitive(allNodes) + + t.Run("limit 0", testNodes(handler, allNodes, 0, "")) + t.Run("limit default", testNodes(handler, allNodes, mock.DefaultLimit, "")) + t.Run("limit 2x default", testNodes(handler, allNodes, 2*mock.DefaultLimit, "")) + t.Run("limit 0.5x default", testNodes(handler, allNodes, mock.DefaultLimit/2, "")) + t.Run("limit max", testNodes(handler, allNodes, mock.MaxLimit, "")) + t.Run("limit 2x max", testNodes(handler, allNodes, 2*mock.MaxLimit, "")) + t.Run("limit negative", testNodes(handler, allNodes, -10, "")) + }) + + // this subtest confirms that all nodes are are listed that contain a a particular key in correct order + // and that for one key different node pagination options are correct + t.Run("key nodes", func(t *testing.T) { + var limitCheckKey string + + for key, addrs := range keyNodes { + testNodes(handler, addrs, 0, key)(t) + if limitCheckKey == "" { + limitCheckKey = key + } + } + testNodes(handler, nil, 0, invalidKey)(t) + + limitCheckKeys := keyNodes[limitCheckKey] + t.Run("limit 0", testNodes(handler, limitCheckKeys, 0, limitCheckKey)) + t.Run("limit default", testNodes(handler, limitCheckKeys, mock.DefaultLimit, limitCheckKey)) + t.Run("limit 2x default", testNodes(handler, limitCheckKeys, 2*mock.DefaultLimit, limitCheckKey)) + t.Run("limit 0.5x default", testNodes(handler, limitCheckKeys, mock.DefaultLimit/2, limitCheckKey)) + t.Run("limit max", testNodes(handler, limitCheckKeys, mock.MaxLimit, limitCheckKey)) + t.Run("limit 2x max", testNodes(handler, limitCheckKeys, 2*mock.MaxLimit, limitCheckKey)) + t.Run("limit negative", testNodes(handler, limitCheckKeys, -10, limitCheckKey)) + }) +} + +// testsKeys returns a test function that validates wantKeys against a series of /api/keys +// HTTP responses with provided limit and node options. +func testKeys(handler http.Handler, wantKeys []string, limit int, node string) func(t *testing.T) { + return func(t *testing.T) { + t.Helper() + + wantLimit := limit + if wantLimit <= 0 { + wantLimit = mock.DefaultLimit + } + if wantLimit > mock.MaxLimit { + wantLimit = mock.MaxLimit + } + wantKeysLen := len(wantKeys) + var i int + var startKey string + for { + var wantNext string + start := i * wantLimit + end := (i + 1) * wantLimit + if end < wantKeysLen { + wantNext = wantKeys[end] + } else { + end = wantKeysLen + } + testKeysResponse(t, handler, node, startKey, limit, KeysResponse{ + Keys: wantKeys[start:end], + Next: wantNext, + }) + if wantNext == "" { + break + } + startKey = wantNext + i++ + } + } +} + +// testNodes returns a test function that validates wantAddrs against a series of /api/nodes +// HTTP responses with provided limit and key options. +func testNodes(handler http.Handler, wantAddrs []string, limit int, key string) func(t *testing.T) { + return func(t *testing.T) { + t.Helper() + + wantLimit := limit + if wantLimit <= 0 { + wantLimit = mock.DefaultLimit + } + if wantLimit > mock.MaxLimit { + wantLimit = mock.MaxLimit + } + wantAddrsLen := len(wantAddrs) + var i int + var startKey string + for { + var wantNext string + start := i * wantLimit + end := (i + 1) * wantLimit + if end < wantAddrsLen { + wantNext = wantAddrs[end] + } else { + end = wantAddrsLen + } + testNodesResponse(t, handler, key, startKey, limit, NodesResponse{ + Nodes: wantAddrs[start:end], + Next: wantNext, + }) + if wantNext == "" { + break + } + startKey = wantNext + i++ + } + } +} + +// testStatusResponse validates a response made on url if it matches +// the expected StatusResponse. +func testStatusResponse(t *testing.T, handler http.Handler, url string, code int) { + t.Helper() + + resp := httpGet(t, handler, url) + + if resp.StatusCode != code { + t.Errorf("got status code %v, want %v", resp.StatusCode, code) + } + if got := resp.Header.Get("Content-Type"); got != jsonContentType { + t.Errorf("got Content-Type header %q, want %q", got, jsonContentType) + } + var r StatusResponse + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + t.Fatal(err) + } + if r.Code != code { + t.Errorf("got response code %v, want %v", r.Code, code) + } + if r.Message != http.StatusText(code) { + t.Errorf("got response message %q, want %q", r.Message, http.StatusText(code)) + } +} + +// testKeysResponse validates response returned from handler on /api/keys +// with node, start and limit options against KeysResponse. +func testKeysResponse(t *testing.T, handler http.Handler, node, start string, limit int, want KeysResponse) { + t.Helper() + + u, err := url.Parse("/api/keys") + if err != nil { + t.Fatal(err) + } + q := u.Query() + if node != "" { + q.Set("node", node) + } + if start != "" { + q.Set("start", start) + } + if limit != 0 { + q.Set("limit", strconv.Itoa(limit)) + } + u.RawQuery = q.Encode() + + resp := httpGet(t, handler, u.String()) + + if resp.StatusCode != http.StatusOK { + t.Errorf("got status code %v, want %v", resp.StatusCode, http.StatusOK) + } + if got := resp.Header.Get("Content-Type"); got != jsonContentType { + t.Errorf("got Content-Type header %q, want %q", got, jsonContentType) + } + var r KeysResponse + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + t.Fatal(err) + } + if fmt.Sprint(r.Keys) != fmt.Sprint(want.Keys) { + t.Errorf("got keys %v, want %v", r.Keys, want.Keys) + } + if r.Next != want.Next { + t.Errorf("got next %s, want %s", r.Next, want.Next) + } +} + +// testNodesResponse validates response returned from handler on /api/nodes +// with key, start and limit options against NodesResponse. +func testNodesResponse(t *testing.T, handler http.Handler, key, start string, limit int, want NodesResponse) { + t.Helper() + + u, err := url.Parse("/api/nodes") + if err != nil { + t.Fatal(err) + } + q := u.Query() + if key != "" { + q.Set("key", key) + } + if start != "" { + q.Set("start", start) + } + if limit != 0 { + q.Set("limit", strconv.Itoa(limit)) + } + u.RawQuery = q.Encode() + + resp := httpGet(t, handler, u.String()) + + if resp.StatusCode != http.StatusOK { + t.Errorf("got status code %v, want %v", resp.StatusCode, http.StatusOK) + } + if got := resp.Header.Get("Content-Type"); got != jsonContentType { + t.Errorf("got Content-Type header %q, want %q", got, jsonContentType) + } + var r NodesResponse + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + t.Fatal(err) + } + if fmt.Sprint(r.Nodes) != fmt.Sprint(want.Nodes) { + t.Errorf("got nodes %v, want %v", r.Nodes, want.Nodes) + } + if r.Next != want.Next { + t.Errorf("got next %s, want %s", r.Next, want.Next) + } +} + +// httpGet uses httptest recorder to provide a response on handler's url. +func httpGet(t *testing.T, handler http.Handler, url string) (r *http.Response) { + t.Helper() + + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + t.Fatal(err) + } + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + return w.Result() +} + +// sortCaseInsensitive performs a case insensitive sort on a string slice. +func sortCaseInsensitive(s []string) { + sort.Slice(s, func(i, j int) bool { + return strings.ToLower(s[i]) < strings.ToLower(s[j]) + }) +} diff --git a/swarm/storage/mock/explorer/headers_test.go b/swarm/storage/mock/explorer/headers_test.go new file mode 100644 index 000000000..5b8e05ffd --- /dev/null +++ b/swarm/storage/mock/explorer/headers_test.go @@ -0,0 +1,163 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package explorer + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/ethereum/go-ethereum/swarm/storage/mock/mem" +) + +// TestHandler_CORSOrigin validates that the correct Access-Control-Allow-Origin +// header is served with various allowed origin settings. +func TestHandler_CORSOrigin(t *testing.T) { + notAllowedOrigin := "http://not-allowed-origin.com/" + + for _, tc := range []struct { + name string + origins []string + }{ + { + name: "no origin", + origins: nil, + }, + { + name: "single origin", + origins: []string{"http://localhost/"}, + }, + { + name: "multiple origins", + origins: []string{"http://localhost/", "http://ethereum.org/"}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + handler := NewHandler(mem.NewGlobalStore(), tc.origins) + + origins := tc.origins + if origins == nil { + // handle the "no origin" test case + origins = []string{""} + } + + for _, origin := range origins { + t.Run(fmt.Sprintf("get %q", origin), newTestCORSOrigin(handler, origin, origin)) + t.Run(fmt.Sprintf("preflight %q", origin), newTestCORSPreflight(handler, origin, origin)) + } + + t.Run(fmt.Sprintf("get %q", notAllowedOrigin), newTestCORSOrigin(handler, notAllowedOrigin, "")) + t.Run(fmt.Sprintf("preflight %q", notAllowedOrigin), newTestCORSPreflight(handler, notAllowedOrigin, "")) + }) + } + + t.Run("wildcard", func(t *testing.T) { + handler := NewHandler(mem.NewGlobalStore(), []string{"*"}) + + for _, origin := range []string{ + "http://example.com/", + "http://ethereum.org", + "http://localhost", + } { + t.Run(fmt.Sprintf("get %q", origin), newTestCORSOrigin(handler, origin, origin)) + t.Run(fmt.Sprintf("preflight %q", origin), newTestCORSPreflight(handler, origin, origin)) + } + }) +} + +// newTestCORSOrigin returns a test function that validates if wantOrigin CORS header is +// served by the handler for a GET request. +func newTestCORSOrigin(handler http.Handler, origin, wantOrigin string) func(t *testing.T) { + return func(t *testing.T) { + req, err := http.NewRequest(http.MethodGet, "/", nil) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Origin", origin) + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + resp := w.Result() + + header := resp.Header.Get("Access-Control-Allow-Origin") + if header != wantOrigin { + t.Errorf("got Access-Control-Allow-Origin header %q, want %q", header, wantOrigin) + } + } +} + +// newTestCORSPreflight returns a test function that validates if wantOrigin CORS header is +// served by the handler for an OPTIONS CORS preflight request. +func newTestCORSPreflight(handler http.Handler, origin, wantOrigin string) func(t *testing.T) { + return func(t *testing.T) { + req, err := http.NewRequest(http.MethodOptions, "/", nil) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Origin", origin) + req.Header.Set("Access-Control-Request-Method", "GET") + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + resp := w.Result() + + header := resp.Header.Get("Access-Control-Allow-Origin") + if header != wantOrigin { + t.Errorf("got Access-Control-Allow-Origin header %q, want %q", header, wantOrigin) + } + } +} + +// TestHandler_noCacheHeaders validates that no cache headers are server. +func TestHandler_noCacheHeaders(t *testing.T) { + handler := NewHandler(mem.NewGlobalStore(), nil) + + for _, tc := range []struct { + url string + }{ + { + url: "/", + }, + { + url: "/api/nodes", + }, + { + url: "/api/keys", + }, + } { + req, err := http.NewRequest(http.MethodGet, tc.url, nil) + if err != nil { + t.Fatal(err) + } + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + resp := w.Result() + + for header, want := range map[string]string{ + "Cache-Control": "no-cache, no-store, must-revalidate", + "Pragma": "no-cache", + "Expires": "0", + } { + got := resp.Header.Get(header) + if got != want { + t.Errorf("got %q header %q for url %q, want %q", header, tc.url, got, want) + } + } + } +} diff --git a/swarm/storage/mock/explorer/swagger.yaml b/swarm/storage/mock/explorer/swagger.yaml new file mode 100644 index 000000000..2c014e927 --- /dev/null +++ b/swarm/storage/mock/explorer/swagger.yaml @@ -0,0 +1,176 @@ +swagger: '2.0' +info: + title: Swarm Global Store API + version: 0.1.0 +tags: + - name: Has Key + description: Checks if a Key is stored on a Node + - name: Keys + description: Lists Keys + - name: Nodes + description: Lists Node addresses + +paths: + '/api/has-key/{node}/{key}': + get: + tags: + - Has Key + summary: Checks if a Key is stored on a Node + operationId: hasKey + produces: + - application/json + + parameters: + - name: node + in: path + required: true + type: string + format: hex-endoded + description: Node address. + + - name: key + in: path + required: true + type: string + format: hex-endoded + description: Key. + + responses: + '200': + description: Key is stored on Node + schema: + $ref: '#/definitions/Status' + '404': + description: Key is not stored on Node + schema: + $ref: '#/definitions/Status' + '500': + description: Internal Server Error + schema: + $ref: '#/definitions/Status' + + '/api/keys': + get: + tags: + - Keys + summary: Lists Keys + operationId: keys + produces: + - application/json + + parameters: + - name: start + in: query + required: false + type: string + format: hex-encoded Key + description: A Key as the starting point for the returned list. It is usually a value from the returned "next" field in the Keys repsonse. + + - name: limit + in: query + required: false + type: integer + default: 100 + minimum: 1 + maximum: 1000 + description: Limits the number of Keys returned in on response. + + - name: node + in: query + required: false + type: string + format: hex-encoded Node address + description: If this parameter is provided, only Keys that are stored on this Node be returned in the response. If not, all known Keys will be returned. + + responses: + '200': + description: List of Keys + schema: + $ref: '#/definitions/Keys' + '500': + description: Internal Server Error + schema: + $ref: '#/definitions/Status' + + '/api/nodes': + get: + tags: + - Nodes + summary: Lists Node addresses + operationId: nodes + produces: + - application/json + + parameters: + - name: start + in: query + required: false + type: string + format: hex-encoded Node address + description: A Node address as the starting point for the returned list. It is usually a value from the returned "next" field in the Nodes repsonse. + + - name: limit + in: query + required: false + type: integer + default: 100 + minimum: 1 + maximum: 1000 + description: Limits the number of Node addresses returned in on response. + + - name: key + in: query + required: false + type: string + format: hex-encoded Key + description: If this parameter is provided, only addresses of Nodes that store this Key will be returned in the response. If not, all known Node addresses will be returned. + + responses: + '200': + description: List of Node addresses + schema: + $ref: '#/definitions/Nodes' + '500': + description: Internal Server Error + schema: + $ref: '#/definitions/Status' + +definitions: + + Status: + type: object + properties: + message: + type: string + description: HTTP Status Code name. + code: + type: integer + description: HTTP Status Code. + + Keys: + type: object + properties: + keys: + type: array + description: A list of Keys. + items: + type: string + format: hex-encoded Key + next: + type: string + format: hex-encoded Key + description: If present, the next Key in listing. Can be passed as "start" query parameter to continue the listing. If not present, the end of the listing is reached. + + Nodes: + type: object + properties: + nodes: + type: array + description: A list of Node addresses. + items: + type: string + format: hex-encoded Node address + next: + type: string + format: hex-encoded Node address + description: If present, the next Node address in listing. Can be passed as "start" query parameter to continue the listing. If not present, the end of the listing is reached. diff --git a/swarm/storage/mock/mem/mem.go b/swarm/storage/mock/mem/mem.go index 3a0a2beb8..38bf098df 100644 --- a/swarm/storage/mock/mem/mem.go +++ b/swarm/storage/mock/mem/mem.go @@ -25,6 +25,7 @@ import ( "encoding/json" "io" "io/ioutil" + "sort" "sync" "github.com/ethereum/go-ethereum/common" @@ -34,16 +35,27 @@ import ( // GlobalStore stores all chunk data and also keys and node addresses relations. // It implements mock.GlobalStore interface. type GlobalStore struct { - nodes map[string]map[common.Address]struct{} - data map[string][]byte - mu sync.Mutex + // holds a slice of keys per node + nodeKeys map[common.Address][][]byte + // holds which key is stored on which nodes + keyNodes map[string][]common.Address + // all node addresses + nodes []common.Address + // all keys + keys [][]byte + // all keys data + data map[string][]byte + mu sync.RWMutex } // NewGlobalStore creates a new instance of GlobalStore. func NewGlobalStore() *GlobalStore { return &GlobalStore{ - nodes: make(map[string]map[common.Address]struct{}), - data: make(map[string][]byte), + nodeKeys: make(map[common.Address][][]byte), + keyNodes: make(map[string][]common.Address), + nodes: make([]common.Address, 0), + keys: make([][]byte, 0), + data: make(map[string][]byte), } } @@ -56,10 +68,10 @@ func (s *GlobalStore) NewNodeStore(addr common.Address) *mock.NodeStore { // Get returns chunk data if the chunk with key exists for node // on address addr. func (s *GlobalStore) Get(addr common.Address, key []byte) (data []byte, err error) { - s.mu.Lock() - defer s.mu.Unlock() + s.mu.RLock() + defer s.mu.RUnlock() - if _, ok := s.nodes[string(key)][addr]; !ok { + if _, has := s.nodeKeyIndex(addr, key); !has { return nil, mock.ErrNotFound } @@ -75,11 +87,33 @@ func (s *GlobalStore) Put(addr common.Address, key []byte, data []byte) error { s.mu.Lock() defer s.mu.Unlock() - if _, ok := s.nodes[string(key)]; !ok { - s.nodes[string(key)] = make(map[common.Address]struct{}) + if i, found := s.nodeKeyIndex(addr, key); !found { + s.nodeKeys[addr] = append(s.nodeKeys[addr], nil) + copy(s.nodeKeys[addr][i+1:], s.nodeKeys[addr][i:]) + s.nodeKeys[addr][i] = key + } + + if i, found := s.keyNodeIndex(key, addr); !found { + k := string(key) + s.keyNodes[k] = append(s.keyNodes[k], addr) + copy(s.keyNodes[k][i+1:], s.keyNodes[k][i:]) + s.keyNodes[k][i] = addr + } + + if i, found := s.nodeIndex(addr); !found { + s.nodes = append(s.nodes, addr) + copy(s.nodes[i+1:], s.nodes[i:]) + s.nodes[i] = addr + } + + if i, found := s.keyIndex(key); !found { + s.keys = append(s.keys, nil) + copy(s.keys[i+1:], s.keys[i:]) + s.keys[i] = key } - s.nodes[string(key)][addr] = struct{}{} + s.data[string(key)] = data + return nil } @@ -88,24 +122,177 @@ func (s *GlobalStore) Delete(addr common.Address, key []byte) error { s.mu.Lock() defer s.mu.Unlock() - var count int - if _, ok := s.nodes[string(key)]; ok { - delete(s.nodes[string(key)], addr) - count = len(s.nodes[string(key)]) + if i, has := s.nodeKeyIndex(addr, key); has { + s.nodeKeys[addr] = append(s.nodeKeys[addr][:i], s.nodeKeys[addr][i+1:]...) + } + + k := string(key) + if i, on := s.keyNodeIndex(key, addr); on { + s.keyNodes[k] = append(s.keyNodes[k][:i], s.keyNodes[k][i+1:]...) } - if count == 0 { - delete(s.data, string(key)) + + if len(s.nodeKeys[addr]) == 0 { + if i, found := s.nodeIndex(addr); found { + s.nodes = append(s.nodes[:i], s.nodes[i+1:]...) + } + } + + if len(s.keyNodes[k]) == 0 { + if i, found := s.keyIndex(key); found { + s.keys = append(s.keys[:i], s.keys[i+1:]...) + } } return nil } // HasKey returns whether a node with addr contains the key. -func (s *GlobalStore) HasKey(addr common.Address, key []byte) bool { - s.mu.Lock() - defer s.mu.Unlock() +func (s *GlobalStore) HasKey(addr common.Address, key []byte) (yes bool) { + s.mu.RLock() + defer s.mu.RUnlock() + + _, yes = s.nodeKeyIndex(addr, key) + return yes +} + +// keyIndex returns the index of a key in keys slice. +func (s *GlobalStore) keyIndex(key []byte) (index int, found bool) { + l := len(s.keys) + index = sort.Search(l, func(i int) bool { + return bytes.Compare(s.keys[i], key) >= 0 + }) + found = index < l && bytes.Equal(s.keys[index], key) + return index, found +} + +// nodeIndex returns the index of a node address in nodes slice. +func (s *GlobalStore) nodeIndex(addr common.Address) (index int, found bool) { + l := len(s.nodes) + index = sort.Search(l, func(i int) bool { + return bytes.Compare(s.nodes[i][:], addr[:]) >= 0 + }) + found = index < l && bytes.Equal(s.nodes[index][:], addr[:]) + return index, found +} + +// nodeKeyIndex returns the index of a key in nodeKeys slice. +func (s *GlobalStore) nodeKeyIndex(addr common.Address, key []byte) (index int, found bool) { + l := len(s.nodeKeys[addr]) + index = sort.Search(l, func(i int) bool { + return bytes.Compare(s.nodeKeys[addr][i], key) >= 0 + }) + found = index < l && bytes.Equal(s.nodeKeys[addr][index], key) + return index, found +} + +// keyNodeIndex returns the index of a node address in keyNodes slice. +func (s *GlobalStore) keyNodeIndex(key []byte, addr common.Address) (index int, found bool) { + k := string(key) + l := len(s.keyNodes[k]) + index = sort.Search(l, func(i int) bool { + return bytes.Compare(s.keyNodes[k][i][:], addr[:]) >= 0 + }) + found = index < l && s.keyNodes[k][index] == addr + return index, found +} + +// Keys returns a paginated list of keys on all nodes. +func (s *GlobalStore) Keys(startKey []byte, limit int) (keys mock.Keys, err error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var i int + if startKey != nil { + i, _ = s.keyIndex(startKey) + } + total := len(s.keys) + max := maxIndex(i, limit, total) + keys.Keys = make([][]byte, 0, max-i) + for ; i < max; i++ { + keys.Keys = append(keys.Keys, append([]byte(nil), s.keys[i]...)) + } + if total > max { + keys.Next = s.keys[max] + } + return keys, nil +} + +// Nodes returns a paginated list of all known nodes. +func (s *GlobalStore) Nodes(startAddr *common.Address, limit int) (nodes mock.Nodes, err error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var i int + if startAddr != nil { + i, _ = s.nodeIndex(*startAddr) + } + total := len(s.nodes) + max := maxIndex(i, limit, total) + nodes.Addrs = make([]common.Address, 0, max-i) + for ; i < max; i++ { + nodes.Addrs = append(nodes.Addrs, s.nodes[i]) + } + if total > max { + nodes.Next = &s.nodes[max] + } + return nodes, nil +} + +// NodeKeys returns a paginated list of keys on a node with provided address. +func (s *GlobalStore) NodeKeys(addr common.Address, startKey []byte, limit int) (keys mock.Keys, err error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var i int + if startKey != nil { + i, _ = s.nodeKeyIndex(addr, startKey) + } + total := len(s.nodeKeys[addr]) + max := maxIndex(i, limit, total) + keys.Keys = make([][]byte, 0, max-i) + for ; i < max; i++ { + keys.Keys = append(keys.Keys, append([]byte(nil), s.nodeKeys[addr][i]...)) + } + if total > max { + keys.Next = s.nodeKeys[addr][max] + } + return keys, nil +} + +// KeyNodes returns a paginated list of nodes that contain a particular key. +func (s *GlobalStore) KeyNodes(key []byte, startAddr *common.Address, limit int) (nodes mock.Nodes, err error) { + s.mu.RLock() + defer s.mu.RUnlock() - _, ok := s.nodes[string(key)][addr] - return ok + var i int + if startAddr != nil { + i, _ = s.keyNodeIndex(key, *startAddr) + } + total := len(s.keyNodes[string(key)]) + max := maxIndex(i, limit, total) + nodes.Addrs = make([]common.Address, 0, max-i) + for ; i < max; i++ { + nodes.Addrs = append(nodes.Addrs, s.keyNodes[string(key)][i]) + } + if total > max { + nodes.Next = &s.keyNodes[string(key)][max] + } + return nodes, nil +} + +// maxIndex returns the end index for one page listing +// based on the start index, limit and total number of elements. +func maxIndex(start, limit, total int) (max int) { + if limit <= 0 { + limit = mock.DefaultLimit + } + if limit > mock.MaxLimit { + limit = mock.MaxLimit + } + max = total + if start+limit < max { + max = start + limit + } + return max } // Import reads tar archive from a reader that contains exported chunk data. @@ -135,14 +322,26 @@ func (s *GlobalStore) Import(r io.Reader) (n int, err error) { return n, err } - addrs := make(map[common.Address]struct{}) - for _, a := range c.Addrs { - addrs[a] = struct{}{} + key := common.Hex2Bytes(hdr.Name) + s.keyNodes[string(key)] = c.Addrs + for _, addr := range c.Addrs { + if i, has := s.nodeKeyIndex(addr, key); !has { + s.nodeKeys[addr] = append(s.nodeKeys[addr], nil) + copy(s.nodeKeys[addr][i+1:], s.nodeKeys[addr][i:]) + s.nodeKeys[addr][i] = key + } + if i, found := s.nodeIndex(addr); !found { + s.nodes = append(s.nodes, addr) + copy(s.nodes[i+1:], s.nodes[i:]) + s.nodes[i] = addr + } } - - key := string(common.Hex2Bytes(hdr.Name)) - s.nodes[key] = addrs - s.data[key] = c.Data + if i, found := s.keyIndex(key); !found { + s.keys = append(s.keys, nil) + copy(s.keys[i+1:], s.keys[i:]) + s.keys[i] = key + } + s.data[string(key)] = c.Data n++ } return n, err @@ -151,23 +350,18 @@ func (s *GlobalStore) Import(r io.Reader) (n int, err error) { // Export writes to a writer a tar archive with all chunk data from // the store. It returns the number of chunks exported and an error. func (s *GlobalStore) Export(w io.Writer) (n int, err error) { - s.mu.Lock() - defer s.mu.Unlock() + s.mu.RLock() + defer s.mu.RUnlock() tw := tar.NewWriter(w) defer tw.Close() buf := bytes.NewBuffer(make([]byte, 0, 1024)) encoder := json.NewEncoder(buf) - for key, addrs := range s.nodes { - al := make([]common.Address, 0, len(addrs)) - for a := range addrs { - al = append(al, a) - } - + for key, addrs := range s.keyNodes { buf.Reset() if err = encoder.Encode(mock.ExportedChunk{ - Addrs: al, + Addrs: addrs, Data: s.data[key], }); err != nil { return n, err diff --git a/swarm/storage/mock/mem/mem_test.go b/swarm/storage/mock/mem/mem_test.go index adcefaabb..d39aaef45 100644 --- a/swarm/storage/mock/mem/mem_test.go +++ b/swarm/storage/mock/mem/mem_test.go @@ -28,6 +28,12 @@ func TestGlobalStore(t *testing.T) { test.MockStore(t, NewGlobalStore(), 100) } +// TestGlobalStoreListings is running test for a GlobalStore +// using test.MockStoreListings function. +func TestGlobalStoreListings(t *testing.T) { + test.MockStoreListings(t, NewGlobalStore(), 1000) +} + // TestImportExport is running tests for importing and // exporting data between two GlobalStores // using test.ImportExport function. diff --git a/swarm/storage/mock/mock.go b/swarm/storage/mock/mock.go index 626ba3fe1..586112a98 100644 --- a/swarm/storage/mock/mock.go +++ b/swarm/storage/mock/mock.go @@ -39,6 +39,17 @@ import ( "github.com/ethereum/go-ethereum/common" ) +const ( + // DefaultLimit should be used as default limit for + // Keys, Nodes, NodeKeys and KeyNodes GlobarStorer + // methids implementations. + DefaultLimit = 100 + // MaxLimit should be used as the maximal returned number + // of items for Keys, Nodes, NodeKeys and KeyNodes GlobarStorer + // methids implementations, regardless of provided limit. + MaxLimit = 1000 +) + // ErrNotFound indicates that the chunk is not found. var ErrNotFound = errors.New("not found") @@ -76,6 +87,10 @@ func (n *NodeStore) Delete(key []byte) error { return n.store.Delete(n.addr, key) } +func (n *NodeStore) Keys(startKey []byte, limit int) (keys Keys, err error) { + return n.store.NodeKeys(n.addr, startKey, limit) +} + // GlobalStorer defines methods for mock db store // that stores chunk data for all swarm nodes. // It is used in tests to construct mock NodeStores @@ -85,12 +100,28 @@ type GlobalStorer interface { Put(addr common.Address, key []byte, data []byte) error Delete(addr common.Address, key []byte) error HasKey(addr common.Address, key []byte) bool + Keys(startKey []byte, limit int) (keys Keys, err error) + Nodes(startAddr *common.Address, limit int) (nodes Nodes, err error) + NodeKeys(addr common.Address, startKey []byte, limit int) (keys Keys, err error) + KeyNodes(key []byte, startAddr *common.Address, limit int) (nodes Nodes, err error) // NewNodeStore creates an instance of NodeStore // to be used by a single swarm node with // address addr. NewNodeStore(addr common.Address) *NodeStore } +// Keys are returned results by Keys and NodeKeys GlobalStorer methods. +type Keys struct { + Keys [][]byte + Next []byte +} + +// Nodes are returned results by Nodes and KeyNodes GlobalStorer methods. +type Nodes struct { + Addrs []common.Address + Next *common.Address +} + // Importer defines method for importing mock store data // from an exported tar archive. type Importer interface { diff --git a/swarm/storage/mock/rpc/rpc.go b/swarm/storage/mock/rpc/rpc.go index 8cd6c83a7..8150ccff1 100644 --- a/swarm/storage/mock/rpc/rpc.go +++ b/swarm/storage/mock/rpc/rpc.go @@ -88,3 +88,27 @@ func (s *GlobalStore) HasKey(addr common.Address, key []byte) bool { } return has } + +// Keys returns a paginated list of keys on all nodes. +func (s *GlobalStore) Keys(startKey []byte, limit int) (keys mock.Keys, err error) { + err = s.client.Call(&keys, "mockStore_keys", startKey, limit) + return keys, err +} + +// Nodes returns a paginated list of all known nodes. +func (s *GlobalStore) Nodes(startAddr *common.Address, limit int) (nodes mock.Nodes, err error) { + err = s.client.Call(&nodes, "mockStore_nodes", startAddr, limit) + return nodes, err +} + +// NodeKeys returns a paginated list of keys on a node with provided address. +func (s *GlobalStore) NodeKeys(addr common.Address, startKey []byte, limit int) (keys mock.Keys, err error) { + err = s.client.Call(&keys, "mockStore_nodeKeys", addr, startKey, limit) + return keys, err +} + +// KeyNodes returns a paginated list of nodes that contain a particular key. +func (s *GlobalStore) KeyNodes(key []byte, startAddr *common.Address, limit int) (nodes mock.Nodes, err error) { + err = s.client.Call(&nodes, "mockStore_keyNodes", key, startAddr, limit) + return nodes, err +} diff --git a/swarm/storage/mock/rpc/rpc_test.go b/swarm/storage/mock/rpc/rpc_test.go index f62340ede..6c4652355 100644 --- a/swarm/storage/mock/rpc/rpc_test.go +++ b/swarm/storage/mock/rpc/rpc_test.go @@ -27,6 +27,27 @@ import ( // TestDBStore is running test for a GlobalStore // using test.MockStore function. func TestRPCStore(t *testing.T) { + store, cleanup := newTestStore(t) + defer cleanup() + + test.MockStore(t, store, 30) +} + +// TestRPCStoreListings is running test for a GlobalStore +// using test.MockStoreListings function. +func TestRPCStoreListings(t *testing.T) { + store, cleanup := newTestStore(t) + defer cleanup() + + test.MockStoreListings(t, store, 1000) +} + +// newTestStore creates a temporary GlobalStore +// that will be closed when returned cleanup function +// is called. +func newTestStore(t *testing.T) (s *GlobalStore, cleanup func()) { + t.Helper() + serverStore := mem.NewGlobalStore() server := rpc.NewServer() @@ -35,7 +56,9 @@ func TestRPCStore(t *testing.T) { } store := NewGlobalStore(rpc.DialInProc(server)) - defer store.Close() - - test.MockStore(t, store, 30) + return store, func() { + if err := store.Close(); err != nil { + t.Error(err) + } + } } diff --git a/swarm/storage/mock/test/test.go b/swarm/storage/mock/test/test.go index 69828b144..cc837f0b7 100644 --- a/swarm/storage/mock/test/test.go +++ b/swarm/storage/mock/test/test.go @@ -20,6 +20,7 @@ package test import ( "bytes" + "encoding/binary" "fmt" "io" "strconv" @@ -170,6 +171,123 @@ func MockStore(t *testing.T, globalStore mock.GlobalStorer, n int) { }) } +// MockStoreListings tests global store methods Keys, Nodes, NodeKeys and KeyNodes. +// It uses a provided globalstore to put chunks for n number of node addresses +// and to validate that methods are returning the right responses. +func MockStoreListings(t *testing.T, globalStore mock.GlobalStorer, n int) { + addrs := make([]common.Address, n) + for i := 0; i < n; i++ { + addrs[i] = common.HexToAddress(strconv.FormatInt(int64(i)+1, 16)) + } + type chunk struct { + key []byte + data []byte + } + const chunksPerNode = 5 + keys := make([][]byte, n*chunksPerNode) + for i := 0; i < n*chunksPerNode; i++ { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(i)) + keys[i] = b + } + + // keep track of keys on every node + nodeKeys := make(map[common.Address][][]byte) + // keep track of nodes that store particular key + keyNodes := make(map[string][]common.Address) + for i := 0; i < chunksPerNode; i++ { + // put chunks for every address + for j := 0; j < n; j++ { + addr := addrs[j] + key := keys[(i*n)+j] + err := globalStore.Put(addr, key, []byte("data")) + if err != nil { + t.Fatal(err) + } + nodeKeys[addr] = append(nodeKeys[addr], key) + keyNodes[string(key)] = append(keyNodes[string(key)], addr) + } + + // test Keys method + var startKey []byte + var gotKeys [][]byte + for { + keys, err := globalStore.Keys(startKey, 0) + if err != nil { + t.Fatal(err) + } + gotKeys = append(gotKeys, keys.Keys...) + if keys.Next == nil { + break + } + startKey = keys.Next + } + wantKeys := keys[:(i+1)*n] + if fmt.Sprint(gotKeys) != fmt.Sprint(wantKeys) { + t.Fatalf("got #%v keys %v, want %v", i+1, gotKeys, wantKeys) + } + + // test Nodes method + var startNode *common.Address + var gotNodes []common.Address + for { + nodes, err := globalStore.Nodes(startNode, 0) + if err != nil { + t.Fatal(err) + } + gotNodes = append(gotNodes, nodes.Addrs...) + if nodes.Next == nil { + break + } + startNode = nodes.Next + } + wantNodes := addrs + if fmt.Sprint(gotNodes) != fmt.Sprint(wantNodes) { + t.Fatalf("got #%v nodes %v, want %v", i+1, gotNodes, wantNodes) + } + + // test NodeKeys method + for addr, wantKeys := range nodeKeys { + var startKey []byte + var gotKeys [][]byte + for { + keys, err := globalStore.NodeKeys(addr, startKey, 0) + if err != nil { + t.Fatal(err) + } + gotKeys = append(gotKeys, keys.Keys...) + if keys.Next == nil { + break + } + startKey = keys.Next + } + if fmt.Sprint(gotKeys) != fmt.Sprint(wantKeys) { + t.Fatalf("got #%v %s node keys %v, want %v", i+1, addr.Hex(), gotKeys, wantKeys) + } + } + + // test KeyNodes method + for key, wantNodes := range keyNodes { + var startNode *common.Address + var gotNodes []common.Address + for { + nodes, err := globalStore.KeyNodes([]byte(key), startNode, 0) + if err != nil { + t.Fatal(err) + } + gotNodes = append(gotNodes, nodes.Addrs...) + if nodes.Next == nil { + break + } + startNode = nodes.Next + } + if fmt.Sprint(gotNodes) != fmt.Sprint(wantNodes) { + t.Fatalf("got #%v %x key nodes %v, want %v", i+1, []byte(key), gotNodes, wantNodes) + } + } + } +} + // ImportExport saves chunks to the outStore, exports them to the tar archive, // imports tar archive to the inStore and checks if all chunks are imported correctly. func ImportExport(t *testing.T, outStore, inStore mock.GlobalStorer, n int) { |