aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/localstore/subscription_push.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/localstore/subscription_push.go')
-rw-r--r--swarm/storage/localstore/subscription_push.go29
1 files changed, 29 insertions, 0 deletions
diff --git a/swarm/storage/localstore/subscription_push.go b/swarm/storage/localstore/subscription_push.go
index 5cbc2eb6f..f2463af2a 100644
--- a/swarm/storage/localstore/subscription_push.go
+++ b/swarm/storage/localstore/subscription_push.go
@@ -19,10 +19,15 @@ package localstore
import (
"context"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
+ "github.com/ethereum/go-ethereum/swarm/spancontext"
+ "github.com/opentracing/opentracing-go"
+ olog "github.com/opentracing/opentracing-go/log"
)
// SubscribePush returns a channel that provides storage chunks with ordering from push syncing index.
@@ -30,6 +35,9 @@ import (
// the returned channel without any errors. Make sure that you check the second returned parameter
// from the channel to stop iteration when its value is false.
func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop func()) {
+ metricName := "localstore.SubscribePush"
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+
chunks := make(chan chunk.Chunk)
trigger := make(chan struct{}, 1)
@@ -44,6 +52,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
var stopChanOnce sync.Once
go func() {
+ defer metrics.GetOrRegisterCounter(metricName+".done", nil).Inc(1)
// close the returned chunkInfo channel at the end to
// signal that the subscription is done
defer close(chunks)
@@ -57,6 +66,12 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
// - last index Item is reached
// - subscription stop is called
// - context is done
+ metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1)
+
+ ctx, sp := spancontext.StartSpan(ctx, metricName+".iter")
+
+ iterStart := time.Now()
+ var count int
err := db.pushIndex.Iterate(func(item shed.Item) (stop bool, err error) {
// get chunk data
dataItem, err := db.retrievalDataIndex.Get(item)
@@ -66,6 +81,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
select {
case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data):
+ count++
// set next iteration start item
// when its chunk is successfully sent to channel
sinceItem = &item
@@ -87,7 +103,20 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
// iterator call, skip it in this one
SkipStartFromItem: true,
})
+
+ totalTimeMetric(metricName+".iter", iterStart)
+
+ sp.FinishWithOptions(opentracing.FinishOptions{
+ LogRecords: []opentracing.LogRecord{
+ {
+ Timestamp: time.Now(),
+ Fields: []olog.Field{olog.Int("count", count)},
+ },
+ },
+ })
+
if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1)
log.Error("localstore push subscription iteration", "err", err)
return
}