aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/localstore/subscription_push.go
diff options
context:
space:
mode:
authorJanoš Guljaš <janos@users.noreply.github.com>2019-04-25 16:22:57 +0800
committerAnton Evangelatov <anton.evangelatov@gmail.com>2019-05-10 18:26:33 +0800
commitc1213bd00c2a84a9dfc218e44cc2f85902f91128 (patch)
tree2f3123b72dbe448ec28fd5175e146c191112c7c9 /swarm/storage/localstore/subscription_push.go
parent993b145f25845e50e8af41ffb1116eaee381d693 (diff)
downloadgo-tangerine-c1213bd00c2a84a9dfc218e44cc2f85902f91128.tar
go-tangerine-c1213bd00c2a84a9dfc218e44cc2f85902f91128.tar.gz
go-tangerine-c1213bd00c2a84a9dfc218e44cc2f85902f91128.tar.bz2
go-tangerine-c1213bd00c2a84a9dfc218e44cc2f85902f91128.tar.lz
go-tangerine-c1213bd00c2a84a9dfc218e44cc2f85902f91128.tar.xz
go-tangerine-c1213bd00c2a84a9dfc218e44cc2f85902f91128.tar.zst
go-tangerine-c1213bd00c2a84a9dfc218e44cc2f85902f91128.zip
swarm: LocalStore metrics
* swarm/shed: remove metrics fields from DB struct * swarm/schunk: add String methods to modes * swarm/storage/localstore: add metrics and traces * swarm/chunk: unknown modes without spaces in String methods * swarm/storage/localstore: remove bin number from pull subscription metrics * swarm/storage/localstore: add resetting time metrics and code improvements
Diffstat (limited to 'swarm/storage/localstore/subscription_push.go')
-rw-r--r--swarm/storage/localstore/subscription_push.go29
1 files changed, 29 insertions, 0 deletions
diff --git a/swarm/storage/localstore/subscription_push.go b/swarm/storage/localstore/subscription_push.go
index 5cbc2eb6f..f2463af2a 100644
--- a/swarm/storage/localstore/subscription_push.go
+++ b/swarm/storage/localstore/subscription_push.go
@@ -19,10 +19,15 @@ package localstore
import (
"context"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
+ "github.com/ethereum/go-ethereum/swarm/spancontext"
+ "github.com/opentracing/opentracing-go"
+ olog "github.com/opentracing/opentracing-go/log"
)
// SubscribePush returns a channel that provides storage chunks with ordering from push syncing index.
@@ -30,6 +35,9 @@ import (
// the returned channel without any errors. Make sure that you check the second returned parameter
// from the channel to stop iteration when its value is false.
func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop func()) {
+ metricName := "localstore.SubscribePush"
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+
chunks := make(chan chunk.Chunk)
trigger := make(chan struct{}, 1)
@@ -44,6 +52,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
var stopChanOnce sync.Once
go func() {
+ defer metrics.GetOrRegisterCounter(metricName+".done", nil).Inc(1)
// close the returned chunkInfo channel at the end to
// signal that the subscription is done
defer close(chunks)
@@ -57,6 +66,12 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
// - last index Item is reached
// - subscription stop is called
// - context is done
+ metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1)
+
+ ctx, sp := spancontext.StartSpan(ctx, metricName+".iter")
+
+ iterStart := time.Now()
+ var count int
err := db.pushIndex.Iterate(func(item shed.Item) (stop bool, err error) {
// get chunk data
dataItem, err := db.retrievalDataIndex.Get(item)
@@ -66,6 +81,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
select {
case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data):
+ count++
// set next iteration start item
// when its chunk is successfully sent to channel
sinceItem = &item
@@ -87,7 +103,20 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
// iterator call, skip it in this one
SkipStartFromItem: true,
})
+
+ totalTimeMetric(metricName+".iter", iterStart)
+
+ sp.FinishWithOptions(opentracing.FinishOptions{
+ LogRecords: []opentracing.LogRecord{
+ {
+ Timestamp: time.Now(),
+ Fields: []olog.Field{olog.Int("count", count)},
+ },
+ },
+ })
+
if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1)
log.Error("localstore push subscription iteration", "err", err)
return
}