diff options
| author | Elad <theman@elad.im> | 2019-05-09 16:54:06 +0800 |
|---|---|---|
| committer | Anton Evangelatov <anton.evangelatov@gmail.com> | 2019-05-10 18:29:22 +0800 |
| commit | 84dfaea246dea179319db90a63afc1189cd09246 (patch) | |
| tree | c97fcdbb43e16c43e9670056f16c3c407ada6521 /swarm/network | |
| parent | 3e9ba576694e7df018b3c9fa2c1d3aa7d55031fe (diff) | |
| download | go-tangerine-84dfaea246dea179319db90a63afc1189cd09246.tar go-tangerine-84dfaea246dea179319db90a63afc1189cd09246.tar.gz go-tangerine-84dfaea246dea179319db90a63afc1189cd09246.tar.bz2 go-tangerine-84dfaea246dea179319db90a63afc1189cd09246.tar.lz go-tangerine-84dfaea246dea179319db90a63afc1189cd09246.tar.xz go-tangerine-84dfaea246dea179319db90a63afc1189cd09246.tar.zst go-tangerine-84dfaea246dea179319db90a63afc1189cd09246.zip | |
swarm: instrument setNextBatch
swarm/storage/localstore: add gc metrics, disable flaky test
Diffstat (limited to 'swarm/network')
| -rw-r--r-- | swarm/network/simulation/kademlia_test.go | 1 | ||||
| -rw-r--r-- | swarm/network/stream/syncer.go | 37 |
2 files changed, 28 insertions, 10 deletions
diff --git a/swarm/network/simulation/kademlia_test.go b/swarm/network/simulation/kademlia_test.go index 0ac1e7803..4d7dc6240 100644 --- a/swarm/network/simulation/kademlia_test.go +++ b/swarm/network/simulation/kademlia_test.go @@ -156,6 +156,7 @@ func createSimServiceMap(discovery bool) map[string]ServiceFunc { // Call WaitTillSnapshotRecreated() function and wait until it returns // Iterate the nodes and check if all the connections are successfully recreated func TestWaitTillSnapshotRecreated(t *testing.T) { + t.Skip("test is flaky. disabling until underlying problem is addressed") var err error sim := New(createSimServiceMap(true)) _, err = sim.AddNodesAndConnectRing(16) diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index 043192903..47320e860 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -21,7 +21,9 @@ import ( "strconv" "time" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -34,27 +36,29 @@ const ( // * live request delivery with or without checkback // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { - po uint8 - netStore *storage.NetStore - quit chan struct{} + correlateId string //used for logging + po uint8 + netStore *storage.NetStore + quit chan struct{} } // NewSwarmSyncerServer is constructor for SwarmSyncerServer -func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore) (*SwarmSyncerServer, error) { +func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore, correlateId string) (*SwarmSyncerServer, error) { return &SwarmSyncerServer{ - po: po, - netStore: netStore, - quit: make(chan struct{}), + correlateId: correlateId, + po: po, + netStore: netStore, + quit: make(chan struct{}), }, nil } func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore) { - streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) { + streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, _ bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { return nil, err } - return NewSwarmSyncerServer(po, netStore) + return NewSwarmSyncerServer(po, netStore, p.ID().String()+"|"+string(po)) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) @@ -92,7 +96,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 if from > 0 { from-- } - + batchStart := time.Now() descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to) defer stop() @@ -106,7 +110,10 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 timer *time.Timer timerC <-chan time.Time ) + defer func() { + metrics.GetOrRegisterResettingTimer("syncer.set-next-batch.total-time", nil).UpdateSince(batchStart) + metrics.GetOrRegisterCounter("syncer.set-next-batch.batch-size", nil).Inc(int64(batchSize)) if timer != nil { timer.Stop() } @@ -125,6 +132,8 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 // validating that the chunk is successfully stored by the peer. err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address) if err != nil { + metrics.GetOrRegisterCounter("syncer.set-next-batch.set-sync-err", nil).Inc(1) + log.Debug("syncer pull subscription - err setting chunk as synced", "correlateId", s.correlateId, "err", err) return nil, 0, 0, nil, err } batchSize++ @@ -136,13 +145,17 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 batchEndID = d.BinID if batchSize >= BatchSize { iterate = false + metrics.GetOrRegisterCounter("syncer.set-next-batch.full-batch", nil).Inc(1) + log.Debug("syncer pull subscription - batch size reached", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) } if timer == nil { timer = time.NewTimer(batchTimeout) } else { + log.Debug("syncer pull subscription - stopping timer", "correlateId", s.correlateId) if !timer.Stop() { <-timer.C } + log.Debug("syncer pull subscription - channel drained, resetting timer", "correlateId", s.correlateId) timer.Reset(batchTimeout) } timerC = timer.C @@ -150,8 +163,12 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 // return batch if new chunks are not // received after some time iterate = false + metrics.GetOrRegisterCounter("syncer.set-next-batch.timer-expire", nil).Inc(1) + log.Debug("syncer pull subscription timer expired", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) case <-s.quit: iterate = false + metrics.GetOrRegisterCounter("syncer.set-next-batch.quit-sig", nil).Inc(1) + log.Debug("syncer pull subscription - quit received", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) } } if batchStartID == nil { |
