diff options
Diffstat (limited to 'swarm')
63 files changed, 3616 insertions, 6046 deletions
diff --git a/swarm/OWNERS b/swarm/OWNERS index d4204e08c..4b9ca96eb 100644 --- a/swarm/OWNERS +++ b/swarm/OWNERS @@ -7,7 +7,6 @@ swarm ├── fuse ────────────────── @jmozah, @holisticode ├── grafana_dashboards ──── @nonsense ├── metrics ─────────────── @nonsense, @holisticode -├── multihash ───────────── @nolash ├── network ─────────────── ethersphere │ ├── bitvector ───────── @zelig, @janos, @gbalint │ ├── priorityqueue ───── @zelig, @janos, @gbalint diff --git a/swarm/api/api.go b/swarm/api/api.go index 7bb631967..33a8e3539 100644 --- a/swarm/api/api.go +++ b/swarm/api/api.go @@ -42,7 +42,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" - "github.com/ethereum/go-ethereum/swarm/multihash" "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" @@ -417,7 +416,7 @@ func (a *API) Get(ctx context.Context, decrypt DecryptFunc, manifestAddr storage return reader, mimeType, status, nil, err } // get the data of the update - _, rsrcData, err := a.feed.GetContent(entry.Feed) + _, contentAddr, err := a.feed.GetContent(entry.Feed) if err != nil { apiGetNotFound.Inc(1) status = http.StatusNotFound @@ -425,23 +424,23 @@ func (a *API) Get(ctx context.Context, decrypt DecryptFunc, manifestAddr storage return reader, mimeType, status, nil, err } - // extract multihash - decodedMultihash, err := multihash.FromMultihash(rsrcData) - if err != nil { + // extract content hash + if len(contentAddr) != storage.AddressLength { apiGetInvalid.Inc(1) status = http.StatusUnprocessableEntity - log.Warn("invalid multihash in feed update", "err", err) - return reader, mimeType, status, nil, err + errorMessage := fmt.Sprintf("invalid swarm hash in feed update. Expected %d bytes. Got %d", storage.AddressLength, len(contentAddr)) + log.Warn(errorMessage) + return reader, mimeType, status, nil, errors.New(errorMessage) } - manifestAddr = storage.Address(decodedMultihash) - log.Trace("feed update contains multihash", "key", manifestAddr) + manifestAddr = storage.Address(contentAddr) + log.Trace("feed update contains swarm hash", "key", manifestAddr) - // get the manifest the multihash digest points to + // get the manifest the swarm hash points to trie, err := loadManifest(ctx, a.fileStore, manifestAddr, nil, NOOPDecrypt) if err != nil { apiGetNotFound.Inc(1) status = http.StatusNotFound - log.Warn(fmt.Sprintf("loadManifestTrie (feed update multihash) error: %v", err)) + log.Warn(fmt.Sprintf("loadManifestTrie (feed update) error: %v", err)) return reader, mimeType, status, nil, err } @@ -451,8 +450,8 @@ func (a *API) Get(ctx context.Context, decrypt DecryptFunc, manifestAddr storage if entry == nil { status = http.StatusNotFound apiGetNotFound.Inc(1) - err = fmt.Errorf("manifest (feed update multihash) entry for '%s' not found", path) - log.Trace("manifest (feed update multihash) entry not found", "key", manifestAddr, "path", path) + err = fmt.Errorf("manifest (feed update) entry for '%s' not found", path) + log.Trace("manifest (feed update) entry not found", "key", manifestAddr, "path", path) return reader, mimeType, status, nil, err } } @@ -472,7 +471,7 @@ func (a *API) Get(ctx context.Context, decrypt DecryptFunc, manifestAddr storage // no entry found status = http.StatusNotFound apiGetNotFound.Inc(1) - err = fmt.Errorf("manifest entry for '%s' not found", path) + err = fmt.Errorf("Not found: could not find resource '%s'", path) log.Trace("manifest entry not found", "key", contentAddr, "path", path) } return diff --git a/swarm/api/client/client_test.go b/swarm/api/client/client_test.go index 76b349397..39f6e4797 100644 --- a/swarm/api/client/client_test.go +++ b/swarm/api/client/client_test.go @@ -25,13 +25,13 @@ import ( "sort" "testing" + "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed/lookup" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/swarm/api" swarmhttp "github.com/ethereum/go-ethereum/swarm/api/http" - "github.com/ethereum/go-ethereum/swarm/multihash" "github.com/ethereum/go-ethereum/swarm/storage/feed" ) @@ -368,58 +368,99 @@ func newTestSigner() (*feed.GenericSigner, error) { return feed.NewGenericSigner(privKey), nil } -// test the transparent resolving of multihash feed updates with bzz:// scheme +// Test the transparent resolving of feed updates with bzz:// scheme // -// first upload data, and store the multihash to the resulting manifest in a feed update -// retrieving the update with the multihash should return the manifest pointing directly to the data +// First upload data to bzz:, and store the Swarm hash to the resulting manifest in a feed update. +// This effectively uses a feed to store a pointer to content rather than the content itself +// Retrieving the update with the Swarm hash should return the manifest pointing directly to the data // and raw retrieve of that hash should return the data -func TestClientCreateFeedMultihash(t *testing.T) { +func TestClientBzzWithFeed(t *testing.T) { signer, _ := newTestSigner() + // Initialize a Swarm test server srv := swarmhttp.NewTestSwarmServer(t, serverFunc, nil) - client := NewClient(srv.URL) + swarmClient := NewClient(srv.URL) defer srv.Close() - // add the data our multihash aliased manifest will point to - databytes := []byte("bar") - - swarmHash, err := client.UploadRaw(bytes.NewReader(databytes), int64(len(databytes)), false) + // put together some data for our test: + dataBytes := []byte(` + // + // Create some data our manifest will point to. Data that could be very big and wouldn't fit in a feed update. + // So what we are going to do is upload it to Swarm bzz:// and obtain a **manifest hash** pointing to it: + // + // MANIFEST HASH --> DATA + // + // Then, we store that **manifest hash** into a Swarm Feed update. Once we have done this, + // we can use the **feed manifest hash** in bzz:// instead, this way: bzz://feed-manifest-hash. + // + // FEED MANIFEST HASH --> MANIFEST HASH --> DATA + // + // Given that we can update the feed at any time with a new **manifest hash** but the **feed manifest hash** + // stays constant, we have effectively created a fixed address to changing content. (Applause) + // + // FEED MANIFEST HASH (the same) --> MANIFEST HASH(2) --> DATA(2) + // + `) + + // Create a virtual File out of memory containing the above data + f := &File{ + ReadCloser: ioutil.NopCloser(bytes.NewReader(dataBytes)), + ManifestEntry: api.ManifestEntry{ + ContentType: "text/plain", + Mode: 0660, + Size: int64(len(dataBytes)), + }, + } + + // upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded. + manifestAddressHex, err := swarmClient.Upload(f, "", false) if err != nil { - t.Fatalf("Error uploading raw test data: %s", err) + t.Fatalf("Error creating manifest: %s", err) } - s := common.FromHex(swarmHash) - mh := multihash.ToMultihash(s) + // convert the hex-encoded manifest hash to a 32-byte slice + manifestAddress := common.FromHex(manifestAddressHex) + + if len(manifestAddress) != storage.AddressLength { + t.Fatalf("Something went wrong. Got a hash of an unexpected length. Expected %d bytes. Got %d", storage.AddressLength, len(manifestAddress)) + } - // our feed topic - topic, _ := feed.NewTopic("foo.eth", nil) + // Now create a **feed manifest**. For that, we need a topic: + topic, _ := feed.NewTopic("interesting topic indeed", nil) - createRequest := feed.NewFirstRequest(topic) + // Build a feed request to update data + request := feed.NewFirstRequest(topic) - createRequest.SetData(mh) - if err := createRequest.Sign(signer); err != nil { + // Put the 32-byte address of the manifest into the feed update + request.SetData(manifestAddress) + + // Sign the update + if err := request.Sign(signer); err != nil { t.Fatalf("Error signing update: %s", err) } - feedManifestHash, err := client.CreateFeedWithManifest(createRequest) - + // Publish the update and at the same time request a **feed manifest** to be created + feedManifestAddressHex, err := swarmClient.CreateFeedWithManifest(request) if err != nil { t.Fatalf("Error creating feed manifest: %s", err) } - correctManifestAddrHex := "bb056a5264c295c2b0f613c8409b9c87ce9d71576ace02458160df4cc894210b" - if feedManifestHash != correctManifestAddrHex { - t.Fatalf("Response feed manifest mismatch, expected '%s', got '%s'", correctManifestAddrHex, feedManifestHash) + // Check we have received the exact **feed manifest** to be expected + // given the topic and user signing the updates: + correctFeedManifestAddrHex := "747c402e5b9dc715a25a4393147512167bab018a007fad7cdcd9adc7fce1ced2" + if feedManifestAddressHex != correctFeedManifestAddrHex { + t.Fatalf("Response feed manifest mismatch, expected '%s', got '%s'", correctFeedManifestAddrHex, feedManifestAddressHex) } // Check we get a not found error when trying to get feed updates with a made-up manifest - _, err = client.QueryFeed(nil, "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") + _, err = swarmClient.QueryFeed(nil, "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") if err != ErrNoFeedUpdatesFound { t.Fatalf("Expected to receive ErrNoFeedUpdatesFound error. Got: %s", err) } - reader, err := client.QueryFeed(nil, correctManifestAddrHex) + // If we query the feed directly we should get **manifest hash** back: + reader, err := swarmClient.QueryFeed(nil, correctFeedManifestAddrHex) if err != nil { t.Fatalf("Error retrieving feed updates: %s", err) } @@ -428,10 +469,27 @@ func TestClientCreateFeedMultihash(t *testing.T) { if err != nil { t.Fatal(err) } - if !bytes.Equal(mh, gotData) { - t.Fatalf("Expected: %v, got %v", mh, gotData) + + //Check that indeed the **manifest hash** is retrieved + if !bytes.Equal(manifestAddress, gotData) { + t.Fatalf("Expected: %v, got %v", manifestAddress, gotData) + } + + // Now the final test we were looking for: Use bzz://<feed-manifest> and that should resolve all manifests + // and return the original data directly: + f, err = swarmClient.Download(feedManifestAddressHex, "") + if err != nil { + t.Fatal(err) + } + gotData, err = ioutil.ReadAll(f) + if err != nil { + t.Fatal(err) } + // Check that we get back the original data: + if !bytes.Equal(dataBytes, gotData) { + t.Fatalf("Expected: %v, got %v", manifestAddress, gotData) + } } // TestClientCreateUpdateFeed will check that feeds can be created and updated via the HTTP client. diff --git a/swarm/api/http/middleware.go b/swarm/api/http/middleware.go index f5f70138b..115a00856 100644 --- a/swarm/api/http/middleware.go +++ b/swarm/api/http/middleware.go @@ -5,6 +5,7 @@ import ( "net/http" "runtime/debug" "strings" + "time" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" @@ -73,9 +74,13 @@ func ParseURI(h http.Handler) http.Handler { func InitLoggingResponseWriter(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.time", r.Method), nil).UpdateSince(startTime) + writer := newLoggingResponseWriter(w) h.ServeHTTP(writer, r) log.Info("request served", "ruid", GetRUID(r.Context()), "code", writer.statusCode) + metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.%d.time", r.Method, writer.statusCode), nil).UpdateSince(startTime) }) } diff --git a/swarm/api/http/server_test.go b/swarm/api/http/server_test.go index 1ef3deece..e82762ce0 100644 --- a/swarm/api/http/server_test.go +++ b/swarm/api/http/server_test.go @@ -45,7 +45,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/api" swarm "github.com/ethereum/go-ethereum/swarm/api/client" - "github.com/ethereum/go-ethereum/swarm/multihash" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/ethereum/go-ethereum/swarm/testutil" @@ -69,60 +68,91 @@ func newTestSigner() (*feed.GenericSigner, error) { return feed.NewGenericSigner(privKey), nil } -// test the transparent resolving of multihash-containing feed updates with bzz:// scheme +// Test the transparent resolving of feed updates with bzz:// scheme // -// first upload data, and store the multihash to the resulting manifest in a feed update -// retrieving the update with the multihash should return the manifest pointing directly to the data +// First upload data to bzz:, and store the Swarm hash to the resulting manifest in a feed update. +// This effectively uses a feed to store a pointer to content rather than the content itself +// Retrieving the update with the Swarm hash should return the manifest pointing directly to the data // and raw retrieve of that hash should return the data -func TestBzzFeedMultihash(t *testing.T) { +func TestBzzWithFeed(t *testing.T) { signer, _ := newTestSigner() + // Initialize Swarm test server srv := NewTestSwarmServer(t, serverFunc, nil) defer srv.Close() - // add the data our multihash aliased manifest will point to - databytes := "bar" - testBzzUrl := fmt.Sprintf("%s/bzz:/", srv.URL) - resp, err := http.Post(testBzzUrl, "text/plain", bytes.NewReader([]byte(databytes))) + // put together some data for our test: + dataBytes := []byte(` + // + // Create some data our manifest will point to. Data that could be very big and wouldn't fit in a feed update. + // So what we are going to do is upload it to Swarm bzz:// and obtain a **manifest hash** pointing to it: + // + // MANIFEST HASH --> DATA + // + // Then, we store that **manifest hash** into a Swarm Feed update. Once we have done this, + // we can use the **feed manifest hash** in bzz:// instead, this way: bzz://feed-manifest-hash. + // + // FEED MANIFEST HASH --> MANIFEST HASH --> DATA + // + // Given that we can update the feed at any time with a new **manifest hash** but the **feed manifest hash** + // stays constant, we have effectively created a fixed address to changing content. (Applause) + // + // FEED MANIFEST HASH (the same) --> MANIFEST HASH(2) --> DATA(2) ... + // + `) + + // POST data to bzz and get back a content-addressed **manifest hash** pointing to it. + resp, err := http.Post(fmt.Sprintf("%s/bzz:/", srv.URL), "text/plain", bytes.NewReader([]byte(dataBytes))) if err != nil { t.Fatal(err) } + defer resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Fatalf("err %s", resp.Status) } - b, err := ioutil.ReadAll(resp.Body) + manifestAddressHex, err := ioutil.ReadAll(resp.Body) if err != nil { t.Fatal(err) } - s := common.FromHex(string(b)) - mh := multihash.ToMultihash(s) - log.Info("added data", "manifest", string(b), "data", common.ToHex(mh)) + manifestAddress := common.FromHex(string(manifestAddressHex)) - topic, _ := feed.NewTopic("foo.eth", nil) + log.Info("added data", "manifest", string(manifestAddressHex)) + + // At this point we have uploaded the data and have a manifest pointing to it + // Now store that manifest address in a feed update. + // We also want a feed manifest, so we can use it to refer to the feed. + + // First, create a topic for our feed: + topic, _ := feed.NewTopic("interesting topic indeed", nil) + + // Create a feed update request: updateRequest := feed.NewFirstRequest(topic) - updateRequest.SetData(mh) + // Store the **manifest address** as data into the feed update. + updateRequest.SetData(manifestAddress) + // Sign the update if err := updateRequest.Sign(signer); err != nil { t.Fatal(err) } - log.Info("added data", "manifest", string(b), "data", common.ToHex(mh)) + log.Info("added data", "data", common.ToHex(manifestAddress)) - testUrl, err := url.Parse(fmt.Sprintf("%s/bzz-feed:/", srv.URL)) + // Build the feed update http request: + feedUpdateURL, err := url.Parse(fmt.Sprintf("%s/bzz-feed:/", srv.URL)) if err != nil { t.Fatal(err) } - query := testUrl.Query() + query := feedUpdateURL.Query() body := updateRequest.AppendValues(query) // this adds all query parameters and returns the data to be posted - query.Set("manifest", "1") // indicate we want a manifest back - testUrl.RawQuery = query.Encode() + query.Set("manifest", "1") // indicate we want a feed manifest back + feedUpdateURL.RawQuery = query.Encode() - // create the multihash update - resp, err = http.Post(testUrl.String(), "application/octet-stream", bytes.NewReader(body)) + // submit the feed update request to Swarm + resp, err = http.Post(feedUpdateURL.String(), "application/octet-stream", bytes.NewReader(body)) if err != nil { t.Fatal(err) } @@ -130,24 +160,25 @@ func TestBzzFeedMultihash(t *testing.T) { if resp.StatusCode != http.StatusOK { t.Fatalf("err %s", resp.Status) } - b, err = ioutil.ReadAll(resp.Body) + + feedManifestAddressHex, err := ioutil.ReadAll(resp.Body) if err != nil { t.Fatal(err) } - rsrcResp := &storage.Address{} - err = json.Unmarshal(b, rsrcResp) + feedManifestAddress := &storage.Address{} + err = json.Unmarshal(feedManifestAddressHex, feedManifestAddress) if err != nil { - t.Fatalf("data %s could not be unmarshaled: %v", b, err) + t.Fatalf("data %s could not be unmarshaled: %v", feedManifestAddressHex, err) } - correctManifestAddrHex := "bb056a5264c295c2b0f613c8409b9c87ce9d71576ace02458160df4cc894210b" - if rsrcResp.Hex() != correctManifestAddrHex { - t.Fatalf("Response feed manifest address mismatch, expected '%s', got '%s'", correctManifestAddrHex, rsrcResp.Hex()) + correctManifestAddrHex := "747c402e5b9dc715a25a4393147512167bab018a007fad7cdcd9adc7fce1ced2" + if feedManifestAddress.Hex() != correctManifestAddrHex { + t.Fatalf("Response feed manifest address mismatch, expected '%s', got '%s'", correctManifestAddrHex, feedManifestAddress.Hex()) } // get bzz manifest transparent feed update resolve - testBzzUrl = fmt.Sprintf("%s/bzz:/%s", srv.URL, rsrcResp) - resp, err = http.Get(testBzzUrl) + getBzzURL := fmt.Sprintf("%s/bzz:/%s", srv.URL, feedManifestAddress) + resp, err = http.Get(getBzzURL) if err != nil { t.Fatal(err) } @@ -155,12 +186,12 @@ func TestBzzFeedMultihash(t *testing.T) { if resp.StatusCode != http.StatusOK { t.Fatalf("err %s", resp.Status) } - b, err = ioutil.ReadAll(resp.Body) + retrievedData, err := ioutil.ReadAll(resp.Body) if err != nil { t.Fatal(err) } - if !bytes.Equal(b, []byte(databytes)) { - t.Fatalf("retrieved data mismatch, expected %x, got %x", databytes, b) + if !bytes.Equal(retrievedData, []byte(dataBytes)) { + t.Fatalf("retrieved data mismatch, expected %x, got %x", dataBytes, retrievedData) } } @@ -245,7 +276,8 @@ func TestBzzFeed(t *testing.T) { t.Fatalf("Expected manifest Feed '%s', got '%s'", correctFeedHex, manifest.Entries[0].Feed.Hex()) } - // get bzz manifest transparent feed update resolve + // take the chance to have bzz: crash on resolving a feed update that does not contain + // a swarm hash: testBzzUrl := fmt.Sprintf("%s/bzz:/%s", srv.URL, rsrcResp) resp, err = http.Get(testBzzUrl) if err != nil { @@ -253,7 +285,7 @@ func TestBzzFeed(t *testing.T) { } defer resp.Body.Close() if resp.StatusCode == http.StatusOK { - t.Fatal("Expected error status since feed update does not contain multihash. Received 200 OK") + t.Fatal("Expected error status since feed update does not contain a Swarm hash. Received 200 OK") } _, err = ioutil.ReadAll(resp.Body) if err != nil { diff --git a/swarm/grafana_dashboards/ldbstore.json b/swarm/grafana_dashboards/ldbstore.json deleted file mode 100644 index 2d64380ba..000000000 --- a/swarm/grafana_dashboards/ldbstore.json +++ /dev/null @@ -1,2278 +0,0 @@ -{ - "annotations": { - "list": [ - { - "$$hashKey": "object:325", - "builtIn": 1, - "datasource": "-- Grafana --", - "enable": true, - "hide": true, - "iconColor": "rgba(0, 211, 255, 1)", - "name": "Annotations & Alerts", - "type": "dashboard" - } - ] - }, - "editable": true, - "gnetId": null, - "graphTooltip": 1, - "id": 5, - "iteration": 1527598894689, - "links": [], - "panels": [ - { - "collapsed": true, - "gridPos": { - "h": 1, - "w": 24, - "x": 0, - "y": 0 - }, - "id": 40, - "panels": [ - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 1 - }, - "id": 42, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.localstore.get.cachehit.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LocalStore get cachehit", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 1 - }, - "id": 43, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.localstore.get.cachemiss.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LocalStore get cachemiss", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 7 - }, - "id": 44, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.localstore.getorcreaterequest.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "Total LocalStore.GetOrCreateRequest", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 7 - }, - "id": 47, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.localstore.getorcreaterequest.errfetching.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LocalStore GetOrCreateRequest ErrFetching", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 13 - }, - "id": 45, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.localstore.getorcreaterequest.hit.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LocalStore.GetOrCreateRequest hit", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 13 - }, - "id": 49, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.localstore.getorcreaterequest.miss.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LocalStore GetOrCreateRequest miss", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 19 - }, - "id": 48, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.localstore.get.error.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LocalStore get error", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 19 - }, - "id": 46, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.localstore.get.errfetching.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LocalStore get ErrFetching", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - } - ], - "title": "LocalStore", - "type": "row" - }, - { - "collapsed": false, - "gridPos": { - "h": 1, - "w": 24, - "x": 0, - "y": 1 - }, - "id": 27, - "panels": [], - "title": "LDBStore", - "type": "row" - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 2 - }, - "id": 29, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.ldbstore.get.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LDBStore get", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 2 - }, - "id": 30, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.ldbstore.put.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LDBStore put", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 8 - }, - "id": 31, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.ldbstore.synciterator.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LDBStore SyncIterator", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 8 - }, - "id": 32, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.ldbstore.synciterator.seek.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LDBStore SyncIterator Seek/Next", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 14 - }, - "id": 50, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.ldbstore.collectgarbage.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LDBStore Collect Garbage", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 14 - }, - "id": 51, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.ldbstore.collectgarbage.delete.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LDBStore Collect Garbage - Actual Deletes", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "collapsed": true, - "gridPos": { - "h": 1, - "w": 24, - "x": 0, - "y": 20 - }, - "id": 34, - "panels": [ - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 39 - }, - "id": 36, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.ldbdatabase.get.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LDBDatabase get", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 39 - }, - "id": 37, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.ldbdatabase.write.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LDBDatabase write", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 45 - }, - "id": 38, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.ldbdatabase.newiterator.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LDBDatabase NewIterator", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - } - ], - "title": "LDBDatabase", - "type": "row" - } - ], - "refresh": "10s", - "schemaVersion": 16, - "style": "dark", - "tags": [], - "templating": { - "list": [ - { - "auto": false, - "auto_count": 30, - "auto_min": "10s", - "current": { - "text": "10s", - "value": "10s" - }, - "hide": 0, - "label": "resolution", - "name": "myinterval", - "options": [ - { - "selected": false, - "text": "5s", - "value": "5s" - }, - { - "selected": true, - "text": "10s", - "value": "10s" - }, - { - "selected": false, - "text": "30s", - "value": "30s" - }, - { - "selected": false, - "text": "100s", - "value": "100s" - } - ], - "query": "5s,10s,30s,100s", - "refresh": 2, - "type": "interval" - }, - { - "allValue": null, - "current": { - "text": "swarm_30399 + swarm_30400 + swarm_30401", - "value": [ - "swarm_30399", - "swarm_30400", - "swarm_30401" - ] - }, - "datasource": "metrics", - "hide": 0, - "includeAll": true, - "label": null, - "multi": true, - "name": "host", - "options": [], - "query": "SHOW TAG VALUES WITH KEY = \"host\"", - "refresh": 1, - "regex": "", - "sort": 1, - "tagValuesQuery": "", - "tags": [], - "tagsQuery": "swarm.http.request.GET.time.span", - "type": "query", - "useTags": false - } - ] - }, - "time": { - "from": "now-15m", - "to": "now" - }, - "timepicker": { - "refresh_intervals": [ - "5s", - "10s", - "30s", - "1m", - "5m", - "15m", - "30m", - "1h", - "2h", - "1d" - ], - "time_options": [ - "5m", - "15m", - "1h", - "6h", - "12h", - "24h", - "2d", - "7d", - "30d" - ] - }, - "timezone": "", - "title": "LDBStore and LDBDatabase", - "uid": "zS6beG7iz", - "version": 28 -} diff --git a/swarm/grafana_dashboards/swarm.json b/swarm/grafana_dashboards/swarm.json deleted file mode 100644 index 3ee244d15..000000000 --- a/swarm/grafana_dashboards/swarm.json +++ /dev/null @@ -1,3198 +0,0 @@ -{ - "annotations": { - "list": [ - { - "$$hashKey": "object:147", - "builtIn": 1, - "datasource": "-- Grafana --", - "enable": true, - "hide": true, - "iconColor": "rgba(0, 211, 255, 1)", - "name": "Annotations & Alerts", - "type": "dashboard" - } - ] - }, - "editable": true, - "gnetId": null, - "graphTooltip": 1, - "id": 2, - "iteration": 1527598859072, - "links": [], - "panels": [ - { - "collapsed": false, - "gridPos": { - "h": 1, - "w": 24, - "x": 0, - "y": 0 - }, - "id": 34, - "panels": [], - "title": "P2P", - "type": "row" - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 9, - "w": 12, - "x": 0, - "y": 1 - }, - "id": 36, - "legend": { - "alignAsTable": true, - "avg": false, - "current": true, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$__interval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "none" - ], - "type": "fill" - } - ], - "measurement": "swarm.peer.send.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "P2P Send() - messages sent", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 9, - "w": 12, - "x": 12, - "y": 1 - }, - "id": 37, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "p95($tag_host)", - "groupBy": [ - { - "params": [ - "$__interval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "none" - ], - "type": "fill" - } - ], - "measurement": "swarm.peer.send_t.span", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "p95" - ], - "type": "field" - }, - { - "params": [], - "type": "mean" - } - ] - ], - "tags": [] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "P2P Send() timer - 95%ile", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "ns", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 9, - "w": 12, - "x": 0, - "y": 10 - }, - "id": 38, - "legend": { - "alignAsTable": true, - "avg": false, - "current": true, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "1 $tag_host", - "groupBy": [ - { - "params": [ - "$__interval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "none" - ], - "type": "fill" - } - ], - "measurement": "swarm.peer.sendpriority.1.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [] - }, - { - "alias": "2 $tag_host", - "groupBy": [ - { - "params": [ - "$__interval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "none" - ], - "type": "fill" - } - ], - "measurement": "swarm.peer.sendpriority.2.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "B", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [] - }, - { - "alias": "3 $tag_host", - "groupBy": [ - { - "params": [ - "$__interval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "none" - ], - "type": "fill" - } - ], - "measurement": "swarm.peer.sendpriority.3.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "C", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "P2P SendPriority() - messages sent", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 9, - "w": 12, - "x": 12, - "y": 10 - }, - "id": 39, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": false, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "1 $tag_host", - "groupBy": [ - { - "params": [ - "$__interval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "none" - ], - "type": "fill" - } - ], - "measurement": "swarm.peer.sendpriority_t.1.span", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "p95" - ], - "type": "field" - }, - { - "params": [], - "type": "mean" - } - ] - ], - "tags": [] - }, - { - "alias": "2 $tag_host", - "groupBy": [ - { - "params": [ - "$__interval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "none" - ], - "type": "fill" - } - ], - "measurement": "swarm.peer.sendpriority_t.2.span", - "orderByTime": "ASC", - "policy": "default", - "refId": "B", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "p95" - ], - "type": "field" - }, - { - "params": [], - "type": "mean" - } - ] - ], - "tags": [] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "P2P SendPriority() timer - 95%ile", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "ns", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 9, - "w": 12, - "x": 0, - "y": 19 - }, - "id": 40, - "legend": { - "alignAsTable": true, - "avg": false, - "current": true, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": false, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$__interval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "none" - ], - "type": "fill" - } - ], - "measurement": "swarm.registry.peers.gauge", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "last" - } - ] - ], - "tags": [] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "Registry Peers", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "collapsed": true, - "gridPos": { - "h": 1, - "w": 24, - "x": 0, - "y": 28 - }, - "id": 32, - "panels": [ - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 2 - }, - "id": 14, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.stack.uptime.gauge", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "mean" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "Uptime", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "ns", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - } - ], - "title": "Uptime", - "type": "row" - }, - { - "collapsed": true, - "gridPos": { - "h": 1, - "w": 24, - "x": 0, - "y": 29 - }, - "id": 28, - "panels": [ - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 7 - }, - "id": 2, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "GET", - "dsType": "influxdb", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "null" - ], - "type": "fill" - } - ], - "measurement": "swarm.http.request.GET.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - }, - { - "alias": "POST", - "dsType": "influxdb", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "null" - ], - "type": "fill" - } - ], - "measurement": "swarm.http.request.POST.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "B", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "Total HTTP Requests", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 7 - }, - "id": 26, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.http.request.GET.time.span", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "p95" - ], - "type": "field" - }, - { - "params": [], - "type": "mean" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "HTTP GET requests 95% timer", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "ns", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 13 - }, - "id": 15, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.http.request.GET.time.span", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "p50" - ], - "type": "field" - }, - { - "params": [], - "type": "mean" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "HTTP GET requests 50% timer", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "ns", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 13 - }, - "id": 8, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": false, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "POST", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.http.request.POST.time.span", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "p95" - ], - "type": "field" - }, - { - "params": [], - "type": "mean" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "HTTP POST requests 95% timer", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "ns", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - } - ], - "title": "HTTP", - "type": "row" - }, - { - "collapsed": true, - "gridPos": { - "h": 1, - "w": 24, - "x": 0, - "y": 30 - }, - "id": 30, - "panels": [ - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 5, - "w": 12, - "x": 0, - "y": 8 - }, - "id": 16, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "dsType": "influxdb", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.lazychunkreader.read.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LazyChunkReader read() calls", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 5, - "w": 12, - "x": 12, - "y": 8 - }, - "id": 18, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "dsType": "influxdb", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.lazychunkreader.read.err.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LazyChunkReader read errors", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 5, - "w": 12, - "x": 0, - "y": 13 - }, - "id": 17, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "dsType": "influxdb", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.lazychunkreader.read.bytes.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "LazyChunkReader bytes read", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "decbytes", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - } - ], - "title": "LazyChunkReader", - "type": "row" - }, - { - "collapsed": false, - "gridPos": { - "h": 1, - "w": 24, - "x": 0, - "y": 31 - }, - "id": 25, - "panels": [], - "title": "All measurements", - "type": "row" - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 32 - }, - "id": 3, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "dsType": "influxdb", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.api.get.count.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "API Get (BZZ)", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 32 - }, - "id": 13, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.network.stream.request_from_peers.count.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "Request from peers", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 38 - }, - "id": 11, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.network.stream.received_chunks.count.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "Received chunks", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 38 - }, - "id": 12, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": false, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "dsType": "influxdb", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.storage.cache.requests.size.gauge", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "max" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "Requests cache entries", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 44 - }, - "id": 9, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.network.stream.handle_retrieve_request_msg.count.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "Handle retrieve request msg", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 44 - }, - "id": 20, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "dsType": "influxdb", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.syncer.setnextbatch.iterator.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "syncer setnextbatch iterator calls", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 50 - }, - "id": 21, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "dsType": "influxdb", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.peer.handlewantedhashesmsg.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "peer HandleWantedHashesMsg", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 50 - }, - "id": 22, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "dsType": "influxdb", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.peer.handlesubscribemsg.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "peer HandleSubscribeMsg", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 56 - }, - "id": 23, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "dsType": "influxdb", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.peer.handlewantedhashesmsg.actualget.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "peer HandleWantedHashesMsg actual get", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "metrics", - "fill": 1, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 56 - }, - "id": 19, - "legend": { - "alignAsTable": true, - "avg": false, - "current": false, - "max": true, - "min": false, - "rightSide": true, - "show": true, - "total": true, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "alias": "$tag_host", - "dsType": "influxdb", - "groupBy": [ - { - "params": [ - "$myinterval" - ], - "type": "time" - }, - { - "params": [ - "host" - ], - "type": "tag" - }, - { - "params": [ - "0" - ], - "type": "fill" - } - ], - "measurement": "swarm.peer.handleofferedhashes.count", - "orderByTime": "ASC", - "policy": "default", - "refId": "A", - "resultFormat": "time_series", - "select": [ - [ - { - "params": [ - "value" - ], - "type": "field" - }, - { - "params": [], - "type": "sum" - } - ] - ], - "tags": [ - { - "key": "host", - "operator": "=~", - "value": "/^$host$/" - } - ] - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "peer OfferedHashesMsg", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - } - ], - "refresh": "30s", - "schemaVersion": 16, - "style": "dark", - "tags": [], - "templating": { - "list": [ - { - "auto": false, - "auto_count": 30, - "auto_min": "10s", - "current": { - "text": "10s", - "value": "10s" - }, - "hide": 0, - "label": "resolution", - "name": "myinterval", - "options": [ - { - "selected": false, - "text": "5s", - "value": "5s" - }, - { - "selected": true, - "text": "10s", - "value": "10s" - }, - { - "selected": false, - "text": "30s", - "value": "30s" - }, - { - "selected": false, - "text": "100s", - "value": "100s" - } - ], - "query": "5s,10s,30s,100s", - "refresh": 2, - "type": "interval" - }, - { - "allValue": null, - "current": { - "text": "swarm_30399 + swarm_30400 + swarm_30401 + swarm_30402", - "value": [ - "swarm_30399", - "swarm_30400", - "swarm_30401", - "swarm_30402" - ] - }, - "datasource": "metrics", - "hide": 0, - "includeAll": true, - "label": null, - "multi": true, - "name": "host", - "options": [], - "query": "SHOW TAG VALUES WITH KEY = \"host\"", - "refresh": 1, - "regex": "", - "sort": 1, - "tagValuesQuery": "", - "tags": [], - "tagsQuery": "swarm.http.request.GET.time.span", - "type": "query", - "useTags": false - } - ] - }, - "time": { - "from": "now-15m", - "to": "now" - }, - "timepicker": { - "refresh_intervals": [ - "5s", - "10s", - "30s", - "1m", - "5m", - "15m", - "30m", - "1h", - "2h", - "1d" - ], - "time_options": [ - "5m", - "15m", - "1h", - "6h", - "12h", - "24h", - "2d", - "7d", - "30d" - ] - }, - "timezone": "", - "title": "Swarm", - "uid": "vmEtxxgmz", - "version": 138 -} diff --git a/swarm/multihash/multihash.go b/swarm/multihash/multihash.go deleted file mode 100644 index 3306e3a6d..000000000 --- a/swarm/multihash/multihash.go +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package multihash - -import ( - "bytes" - "encoding/binary" - "errors" - "fmt" -) - -const ( - defaultMultihashLength = 32 - defaultMultihashTypeCode = 0x1b -) - -var ( - multihashTypeCode uint8 - MultihashLength = defaultMultihashLength -) - -func init() { - multihashTypeCode = defaultMultihashTypeCode - MultihashLength = defaultMultihashLength -} - -// check if valid swarm multihash -func isSwarmMultihashType(code uint8) bool { - return code == multihashTypeCode -} - -// GetMultihashLength returns the digest length of the provided multihash -// It will fail if the multihash is not a valid swarm mulithash -func GetMultihashLength(data []byte) (int, int, error) { - cursor := 0 - typ, c := binary.Uvarint(data) - if c <= 0 { - return 0, 0, errors.New("unreadable hashtype field") - } - if !isSwarmMultihashType(uint8(typ)) { - return 0, 0, fmt.Errorf("hash code %x is not a swarm hashtype", typ) - } - cursor += c - hashlength, c := binary.Uvarint(data[cursor:]) - if c <= 0 { - return 0, 0, errors.New("unreadable length field") - } - cursor += c - - // we cheekily assume hashlength < maxint - inthashlength := int(hashlength) - if len(data[c:]) < inthashlength { - return 0, 0, errors.New("length mismatch") - } - return inthashlength, cursor, nil -} - -// FromMulithash returns the digest portion of the multihash -// It will fail if the multihash is not a valid swarm multihash -func FromMultihash(data []byte) ([]byte, error) { - hashLength, _, err := GetMultihashLength(data) - if err != nil { - return nil, err - } - return data[len(data)-hashLength:], nil -} - -// ToMulithash wraps the provided digest data with a swarm mulithash header -func ToMultihash(hashData []byte) []byte { - buf := bytes.NewBuffer(nil) - b := make([]byte, 8) - c := binary.PutUvarint(b, uint64(multihashTypeCode)) - buf.Write(b[:c]) - c = binary.PutUvarint(b, uint64(len(hashData))) - buf.Write(b[:c]) - buf.Write(hashData) - return buf.Bytes() -} diff --git a/swarm/multihash/multihash_test.go b/swarm/multihash/multihash_test.go deleted file mode 100644 index 85df741dd..000000000 --- a/swarm/multihash/multihash_test.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package multihash - -import ( - "bytes" - "math/rand" - "testing" -) - -// parse multihash, and check that invalid multihashes fail -func TestCheckMultihash(t *testing.T) { - hashbytes := make([]byte, 32) - c, err := rand.Read(hashbytes) - if err != nil { - t.Fatal(err) - } else if c < 32 { - t.Fatal("short read") - } - - expected := ToMultihash(hashbytes) - - l, hl, _ := GetMultihashLength(expected) - if l != 32 { - t.Fatalf("expected length %d, got %d", 32, l) - } else if hl != 2 { - t.Fatalf("expected header length %d, got %d", 2, hl) - } - if _, _, err := GetMultihashLength(expected[1:]); err == nil { - t.Fatal("expected failure on corrupt header") - } - if _, _, err := GetMultihashLength(expected[:len(expected)-2]); err == nil { - t.Fatal("expected failure on short content") - } - dh, _ := FromMultihash(expected) - if !bytes.Equal(dh, hashbytes) { - t.Fatalf("expected content hash %x, got %x", hashbytes, dh) - } -} diff --git a/swarm/network/hive.go b/swarm/network/hive.go index 1aa1ae42a..ebef54592 100644 --- a/swarm/network/hive.go +++ b/swarm/network/hive.go @@ -165,8 +165,8 @@ func (h *Hive) Run(p *BzzPeer) error { // otherwise just send depth to new peer dp.NotifyDepth(depth) } + NotifyPeer(p.BzzAddr, h.Kademlia) } - NotifyPeer(p.BzzAddr, h.Kademlia) defer h.Off(dp) return dp.Run(dp.HandleMsg) } diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index cd94741be..a8ecaa4be 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -81,14 +81,15 @@ func NewKadParams() *KadParams { // Kademlia is a table of live peers and a db of known peers (node records) type Kademlia struct { lock sync.RWMutex - *KadParams // Kademlia configuration parameters - base []byte // immutable baseaddress of the table - addrs *pot.Pot // pots container for known peer addresses - conns *pot.Pot // pots container for live peer connections - depth uint8 // stores the last current depth of saturation - nDepth int // stores the last neighbourhood depth - nDepthC chan int // returned by DepthC function to signal neighbourhood depth change - addrCountC chan int // returned by AddrCountC function to signal peer count change + *KadParams // Kademlia configuration parameters + base []byte // immutable baseaddress of the table + addrs *pot.Pot // pots container for known peer addresses + conns *pot.Pot // pots container for live peer connections + depth uint8 // stores the last current depth of saturation + nDepth int // stores the last neighbourhood depth + nDepthC chan int // returned by DepthC function to signal neighbourhood depth change + addrCountC chan int // returned by AddrCountC function to signal peer count change + Pof func(pot.Val, pot.Val, int) (int, bool) // function for calculating kademlia routing distance between two addresses } // NewKademlia creates a Kademlia table for base address addr @@ -103,6 +104,7 @@ func NewKademlia(addr []byte, params *KadParams) *Kademlia { KadParams: params, addrs: pot.NewPot(nil, 0), conns: pot.NewPot(nil, 0), + Pof: pof, } } @@ -175,7 +177,7 @@ func (k *Kademlia) SuggestPeer() (a *BzzAddr, o int, want bool) { k.lock.Lock() defer k.lock.Unlock() minsize := k.MinBinSize - depth := k.neighbourhoodDepth() + depth := depthForPot(k.conns, k.MinProxBinSize, k.base) // if there is a callable neighbour within the current proxBin, connect // this makes sure nearest neighbour set is fully connected var ppo int @@ -289,6 +291,7 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) { // neighbourhood depth on each change. // Not receiving from the returned channel will block On function // when the neighbourhood depth is changed. +// TODO: Why is this exported, and if it should be; why can't we have more subscribers than one? func (k *Kademlia) NeighbourhoodDepthC() <-chan int { k.lock.Lock() defer k.lock.Unlock() @@ -305,7 +308,7 @@ func (k *Kademlia) sendNeighbourhoodDepthChange() { // It provides signaling of neighbourhood depth change. // This part of the code is sending new neighbourhood depth to nDepthC if that condition is met. if k.nDepthC != nil { - nDepth := k.neighbourhoodDepth() + nDepth := depthForPot(k.conns, k.MinProxBinSize, k.base) if nDepth != k.nDepth { k.nDepth = nDepth k.nDepthC <- nDepth @@ -361,7 +364,7 @@ func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(con var startPo int var endPo int - kadDepth := k.neighbourhoodDepth() + kadDepth := depthForPot(k.conns, k.MinProxBinSize, k.base) k.conns.EachBin(base, pof, o, func(po, size int, f func(func(val pot.Val, i int) bool) bool) bool { if startPo > 0 && endPo != k.MaxProxDisplay { @@ -395,7 +398,7 @@ func (k *Kademlia) eachConn(base []byte, o int, f func(*Peer, int, bool) bool) { if len(base) == 0 { base = k.base } - depth := k.neighbourhoodDepth() + depth := depthForPot(k.conns, k.MinProxBinSize, k.base) k.conns.EachNeighbour(base, pof, func(val pot.Val, po int) bool { if po > o { return true @@ -417,7 +420,7 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool if len(base) == 0 { base = k.base } - depth := k.neighbourhoodDepth() + depth := depthForPot(k.conns, k.MinProxBinSize, k.base) k.addrs.EachNeighbour(base, pof, func(val pot.Val, po int) bool { if po > o { return true @@ -426,21 +429,72 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool }) } -// neighbourhoodDepth returns the proximity order that defines the distance of +func (k *Kademlia) NeighbourhoodDepth() (depth int) { + k.lock.RLock() + defer k.lock.RUnlock() + return depthForPot(k.conns, k.MinProxBinSize, k.base) +} + +// depthForPot returns the proximity order that defines the distance of // the nearest neighbour set with cardinality >= MinProxBinSize // if there is altogether less than MinProxBinSize peers it returns 0 // caller must hold the lock -func (k *Kademlia) neighbourhoodDepth() (depth int) { - if k.conns.Size() < k.MinProxBinSize { +func depthForPot(p *pot.Pot, minProxBinSize int, pivotAddr []byte) (depth int) { + if p.Size() <= minProxBinSize { return 0 } + + // total number of peers in iteration var size int + + // true if iteration has all prox peers + var b bool + + // last po recorded in iteration + var lastPo int + f := func(v pot.Val, i int) bool { + // po == 256 means that addr is the pivot address(self) + if i == 256 { + return true + } size++ - depth = i - return size < k.MinProxBinSize + + // this means we have all nn-peers. + // depth is by default set to the bin of the farthest nn-peer + if size == minProxBinSize { + b = true + depth = i + return true + } + + // if there are empty bins between farthest nn and current node, + // the depth should recalculated to be + // the farthest of those empty bins + // + // 0 abac ccde + // 1 2a2a + // 2 589f <--- nearest non-nn + // ============ DEPTH 3 =========== + // 3 <--- don't count as empty bins + // 4 <--- don't count as empty bins + // 5 cbcb cdcd <---- furthest nn + // 6 a1a2 b3c4 + if b && i < depth { + depth = i + 1 + lastPo = i + return false + } + lastPo = i + return true + } + p.EachNeighbour(pivotAddr, pof, f) + + // cover edge case where more than one farthest nn + // AND we only have nn-peers + if lastPo == depth { + depth = 0 } - k.conns.EachNeighbour(k.base, pof, f) return depth } @@ -500,7 +554,7 @@ func (k *Kademlia) string() string { liverows := make([]string, k.MaxProxDisplay) peersrows := make([]string, k.MaxProxDisplay) - depth := k.neighbourhoodDepth() + depth := depthForPot(k.conns, k.MinProxBinSize, k.base) rest := k.conns.Size() k.conns.EachBin(k.base, pof, 0, func(po, size int, f func(func(val pot.Val, i int) bool) bool) bool { var rowlen int @@ -570,6 +624,7 @@ type PeerPot struct { // as hexadecimal representations of the address. // used for testing only func NewPeerPotMap(kadMinProxSize int, addrs [][]byte) map[string]*PeerPot { + // create a table of all nodes for health check np := pot.NewPot(nil, 0) for _, addr := range addrs { @@ -578,34 +633,47 @@ func NewPeerPotMap(kadMinProxSize int, addrs [][]byte) map[string]*PeerPot { ppmap := make(map[string]*PeerPot) for i, a := range addrs { - pl := 256 - prev := 256 + + // actual kademlia depth + depth := depthForPot(np, kadMinProxSize, a) + + // upon entering a new iteration + // this will hold the value the po should be + // if it's one higher than the po in the last iteration + prevPo := 256 + + // all empty bins which are outside neighbourhood depth var emptyBins []int + + // all nn-peers var nns [][]byte - np.EachNeighbour(addrs[i], pof, func(val pot.Val, po int) bool { - a := val.([]byte) + + np.EachNeighbour(a, pof, func(val pot.Val, po int) bool { + addr := val.([]byte) + // po == 256 means that addr is the pivot address(self) if po == 256 { return true } - if pl == 256 || pl == po { - nns = append(nns, a) - } - if pl == 256 && len(nns) >= kadMinProxSize { - pl = po - prev = po + + // iterate through the neighbours, going from the closest to the farthest + // we calculate the nearest neighbours that should be in the set + // depth in this case equates to: + // 1. Within all bins that are higher or equal than depth there are + // at least minProxBinSize peers connected + // 2. depth-1 bin is not empty + if po >= depth { + nns = append(nns, addr) + prevPo = depth - 1 + return true } - if prev < pl { - for j := prev; j > po; j-- { - emptyBins = append(emptyBins, j) - } + for j := prevPo; j > po; j-- { + emptyBins = append(emptyBins, j) } - prev = po - 1 + prevPo = po - 1 return true }) - for j := prev; j >= 0; j-- { - emptyBins = append(emptyBins, j) - } - log.Trace(fmt.Sprintf("%x NNS: %s", addrs[i][:4], LogAddrs(nns))) + + log.Trace(fmt.Sprintf("%x NNS: %s, emptyBins: %s", addrs[i][:4], LogAddrs(nns), logEmptyBins(emptyBins))) ppmap[common.Bytes2Hex(a)] = &PeerPot{nns, emptyBins} } return ppmap @@ -620,7 +688,7 @@ func (k *Kademlia) saturation(n int) int { prev++ return prev == po && size >= n }) - depth := k.neighbourhoodDepth() + depth := depthForPot(k.conns, k.MinProxBinSize, k.base) if depth < prev { return depth } @@ -633,8 +701,11 @@ func (k *Kademlia) full(emptyBins []int) (full bool) { prev := 0 e := len(emptyBins) ok := true - depth := k.neighbourhoodDepth() + depth := depthForPot(k.conns, k.MinProxBinSize, k.base) k.conns.EachBin(k.base, pof, 0, func(po, _ int, _ func(func(val pot.Val, i int) bool) bool) bool { + if po >= depth { + return false + } if prev == depth+1 { return true } diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go index d2e051f45..184a2d942 100644 --- a/swarm/network/kademlia_test.go +++ b/swarm/network/kademlia_test.go @@ -25,6 +25,9 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/protocols" "github.com/ethereum/go-ethereum/swarm/pot" ) @@ -73,6 +76,76 @@ func Register(k *Kademlia, regs ...string) { } } +// tests the validity of neighborhood depth calculations +// +// in particular, it tests that if there are one or more consecutive +// empty bins above the farthest "nearest neighbor-peer" then +// the depth should be set at the farthest of those empty bins +// +// TODO: Make test adapt to change in MinProxBinSize +func TestNeighbourhoodDepth(t *testing.T) { + baseAddressBytes := RandomAddr().OAddr + kad := NewKademlia(baseAddressBytes, NewKadParams()) + + baseAddress := pot.NewAddressFromBytes(baseAddressBytes) + + closerAddress := pot.RandomAddressAt(baseAddress, 7) + closerPeer := newTestDiscoveryPeer(closerAddress, kad) + kad.On(closerPeer) + depth := kad.NeighbourhoodDepth() + if depth != 0 { + t.Fatalf("expected depth 0, was %d", depth) + } + + sameAddress := pot.RandomAddressAt(baseAddress, 7) + samePeer := newTestDiscoveryPeer(sameAddress, kad) + kad.On(samePeer) + depth = kad.NeighbourhoodDepth() + if depth != 0 { + t.Fatalf("expected depth 0, was %d", depth) + } + + midAddress := pot.RandomAddressAt(baseAddress, 4) + midPeer := newTestDiscoveryPeer(midAddress, kad) + kad.On(midPeer) + depth = kad.NeighbourhoodDepth() + if depth != 5 { + t.Fatalf("expected depth 5, was %d", depth) + } + + kad.Off(midPeer) + depth = kad.NeighbourhoodDepth() + if depth != 0 { + t.Fatalf("expected depth 0, was %d", depth) + } + + fartherAddress := pot.RandomAddressAt(baseAddress, 1) + fartherPeer := newTestDiscoveryPeer(fartherAddress, kad) + kad.On(fartherPeer) + depth = kad.NeighbourhoodDepth() + if depth != 2 { + t.Fatalf("expected depth 2, was %d", depth) + } + + midSameAddress := pot.RandomAddressAt(baseAddress, 4) + midSamePeer := newTestDiscoveryPeer(midSameAddress, kad) + kad.Off(closerPeer) + kad.On(midPeer) + kad.On(midSamePeer) + depth = kad.NeighbourhoodDepth() + if depth != 2 { + t.Fatalf("expected depth 2, was %d", depth) + } + + kad.Off(fartherPeer) + log.Trace(kad.string()) + time.Sleep(time.Millisecond) + depth = kad.NeighbourhoodDepth() + if depth != 0 { + t.Fatalf("expected depth 0, was %d", depth) + } +} + func testSuggestPeer(k *Kademlia, expAddr string, expPo int, expWant bool) error { addr, o, want := k.SuggestPeer() if binStr(addr) != expAddr { @@ -376,7 +449,7 @@ func TestKademliaHiveString(t *testing.T) { Register(k, "10000000", "10000001") k.MaxProxDisplay = 8 h := k.String() - expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 000000\npopulation: 2 (4), MinProxBinSize: 2, MinBinSize: 1, MaxBinSize: 4\n000 0 | 2 8100 (0) 8000 (0)\n============ DEPTH: 1 ==========================================\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n=========================================================================" + expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 000000\npopulation: 2 (4), MinProxBinSize: 2, MinBinSize: 1, MaxBinSize: 4\n============ DEPTH: 0 ==========================================\n000 0 | 2 8100 (0) 8000 (0)\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n=========================================================================" if expH[104:] != h[104:] { t.Fatalf("incorrect hive output. expected %v, got %v", expH, h) } @@ -644,3 +717,17 @@ func TestKademliaCase5(t *testing.T) { "78fafa0809929a1279ece089a51d12457c2d8416dff859aeb2ccc24bb50df5ec", "1dd39b1257e745f147cbbc3cadd609ccd6207c41056dbc4254bba5d2527d3ee5", "5f61dd66d4d94aec8fcc3ce0e7885c7edf30c43143fa730e2841c5d28e3cd081", "8aa8b0472cb351d967e575ad05c4b9f393e76c4b01ef4b3a54aac5283b78abc9", "4502f385152a915b438a6726ce3ea9342e7a6db91a23c2f6bee83a885ed7eb82", "718677a504249db47525e959ef1784bed167e1c46f1e0275b9c7b588e28a3758", "7c54c6ed1f8376323896ed3a4e048866410de189e9599dd89bf312ca4adb96b5", "18e03bd3378126c09e799a497150da5c24c895aedc84b6f0dbae41fc4bac081a", "23db76ac9e6e58d9f5395ca78252513a7b4118b4155f8462d3d5eec62486cadc", "40ae0e8f065e96c7adb7fa39505136401f01780481e678d718b7f6dbb2c906ec", "c1539998b8bae19d339d6bbb691f4e9daeb0e86847545229e80fe0dffe716e92", "ed139d73a2699e205574c08722ca9f030ad2d866c662f1112a276b91421c3cb9", "5bdb19584b7a36d09ca689422ef7e6bb681b8f2558a6b2177a8f7c812f631022", "636c9de7fe234ffc15d67a504c69702c719f626c17461d3f2918e924cd9d69e2", "de4455413ff9335c440d52458c6544191bd58a16d85f700c1de53b62773064ea", "de1963310849527acabc7885b6e345a56406a8f23e35e436b6d9725e69a79a83", "a80a50a467f561210a114cba6c7fb1489ed43a14d61a9edd70e2eb15c31f074d", "7804f12b8d8e6e4b375b242058242068a3809385e05df0e64973cde805cf729c", "60f9aa320c02c6f2e6370aa740cf7cea38083fa95fca8c99552cda52935c1520", "d8da963602390f6c002c00ce62a84b514edfce9ebde035b277a957264bb54d21", "8463d93256e026fe436abad44697152b9a56ac8e06a0583d318e9571b83d073c", "9a3f78fcefb9a05e40a23de55f6153d7a8b9d973ede43a380bf46bb3b3847de1", "e3bb576f4b3760b9ca6bff59326f4ebfc4a669d263fb7d67ab9797adea54ed13", "4d5cdbd6dcca5bdf819a0fe8d175dc55cc96f088d37462acd5ea14bc6296bdbe", "5a0ed28de7b5258c727cb85447071c74c00a5fbba9e6bc0393bc51944d04ab2a", "61e4ddb479c283c638f4edec24353b6cc7a3a13b930824aad016b0996ca93c47", "7e3610868acf714836cafaaa7b8c009a9ac6e3a6d443e5586cf661530a204ee2", "d74b244d4345d2c86e30a097105e4fb133d53c578320285132a952cdaa64416e", "cfeed57d0f935bfab89e3f630a7c97e0b1605f0724d85a008bbfb92cb47863a8", "580837af95055670e20d494978f60c7f1458dc4b9e389fc7aa4982b2aca3bce3", "df55c0c49e6c8a83d82dfa1c307d3bf6a20e18721c80d8ec4f1f68dc0a137ced", "5f149c51ce581ba32a285439a806c063ced01ccd4211cd024e6a615b8f216f95", "1eb76b00aeb127b10dd1b7cd4c3edeb4d812b5a658f0feb13e85c4d2b7c6fe06", "7a56ba7c3fb7cbfb5561a46a75d95d7722096b45771ec16e6fa7bbfab0b35dfe", "4bae85ad88c28470f0015246d530adc0cd1778bdd5145c3c6b538ee50c4e04bd", "afd1892e2a7145c99ec0ebe9ded0d3fec21089b277a68d47f45961ec5e39e7e0", "953138885d7b36b0ef79e46030f8e61fd7037fbe5ce9e0a94d728e8c8d7eab86", "de761613ef305e4f628cb6bf97d7b7dc69a9d513dc233630792de97bcda777a6", "3f3087280063d09504c084bbf7fdf984347a72b50d097fd5b086ffabb5b3fb4c", "7d18a94bb1ebfdef4d3e454d2db8cb772f30ca57920dd1e402184a9e598581a0", "a7d6fbdc9126d9f10d10617f49fb9f5474ffe1b229f76b7dd27cebba30eccb5d", "fad0246303618353d1387ec10c09ee991eb6180697ed3470ed9a6b377695203d", "1cf66e09ea51ee5c23df26615a9e7420be2ac8063f28f60a3bc86020e94fe6f3", "8269cdaa153da7c358b0b940791af74d7c651cd4d3f5ed13acfe6d0f2c539e7f", "90d52eaaa60e74bf1c79106113f2599471a902d7b1c39ac1f55b20604f453c09", "9788fd0c09190a3f3d0541f68073a2f44c2fcc45bb97558a7c319f36c25a75b3", "10b68fc44157ecfdae238ee6c1ce0333f906ad04d1a4cb1505c8e35c3c87fbb0", "e5284117fdf3757920475c786e0004cb00ba0932163659a89b36651a01e57394", "403ad51d911e113dcd5f9ff58c94f6d278886a2a4da64c3ceca2083282c92de3", ) } + +func newTestDiscoveryPeer(addr pot.Address, kad *Kademlia) *Peer { + rw := &p2p.MsgPipeRW{} + p := p2p.NewPeer(enode.ID{}, "foo", []p2p.Cap{}) + pp := protocols.NewPeer(p, rw, &protocols.Spec{}) + bp := &BzzPeer{ + Peer: pp, + BzzAddr: &BzzAddr{ + OAddr: addr.Bytes(), + UAddr: []byte(fmt.Sprintf("%x", addr[:])), + }, + } + return NewPeer(bp, kad) +} diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go index 66ae94a88..4b9b28cdc 100644 --- a/swarm/network/protocol.go +++ b/swarm/network/protocol.go @@ -44,7 +44,7 @@ const ( // BzzSpec is the spec of the generic swarm handshake var BzzSpec = &protocols.Spec{ Name: "bzz", - Version: 7, + Version: 8, MaxMsgSize: 10 * 1024 * 1024, Messages: []interface{}{ HandshakeMsg{}, @@ -54,7 +54,7 @@ var BzzSpec = &protocols.Spec{ // DiscoverySpec is the spec for the bzz discovery subprotocols var DiscoverySpec = &protocols.Spec{ Name: "hive", - Version: 6, + Version: 8, MaxMsgSize: 10 * 1024 * 1024, Messages: []interface{}{ peersMsg{}, diff --git a/swarm/network/protocol_test.go b/swarm/network/protocol_test.go index f0d266628..53ceda744 100644 --- a/swarm/network/protocol_test.go +++ b/swarm/network/protocol_test.go @@ -31,7 +31,7 @@ import ( ) const ( - TestProtocolVersion = 7 + TestProtocolVersion = 8 TestProtocolNetworkID = 3 ) diff --git a/swarm/network/simulation/example_test.go b/swarm/network/simulation/example_test.go index bacc64d53..7b6204617 100644 --- a/swarm/network/simulation/example_test.go +++ b/swarm/network/simulation/example_test.go @@ -33,6 +33,10 @@ import ( // BucketKeyKademlia key. This allows to use WaitTillHealthy to block until // all nodes have the their Kadmlias healthy. func ExampleSimulation_WaitTillHealthy() { + + log.Error("temporarily disabled as simulations.WaitTillHealthy cannot be trusted") + return + sim := simulation.New(map[string]simulation.ServiceFunc{ "bzz": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) { addr := network.NewAddr(ctx.Config.Node()) diff --git a/swarm/network/simulation/kademlia.go b/swarm/network/simulation/kademlia.go index f895181d9..7982810ca 100644 --- a/swarm/network/simulation/kademlia.go +++ b/swarm/network/simulation/kademlia.go @@ -33,6 +33,7 @@ var BucketKeyKademlia BucketKey = "kademlia" // WaitTillHealthy is blocking until the health of all kademlias is true. // If error is not nil, a map of kademlia that was found not healthy is returned. +// TODO: Check correctness since change in kademlia depth calculation logic func (s *Simulation) WaitTillHealthy(ctx context.Context, kadMinProxSize int) (ill map[enode.ID]*network.Kademlia, err error) { // Prepare PeerPot map for checking Kademlia health var ppmap map[string]*network.PeerPot diff --git a/swarm/network/simulation/kademlia_test.go b/swarm/network/simulation/kademlia_test.go index 285644a0f..f02b0e541 100644 --- a/swarm/network/simulation/kademlia_test.go +++ b/swarm/network/simulation/kademlia_test.go @@ -28,11 +28,11 @@ import ( ) func TestWaitTillHealthy(t *testing.T) { + sim := New(map[string]ServiceFunc{ "bzz": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) { addr := network.NewAddr(ctx.Config.Node()) hp := network.NewHiveParams() - hp.Discovery = false config := &network.BzzConfig{ OverlayAddr: addr.Over(), UnderlayAddr: addr.Under(), diff --git a/swarm/network/simulation/node_test.go b/swarm/network/simulation/node_test.go index 086ab606f..01346ef14 100644 --- a/swarm/network/simulation/node_test.go +++ b/swarm/network/simulation/node_test.go @@ -160,6 +160,41 @@ func TestAddNodeWithService(t *testing.T) { } } +func TestAddNodeMultipleServices(t *testing.T) { + sim := New(map[string]ServiceFunc{ + "noop1": noopServiceFunc, + "noop2": noopService2Func, + }) + defer sim.Close() + + id, err := sim.AddNode() + if err != nil { + t.Fatal(err) + } + + n := sim.Net.GetNode(id).Node.(*adapters.SimNode) + if n.Service("noop1") == nil { + t.Error("service noop1 not found on node") + } + if n.Service("noop2") == nil { + t.Error("service noop2 not found on node") + } +} + +func TestAddNodeDuplicateServiceError(t *testing.T) { + sim := New(map[string]ServiceFunc{ + "noop1": noopServiceFunc, + "noop2": noopServiceFunc, + }) + defer sim.Close() + + wantErr := "duplicate service: *simulation.noopService" + _, err := sim.AddNode() + if err.Error() != wantErr { + t.Errorf("got error %q, want %q", err, wantErr) + } +} + func TestAddNodes(t *testing.T) { sim := New(noopServiceFuncMap) defer sim.Close() diff --git a/swarm/network/simulation/simulation.go b/swarm/network/simulation/simulation.go index f6d3ce229..e5435b9f0 100644 --- a/swarm/network/simulation/simulation.go +++ b/swarm/network/simulation/simulation.go @@ -68,6 +68,10 @@ type ServiceFunc func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Se // New creates a new Simulation instance with new // simulations.Network initialized with provided services. +// Services map must have unique keys as service names and +// every ServiceFunc must return a node.Service of the unique type. +// This restriction is required by node.Node.Start() function +// which is used to start node.Service returned by ServiceFunc. func New(services map[string]ServiceFunc) (s *Simulation) { s = &Simulation{ buckets: make(map[enode.ID]*sync.Map), @@ -76,6 +80,9 @@ func New(services map[string]ServiceFunc) (s *Simulation) { adapterServices := make(map[string]adapters.ServiceFunc, len(services)) for name, serviceFunc := range services { + // Scope this variables correctly + // as they will be in the adapterServices[name] function accessed later. + name, serviceFunc := name, serviceFunc s.serviceNames = append(s.serviceNames, name) adapterServices[name] = func(ctx *adapters.ServiceContext) (node.Service, error) { b := new(sync.Map) diff --git a/swarm/network/simulation/simulation_test.go b/swarm/network/simulation/simulation_test.go index eed09bf50..ca8599d7c 100644 --- a/swarm/network/simulation/simulation_test.go +++ b/swarm/network/simulation/simulation_test.go @@ -205,3 +205,16 @@ func (t *noopService) Start(server *p2p.Server) error { func (t *noopService) Stop() error { return nil } + +// a helper function for most basic noop service +// of a different type then noopService to test +// multiple services on one node. +func noopService2Func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) { + return new(noopService2), nil, nil +} + +// noopService2 is the service that does not do anything +// but implements node.Service interface. +type noopService2 struct { + noopService +} diff --git a/swarm/network/simulations/overlay.go b/swarm/network/simulations/overlay.go index caf7ff1f2..284ae6398 100644 --- a/swarm/network/simulations/overlay.go +++ b/swarm/network/simulations/overlay.go @@ -64,12 +64,12 @@ func init() { type Simulation struct { mtx sync.Mutex - stores map[enode.ID]*state.InmemoryStore + stores map[enode.ID]state.Store } func NewSimulation() *Simulation { return &Simulation{ - stores: make(map[enode.ID]*state.InmemoryStore), + stores: make(map[enode.ID]state.Store), } } diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index c5f1fa176..e0a7f7e12 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -38,7 +38,6 @@ import ( "github.com/ethereum/go-ethereum/swarm/pot" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" - mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db" "github.com/ethereum/go-ethereum/swarm/testutil" colorable "github.com/mattn/go-colorable" ) @@ -69,21 +68,6 @@ func init() { log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))) } -func createGlobalStore() (string, *mockdb.GlobalStore, error) { - var globalStore *mockdb.GlobalStore - globalStoreDir, err := ioutil.TempDir("", "global.store") - if err != nil { - log.Error("Error initiating global store temp directory!", "err", err) - return "", nil, err - } - globalStore, err = mockdb.NewGlobalStore(globalStoreDir) - if err != nil { - log.Error("Error initiating global store!", "err", err) - return "", nil, err - } - return globalStoreDir, globalStore, nil -} - func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) { // setup addr := network.RandomAddr() // tested peers peer address diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 0109fbdef..c73298d9a 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -39,6 +39,7 @@ const ( var ( processReceivedChunksCount = metrics.NewRegisteredCounter("network.stream.received_chunks.count", nil) handleRetrieveRequestMsgCount = metrics.NewRegisteredCounter("network.stream.handle_retrieve_request_msg.count", nil) + retrieveChunkFail = metrics.NewRegisteredCounter("network.stream.retrieve_chunks_fail.count", nil) requestFromPeersCount = metrics.NewRegisteredCounter("network.stream.request_from_peers.count", nil) requestFromPeersEachCount = metrics.NewRegisteredCounter("network.stream.request_from_peers_each.count", nil) @@ -169,7 +170,8 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * go func() { chunk, err := d.chunkStore.Get(ctx, req.Addr) if err != nil { - log.Warn("ChunkStore.Get can not retrieve chunk", "err", err) + retrieveChunkFail.Inc(1) + log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err) return } if req.SkipCheck { @@ -255,7 +257,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) ( } sp = d.getPeer(id) if sp == nil { - log.Warn("Delivery.RequestFromPeers: peer not found", "id", id) + //log.Warn("Delivery.RequestFromPeers: peer not found", "id", id) return true } spID = &id diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index a6173a389..f69f80499 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -453,6 +453,8 @@ func TestDeliveryFromNodes(t *testing.T) { } func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bool) { + + t.Skip("temporarily disabled as simulations.WaitTillHealthy cannot be trusted") sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { node := ctx.Config.Node() diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index defb6df50..668cf586c 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -52,6 +52,8 @@ func TestIntervalsLiveAndHistory(t *testing.T) { } func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { + + t.Skip("temporarily disabled as simulations.WaitTillHealthy cannot be trusted") nodes := 2 chunkCount := dataChunkCount externalStreamName := "externalStream" diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index 5ea0b1511..932e28b32 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -246,6 +246,7 @@ simulation's `action` function. The snapshot should have 'streamer' in its service list. */ func runRetrievalTest(chunkCount int, nodeCount int) error { + sim := simulation.New(retrievalSimServiceMap) defer sim.Close() diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 6b92c32ae..4a632c8c9 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -35,7 +35,8 @@ import ( "github.com/ethereum/go-ethereum/swarm/pot" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" - mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db" + "github.com/ethereum/go-ethereum/swarm/storage/mock" + mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem" "github.com/ethereum/go-ethereum/swarm/testutil" ) @@ -181,6 +182,8 @@ func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Servic } func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { + + t.Skip("temporarily disabled as simulations.WaitTillHealthy cannot be trusted") sim := simulation.New(simServiceMap) defer sim.Close() @@ -268,20 +271,9 @@ func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulatio // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. - var gDir string - var globalStore *mockdb.GlobalStore + var globalStore mock.GlobalStorer if *useMockStore { - gDir, globalStore, err = createGlobalStore() - if err != nil { - return fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil") - } - defer func() { - os.RemoveAll(gDir) - err := globalStore.Close() - if err != nil { - log.Error("Error closing global store! %v", "err", err) - } - }() + globalStore = mockmem.NewGlobalStore() } REPEAT: for { @@ -339,6 +331,8 @@ assuming that the snapshot file identifies a healthy kademlia network. The snapshot should have 'streamer' in its service list. */ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) error { + + t.Skip("temporarily disabled as simulations.WaitTillHealthy cannot be trusted") sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { n := ctx.Config.Node() @@ -476,14 +470,9 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) return err } - var gDir string - var globalStore *mockdb.GlobalStore + var globalStore mock.GlobalStorer if *useMockStore { - gDir, globalStore, err = createGlobalStore() - if err != nil { - return fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil") - } - defer os.RemoveAll(gDir) + globalStore = mockmem.NewGlobalStore() } // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index fe20bab26..3e3cee18d 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -35,7 +35,8 @@ import ( "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" - mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db" + "github.com/ethereum/go-ethereum/swarm/storage/mock" + mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem" "github.com/ethereum/go-ethereum/swarm/testutil" ) @@ -48,7 +49,7 @@ func TestSyncerSimulation(t *testing.T) { testSyncBetweenNodes(t, 16, 1, dataChunkCount, true, 1) } -func createMockStore(globalStore *mockdb.GlobalStore, id enode.ID, addr *network.BzzAddr) (lstore storage.ChunkStore, datadir string, err error) { +func createMockStore(globalStore mock.GlobalStorer, id enode.ID, addr *network.BzzAddr) (lstore storage.ChunkStore, datadir string, err error) { address := common.BytesToAddress(id.Bytes()) mockStore := globalStore.NewNodeStore(address) params := storage.NewDefaultLocalStoreParams() @@ -67,11 +68,12 @@ func createMockStore(globalStore *mockdb.GlobalStore, id enode.ID, addr *network } func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bool, po uint8) { + + t.Skip("temporarily disabled as simulations.WaitTillHealthy cannot be trusted") sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { var store storage.ChunkStore - var globalStore *mockdb.GlobalStore - var gDir, datadir string + var datadir string node := ctx.Config.Node() addr := network.NewAddr(node) @@ -79,11 +81,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck addr.OAddr[0] = byte(0) if *useMockStore { - gDir, globalStore, err = createGlobalStore() - if err != nil { - return nil, nil, fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil") - } - store, datadir, err = createMockStore(globalStore, node.ID(), addr) + store, datadir, err = createMockStore(mockmem.NewGlobalStore(), node.ID(), addr) } else { store, datadir, err = createTestLocalStorageForID(node.ID(), addr) } @@ -94,13 +92,6 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck cleanup = func() { store.Close() os.RemoveAll(datadir) - if *useMockStore { - err := globalStore.Close() - if err != nil { - log.Error("Error closing global store! %v", "err", err) - } - os.RemoveAll(gDir) - } } localStore := store.(*storage.LocalStore) netStore, err := storage.NewNetStore(localStore, nil) @@ -243,3 +234,170 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck t.Fatal(result.Error) } } + +//TestSameVersionID just checks that if the version is not changed, +//then streamer peers see each other +func TestSameVersionID(t *testing.T) { + //test version ID + v := uint(1) + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + var store storage.ChunkStore + var datadir string + + node := ctx.Config.Node() + addr := network.NewAddr(node) + + store, datadir, err = createTestLocalStorageForID(node.ID(), addr) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyStore, store) + cleanup = func() { + store.Close() + os.RemoveAll(datadir) + } + localStore := store.(*storage.LocalStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyDB, netStore) + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New + + bucket.Store(bucketKeyDelivery, delivery) + + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + Retrieval: RetrievalDisabled, + Syncing: SyncingAutoSubscribe, + }, nil) + //assign to each node the same version ID + r.spec.Version = v + + bucket.Store(bucketKeyRegistry, r) + + return r, cleanup, nil + + }, + }) + defer sim.Close() + + //connect just two nodes + log.Info("Adding nodes to simulation") + _, err := sim.AddNodesAndConnectChain(2) + if err != nil { + t.Fatal(err) + } + + log.Info("Starting simulation") + ctx := context.Background() + //make sure they have time to connect + time.Sleep(200 * time.Millisecond) + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + //get the pivot node's filestore + nodes := sim.UpNodeIDs() + + item, ok := sim.NodeItem(nodes[0], bucketKeyRegistry) + if !ok { + return fmt.Errorf("No filestore") + } + registry := item.(*Registry) + + //the peers should connect, thus getting the peer should not return nil + if registry.getPeer(nodes[1]) == nil { + t.Fatal("Expected the peer to not be nil, but it is") + } + return nil + }) + if result.Error != nil { + t.Fatal(result.Error) + } + log.Info("Simulation ended") +} + +//TestDifferentVersionID proves that if the streamer protocol version doesn't match, +//then the peers are not connected at streamer level +func TestDifferentVersionID(t *testing.T) { + //create a variable to hold the version ID + v := uint(0) + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + var store storage.ChunkStore + var datadir string + + node := ctx.Config.Node() + addr := network.NewAddr(node) + + store, datadir, err = createTestLocalStorageForID(node.ID(), addr) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyStore, store) + cleanup = func() { + store.Close() + os.RemoveAll(datadir) + } + localStore := store.(*storage.LocalStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyDB, netStore) + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New + + bucket.Store(bucketKeyDelivery, delivery) + + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + Retrieval: RetrievalDisabled, + Syncing: SyncingAutoSubscribe, + }, nil) + + //increase the version ID for each node + v++ + r.spec.Version = v + + bucket.Store(bucketKeyRegistry, r) + + return r, cleanup, nil + + }, + }) + defer sim.Close() + + //connect the nodes + log.Info("Adding nodes to simulation") + _, err := sim.AddNodesAndConnectChain(2) + if err != nil { + t.Fatal(err) + } + + log.Info("Starting simulation") + ctx := context.Background() + //make sure they have time to connect + time.Sleep(200 * time.Millisecond) + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + //get the pivot node's filestore + nodes := sim.UpNodeIDs() + + item, ok := sim.NodeItem(nodes[0], bucketKeyRegistry) + if !ok { + return fmt.Errorf("No filestore") + } + registry := item.(*Registry) + + //getting the other peer should fail due to the different version numbers + if registry.getPeer(nodes[1]) != nil { + t.Fatal("Expected the peer to be nil, but it is not") + } + return nil + }) + if result.Error != nil { + t.Fatal(result.Error) + } + log.Info("Simulation ended") + +} diff --git a/swarm/network/stream/visualized_snapshot_sync_sim_test.go b/swarm/network/stream/visualized_snapshot_sync_sim_test.go index 437c17e5e..f6d618020 100644 --- a/swarm/network/stream/visualized_snapshot_sync_sim_test.go +++ b/swarm/network/stream/visualized_snapshot_sync_sim_test.go @@ -84,6 +84,8 @@ func watchSim(sim *simulation.Simulation) (context.Context, context.CancelFunc) //This test requests bogus hashes into the network func TestNonExistingHashesWithServer(t *testing.T) { + + t.Skip("temporarily disabled as simulations.WaitTillHealthy cannot be trusted") nodeCount, _, sim := setupSim(retrievalSimServiceMap) defer sim.Close() @@ -143,6 +145,7 @@ func sendSimTerminatedEvent(sim *simulation.Simulation) { //can visualize messages like SendOfferedMsg, WantedHashesMsg, DeliveryMsg func TestSnapshotSyncWithServer(t *testing.T) { + t.Skip("temporarily disabled as simulations.WaitTillHealthy cannot be trusted") nodeCount, chunkCount, sim := setupSim(simServiceMap) defer sim.Close() diff --git a/swarm/network_test.go b/swarm/network_test.go index d84f28147..41993dfc6 100644 --- a/swarm/network_test.go +++ b/swarm/network_test.go @@ -259,6 +259,8 @@ type testSwarmNetworkOptions struct { // - May wait for Kademlia on every node to be healthy. // - Checking if a file is retrievable from all nodes. func testSwarmNetwork(t *testing.T, o *testSwarmNetworkOptions, steps ...testSwarmNetworkStep) { + + t.Skip("temporarily disabled as simulations.WaitTillHealthy cannot be trusted") if o == nil { o = new(testSwarmNetworkOptions) } diff --git a/swarm/pss/api.go b/swarm/pss/api.go index eba7bb722..587382d72 100644 --- a/swarm/pss/api.go +++ b/swarm/pss/api.go @@ -51,7 +51,7 @@ func NewAPI(ps *Pss) *API { // // All incoming messages to the node matching this topic will be encapsulated in the APIMsg // struct and sent to the subscriber -func (pssapi *API) Receive(ctx context.Context, topic Topic) (*rpc.Subscription, error) { +func (pssapi *API) Receive(ctx context.Context, topic Topic, raw bool, prox bool) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return nil, fmt.Errorf("Subscribe not supported") @@ -59,7 +59,7 @@ func (pssapi *API) Receive(ctx context.Context, topic Topic) (*rpc.Subscription, psssub := notifier.CreateSubscription() - handler := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + hndlr := NewHandler(func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { apimsg := &APIMsg{ Msg: hexutil.Bytes(msg), Asymmetric: asymmetric, @@ -69,9 +69,15 @@ func (pssapi *API) Receive(ctx context.Context, topic Topic) (*rpc.Subscription, log.Warn(fmt.Sprintf("notification on pss sub topic rpc (sub %v) msg %v failed!", psssub.ID, msg)) } return nil + }) + if raw { + hndlr.caps.raw = true + } + if prox { + hndlr.caps.prox = true } - deregf := pssapi.Register(&topic, handler) + deregf := pssapi.Register(&topic, hndlr) go func() { defer deregf() select { @@ -158,6 +164,10 @@ func (pssapi *API) SendSym(symkeyhex string, topic Topic, msg hexutil.Bytes) err return pssapi.Pss.SendSym(symkeyhex, topic, msg[:]) } +func (pssapi *API) SendRaw(addr hexutil.Bytes, topic Topic, msg hexutil.Bytes) error { + return pssapi.Pss.SendRaw(PssAddress(addr), topic, msg[:]) +} + func (pssapi *API) GetPeerTopics(pubkeyhex string) ([]Topic, error) { topics, _, err := pssapi.Pss.GetPublickeyPeers(pubkeyhex) return topics, err diff --git a/swarm/pss/client/client.go b/swarm/pss/client/client.go index d541081d3..5ee387aa7 100644 --- a/swarm/pss/client/client.go +++ b/swarm/pss/client/client.go @@ -236,7 +236,7 @@ func (c *Client) RunProtocol(ctx context.Context, proto *p2p.Protocol) error { topichex := topicobj.String() msgC := make(chan pss.APIMsg) c.peerPool[topicobj] = make(map[string]*pssRPCRW) - sub, err := c.rpc.Subscribe(ctx, "pss", msgC, "receive", topichex) + sub, err := c.rpc.Subscribe(ctx, "pss", msgC, "receive", topichex, false, false) if err != nil { return fmt.Errorf("pss event subscription failed: %v", err) } diff --git a/swarm/pss/handshake.go b/swarm/pss/handshake.go index e3ead77d0..5486abafa 100644 --- a/swarm/pss/handshake.go +++ b/swarm/pss/handshake.go @@ -486,7 +486,7 @@ func (api *HandshakeAPI) Handshake(pubkeyid string, topic Topic, sync bool, flus // Activate handshake functionality on a topic func (api *HandshakeAPI) AddHandshake(topic Topic) error { - api.ctrl.deregisterFuncs[topic] = api.ctrl.pss.Register(&topic, api.ctrl.handler) + api.ctrl.deregisterFuncs[topic] = api.ctrl.pss.Register(&topic, NewHandler(api.ctrl.handler)) return nil } diff --git a/swarm/pss/notify/notify.go b/swarm/pss/notify/notify.go index 3731fb9db..d3c89058b 100644 --- a/swarm/pss/notify/notify.go +++ b/swarm/pss/notify/notify.go @@ -113,7 +113,7 @@ func NewController(ps *pss.Pss) *Controller { notifiers: make(map[string]*notifier), subscriptions: make(map[string]*subscription), } - ctrl.pss.Register(&controlTopic, ctrl.Handler) + ctrl.pss.Register(&controlTopic, pss.NewHandler(ctrl.Handler)) return ctrl } @@ -336,7 +336,7 @@ func (c *Controller) handleNotifyWithKeyMsg(msg *Msg) error { // \TODO keep track of and add actual address updaterAddr := pss.PssAddress([]byte{}) c.pss.SetSymmetricKey(symkey, topic, &updaterAddr, true) - c.pss.Register(&topic, c.Handler) + c.pss.Register(&topic, pss.NewHandler(c.Handler)) return c.subscriptions[msg.namestring].handler(msg.namestring, msg.Payload[:len(msg.Payload)-symKeyLength]) } diff --git a/swarm/pss/notify/notify_test.go b/swarm/pss/notify/notify_test.go index d4d383a6b..6100195b0 100644 --- a/swarm/pss/notify/notify_test.go +++ b/swarm/pss/notify/notify_test.go @@ -121,7 +121,7 @@ func TestStart(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() rmsgC := make(chan *pss.APIMsg) - rightSub, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", controlTopic) + rightSub, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", controlTopic, false, false) if err != nil { t.Fatal(err) } @@ -174,7 +174,7 @@ func TestStart(t *testing.T) { t.Fatalf("expected payload length %d, have %d", len(updateMsg)+symKeyLength, len(dMsg.Payload)) } - rightSubUpdate, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", rsrcTopic) + rightSubUpdate, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", rsrcTopic, false, false) if err != nil { t.Fatal(err) } diff --git a/swarm/pss/protocol_test.go b/swarm/pss/protocol_test.go index 4ef3e90a0..520c48a20 100644 --- a/swarm/pss/protocol_test.go +++ b/swarm/pss/protocol_test.go @@ -92,7 +92,7 @@ func testProtocol(t *testing.T) { lmsgC := make(chan APIMsg) lctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic) + lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false, false) if err != nil { t.Fatal(err) } @@ -100,7 +100,7 @@ func testProtocol(t *testing.T) { rmsgC := make(chan APIMsg) rctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false) if err != nil { t.Fatal(err) } @@ -130,6 +130,7 @@ func testProtocol(t *testing.T) { log.Debug("lnode ok") case cerr := <-lctx.Done(): t.Fatalf("test message timed out: %v", cerr) + return } select { case <-rmsgC: diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index e1e24e1f5..d0986d280 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -23,11 +23,13 @@ import ( "crypto/rand" "errors" "fmt" + "hash" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" @@ -136,10 +138,10 @@ type Pss struct { symKeyDecryptCacheCapacity int // max amount of symkeys to keep. // message handling - handlers map[Topic]map[*Handler]bool // topic and version based pss payload handlers. See pss.Handle() - handlersMu sync.RWMutex - allowRaw bool - hashPool sync.Pool + handlers map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle() + handlersMu sync.RWMutex + hashPool sync.Pool + topicHandlerCaps map[Topic]*handlerCaps // caches capabilities of each topic's handlers (see handlerCap* consts in types.go) // process quitC chan struct{} @@ -180,11 +182,12 @@ func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) { symKeyDecryptCache: make([]*string, params.SymKeyCacheCapacity), symKeyDecryptCacheCapacity: params.SymKeyCacheCapacity, - handlers: make(map[Topic]map[*Handler]bool), - allowRaw: params.AllowRaw, + handlers: make(map[Topic]map[*handler]bool), + topicHandlerCaps: make(map[Topic]*handlerCaps), + hashPool: sync.Pool{ New: func() interface{} { - return storage.MakeHashFunc(storage.DefaultHash)() + return sha3.NewKeccak256() }, }, } @@ -313,30 +316,54 @@ func (p *Pss) PublicKey() *ecdsa.PublicKey { // // Returns a deregister function which needs to be called to // deregister the handler, -func (p *Pss) Register(topic *Topic, handler Handler) func() { +func (p *Pss) Register(topic *Topic, hndlr *handler) func() { p.handlersMu.Lock() defer p.handlersMu.Unlock() handlers := p.handlers[*topic] if handlers == nil { - handlers = make(map[*Handler]bool) + handlers = make(map[*handler]bool) p.handlers[*topic] = handlers + log.Debug("registered handler", "caps", hndlr.caps) + } + if hndlr.caps == nil { + hndlr.caps = &handlerCaps{} + } + handlers[hndlr] = true + if _, ok := p.topicHandlerCaps[*topic]; !ok { + p.topicHandlerCaps[*topic] = &handlerCaps{} } - handlers[&handler] = true - return func() { p.deregister(topic, &handler) } + if hndlr.caps.raw { + p.topicHandlerCaps[*topic].raw = true + } + if hndlr.caps.prox { + p.topicHandlerCaps[*topic].prox = true + } + return func() { p.deregister(topic, hndlr) } } -func (p *Pss) deregister(topic *Topic, h *Handler) { +func (p *Pss) deregister(topic *Topic, hndlr *handler) { p.handlersMu.Lock() defer p.handlersMu.Unlock() handlers := p.handlers[*topic] - if len(handlers) == 1 { + if len(handlers) > 1 { delete(p.handlers, *topic) + // topic caps might have changed now that a handler is gone + caps := &handlerCaps{} + for h := range handlers { + if h.caps.raw { + caps.raw = true + } + if h.caps.prox { + caps.prox = true + } + } + p.topicHandlerCaps[*topic] = caps return } - delete(handlers, h) + delete(handlers, hndlr) } // get all registered handlers for respective topics -func (p *Pss) getHandlers(topic Topic) map[*Handler]bool { +func (p *Pss) getHandlers(topic Topic) map[*handler]bool { p.handlersMu.RLock() defer p.handlersMu.RUnlock() return p.handlers[topic] @@ -348,12 +375,11 @@ func (p *Pss) getHandlers(topic Topic) map[*Handler]bool { // Only passes error to pss protocol handler if payload is not valid pssmsg func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { metrics.GetOrRegisterCounter("pss.handlepssmsg", nil).Inc(1) - pssmsg, ok := msg.(*PssMsg) - if !ok { return fmt.Errorf("invalid message type. Expected *PssMsg, got %T ", msg) } + log.Trace("handler", "self", label(p.Kademlia.BaseAddr()), "topic", label(pssmsg.Payload.Topic[:])) if int64(pssmsg.Expire) < time.Now().Unix() { metrics.GetOrRegisterCounter("pss.expire", nil).Inc(1) log.Warn("pss filtered expired message", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", common.ToHex(pssmsg.To)) @@ -365,13 +391,34 @@ func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { } p.addFwdCache(pssmsg) - if !p.isSelfPossibleRecipient(pssmsg) { - log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr())) + psstopic := Topic(pssmsg.Payload.Topic) + + // raw is simplest handler contingency to check, so check that first + var isRaw bool + if pssmsg.isRaw() { + if !p.topicHandlerCaps[psstopic].raw { + log.Debug("No handler for raw message", "topic", psstopic) + return nil + } + isRaw = true + } + + // check if we can be recipient: + // - no prox handler on message and partial address matches + // - prox handler on message and we are in prox regardless of partial address match + // store this result so we don't calculate again on every handler + var isProx bool + if _, ok := p.topicHandlerCaps[psstopic]; ok { + isProx = p.topicHandlerCaps[psstopic].prox + } + isRecipient := p.isSelfPossibleRecipient(pssmsg, isProx) + if !isRecipient { + log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr()), "prox", isProx) return p.enqueue(pssmsg) } - log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(p.BaseAddr())) - if err := p.process(pssmsg); err != nil { + log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(p.BaseAddr()), "prox", isProx, "raw", isRaw, "topic", label(pssmsg.Payload.Topic[:])) + if err := p.process(pssmsg, isRaw, isProx); err != nil { qerr := p.enqueue(pssmsg) if qerr != nil { return fmt.Errorf("process fail: processerr %v, queueerr: %v", err, qerr) @@ -384,7 +431,7 @@ func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { // Entry point to processing a message for which the current node can be the intended recipient. // Attempts symmetric and asymmetric decryption with stored keys. // Dispatches message to all handlers matching the message topic -func (p *Pss) process(pssmsg *PssMsg) error { +func (p *Pss) process(pssmsg *PssMsg, raw bool, prox bool) error { metrics.GetOrRegisterCounter("pss.process", nil).Inc(1) var err error @@ -397,10 +444,8 @@ func (p *Pss) process(pssmsg *PssMsg) error { envelope := pssmsg.Payload psstopic := Topic(envelope.Topic) - if pssmsg.isRaw() { - if !p.allowRaw { - return errors.New("raw message support disabled") - } + + if raw { payload = pssmsg.Payload.Data } else { if pssmsg.isSym() { @@ -422,19 +467,27 @@ func (p *Pss) process(pssmsg *PssMsg) error { return err } } - p.executeHandlers(psstopic, payload, from, asymmetric, keyid) + p.executeHandlers(psstopic, payload, from, raw, prox, asymmetric, keyid) return nil } -func (p *Pss) executeHandlers(topic Topic, payload []byte, from *PssAddress, asymmetric bool, keyid string) { +func (p *Pss) executeHandlers(topic Topic, payload []byte, from *PssAddress, raw bool, prox bool, asymmetric bool, keyid string) { handlers := p.getHandlers(topic) peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{}) - for f := range handlers { - err := (*f)(payload, peer, asymmetric, keyid) + for h := range handlers { + if !h.caps.raw && raw { + log.Warn("norawhandler") + continue + } + if !h.caps.prox && prox { + log.Warn("noproxhandler") + continue + } + err := (h.f)(payload, peer, asymmetric, keyid) if err != nil { - log.Warn("Pss handler %p failed: %v", f, err) + log.Warn("Pss handler failed", "err", err) } } } @@ -445,9 +498,23 @@ func (p *Pss) isSelfRecipient(msg *PssMsg) bool { } // test match of leftmost bytes in given message to node's Kademlia address -func (p *Pss) isSelfPossibleRecipient(msg *PssMsg) bool { +func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool { local := p.Kademlia.BaseAddr() - return bytes.Equal(msg.To, local[:len(msg.To)]) + + // if a partial address matches we are possible recipient regardless of prox + // if not and prox is not set, we are surely not + if bytes.Equal(msg.To, local[:len(msg.To)]) { + + return true + } else if !prox { + return false + } + + depth := p.Kademlia.NeighbourhoodDepth() + po, _ := p.Kademlia.Pof(p.Kademlia.BaseAddr(), msg.To, 0) + log.Trace("selfpossible", "po", po, "depth", depth) + + return depth <= po } ///////////////////////////////////////////////////////////////////// @@ -684,9 +751,6 @@ func (p *Pss) enqueue(msg *PssMsg) error { // // Will fail if raw messages are disallowed func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error { - if !p.allowRaw { - return errors.New("Raw messages not enabled") - } pssMsgParams := &msgParams{ raw: true, } @@ -699,7 +763,17 @@ func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error { pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix()) pssMsg.Payload = payload p.addFwdCache(pssMsg) - return p.enqueue(pssMsg) + err := p.enqueue(pssMsg) + if err != nil { + return err + } + + // if we have a proxhandler on this topic + // also deliver message to ourselves + if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox { + return p.process(pssMsg, true, true) + } + return nil } // Send a message using symmetric encryption @@ -800,7 +874,16 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by pssMsg.To = to pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix()) pssMsg.Payload = envelope - return p.enqueue(pssMsg) + err = p.enqueue(pssMsg) + if err != nil { + return err + } + if _, ok := p.topicHandlerCaps[topic]; ok { + if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox { + return p.process(pssMsg, true, true) + } + } + return nil } // Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct @@ -895,6 +978,10 @@ func (p *Pss) cleanFwdCache() { } } +func label(b []byte) string { + return fmt.Sprintf("%04x", b[:2]) +} + // add a message to the cache func (p *Pss) addFwdCache(msg *PssMsg) error { metrics.GetOrRegisterCounter("pss.addfwdcache", nil).Inc(1) @@ -934,10 +1021,14 @@ func (p *Pss) checkFwdCache(msg *PssMsg) bool { // Digest of message func (p *Pss) digest(msg *PssMsg) pssDigest { - hasher := p.hashPool.Get().(storage.SwarmHash) + return p.digestBytes(msg.serialize()) +} + +func (p *Pss) digestBytes(msg []byte) pssDigest { + hasher := p.hashPool.Get().(hash.Hash) defer p.hashPool.Put(hasher) hasher.Reset() - hasher.Write(msg.serialize()) + hasher.Write(msg) digest := pssDigest{} key := hasher.Sum(nil) copy(digest[:], key[:digestLength]) diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index 66a90be62..72f62acd9 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -48,20 +48,23 @@ import ( "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/pot" "github.com/ethereum/go-ethereum/swarm/state" whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" ) var ( - initOnce = sync.Once{} - debugdebugflag = flag.Bool("vv", false, "veryverbose") - debugflag = flag.Bool("v", false, "verbose") - longrunning = flag.Bool("longrunning", false, "do run long-running tests") - w *whisper.Whisper - wapi *whisper.PublicWhisperAPI - psslogmain log.Logger - pssprotocols map[string]*protoCtrl - useHandshake bool + initOnce = sync.Once{} + loglevel = flag.Int("loglevel", 2, "logging verbosity") + longrunning = flag.Bool("longrunning", false, "do run long-running tests") + w *whisper.Whisper + wapi *whisper.PublicWhisperAPI + psslogmain log.Logger + pssprotocols map[string]*protoCtrl + useHandshake bool + noopHandlerFunc = func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + return nil + } ) func init() { @@ -75,16 +78,9 @@ func init() { func initTest() { initOnce.Do( func() { - loglevel := log.LvlInfo - if *debugflag { - loglevel = log.LvlDebug - } else if *debugdebugflag { - loglevel = log.LvlTrace - } - psslogmain = log.New("psslog", "*") hs := log.StreamHandler(os.Stderr, log.TerminalFormat(true)) - hf := log.LvlFilterHandler(loglevel, hs) + hf := log.LvlFilterHandler(log.Lvl(*loglevel), hs) h := log.CallerFileHandler(hf) log.Root().SetHandler(h) @@ -280,15 +276,14 @@ func TestAddressMatch(t *testing.T) { } pssmsg := &PssMsg{ - To: remoteaddr, - Payload: &whisper.Envelope{}, + To: remoteaddr, } // differ from first byte if ps.isSelfRecipient(pssmsg) { t.Fatalf("isSelfRecipient true but %x != %x", remoteaddr, localaddr) } - if ps.isSelfPossibleRecipient(pssmsg) { + if ps.isSelfPossibleRecipient(pssmsg, false) { t.Fatalf("isSelfPossibleRecipient true but %x != %x", remoteaddr[:8], localaddr[:8]) } @@ -297,7 +292,7 @@ func TestAddressMatch(t *testing.T) { if ps.isSelfRecipient(pssmsg) { t.Fatalf("isSelfRecipient true but %x != %x", remoteaddr, localaddr) } - if !ps.isSelfPossibleRecipient(pssmsg) { + if !ps.isSelfPossibleRecipient(pssmsg, false) { t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8]) } @@ -306,13 +301,342 @@ func TestAddressMatch(t *testing.T) { if !ps.isSelfRecipient(pssmsg) { t.Fatalf("isSelfRecipient false but %x == %x", remoteaddr, localaddr) } - if !ps.isSelfPossibleRecipient(pssmsg) { + if !ps.isSelfPossibleRecipient(pssmsg, false) { t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8]) } + } -// -func TestHandlerConditions(t *testing.T) { +// test that message is handled by sender if a prox handler exists and sender is in prox of message +func TestProxShortCircuit(t *testing.T) { + + // sender node address + localAddr := network.RandomAddr().Over() + localPotAddr := pot.NewAddressFromBytes(localAddr) + + // set up kademlia + kadParams := network.NewKadParams() + kad := network.NewKademlia(localAddr, kadParams) + peerCount := kad.MinBinSize + 1 + + // set up pss + privKey, err := crypto.GenerateKey() + pssp := NewPssParams().WithPrivateKey(privKey) + ps, err := NewPss(kad, pssp) + if err != nil { + t.Fatal(err.Error()) + } + + // create kademlia peers, so we have peers both inside and outside minproxlimit + var peers []*network.Peer + proxMessageAddress := pot.RandomAddressAt(localPotAddr, peerCount).Bytes() + distantMessageAddress := pot.RandomAddressAt(localPotAddr, 0).Bytes() + + for i := 0; i < peerCount; i++ { + rw := &p2p.MsgPipeRW{} + ptpPeer := p2p.NewPeer(enode.ID{}, "wanna be with me? [ ] yes [ ] no", []p2p.Cap{}) + protoPeer := protocols.NewPeer(ptpPeer, rw, &protocols.Spec{}) + peerAddr := pot.RandomAddressAt(localPotAddr, i) + bzzPeer := &network.BzzPeer{ + Peer: protoPeer, + BzzAddr: &network.BzzAddr{ + OAddr: peerAddr.Bytes(), + UAddr: []byte(fmt.Sprintf("%x", peerAddr[:])), + }, + } + peer := network.NewPeer(bzzPeer, kad) + kad.On(peer) + peers = append(peers, peer) + } + + // register it marking prox capability + delivered := make(chan struct{}) + rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + log.Trace("in allowraw handler") + delivered <- struct{}{} + return nil + } + topic := BytesToTopic([]byte{0x2a}) + hndlrProxDereg := ps.Register(&topic, &handler{ + f: rawHandlerFunc, + caps: &handlerCaps{ + raw: true, + prox: true, + }, + }) + defer hndlrProxDereg() + + // send message too far away for sender to be in prox + // reception of this message should time out + errC := make(chan error) + go func() { + err := ps.SendRaw(distantMessageAddress, topic, []byte("foo")) + if err != nil { + errC <- err + } + }() + + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + select { + case <-delivered: + t.Fatal("raw distant message delivered") + case err := <-errC: + t.Fatal(err) + case <-ctx.Done(): + } + + // send message that should be within sender prox + // this message should be delivered + go func() { + err := ps.SendRaw(proxMessageAddress, topic, []byte("bar")) + if err != nil { + errC <- err + } + }() + + ctx, cancel = context.WithTimeout(context.TODO(), time.Second) + defer cancel() + select { + case <-delivered: + case err := <-errC: + t.Fatal(err) + case <-ctx.Done(): + t.Fatal("raw timeout") + } + + // try the same prox message with sym and asym send + proxAddrPss := PssAddress(proxMessageAddress) + symKeyId, err := ps.GenerateSymmetricKey(topic, &proxAddrPss, true) + go func() { + err := ps.SendSym(symKeyId, topic, []byte("baz")) + if err != nil { + errC <- err + } + }() + ctx, cancel = context.WithTimeout(context.TODO(), time.Second) + defer cancel() + select { + case <-delivered: + case err := <-errC: + t.Fatal(err) + case <-ctx.Done(): + t.Fatal("sym timeout") + } + + err = ps.SetPeerPublicKey(&privKey.PublicKey, topic, &proxAddrPss) + if err != nil { + t.Fatal(err) + } + pubKeyId := hexutil.Encode(crypto.FromECDSAPub(&privKey.PublicKey)) + go func() { + err := ps.SendAsym(pubKeyId, topic, []byte("xyzzy")) + if err != nil { + errC <- err + } + }() + ctx, cancel = context.WithTimeout(context.TODO(), time.Second) + defer cancel() + select { + case <-delivered: + case err := <-errC: + t.Fatal(err) + case <-ctx.Done(): + t.Fatal("asym timeout") + } +} + +// verify that node can be set as recipient regardless of explicit message address match if minimum one handler of a topic is explicitly set to allow it +// note that in these tests we use the raw capability on handlers for convenience +func TestAddressMatchProx(t *testing.T) { + + // recipient node address + localAddr := network.RandomAddr().Over() + localPotAddr := pot.NewAddressFromBytes(localAddr) + + // set up kademlia + kadparams := network.NewKadParams() + kad := network.NewKademlia(localAddr, kadparams) + nnPeerCount := kad.MinBinSize + peerCount := nnPeerCount + 2 + + // set up pss + privKey, err := crypto.GenerateKey() + pssp := NewPssParams().WithPrivateKey(privKey) + ps, err := NewPss(kad, pssp) + if err != nil { + t.Fatal(err.Error()) + } + + // create kademlia peers, so we have peers both inside and outside minproxlimit + var peers []*network.Peer + for i := 0; i < peerCount; i++ { + rw := &p2p.MsgPipeRW{} + ptpPeer := p2p.NewPeer(enode.ID{}, "362436 call me anytime", []p2p.Cap{}) + protoPeer := protocols.NewPeer(ptpPeer, rw, &protocols.Spec{}) + peerAddr := pot.RandomAddressAt(localPotAddr, i) + bzzPeer := &network.BzzPeer{ + Peer: protoPeer, + BzzAddr: &network.BzzAddr{ + OAddr: peerAddr.Bytes(), + UAddr: []byte(fmt.Sprintf("%x", peerAddr[:])), + }, + } + peer := network.NewPeer(bzzPeer, kad) + kad.On(peer) + peers = append(peers, peer) + } + + // TODO: create a test in the network package to make a table with n peers where n-m are proxpeers + // meanwhile test regression for kademlia since we are compiling the test parameters from different packages + var proxes int + var conns int + kad.EachConn(nil, peerCount, func(p *network.Peer, po int, prox bool) bool { + conns++ + if prox { + proxes++ + } + log.Trace("kadconn", "po", po, "peer", p, "prox", prox) + return true + }) + if proxes != nnPeerCount { + t.Fatalf("expected %d proxpeers, have %d", nnPeerCount, proxes) + } else if conns != peerCount { + t.Fatalf("expected %d peers total, have %d", peerCount, proxes) + } + + // remote address distances from localAddr to try and the expected outcomes if we use prox handler + remoteDistances := []int{ + 255, + nnPeerCount + 1, + nnPeerCount, + nnPeerCount - 1, + 0, + } + expects := []bool{ + true, + true, + true, + false, + false, + } + + // first the unit test on the method that calculates possible receipient using prox + for i, distance := range remoteDistances { + pssMsg := newPssMsg(&msgParams{}) + pssMsg.To = make([]byte, len(localAddr)) + copy(pssMsg.To, localAddr) + var byteIdx = distance / 8 + pssMsg.To[byteIdx] ^= 1 << uint(7-(distance%8)) + log.Trace(fmt.Sprintf("addrmatch %v", bytes.Equal(pssMsg.To, localAddr))) + if ps.isSelfPossibleRecipient(pssMsg, true) != expects[i] { + t.Fatalf("expected distance %d to be %v", distance, expects[i]) + } + } + + // we move up to higher level and test the actual message handler + // for each distance check if we are possible recipient when prox variant is used is set + + // this handler will increment a counter for every message that gets passed to the handler + var receives int + rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + log.Trace("in allowraw handler") + receives++ + return nil + } + + // register it marking prox capability + topic := BytesToTopic([]byte{0x2a}) + hndlrProxDereg := ps.Register(&topic, &handler{ + f: rawHandlerFunc, + caps: &handlerCaps{ + raw: true, + prox: true, + }, + }) + + // test the distances + var prevReceive int + for i, distance := range remoteDistances { + remotePotAddr := pot.RandomAddressAt(localPotAddr, distance) + remoteAddr := remotePotAddr.Bytes() + + var data [32]byte + rand.Read(data[:]) + pssMsg := newPssMsg(&msgParams{raw: true}) + pssMsg.To = remoteAddr + pssMsg.Expire = uint32(time.Now().Unix() + 4200) + pssMsg.Payload = &whisper.Envelope{ + Topic: whisper.TopicType(topic), + Data: data[:], + } + + log.Trace("withprox addrs", "local", localAddr, "remote", remoteAddr) + ps.handlePssMsg(context.TODO(), pssMsg) + if (!expects[i] && prevReceive != receives) || (expects[i] && prevReceive == receives) { + t.Fatalf("expected distance %d recipient %v when prox is set for handler", distance, expects[i]) + } + prevReceive = receives + } + + // now add a non prox-capable handler and test + ps.Register(&topic, &handler{ + f: rawHandlerFunc, + caps: &handlerCaps{ + raw: true, + }, + }) + receives = 0 + prevReceive = 0 + for i, distance := range remoteDistances { + remotePotAddr := pot.RandomAddressAt(localPotAddr, distance) + remoteAddr := remotePotAddr.Bytes() + + var data [32]byte + rand.Read(data[:]) + pssMsg := newPssMsg(&msgParams{raw: true}) + pssMsg.To = remoteAddr + pssMsg.Expire = uint32(time.Now().Unix() + 4200) + pssMsg.Payload = &whisper.Envelope{ + Topic: whisper.TopicType(topic), + Data: data[:], + } + + log.Trace("withprox addrs", "local", localAddr, "remote", remoteAddr) + ps.handlePssMsg(context.TODO(), pssMsg) + if (!expects[i] && prevReceive != receives) || (expects[i] && prevReceive == receives) { + t.Fatalf("expected distance %d recipient %v when prox is set for handler", distance, expects[i]) + } + prevReceive = receives + } + + // now deregister the prox capable handler, now none of the messages will be handled + hndlrProxDereg() + receives = 0 + + for _, distance := range remoteDistances { + remotePotAddr := pot.RandomAddressAt(localPotAddr, distance) + remoteAddr := remotePotAddr.Bytes() + + pssMsg := newPssMsg(&msgParams{raw: true}) + pssMsg.To = remoteAddr + pssMsg.Expire = uint32(time.Now().Unix() + 4200) + pssMsg.Payload = &whisper.Envelope{ + Topic: whisper.TopicType(topic), + Data: []byte(remotePotAddr.String()), + } + + log.Trace("noprox addrs", "local", localAddr, "remote", remoteAddr) + ps.handlePssMsg(context.TODO(), pssMsg) + if receives != 0 { + t.Fatalf("expected distance %d to not be recipient when prox is not set for handler", distance) + } + + } +} + +// verify that message queueing happens when it should, and that expired and corrupt messages are dropped +func TestMessageProcessing(t *testing.T) { t.Skip("Disabled due to probable faulty logic for outbox expectations") // setup @@ -326,13 +650,12 @@ func TestHandlerConditions(t *testing.T) { ps := newTestPss(privkey, network.NewKademlia(addr, network.NewKadParams()), NewPssParams()) // message should pass - msg := &PssMsg{ - To: addr, - Expire: uint32(time.Now().Add(time.Second * 60).Unix()), - Payload: &whisper.Envelope{ - Topic: [4]byte{}, - Data: []byte{0x66, 0x6f, 0x6f}, - }, + msg := newPssMsg(&msgParams{}) + msg.To = addr + msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix()) + msg.Payload = &whisper.Envelope{ + Topic: [4]byte{}, + Data: []byte{0x66, 0x6f, 0x6f}, } if err := ps.handlePssMsg(context.TODO(), msg); err != nil { t.Fatal(err.Error()) @@ -498,6 +821,7 @@ func TestKeys(t *testing.T) { } } +// check that we can retrieve previously added public key entires per topic and peer func TestGetPublickeyEntries(t *testing.T) { privkey, err := crypto.GenerateKey() @@ -557,7 +881,7 @@ OUTER: } // forwarding should skip peers that do not have matching pss capabilities -func TestMismatch(t *testing.T) { +func TestPeerCapabilityMismatch(t *testing.T) { // create privkey for forwarder node privkey, err := crypto.GenerateKey() @@ -615,6 +939,76 @@ func TestMismatch(t *testing.T) { } +// verifies that message handlers for raw messages only are invoked when minimum one handler for the topic exists in which raw messages are explicitly allowed +func TestRawAllow(t *testing.T) { + + // set up pss like so many times before + privKey, err := crypto.GenerateKey() + if err != nil { + t.Fatal(err) + } + baseAddr := network.RandomAddr() + kad := network.NewKademlia((baseAddr).Over(), network.NewKadParams()) + ps := newTestPss(privKey, kad, nil) + topic := BytesToTopic([]byte{0x2a}) + + // create handler innards that increments every time a message hits it + var receives int + rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + log.Trace("in allowraw handler") + receives++ + return nil + } + + // wrap this handler function with a handler without raw capability and register it + hndlrNoRaw := &handler{ + f: rawHandlerFunc, + } + ps.Register(&topic, hndlrNoRaw) + + // test it with a raw message, should be poo-poo + pssMsg := newPssMsg(&msgParams{ + raw: true, + }) + pssMsg.To = baseAddr.OAddr + pssMsg.Expire = uint32(time.Now().Unix() + 4200) + pssMsg.Payload = &whisper.Envelope{ + Topic: whisper.TopicType(topic), + } + ps.handlePssMsg(context.TODO(), pssMsg) + if receives > 0 { + t.Fatalf("Expected handler not to be executed with raw cap off") + } + + // now wrap the same handler function with raw capabilities and register it + hndlrRaw := &handler{ + f: rawHandlerFunc, + caps: &handlerCaps{ + raw: true, + }, + } + deregRawHandler := ps.Register(&topic, hndlrRaw) + + // should work now + pssMsg.Payload.Data = []byte("Raw Deal") + ps.handlePssMsg(context.TODO(), pssMsg) + if receives == 0 { + t.Fatalf("Expected handler to be executed with raw cap on") + } + + // now deregister the raw capable handler + prevReceives := receives + deregRawHandler() + + // check that raw messages fail again + pssMsg.Payload.Data = []byte("Raw Trump") + ps.handlePssMsg(context.TODO(), pssMsg) + if receives != prevReceives { + t.Fatalf("Expected handler not to be executed when raw handler is retracted") + } +} + +// verifies that nodes can send and receive raw (verbatim) messages func TestSendRaw(t *testing.T) { t.Run("32", testSendRaw) t.Run("8", testSendRaw) @@ -658,19 +1052,19 @@ func testSendRaw(t *testing.T) { lmsgC := make(chan APIMsg) lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10) defer lcancel() - lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic) + lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, true, false) log.Trace("lsub", "id", lsub) defer lsub.Unsubscribe() rmsgC := make(chan APIMsg) rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10) defer rcancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, true, false) log.Trace("rsub", "id", rsub) defer rsub.Unsubscribe() // send and verify delivery lmsg := []byte("plugh") - err = clients[1].Call(nil, "pss_sendRaw", loaddrhex, topic, lmsg) + err = clients[1].Call(nil, "pss_sendRaw", loaddrhex, topic, hexutil.Encode(lmsg)) if err != nil { t.Fatal(err) } @@ -683,7 +1077,7 @@ func testSendRaw(t *testing.T) { t.Fatalf("test message (left) timed out: %v", cerr) } rmsg := []byte("xyzzy") - err = clients[0].Call(nil, "pss_sendRaw", roaddrhex, topic, rmsg) + err = clients[0].Call(nil, "pss_sendRaw", roaddrhex, topic, hexutil.Encode(rmsg)) if err != nil { t.Fatal(err) } @@ -757,13 +1151,13 @@ func testSendSym(t *testing.T) { lmsgC := make(chan APIMsg) lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10) defer lcancel() - lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic) + lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false, false) log.Trace("lsub", "id", lsub) defer lsub.Unsubscribe() rmsgC := make(chan APIMsg) rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10) defer rcancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false) log.Trace("rsub", "id", rsub) defer rsub.Unsubscribe() @@ -872,13 +1266,13 @@ func testSendAsym(t *testing.T) { lmsgC := make(chan APIMsg) lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10) defer lcancel() - lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic) + lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false, false) log.Trace("lsub", "id", lsub) defer lsub.Unsubscribe() rmsgC := make(chan APIMsg) rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10) defer rcancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false) log.Trace("rsub", "id", rsub) defer rsub.Unsubscribe() @@ -1037,7 +1431,7 @@ func testNetwork(t *testing.T) { msgC := make(chan APIMsg) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - sub, err := rpcclient.Subscribe(ctx, "pss", msgC, "receive", topic) + sub, err := rpcclient.Subscribe(ctx, "pss", msgC, "receive", topic, false, false) if err != nil { t.Fatal(err) } @@ -1209,7 +1603,7 @@ func TestDeduplication(t *testing.T) { rmsgC := make(chan APIMsg) rctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false) log.Trace("rsub", "id", rsub) defer rsub.Unsubscribe() @@ -1392,8 +1786,8 @@ func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) { if err != nil { b.Fatalf("could not generate whisper envelope: %v", err) } - ps.Register(&topic, func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { - return nil + ps.Register(&topic, &handler{ + f: noopHandlerFunc, }) pssmsgs = append(pssmsgs, &PssMsg{ To: to, @@ -1402,7 +1796,7 @@ func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - if err := ps.process(pssmsgs[len(pssmsgs)-(i%len(pssmsgs))-1]); err != nil { + if err := ps.process(pssmsgs[len(pssmsgs)-(i%len(pssmsgs))-1], false, false); err != nil { b.Fatalf("pss processing failed: %v", err) } } @@ -1476,15 +1870,15 @@ func benchmarkSymkeyBruteforceSameaddr(b *testing.B) { if err != nil { b.Fatalf("could not generate whisper envelope: %v", err) } - ps.Register(&topic, func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { - return nil + ps.Register(&topic, &handler{ + f: noopHandlerFunc, }) pssmsg := &PssMsg{ To: addr[len(addr)-1][:], Payload: env, } for i := 0; i < b.N; i++ { - if err := ps.process(pssmsg); err != nil { + if err := ps.process(pssmsg, false, false); err != nil { b.Fatalf("pss processing failed: %v", err) } } @@ -1581,7 +1975,12 @@ func newServices(allowRaw bool) adapters.Services { if useHandshake { SetHandshakeController(ps, NewHandshakeParams()) } - ps.Register(&PingTopic, pp.Handle) + ps.Register(&PingTopic, &handler{ + f: pp.Handle, + caps: &handlerCaps{ + raw: true, + }, + }) ps.addAPI(rpc.API{ Namespace: "psstest", Version: "0.3", diff --git a/swarm/pss/types.go b/swarm/pss/types.go index 56c2c51dc..ba963067c 100644 --- a/swarm/pss/types.go +++ b/swarm/pss/types.go @@ -159,9 +159,39 @@ func (msg *PssMsg) String() string { } // Signature for a message handler function for a PssMsg -// // Implementations of this type are passed to Pss.Register together with a topic, -type Handler func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error +type HandlerFunc func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error + +type handlerCaps struct { + raw bool + prox bool +} + +// Handler defines code to be executed upon reception of content. +type handler struct { + f HandlerFunc + caps *handlerCaps +} + +// NewHandler returns a new message handler +func NewHandler(f HandlerFunc) *handler { + return &handler{ + f: f, + caps: &handlerCaps{}, + } +} + +// WithRaw is a chainable method that allows raw messages to be handled. +func (h *handler) WithRaw() *handler { + h.caps.raw = true + return h +} + +// WithProxBin is a chainable method that allows sending messages with full addresses to neighbourhoods using the kademlia depth as reference +func (h *handler) WithProxBin() *handler { + h.caps.prox = true + return h +} // the stateStore handles saving and loading PSS peers and their corresponding keys // it is currently unimplemented diff --git a/swarm/shed/db.go b/swarm/shed/db.go new file mode 100644 index 000000000..e128b8cbc --- /dev/null +++ b/swarm/shed/db.go @@ -0,0 +1,130 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package shed provides a simple abstraction components to compose +// more complex operations on storage data organized in fields and indexes. +// +// Only type which holds logical information about swarm storage chunks data +// and metadata is IndexItem. This part is not generalized mostly for +// performance reasons. +package shed + +import ( + "github.com/ethereum/go-ethereum/metrics" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/opt" +) + +// The limit for LevelDB OpenFilesCacheCapacity. +const openFileLimit = 128 + +// DB provides abstractions over LevelDB in order to +// implement complex structures using fields and ordered indexes. +// It provides a schema functionality to store fields and indexes +// information about naming and types. +type DB struct { + ldb *leveldb.DB +} + +// NewDB constructs a new DB and validates the schema +// if it exists in database on the given path. +func NewDB(path string) (db *DB, err error) { + ldb, err := leveldb.OpenFile(path, &opt.Options{ + OpenFilesCacheCapacity: openFileLimit, + }) + if err != nil { + return nil, err + } + db = &DB{ + ldb: ldb, + } + + if _, err = db.getSchema(); err != nil { + if err == leveldb.ErrNotFound { + // save schema with initialized default fields + if err = db.putSchema(schema{ + Fields: make(map[string]fieldSpec), + Indexes: make(map[byte]indexSpec), + }); err != nil { + return nil, err + } + } else { + return nil, err + } + } + return db, nil +} + +// Put wraps LevelDB Put method to increment metrics counter. +func (db *DB) Put(key []byte, value []byte) (err error) { + err = db.ldb.Put(key, value, nil) + if err != nil { + metrics.GetOrRegisterCounter("DB.putFail", nil).Inc(1) + return err + } + metrics.GetOrRegisterCounter("DB.put", nil).Inc(1) + return nil +} + +// Get wraps LevelDB Get method to increment metrics counter. +func (db *DB) Get(key []byte) (value []byte, err error) { + value, err = db.ldb.Get(key, nil) + if err != nil { + if err == leveldb.ErrNotFound { + metrics.GetOrRegisterCounter("DB.getNotFound", nil).Inc(1) + } else { + metrics.GetOrRegisterCounter("DB.getFail", nil).Inc(1) + } + return nil, err + } + metrics.GetOrRegisterCounter("DB.get", nil).Inc(1) + return value, nil +} + +// Delete wraps LevelDB Delete method to increment metrics counter. +func (db *DB) Delete(key []byte) (err error) { + err = db.ldb.Delete(key, nil) + if err != nil { + metrics.GetOrRegisterCounter("DB.deleteFail", nil).Inc(1) + return err + } + metrics.GetOrRegisterCounter("DB.delete", nil).Inc(1) + return nil +} + +// NewIterator wraps LevelDB NewIterator method to increment metrics counter. +func (db *DB) NewIterator() iterator.Iterator { + metrics.GetOrRegisterCounter("DB.newiterator", nil).Inc(1) + + return db.ldb.NewIterator(nil, nil) +} + +// WriteBatch wraps LevelDB Write method to increment metrics counter. +func (db *DB) WriteBatch(batch *leveldb.Batch) (err error) { + err = db.ldb.Write(batch, nil) + if err != nil { + metrics.GetOrRegisterCounter("DB.writebatchFail", nil).Inc(1) + return err + } + metrics.GetOrRegisterCounter("DB.writebatch", nil).Inc(1) + return nil +} + +// Close closes LevelDB database. +func (db *DB) Close() (err error) { + return db.ldb.Close() +} diff --git a/swarm/shed/db_test.go b/swarm/shed/db_test.go new file mode 100644 index 000000000..45325beeb --- /dev/null +++ b/swarm/shed/db_test.go @@ -0,0 +1,110 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package shed + +import ( + "io/ioutil" + "os" + "testing" +) + +// TestNewDB constructs a new DB +// and validates if the schema is initialized properly. +func TestNewDB(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + s, err := db.getSchema() + if err != nil { + t.Fatal(err) + } + if s.Fields == nil { + t.Error("schema fields are empty") + } + if len(s.Fields) != 0 { + t.Errorf("got schema fields length %v, want %v", len(s.Fields), 0) + } + if s.Indexes == nil { + t.Error("schema indexes are empty") + } + if len(s.Indexes) != 0 { + t.Errorf("got schema indexes length %v, want %v", len(s.Indexes), 0) + } +} + +// TestDB_persistence creates one DB, saves a field and closes that DB. +// Then, it constructs another DB and trues to retrieve the saved value. +func TestDB_persistence(t *testing.T) { + dir, err := ioutil.TempDir("", "shed-test-persistence") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + db, err := NewDB(dir) + if err != nil { + t.Fatal(err) + } + stringField, err := db.NewStringField("preserve-me") + if err != nil { + t.Fatal(err) + } + want := "persistent value" + err = stringField.Put(want) + if err != nil { + t.Fatal(err) + } + err = db.Close() + if err != nil { + t.Fatal(err) + } + + db2, err := NewDB(dir) + if err != nil { + t.Fatal(err) + } + stringField2, err := db2.NewStringField("preserve-me") + if err != nil { + t.Fatal(err) + } + got, err := stringField2.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got string %q, want %q", got, want) + } +} + +// newTestDB is a helper function that constructs a +// temporary database and returns a cleanup function that must +// be called to remove the data. +func newTestDB(t *testing.T) (db *DB, cleanupFunc func()) { + t.Helper() + + dir, err := ioutil.TempDir("", "shed-test") + if err != nil { + t.Fatal(err) + } + cleanupFunc = func() { os.RemoveAll(dir) } + db, err = NewDB(dir) + if err != nil { + cleanupFunc() + t.Fatal(err) + } + return db, cleanupFunc +} diff --git a/swarm/shed/example_store_test.go b/swarm/shed/example_store_test.go new file mode 100644 index 000000000..2ed0be141 --- /dev/null +++ b/swarm/shed/example_store_test.go @@ -0,0 +1,332 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package shed_test + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "io/ioutil" + "log" + "os" + "time" + + "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/syndtr/goleveldb/leveldb" +) + +// Store holds fields and indexes (including their encoding functions) +// and defines operations on them by composing data from them. +// It implements storage.ChunkStore interface. +// It is just an example without any support for parallel operations +// or real world implementation. +type Store struct { + db *shed.DB + + // fields and indexes + schemaName shed.StringField + sizeCounter shed.Uint64Field + accessCounter shed.Uint64Field + retrievalIndex shed.Index + accessIndex shed.Index + gcIndex shed.Index +} + +// New returns new Store. All fields and indexes are initialized +// and possible conflicts with schema from existing database is checked +// automatically. +func New(path string) (s *Store, err error) { + db, err := shed.NewDB(path) + if err != nil { + return nil, err + } + s = &Store{ + db: db, + } + // Identify current storage schema by arbitrary name. + s.schemaName, err = db.NewStringField("schema-name") + if err != nil { + return nil, err + } + // Global ever incrementing index of chunk accesses. + s.accessCounter, err = db.NewUint64Field("access-counter") + if err != nil { + return nil, err + } + // Index storing actual chunk address, data and store timestamp. + s.retrievalIndex, err = db.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{ + EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e shed.IndexItem, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp)) + value = append(b, fields.Data...) + return value, nil + }, + DecodeValue: func(value []byte) (e shed.IndexItem, err error) { + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8])) + e.Data = value[8:] + return e, nil + }, + }) + if err != nil { + return nil, err + } + // Index storing access timestamp for a particular address. + // It is needed in order to update gc index keys for iteration order. + s.accessIndex, err = db.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{ + EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e shed.IndexItem, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp)) + return b, nil + }, + DecodeValue: func(value []byte) (e shed.IndexItem, err error) { + e.AccessTimestamp = int64(binary.BigEndian.Uint64(value)) + return e, nil + }, + }) + if err != nil { + return nil, err + } + // Index with keys ordered by access timestamp for garbage collection prioritization. + s.gcIndex, err = db.NewIndex("AccessTimestamp|StoredTimestamp|Address->nil", shed.IndexFuncs{ + EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { + b := make([]byte, 16, 16+len(fields.Address)) + binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp)) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) + key = append(b, fields.Address...) + return key, nil + }, + DecodeKey: func(key []byte) (e shed.IndexItem, err error) { + e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8])) + e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16])) + e.Address = key[16:] + return e, nil + }, + EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { + return nil, nil + }, + DecodeValue: func(value []byte) (e shed.IndexItem, err error) { + return e, nil + }, + }) + if err != nil { + return nil, err + } + return s, nil +} + +// Put stores the chunk and sets it store timestamp. +func (s *Store) Put(_ context.Context, ch storage.Chunk) (err error) { + return s.retrievalIndex.Put(shed.IndexItem{ + Address: ch.Address(), + Data: ch.Data(), + StoreTimestamp: time.Now().UTC().UnixNano(), + }) +} + +// Get retrieves a chunk with the provided address. +// It updates access and gc indexes by removing the previous +// items from them and adding new items as keys of index entries +// are changed. +func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, err error) { + batch := new(leveldb.Batch) + + // Get the chunk data and storage timestamp. + item, err := s.retrievalIndex.Get(shed.IndexItem{ + Address: addr, + }) + if err != nil { + if err == leveldb.ErrNotFound { + return nil, storage.ErrChunkNotFound + } + return nil, err + } + + // Get the chunk access timestamp. + accessItem, err := s.accessIndex.Get(shed.IndexItem{ + Address: addr, + }) + switch err { + case nil: + // Remove gc index entry if access timestamp is found. + err = s.gcIndex.DeleteInBatch(batch, shed.IndexItem{ + Address: item.Address, + StoreTimestamp: accessItem.AccessTimestamp, + AccessTimestamp: item.StoreTimestamp, + }) + if err != nil { + return nil, err + } + case leveldb.ErrNotFound: + // Access timestamp is not found. Do not do anything. + // This is the firs get request. + default: + return nil, err + } + + // Specify new access timestamp + accessTimestamp := time.Now().UTC().UnixNano() + + // Put new access timestamp in access index. + err = s.accessIndex.PutInBatch(batch, shed.IndexItem{ + Address: addr, + AccessTimestamp: accessTimestamp, + }) + if err != nil { + return nil, err + } + + // Put new access timestamp in gc index. + err = s.gcIndex.PutInBatch(batch, shed.IndexItem{ + Address: item.Address, + AccessTimestamp: accessTimestamp, + StoreTimestamp: item.StoreTimestamp, + }) + if err != nil { + return nil, err + } + + // Increment access counter. + // Currently this information is not used anywhere. + _, err = s.accessCounter.IncInBatch(batch) + if err != nil { + return nil, err + } + + // Write the batch. + err = s.db.WriteBatch(batch) + if err != nil { + return nil, err + } + + // Return the chunk. + return storage.NewChunk(item.Address, item.Data), nil +} + +// CollectGarbage is an example of index iteration. +// It provides no reliable garbage collection functionality. +func (s *Store) CollectGarbage() (err error) { + const maxTrashSize = 100 + maxRounds := 10 // arbitrary number, needs to be calculated + + // Run a few gc rounds. + for roundCount := 0; roundCount < maxRounds; roundCount++ { + var garbageCount int + // New batch for a new cg round. + trash := new(leveldb.Batch) + // Iterate through all index items and break when needed. + err = s.gcIndex.IterateAll(func(item shed.IndexItem) (stop bool, err error) { + // Remove the chunk. + err = s.retrievalIndex.DeleteInBatch(trash, item) + if err != nil { + return false, err + } + // Remove the element in gc index. + err = s.gcIndex.DeleteInBatch(trash, item) + if err != nil { + return false, err + } + // Remove the relation in access index. + err = s.accessIndex.DeleteInBatch(trash, item) + if err != nil { + return false, err + } + garbageCount++ + if garbageCount >= maxTrashSize { + return true, nil + } + return false, nil + }) + if err != nil { + return err + } + if garbageCount == 0 { + return nil + } + err = s.db.WriteBatch(trash) + if err != nil { + return err + } + } + return nil +} + +// GetSchema is an example of retrieveing the most simple +// string from a database field. +func (s *Store) GetSchema() (name string, err error) { + name, err = s.schemaName.Get() + if err == leveldb.ErrNotFound { + return "", nil + } + return name, err +} + +// GetSchema is an example of storing the most simple +// string in a database field. +func (s *Store) PutSchema(name string) (err error) { + return s.schemaName.Put(name) +} + +// Close closes the underlying database. +func (s *Store) Close() error { + return s.db.Close() +} + +// Example_store constructs a simple storage implementation using shed package. +func Example_store() { + dir, err := ioutil.TempDir("", "ephemeral") + if err != nil { + log.Fatal(err) + } + defer os.RemoveAll(dir) + + s, err := New(dir) + if err != nil { + log.Fatal(err) + } + defer s.Close() + + ch := storage.GenerateRandomChunk(1024) + err = s.Put(context.Background(), ch) + if err != nil { + log.Fatal(err) + } + + got, err := s.Get(context.Background(), ch.Address()) + if err != nil { + log.Fatal(err) + } + + fmt.Println(bytes.Equal(got.Data(), ch.Data())) + + //Output: true +} diff --git a/swarm/shed/field_string.go b/swarm/shed/field_string.go new file mode 100644 index 000000000..a7e8f0c75 --- /dev/null +++ b/swarm/shed/field_string.go @@ -0,0 +1,66 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package shed + +import ( + "github.com/syndtr/goleveldb/leveldb" +) + +// StringField is the most simple field implementation +// that stores an arbitrary string under a specific LevelDB key. +type StringField struct { + db *DB + key []byte +} + +// NewStringField retruns a new Instance of StringField. +// It validates its name and type against the database schema. +func (db *DB) NewStringField(name string) (f StringField, err error) { + key, err := db.schemaFieldKey(name, "string") + if err != nil { + return f, err + } + return StringField{ + db: db, + key: key, + }, nil +} + +// Get returns a string value from database. +// If the value is not found, an empty string is returned +// an no error. +func (f StringField) Get() (val string, err error) { + b, err := f.db.Get(f.key) + if err != nil { + if err == leveldb.ErrNotFound { + return "", nil + } + return "", err + } + return string(b), nil +} + +// Put stores a string in the database. +func (f StringField) Put(val string) (err error) { + return f.db.Put(f.key, []byte(val)) +} + +// PutInBatch stores a string in a batch that can be +// saved later in database. +func (f StringField) PutInBatch(batch *leveldb.Batch, val string) { + batch.Put(f.key, []byte(val)) +} diff --git a/swarm/shed/field_string_test.go b/swarm/shed/field_string_test.go new file mode 100644 index 000000000..4215075bc --- /dev/null +++ b/swarm/shed/field_string_test.go @@ -0,0 +1,110 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package shed + +import ( + "testing" + + "github.com/syndtr/goleveldb/leveldb" +) + +// TestStringField validates put and get operations +// of the StringField. +func TestStringField(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + simpleString, err := db.NewStringField("simple-string") + if err != nil { + t.Fatal(err) + } + + t.Run("get empty", func(t *testing.T) { + got, err := simpleString.Get() + if err != nil { + t.Fatal(err) + } + want := "" + if got != want { + t.Errorf("got string %q, want %q", got, want) + } + }) + + t.Run("put", func(t *testing.T) { + want := "simple string value" + err = simpleString.Put(want) + if err != nil { + t.Fatal(err) + } + got, err := simpleString.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got string %q, want %q", got, want) + } + + t.Run("overwrite", func(t *testing.T) { + want := "overwritten string value" + err = simpleString.Put(want) + if err != nil { + t.Fatal(err) + } + got, err := simpleString.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got string %q, want %q", got, want) + } + }) + }) + + t.Run("put in batch", func(t *testing.T) { + batch := new(leveldb.Batch) + want := "simple string batch value" + simpleString.PutInBatch(batch, want) + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + got, err := simpleString.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got string %q, want %q", got, want) + } + + t.Run("overwrite", func(t *testing.T) { + batch := new(leveldb.Batch) + want := "overwritten string batch value" + simpleString.PutInBatch(batch, want) + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + got, err := simpleString.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got string %q, want %q", got, want) + } + }) + }) +} diff --git a/swarm/shed/field_struct.go b/swarm/shed/field_struct.go new file mode 100644 index 000000000..90daee7fc --- /dev/null +++ b/swarm/shed/field_struct.go @@ -0,0 +1,71 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package shed + +import ( + "github.com/ethereum/go-ethereum/rlp" + "github.com/syndtr/goleveldb/leveldb" +) + +// StructField is a helper to store complex structure by +// encoding it in RLP format. +type StructField struct { + db *DB + key []byte +} + +// NewStructField returns a new StructField. +// It validates its name and type against the database schema. +func (db *DB) NewStructField(name string) (f StructField, err error) { + key, err := db.schemaFieldKey(name, "struct-rlp") + if err != nil { + return f, err + } + return StructField{ + db: db, + key: key, + }, nil +} + +// Get unmarshals data from the database to a provided val. +// If the data is not found leveldb.ErrNotFound is returned. +func (f StructField) Get(val interface{}) (err error) { + b, err := f.db.Get(f.key) + if err != nil { + return err + } + return rlp.DecodeBytes(b, val) +} + +// Put marshals provided val and saves it to the database. +func (f StructField) Put(val interface{}) (err error) { + b, err := rlp.EncodeToBytes(val) + if err != nil { + return err + } + return f.db.Put(f.key, b) +} + +// PutInBatch marshals provided val and puts it into the batch. +func (f StructField) PutInBatch(batch *leveldb.Batch, val interface{}) (err error) { + b, err := rlp.EncodeToBytes(val) + if err != nil { + return err + } + batch.Put(f.key, b) + return nil +} diff --git a/swarm/shed/field_struct_test.go b/swarm/shed/field_struct_test.go new file mode 100644 index 000000000..cc0be0186 --- /dev/null +++ b/swarm/shed/field_struct_test.go @@ -0,0 +1,127 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package shed + +import ( + "testing" + + "github.com/syndtr/goleveldb/leveldb" +) + +// TestStructField validates put and get operations +// of the StructField. +func TestStructField(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + complexField, err := db.NewStructField("complex-field") + if err != nil { + t.Fatal(err) + } + + type complexStructure struct { + A string + } + + t.Run("get empty", func(t *testing.T) { + var s complexStructure + err := complexField.Get(&s) + if err != leveldb.ErrNotFound { + t.Fatalf("got error %v, want %v", err, leveldb.ErrNotFound) + } + want := "" + if s.A != want { + t.Errorf("got string %q, want %q", s.A, want) + } + }) + + t.Run("put", func(t *testing.T) { + want := complexStructure{ + A: "simple string value", + } + err = complexField.Put(want) + if err != nil { + t.Fatal(err) + } + var got complexStructure + err = complexField.Get(&got) + if err != nil { + t.Fatal(err) + } + if got.A != want.A { + t.Errorf("got string %q, want %q", got.A, want.A) + } + + t.Run("overwrite", func(t *testing.T) { + want := complexStructure{ + A: "overwritten string value", + } + err = complexField.Put(want) + if err != nil { + t.Fatal(err) + } + var got complexStructure + err = complexField.Get(&got) + if err != nil { + t.Fatal(err) + } + if got.A != want.A { + t.Errorf("got string %q, want %q", got.A, want.A) + } + }) + }) + + t.Run("put in batch", func(t *testing.T) { + batch := new(leveldb.Batch) + want := complexStructure{ + A: "simple string batch value", + } + complexField.PutInBatch(batch, want) + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + var got complexStructure + err := complexField.Get(&got) + if err != nil { + t.Fatal(err) + } + if got.A != want.A { + t.Errorf("got string %q, want %q", got, want) + } + + t.Run("overwrite", func(t *testing.T) { + batch := new(leveldb.Batch) + want := complexStructure{ + A: "overwritten string batch value", + } + complexField.PutInBatch(batch, want) + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + var got complexStructure + err := complexField.Get(&got) + if err != nil { + t.Fatal(err) + } + if got.A != want.A { + t.Errorf("got string %q, want %q", got, want) + } + }) + }) +} diff --git a/swarm/shed/field_uint64.go b/swarm/shed/field_uint64.go new file mode 100644 index 000000000..80e0069ae --- /dev/null +++ b/swarm/shed/field_uint64.go @@ -0,0 +1,108 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package shed + +import ( + "encoding/binary" + + "github.com/syndtr/goleveldb/leveldb" +) + +// Uint64Field provides a way to have a simple counter in the database. +// It transparently encodes uint64 type value to bytes. +type Uint64Field struct { + db *DB + key []byte +} + +// NewUint64Field returns a new Uint64Field. +// It validates its name and type against the database schema. +func (db *DB) NewUint64Field(name string) (f Uint64Field, err error) { + key, err := db.schemaFieldKey(name, "uint64") + if err != nil { + return f, err + } + return Uint64Field{ + db: db, + key: key, + }, nil +} + +// Get retrieves a uint64 value from the database. +// If the value is not found in the database a 0 value +// is returned and no error. +func (f Uint64Field) Get() (val uint64, err error) { + b, err := f.db.Get(f.key) + if err != nil { + if err == leveldb.ErrNotFound { + return 0, nil + } + return 0, err + } + return binary.BigEndian.Uint64(b), nil +} + +// Put encodes uin64 value and stores it in the database. +func (f Uint64Field) Put(val uint64) (err error) { + return f.db.Put(f.key, encodeUint64(val)) +} + +// PutInBatch stores a uint64 value in a batch +// that can be saved later in the database. +func (f Uint64Field) PutInBatch(batch *leveldb.Batch, val uint64) { + batch.Put(f.key, encodeUint64(val)) +} + +// Inc increments a uint64 value in the database. +// This operation is not goroutine save. +func (f Uint64Field) Inc() (val uint64, err error) { + val, err = f.Get() + if err != nil { + if err == leveldb.ErrNotFound { + val = 0 + } else { + return 0, err + } + } + val++ + return val, f.Put(val) +} + +// IncInBatch increments a uint64 value in the batch +// by retreiving a value from the database, not the same batch. +// This operation is not goroutine save. +func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error) { + val, err = f.Get() + if err != nil { + if err == leveldb.ErrNotFound { + val = 0 + } else { + return 0, err + } + } + val++ + f.PutInBatch(batch, val) + return val, nil +} + +// encode transforms uint64 to 8 byte long +// slice in big endian encoding. +func encodeUint64(val uint64) (b []byte) { + b = make([]byte, 8) + binary.BigEndian.PutUint64(b, val) + return b +} diff --git a/swarm/shed/field_uint64_test.go b/swarm/shed/field_uint64_test.go new file mode 100644 index 000000000..69ade71ba --- /dev/null +++ b/swarm/shed/field_uint64_test.go @@ -0,0 +1,194 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package shed + +import ( + "testing" + + "github.com/syndtr/goleveldb/leveldb" +) + +// TestUint64Field validates put and get operations +// of the Uint64Field. +func TestUint64Field(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + counter, err := db.NewUint64Field("counter") + if err != nil { + t.Fatal(err) + } + + t.Run("get empty", func(t *testing.T) { + got, err := counter.Get() + if err != nil { + t.Fatal(err) + } + var want uint64 + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + }) + + t.Run("put", func(t *testing.T) { + var want uint64 = 42 + err = counter.Put(want) + if err != nil { + t.Fatal(err) + } + got, err := counter.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + + t.Run("overwrite", func(t *testing.T) { + var want uint64 = 84 + err = counter.Put(want) + if err != nil { + t.Fatal(err) + } + got, err := counter.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + }) + }) + + t.Run("put in batch", func(t *testing.T) { + batch := new(leveldb.Batch) + var want uint64 = 42 + counter.PutInBatch(batch, want) + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + got, err := counter.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + + t.Run("overwrite", func(t *testing.T) { + batch := new(leveldb.Batch) + var want uint64 = 84 + counter.PutInBatch(batch, want) + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + got, err := counter.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + }) + }) +} + +// TestUint64Field_Inc validates Inc operation +// of the Uint64Field. +func TestUint64Field_Inc(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + counter, err := db.NewUint64Field("counter") + if err != nil { + t.Fatal(err) + } + + var want uint64 = 1 + got, err := counter.Inc() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + + want = 2 + got, err = counter.Inc() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } +} + +// TestUint64Field_IncInBatch validates IncInBatch operation +// of the Uint64Field. +func TestUint64Field_IncInBatch(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + counter, err := db.NewUint64Field("counter") + if err != nil { + t.Fatal(err) + } + + batch := new(leveldb.Batch) + var want uint64 = 1 + got, err := counter.IncInBatch(batch) + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + got, err = counter.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + + batch2 := new(leveldb.Batch) + want = 2 + got, err = counter.IncInBatch(batch2) + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + err = db.WriteBatch(batch2) + if err != nil { + t.Fatal(err) + } + got, err = counter.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } +} diff --git a/swarm/shed/index.go b/swarm/shed/index.go new file mode 100644 index 000000000..ba803e3c2 --- /dev/null +++ b/swarm/shed/index.go @@ -0,0 +1,264 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package shed + +import ( + "github.com/syndtr/goleveldb/leveldb" +) + +// IndexItem holds fields relevant to Swarm Chunk data and metadata. +// All information required for swarm storage and operations +// on that storage must be defined here. +// This structure is logically connected to swarm storage, +// the only part of this package that is not generalized, +// mostly for performance reasons. +// +// IndexItem is a type that is used for retrieving, storing and encoding +// chunk data and metadata. It is passed as an argument to Index encoding +// functions, get function and put function. +// But it is also returned with additional data from get function call +// and as the argument in iterator function definition. +type IndexItem struct { + Address []byte + Data []byte + AccessTimestamp int64 + StoreTimestamp int64 + // UseMockStore is a pointer to identify + // an unset state of the field in Join function. + UseMockStore *bool +} + +// Merge is a helper method to construct a new +// IndexItem by filling up fields with default values +// of a particular IndexItem with values from another one. +func (i IndexItem) Merge(i2 IndexItem) (new IndexItem) { + if i.Address == nil { + i.Address = i2.Address + } + if i.Data == nil { + i.Data = i2.Data + } + if i.AccessTimestamp == 0 { + i.AccessTimestamp = i2.AccessTimestamp + } + if i.StoreTimestamp == 0 { + i.StoreTimestamp = i2.StoreTimestamp + } + if i.UseMockStore == nil { + i.UseMockStore = i2.UseMockStore + } + return i +} + +// Index represents a set of LevelDB key value pairs that have common +// prefix. It holds functions for encoding and decoding keys and values +// to provide transparent actions on saved data which inclide: +// - getting a particular IndexItem +// - saving a particular IndexItem +// - iterating over a sorted LevelDB keys +// It implements IndexIteratorInterface interface. +type Index struct { + db *DB + prefix []byte + encodeKeyFunc func(fields IndexItem) (key []byte, err error) + decodeKeyFunc func(key []byte) (e IndexItem, err error) + encodeValueFunc func(fields IndexItem) (value []byte, err error) + decodeValueFunc func(value []byte) (e IndexItem, err error) +} + +// IndexFuncs structure defines functions for encoding and decoding +// LevelDB keys and values for a specific index. +type IndexFuncs struct { + EncodeKey func(fields IndexItem) (key []byte, err error) + DecodeKey func(key []byte) (e IndexItem, err error) + EncodeValue func(fields IndexItem) (value []byte, err error) + DecodeValue func(value []byte) (e IndexItem, err error) +} + +// NewIndex returns a new Index instance with defined name and +// encoding functions. The name must be unique and will be validated +// on database schema for a key prefix byte. +func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error) { + id, err := db.schemaIndexPrefix(name) + if err != nil { + return f, err + } + prefix := []byte{id} + return Index{ + db: db, + prefix: prefix, + // This function adjusts Index LevelDB key + // by appending the provided index id byte. + // This is needed to avoid collisions between keys of different + // indexes as all index ids are unique. + encodeKeyFunc: func(e IndexItem) (key []byte, err error) { + key, err = funcs.EncodeKey(e) + if err != nil { + return nil, err + } + return append(append(make([]byte, 0, len(key)+1), prefix...), key...), nil + }, + // This function reverses the encodeKeyFunc constructed key + // to transparently work with index keys without their index ids. + // It assumes that index keys are prefixed with only one byte. + decodeKeyFunc: func(key []byte) (e IndexItem, err error) { + return funcs.DecodeKey(key[1:]) + }, + encodeValueFunc: funcs.EncodeValue, + decodeValueFunc: funcs.DecodeValue, + }, nil +} + +// Get accepts key fields represented as IndexItem to retrieve a +// value from the index and return maximum available information +// from the index represented as another IndexItem. +func (f Index) Get(keyFields IndexItem) (out IndexItem, err error) { + key, err := f.encodeKeyFunc(keyFields) + if err != nil { + return out, err + } + value, err := f.db.Get(key) + if err != nil { + return out, err + } + out, err = f.decodeValueFunc(value) + if err != nil { + return out, err + } + return out.Merge(keyFields), nil +} + +// Put accepts IndexItem to encode information from it +// and save it to the database. +func (f Index) Put(i IndexItem) (err error) { + key, err := f.encodeKeyFunc(i) + if err != nil { + return err + } + value, err := f.encodeValueFunc(i) + if err != nil { + return err + } + return f.db.Put(key, value) +} + +// PutInBatch is the same as Put method, but it just +// saves the key/value pair to the batch instead +// directly to the database. +func (f Index) PutInBatch(batch *leveldb.Batch, i IndexItem) (err error) { + key, err := f.encodeKeyFunc(i) + if err != nil { + return err + } + value, err := f.encodeValueFunc(i) + if err != nil { + return err + } + batch.Put(key, value) + return nil +} + +// Delete accepts IndexItem to remove a key/value pair +// from the database based on its fields. +func (f Index) Delete(keyFields IndexItem) (err error) { + key, err := f.encodeKeyFunc(keyFields) + if err != nil { + return err + } + return f.db.Delete(key) +} + +// DeleteInBatch is the same as Delete just the operation +// is performed on the batch instead on the database. +func (f Index) DeleteInBatch(batch *leveldb.Batch, keyFields IndexItem) (err error) { + key, err := f.encodeKeyFunc(keyFields) + if err != nil { + return err + } + batch.Delete(key) + return nil +} + +// IndexIterFunc is a callback on every IndexItem that is decoded +// by iterating on an Index keys. +// By returning a true for stop variable, iteration will +// stop, and by returning the error, that error will be +// propagated to the called iterator method on Index. +type IndexIterFunc func(item IndexItem) (stop bool, err error) + +// IterateAll iterates over all keys of the Index. +func (f Index) IterateAll(fn IndexIterFunc) (err error) { + it := f.db.NewIterator() + defer it.Release() + + for ok := it.Seek(f.prefix); ok; ok = it.Next() { + key := it.Key() + if key[0] != f.prefix[0] { + break + } + keyIndexItem, err := f.decodeKeyFunc(key) + if err != nil { + return err + } + valueIndexItem, err := f.decodeValueFunc(it.Value()) + if err != nil { + return err + } + stop, err := fn(keyIndexItem.Merge(valueIndexItem)) + if err != nil { + return err + } + if stop { + break + } + } + return it.Error() +} + +// IterateFrom iterates over Index keys starting from the key +// encoded from the provided IndexItem. +func (f Index) IterateFrom(start IndexItem, fn IndexIterFunc) (err error) { + startKey, err := f.encodeKeyFunc(start) + if err != nil { + return err + } + it := f.db.NewIterator() + defer it.Release() + + for ok := it.Seek(startKey); ok; ok = it.Next() { + key := it.Key() + if key[0] != f.prefix[0] { + break + } + keyIndexItem, err := f.decodeKeyFunc(key) + if err != nil { + return err + } + valueIndexItem, err := f.decodeValueFunc(it.Value()) + if err != nil { + return err + } + stop, err := fn(keyIndexItem.Merge(valueIndexItem)) + if err != nil { + return err + } + if stop { + break + } + } + return it.Error() +} diff --git a/swarm/shed/index_test.go b/swarm/shed/index_test.go new file mode 100644 index 000000000..ba82216df --- /dev/null +++ b/swarm/shed/index_test.go @@ -0,0 +1,426 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package shed + +import ( + "bytes" + "encoding/binary" + "fmt" + "sort" + "testing" + "time" + + "github.com/syndtr/goleveldb/leveldb" +) + +// Index functions for the index that is used in tests in this file. +var retrievalIndexFuncs = IndexFuncs{ + EncodeKey: func(fields IndexItem) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e IndexItem, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields IndexItem) (value []byte, err error) { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp)) + value = append(b, fields.Data...) + return value, nil + }, + DecodeValue: func(value []byte) (e IndexItem, err error) { + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8])) + e.Data = value[8:] + return e, nil + }, +} + +// TestIndex validates put, get and delete functions of the Index implementation. +func TestIndex(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + index, err := db.NewIndex("retrieval", retrievalIndexFuncs) + if err != nil { + t.Fatal(err) + } + + t.Run("put", func(t *testing.T) { + want := IndexItem{ + Address: []byte("put-hash"), + Data: []byte("DATA"), + StoreTimestamp: time.Now().UTC().UnixNano(), + } + + err := index.Put(want) + if err != nil { + t.Fatal(err) + } + got, err := index.Get(IndexItem{ + Address: want.Address, + }) + if err != nil { + t.Fatal(err) + } + checkIndexItem(t, got, want) + + t.Run("overwrite", func(t *testing.T) { + want := IndexItem{ + Address: []byte("put-hash"), + Data: []byte("New DATA"), + StoreTimestamp: time.Now().UTC().UnixNano(), + } + + err = index.Put(want) + if err != nil { + t.Fatal(err) + } + got, err := index.Get(IndexItem{ + Address: want.Address, + }) + if err != nil { + t.Fatal(err) + } + checkIndexItem(t, got, want) + }) + }) + + t.Run("put in batch", func(t *testing.T) { + want := IndexItem{ + Address: []byte("put-in-batch-hash"), + Data: []byte("DATA"), + StoreTimestamp: time.Now().UTC().UnixNano(), + } + + batch := new(leveldb.Batch) + index.PutInBatch(batch, want) + err := db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + got, err := index.Get(IndexItem{ + Address: want.Address, + }) + if err != nil { + t.Fatal(err) + } + checkIndexItem(t, got, want) + + t.Run("overwrite", func(t *testing.T) { + want := IndexItem{ + Address: []byte("put-in-batch-hash"), + Data: []byte("New DATA"), + StoreTimestamp: time.Now().UTC().UnixNano(), + } + + batch := new(leveldb.Batch) + index.PutInBatch(batch, want) + db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + got, err := index.Get(IndexItem{ + Address: want.Address, + }) + if err != nil { + t.Fatal(err) + } + checkIndexItem(t, got, want) + }) + }) + + t.Run("put in batch twice", func(t *testing.T) { + // ensure that the last item of items with the same db keys + // is actually saved + batch := new(leveldb.Batch) + address := []byte("put-in-batch-twice-hash") + + // put the first item + index.PutInBatch(batch, IndexItem{ + Address: address, + Data: []byte("DATA"), + StoreTimestamp: time.Now().UTC().UnixNano(), + }) + + want := IndexItem{ + Address: address, + Data: []byte("New DATA"), + StoreTimestamp: time.Now().UTC().UnixNano(), + } + // then put the item that will produce the same key + // but different value in the database + index.PutInBatch(batch, want) + db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + got, err := index.Get(IndexItem{ + Address: address, + }) + if err != nil { + t.Fatal(err) + } + checkIndexItem(t, got, want) + }) + + t.Run("delete", func(t *testing.T) { + want := IndexItem{ + Address: []byte("delete-hash"), + Data: []byte("DATA"), + StoreTimestamp: time.Now().UTC().UnixNano(), + } + + err := index.Put(want) + if err != nil { + t.Fatal(err) + } + got, err := index.Get(IndexItem{ + Address: want.Address, + }) + if err != nil { + t.Fatal(err) + } + checkIndexItem(t, got, want) + + err = index.Delete(IndexItem{ + Address: want.Address, + }) + if err != nil { + t.Fatal(err) + } + + wantErr := leveldb.ErrNotFound + got, err = index.Get(IndexItem{ + Address: want.Address, + }) + if err != wantErr { + t.Fatalf("got error %v, want %v", err, wantErr) + } + }) + + t.Run("delete in batch", func(t *testing.T) { + want := IndexItem{ + Address: []byte("delete-in-batch-hash"), + Data: []byte("DATA"), + StoreTimestamp: time.Now().UTC().UnixNano(), + } + + err := index.Put(want) + if err != nil { + t.Fatal(err) + } + got, err := index.Get(IndexItem{ + Address: want.Address, + }) + if err != nil { + t.Fatal(err) + } + checkIndexItem(t, got, want) + + batch := new(leveldb.Batch) + index.DeleteInBatch(batch, IndexItem{ + Address: want.Address, + }) + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + + wantErr := leveldb.ErrNotFound + got, err = index.Get(IndexItem{ + Address: want.Address, + }) + if err != wantErr { + t.Fatalf("got error %v, want %v", err, wantErr) + } + }) +} + +// TestIndex_iterate validates index iterator functions for correctness. +func TestIndex_iterate(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + index, err := db.NewIndex("retrieval", retrievalIndexFuncs) + if err != nil { + t.Fatal(err) + } + + items := []IndexItem{ + { + Address: []byte("iterate-hash-01"), + Data: []byte("data80"), + }, + { + Address: []byte("iterate-hash-03"), + Data: []byte("data22"), + }, + { + Address: []byte("iterate-hash-05"), + Data: []byte("data41"), + }, + { + Address: []byte("iterate-hash-02"), + Data: []byte("data84"), + }, + { + Address: []byte("iterate-hash-06"), + Data: []byte("data1"), + }, + } + batch := new(leveldb.Batch) + for _, i := range items { + index.PutInBatch(batch, i) + } + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + item04 := IndexItem{ + Address: []byte("iterate-hash-04"), + Data: []byte("data0"), + } + err = index.Put(item04) + if err != nil { + t.Fatal(err) + } + items = append(items, item04) + + sort.SliceStable(items, func(i, j int) bool { + return bytes.Compare(items[i].Address, items[j].Address) < 0 + }) + + t.Run("all", func(t *testing.T) { + var i int + err := index.IterateAll(func(item IndexItem) (stop bool, err error) { + if i > len(items)-1 { + return true, fmt.Errorf("got unexpected index item: %#v", item) + } + want := items[i] + checkIndexItem(t, item, want) + i++ + return false, nil + }) + if err != nil { + t.Fatal(err) + } + }) + + t.Run("from", func(t *testing.T) { + startIndex := 2 + i := startIndex + err := index.IterateFrom(items[startIndex], func(item IndexItem) (stop bool, err error) { + if i > len(items)-1 { + return true, fmt.Errorf("got unexpected index item: %#v", item) + } + want := items[i] + checkIndexItem(t, item, want) + i++ + return false, nil + }) + if err != nil { + t.Fatal(err) + } + }) + + t.Run("stop", func(t *testing.T) { + var i int + stopIndex := 3 + var count int + err := index.IterateAll(func(item IndexItem) (stop bool, err error) { + if i > len(items)-1 { + return true, fmt.Errorf("got unexpected index item: %#v", item) + } + want := items[i] + checkIndexItem(t, item, want) + count++ + if i == stopIndex { + return true, nil + } + i++ + return false, nil + }) + if err != nil { + t.Fatal(err) + } + wantItemsCount := stopIndex + 1 + if count != wantItemsCount { + t.Errorf("got %v items, expected %v", count, wantItemsCount) + } + }) + + t.Run("no overflow", func(t *testing.T) { + secondIndex, err := db.NewIndex("second-index", retrievalIndexFuncs) + if err != nil { + t.Fatal(err) + } + + secondIndexItem := IndexItem{ + Address: []byte("iterate-hash-10"), + Data: []byte("data-second"), + } + err = secondIndex.Put(secondIndexItem) + if err != nil { + t.Fatal(err) + } + + var i int + err = index.IterateAll(func(item IndexItem) (stop bool, err error) { + if i > len(items)-1 { + return true, fmt.Errorf("got unexpected index item: %#v", item) + } + want := items[i] + checkIndexItem(t, item, want) + i++ + return false, nil + }) + if err != nil { + t.Fatal(err) + } + + i = 0 + err = secondIndex.IterateAll(func(item IndexItem) (stop bool, err error) { + if i > 1 { + return true, fmt.Errorf("got unexpected index item: %#v", item) + } + checkIndexItem(t, item, secondIndexItem) + i++ + return false, nil + }) + if err != nil { + t.Fatal(err) + } + }) +} + +// checkIndexItem is a test helper function that compares if two Index items are the same. +func checkIndexItem(t *testing.T, got, want IndexItem) { + t.Helper() + + if !bytes.Equal(got.Address, want.Address) { + t.Errorf("got hash %q, expected %q", string(got.Address), string(want.Address)) + } + if !bytes.Equal(got.Data, want.Data) { + t.Errorf("got data %q, expected %q", string(got.Data), string(want.Data)) + } + if got.StoreTimestamp != want.StoreTimestamp { + t.Errorf("got store timestamp %v, expected %v", got.StoreTimestamp, want.StoreTimestamp) + } + if got.AccessTimestamp != want.AccessTimestamp { + t.Errorf("got access timestamp %v, expected %v", got.AccessTimestamp, want.AccessTimestamp) + } +} diff --git a/swarm/shed/schema.go b/swarm/shed/schema.go new file mode 100644 index 000000000..cfb7c6d64 --- /dev/null +++ b/swarm/shed/schema.go @@ -0,0 +1,134 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package shed + +import ( + "encoding/json" + "errors" + "fmt" +) + +var ( + // LevelDB key value for storing the schema. + keySchema = []byte{0} + // LevelDB key prefix for all field type. + // LevelDB keys will be constructed by appending name values to this prefix. + keyPrefixFields byte = 1 + // LevelDB key prefix from which indexing keys start. + // Every index has its own key prefix and this value defines the first one. + keyPrefixIndexStart byte = 2 // Q: or maybe a higher number like 7, to have more space for potential specific perfixes +) + +// schema is used to serialize known database structure information. +type schema struct { + Fields map[string]fieldSpec `json:"fields"` // keys are field names + Indexes map[byte]indexSpec `json:"indexes"` // keys are index prefix bytes +} + +// fieldSpec holds information about a particular field. +// It does not need Name field as it is contained in the +// schema.Field map key. +type fieldSpec struct { + Type string `json:"type"` +} + +// indxSpec holds information about a particular index. +// It does not contain index type, as indexes do not have type. +type indexSpec struct { + Name string `json:"name"` +} + +// schemaFieldKey retrives the complete LevelDB key for +// a particular field form the schema definition. +func (db *DB) schemaFieldKey(name, fieldType string) (key []byte, err error) { + if name == "" { + return nil, errors.New("field name can not be blank") + } + if fieldType == "" { + return nil, errors.New("field type can not be blank") + } + s, err := db.getSchema() + if err != nil { + return nil, err + } + var found bool + for n, f := range s.Fields { + if n == name { + if f.Type != fieldType { + return nil, fmt.Errorf("field %q of type %q stored as %q in db", name, fieldType, f.Type) + } + break + } + } + if !found { + s.Fields[name] = fieldSpec{ + Type: fieldType, + } + err := db.putSchema(s) + if err != nil { + return nil, err + } + } + return append([]byte{keyPrefixFields}, []byte(name)...), nil +} + +// schemaIndexID retrieves the complete LevelDB prefix for +// a particular index. +func (db *DB) schemaIndexPrefix(name string) (id byte, err error) { + if name == "" { + return 0, errors.New("index name can not be blank") + } + s, err := db.getSchema() + if err != nil { + return 0, err + } + nextID := keyPrefixIndexStart + for i, f := range s.Indexes { + if i >= nextID { + nextID = i + 1 + } + if f.Name == name { + return i, nil + } + } + id = nextID + s.Indexes[id] = indexSpec{ + Name: name, + } + return id, db.putSchema(s) +} + +// getSchema retrieves the complete schema from +// the database. +func (db *DB) getSchema() (s schema, err error) { + b, err := db.Get(keySchema) + if err != nil { + return s, err + } + err = json.Unmarshal(b, &s) + return s, err +} + +// putSchema stores the complete schema to +// the database. +func (db *DB) putSchema(s schema) (err error) { + b, err := json.Marshal(s) + if err != nil { + return err + } + return db.Put(keySchema, b) +} diff --git a/swarm/shed/schema_test.go b/swarm/shed/schema_test.go new file mode 100644 index 000000000..a0c1838c8 --- /dev/null +++ b/swarm/shed/schema_test.go @@ -0,0 +1,126 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package shed + +import ( + "bytes" + "testing" +) + +// TestDB_schemaFieldKey validates correctness of schemaFieldKey. +func TestDB_schemaFieldKey(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + t.Run("empty name or type", func(t *testing.T) { + _, err := db.schemaFieldKey("", "") + if err == nil { + t.Errorf("error not returned, but expected") + } + _, err = db.schemaFieldKey("", "type") + if err == nil { + t.Errorf("error not returned, but expected") + } + + _, err = db.schemaFieldKey("test", "") + if err == nil { + t.Errorf("error not returned, but expected") + } + }) + + t.Run("same field", func(t *testing.T) { + key1, err := db.schemaFieldKey("test", "undefined") + if err != nil { + t.Fatal(err) + } + + key2, err := db.schemaFieldKey("test", "undefined") + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(key1, key2) { + t.Errorf("schema keys for the same field name are not the same: %q, %q", string(key1), string(key2)) + } + }) + + t.Run("different fields", func(t *testing.T) { + key1, err := db.schemaFieldKey("test1", "undefined") + if err != nil { + t.Fatal(err) + } + + key2, err := db.schemaFieldKey("test2", "undefined") + if err != nil { + t.Fatal(err) + } + + if bytes.Equal(key1, key2) { + t.Error("schema keys for the same field name are the same, but must not be") + } + }) + + t.Run("same field name different types", func(t *testing.T) { + _, err := db.schemaFieldKey("the-field", "one-type") + if err != nil { + t.Fatal(err) + } + + _, err = db.schemaFieldKey("the-field", "another-type") + if err == nil { + t.Errorf("error not returned, but expected") + } + }) +} + +// TestDB_schemaIndexPrefix validates correctness of schemaIndexPrefix. +func TestDB_schemaIndexPrefix(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + t.Run("same name", func(t *testing.T) { + id1, err := db.schemaIndexPrefix("test") + if err != nil { + t.Fatal(err) + } + + id2, err := db.schemaIndexPrefix("test") + if err != nil { + t.Fatal(err) + } + + if id1 != id2 { + t.Errorf("schema keys for the same field name are not the same: %v, %v", id1, id2) + } + }) + + t.Run("different names", func(t *testing.T) { + id1, err := db.schemaIndexPrefix("test1") + if err != nil { + t.Fatal(err) + } + + id2, err := db.schemaIndexPrefix("test2") + if err != nil { + t.Fatal(err) + } + + if id1 == id2 { + t.Error("schema ids for the same index name are the same, but must not be") + } + }) +} diff --git a/swarm/state/dbstore.go b/swarm/state/dbstore.go index b0aa92e27..fc5dd8f7c 100644 --- a/swarm/state/dbstore.go +++ b/swarm/state/dbstore.go @@ -22,6 +22,7 @@ import ( "errors" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/storage" ) // ErrNotFound is returned when no results are returned from the database @@ -30,6 +31,15 @@ var ErrNotFound = errors.New("ErrorNotFound") // ErrInvalidArgument is returned when the argument type does not match the expected type var ErrInvalidArgument = errors.New("ErrorInvalidArgument") +// Store defines methods required to get, set, delete values for different keys +// and close the underlying resources. +type Store interface { + Get(key string, i interface{}) (err error) + Put(key string, i interface{}) (err error) + Delete(key string) (err error) + Close() error +} + // DBStore uses LevelDB to store values. type DBStore struct { db *leveldb.DB @@ -46,6 +56,17 @@ func NewDBStore(path string) (s *DBStore, err error) { }, nil } +// NewInmemoryStore returns a new instance of DBStore. To be used only in tests and simulations. +func NewInmemoryStore() *DBStore { + db, err := leveldb.Open(storage.NewMemStorage(), nil) + if err != nil { + panic(err) + } + return &DBStore{ + db: db, + } +} + // Get retrieves a persisted value for a specific key. If there is no results // ErrNotFound is returned. The provided parameter should be either a byte slice or // a struct that implements the encoding.BinaryUnmarshaler interface diff --git a/swarm/state/inmemorystore.go b/swarm/state/inmemorystore.go deleted file mode 100644 index 3ba48592b..000000000 --- a/swarm/state/inmemorystore.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package state - -import ( - "encoding" - "encoding/json" - "sync" -) - -// InmemoryStore is the reference implementation of Store interface that is supposed -// to be used in tests. -type InmemoryStore struct { - db map[string][]byte - mu sync.RWMutex -} - -// NewInmemoryStore returns a new instance of InmemoryStore. -func NewInmemoryStore() *InmemoryStore { - return &InmemoryStore{ - db: make(map[string][]byte), - } -} - -// Get retrieves a value stored for a specific key. If there is no value found, -// ErrNotFound is returned. -func (s *InmemoryStore) Get(key string, i interface{}) (err error) { - s.mu.RLock() - defer s.mu.RUnlock() - - bytes, ok := s.db[key] - if !ok { - return ErrNotFound - } - - unmarshaler, ok := i.(encoding.BinaryUnmarshaler) - if !ok { - return json.Unmarshal(bytes, i) - } - - return unmarshaler.UnmarshalBinary(bytes) -} - -// Put stores a value for a specific key. -func (s *InmemoryStore) Put(key string, i interface{}) (err error) { - s.mu.Lock() - defer s.mu.Unlock() - var bytes []byte - - marshaler, ok := i.(encoding.BinaryMarshaler) - if !ok { - if bytes, err = json.Marshal(i); err != nil { - return err - } - } else { - if bytes, err = marshaler.MarshalBinary(); err != nil { - return err - } - } - - s.db[key] = bytes - return nil -} - -// Delete removes value stored under a specific key. -func (s *InmemoryStore) Delete(key string) (err error) { - s.mu.Lock() - defer s.mu.Unlock() - - if _, ok := s.db[key]; !ok { - return ErrNotFound - } - delete(s.db, key) - return nil -} - -// Close does not do anything. -func (s *InmemoryStore) Close() error { - return nil -} diff --git a/swarm/state/store.go b/swarm/state/store.go deleted file mode 100644 index fb7fe258f..000000000 --- a/swarm/state/store.go +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package state - -// Store defines methods required to get, set, delete values for different keys -// and close the underlying resources. -type Store interface { - Get(key string, i interface{}) (err error) - Put(key string, i interface{}) (err error) - Delete(key string) (err error) - Close() error -} diff --git a/swarm/storage/mock/db/db.go b/swarm/storage/mock/db/db.go index 43bfa24f0..73ae199e8 100644 --- a/swarm/storage/mock/db/db.go +++ b/swarm/storage/mock/db/db.go @@ -86,6 +86,13 @@ func (s *GlobalStore) Put(addr common.Address, key []byte, data []byte) error { return s.db.Write(batch, nil) } +// Delete removes the chunk reference to node with address addr. +func (s *GlobalStore) Delete(addr common.Address, key []byte) error { + batch := new(leveldb.Batch) + batch.Delete(nodeDBKey(addr, key)) + return s.db.Write(batch, nil) +} + // HasKey returns whether a node with addr contains the key. func (s *GlobalStore) HasKey(addr common.Address, key []byte) bool { has, err := s.db.Has(nodeDBKey(addr, key), nil) diff --git a/swarm/storage/mock/mem/mem.go b/swarm/storage/mock/mem/mem.go index 8878309d0..3a0a2beb8 100644 --- a/swarm/storage/mock/mem/mem.go +++ b/swarm/storage/mock/mem/mem.go @@ -83,6 +83,22 @@ func (s *GlobalStore) Put(addr common.Address, key []byte, data []byte) error { return nil } +// Delete removes the chunk data for node with address addr. +func (s *GlobalStore) Delete(addr common.Address, key []byte) error { + s.mu.Lock() + defer s.mu.Unlock() + + var count int + if _, ok := s.nodes[string(key)]; ok { + delete(s.nodes[string(key)], addr) + count = len(s.nodes[string(key)]) + } + if count == 0 { + delete(s.data, string(key)) + } + return nil +} + // HasKey returns whether a node with addr contains the key. func (s *GlobalStore) HasKey(addr common.Address, key []byte) bool { s.mu.Lock() diff --git a/swarm/storage/mock/mock.go b/swarm/storage/mock/mock.go index 81340f927..1fb71b70a 100644 --- a/swarm/storage/mock/mock.go +++ b/swarm/storage/mock/mock.go @@ -70,6 +70,12 @@ func (n *NodeStore) Put(key []byte, data []byte) error { return n.store.Put(n.addr, key, data) } +// Delete removes chunk data for a key for a node that has the address +// provided on NodeStore initialization. +func (n *NodeStore) Delete(key []byte) error { + return n.store.Delete(n.addr, key) +} + // GlobalStorer defines methods for mock db store // that stores chunk data for all swarm nodes. // It is used in tests to construct mock NodeStores @@ -77,6 +83,7 @@ func (n *NodeStore) Put(key []byte, data []byte) error { type GlobalStorer interface { Get(addr common.Address, key []byte) (data []byte, err error) Put(addr common.Address, key []byte, data []byte) error + Delete(addr common.Address, key []byte) error HasKey(addr common.Address, key []byte) bool // NewNodeStore creates an instance of NodeStore // to be used by a single swarm node with diff --git a/swarm/storage/mock/rpc/rpc.go b/swarm/storage/mock/rpc/rpc.go index 6e735f698..8cd6c83a7 100644 --- a/swarm/storage/mock/rpc/rpc.go +++ b/swarm/storage/mock/rpc/rpc.go @@ -73,6 +73,12 @@ func (s *GlobalStore) Put(addr common.Address, key []byte, data []byte) error { return err } +// Delete calls a Delete method to RPC server. +func (s *GlobalStore) Delete(addr common.Address, key []byte) error { + err := s.client.Call(nil, "mockStore_delete", addr, key) + return err +} + // HasKey calls a HasKey method to RPC server. func (s *GlobalStore) HasKey(addr common.Address, key []byte) bool { var has bool diff --git a/swarm/storage/mock/test/test.go b/swarm/storage/mock/test/test.go index 02da3af55..10180985f 100644 --- a/swarm/storage/mock/test/test.go +++ b/swarm/storage/mock/test/test.go @@ -72,6 +72,31 @@ func MockStore(t *testing.T, globalStore mock.GlobalStorer, n int) { } } } + t.Run("delete", func(t *testing.T) { + chunkAddr := storage.Address([]byte("1234567890abcd")) + for _, addr := range addrs { + err := globalStore.Put(addr, chunkAddr, []byte("data")) + if err != nil { + t.Fatalf("put data to store %s key %s: %v", addr.Hex(), chunkAddr.Hex(), err) + } + } + firstNodeAddr := addrs[0] + if err := globalStore.Delete(firstNodeAddr, chunkAddr); err != nil { + t.Fatalf("delete from store %s key %s: %v", firstNodeAddr.Hex(), chunkAddr.Hex(), err) + } + for i, addr := range addrs { + _, err := globalStore.Get(addr, chunkAddr) + if i == 0 { + if err != mock.ErrNotFound { + t.Errorf("get data from store %s key %s: expected mock.ErrNotFound error, got %v", addr.Hex(), chunkAddr.Hex(), err) + } + } else { + if err != nil { + t.Errorf("get data from store %s key %s: %v", addr.Hex(), chunkAddr.Hex(), err) + } + } + } + }) }) t.Run("NodeStore", func(t *testing.T) { @@ -114,6 +139,34 @@ func MockStore(t *testing.T, globalStore mock.GlobalStorer, n int) { } } } + t.Run("delete", func(t *testing.T) { + chunkAddr := storage.Address([]byte("1234567890abcd")) + var chosenStore *mock.NodeStore + for addr, store := range nodes { + if chosenStore == nil { + chosenStore = store + } + err := store.Put(chunkAddr, []byte("data")) + if err != nil { + t.Fatalf("put data to store %s key %s: %v", addr.Hex(), chunkAddr.Hex(), err) + } + } + if err := chosenStore.Delete(chunkAddr); err != nil { + t.Fatalf("delete key %s: %v", chunkAddr.Hex(), err) + } + for addr, store := range nodes { + _, err := store.Get(chunkAddr) + if store == chosenStore { + if err != mock.ErrNotFound { + t.Errorf("get data from store %s key %s: expected mock.ErrNotFound error, got %v", addr.Hex(), chunkAddr.Hex(), err) + } + } else { + if err != nil { + t.Errorf("get data from store %s key %s: %v", addr.Hex(), chunkAddr.Hex(), err) + } + } + } + }) }) } diff --git a/swarm/swap/swap.go b/swarm/swap/swap.go index 137eb141d..5d636dc20 100644 --- a/swarm/swap/swap.go +++ b/swarm/swap/swap.go @@ -91,3 +91,8 @@ func (s *Swap) loadState(peer *protocols.Peer) (err error) { } return } + +//Clean up Swap +func (swap *Swap) Close() { + swap.stateStore.Close() +} diff --git a/swarm/swarm.go b/swarm/swarm.go index dc3756d3a..a4ff94051 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -1,4 +1,4 @@ -// Copyright 2016 The go-ethereum Authors +// Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify @@ -66,20 +66,22 @@ var ( // the swarm stack type Swarm struct { - config *api.Config // swarm configuration - api *api.API // high level api layer (fs/manifest) - dns api.Resolver // DNS registrar - fileStore *storage.FileStore // distributed preimage archive, the local API to the storage with document level storage/retrieval support - streamer *stream.Registry - bzz *network.Bzz // the logistic manager - backend chequebook.Backend // simple blockchain Backend - privateKey *ecdsa.PrivateKey - corsString string - swapEnabled bool - netStore *storage.NetStore - sfs *fuse.SwarmFS // need this to cleanup all the active mounts on node exit - ps *pss.Pss - swap *swap.Swap + config *api.Config // swarm configuration + api *api.API // high level api layer (fs/manifest) + dns api.Resolver // DNS registrar + fileStore *storage.FileStore // distributed preimage archive, the local API to the storage with document level storage/retrieval support + streamer *stream.Registry + bzz *network.Bzz // the logistic manager + backend chequebook.Backend // simple blockchain Backend + privateKey *ecdsa.PrivateKey + corsString string + swapEnabled bool + netStore *storage.NetStore + sfs *fuse.SwarmFS // need this to cleanup all the active mounts on node exit + ps *pss.Pss + swap *swap.Swap + stateStore *state.DBStore + accountingMetrics *protocols.AccountingMetrics tracerClose io.Closer } @@ -134,7 +136,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e LightNode: config.LightNodeEnabled, } - stateStore, err := state.NewDBStore(filepath.Join(config.Path, "state-store.db")) + self.stateStore, err = state.NewDBStore(filepath.Join(config.Path, "state-store.db")) if err != nil { return } @@ -179,6 +181,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e return nil, err } self.swap = swap.New(balancesStore) + self.accountingMetrics = protocols.SetupAccountingMetrics(10*time.Second, filepath.Join(config.Path, "metrics.db")) } var nodeID enode.ID @@ -203,7 +206,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e SyncUpdateDelay: config.SyncUpdateDelay, MaxPeerServers: config.MaxStreamPeerServers, } - self.streamer = stream.NewRegistry(nodeID, delivery, self.netStore, stateStore, registryOptions, self.swap) + self.streamer = stream.NewRegistry(nodeID, delivery, self.netStore, self.stateStore, registryOptions, self.swap) // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage self.fileStore = storage.NewFileStore(self.netStore, self.config.FileStoreParams) @@ -226,7 +229,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e log.Debug("Setup local storage") - self.bzz = network.NewBzz(bzzconfig, to, stateStore, self.streamer.GetSpec(), self.streamer.Run) + self.bzz = network.NewBzz(bzzconfig, to, self.stateStore, self.streamer.GetSpec(), self.streamer.Run) // Pss = postal service over swarm (devp2p over bzz) self.ps, err = pss.NewPss(to, config.Pss) @@ -446,14 +449,24 @@ func (self *Swarm) Stop() error { ch.Stop() ch.Save() } - + if self.swap != nil { + self.swap.Close() + } + if self.accountingMetrics != nil { + self.accountingMetrics.Close() + } if self.netStore != nil { self.netStore.Close() } self.sfs.Stop() stopCounter.Inc(1) self.streamer.Stop() - return self.bzz.Stop() + + err := self.bzz.Stop() + if self.stateStore != nil { + self.stateStore.Close() + } + return err } // implements the node.Service interface diff --git a/swarm/version/version.go b/swarm/version/version.go index 17ef34f5f..57ac05a86 100644 --- a/swarm/version/version.go +++ b/swarm/version/version.go @@ -23,7 +23,7 @@ import ( const ( VersionMajor = 0 // Major version component of the current release VersionMinor = 3 // Minor version component of the current release - VersionPatch = 7 // Patch version component of the current release + VersionPatch = 8 // Patch version component of the current release VersionMeta = "unstable" // Version metadata to append to the version string ) |