From b02958b9c57f35d0df085ed8e7057a52131373a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 10 Jun 2019 14:21:02 +0300 Subject: core, ethdb, metrics, p2p: expose various counter metrics for grafana --- core/blockchain.go | 20 +++++- core/headerchain.go | 5 +- core/rawdb/freezer.go | 7 +- core/rawdb/freezer_table.go | 44 ++++++++++-- core/rawdb/freezer_table_test.go | 68 +++++++++--------- core/tx_list.go | 4 +- core/tx_pool.go | 151 ++++++++++++++++++++++++++------------- eth/downloader/statesync.go | 4 +- ethdb/leveldb/leveldb.go | 16 +++-- metrics/cpu.go | 36 ++++++++++ metrics/cpu_syscall.go | 35 +++++++++ metrics/cpu_windows.go | 23 ++++++ metrics/metrics.go | 23 ++++-- p2p/metrics.go | 17 +++-- p2p/server.go | 5 +- 15 files changed, 341 insertions(+), 117 deletions(-) create mode 100644 metrics/cpu.go create mode 100644 metrics/cpu_syscall.go create mode 100644 metrics/cpu_windows.go diff --git a/core/blockchain.go b/core/blockchain.go index 2355f0ea3..a1f3b68c5 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -46,6 +46,10 @@ import ( ) var ( + headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil) + headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil) + headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil) + accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil) accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil) accountUpdateTimer = metrics.NewRegisteredTimer("chain/account/updates", nil) @@ -332,6 +336,7 @@ func (bc *BlockChain) loadLastState() error { } // Everything seems to be fine, set as the head block bc.currentBlock.Store(currentBlock) + headBlockGauge.Update(int64(currentBlock.NumberU64())) // Restore the last known head header currentHeader := currentBlock.Header() @@ -344,12 +349,14 @@ func (bc *BlockChain) loadLastState() error { // Restore the last known head fast block bc.currentFastBlock.Store(currentBlock) + headFastBlockGauge.Update(int64(currentBlock.NumberU64())) + if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) { if block := bc.GetBlockByHash(head); block != nil { bc.currentFastBlock.Store(block) + headFastBlockGauge.Update(int64(block.NumberU64())) } } - // Issue a status log for the user currentFastBlock := bc.CurrentFastBlock() @@ -388,6 +395,7 @@ func (bc *BlockChain) SetHead(head uint64) error { } rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash()) bc.currentBlock.Store(newHeadBlock) + headBlockGauge.Update(int64(newHeadBlock.NumberU64())) } // Rewind the fast block in a simpleton way to the target head @@ -399,6 +407,7 @@ func (bc *BlockChain) SetHead(head uint64) error { } rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash()) bc.currentFastBlock.Store(newHeadFastBlock) + headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64())) } } @@ -450,6 +459,7 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { // If all checks out, manually set the head block bc.chainmu.Lock() bc.currentBlock.Store(block) + headBlockGauge.Update(int64(block.NumberU64())) bc.chainmu.Unlock() log.Info("Committed new head block", "number", block.Number(), "hash", hash) @@ -522,9 +532,12 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { bc.genesisBlock = genesis bc.insert(bc.genesisBlock) bc.currentBlock.Store(bc.genesisBlock) + headBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) + bc.hc.SetGenesis(bc.genesisBlock.Header()) bc.hc.SetCurrentHeader(bc.genesisBlock.Header()) bc.currentFastBlock.Store(bc.genesisBlock) + headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) return nil } @@ -598,6 +611,7 @@ func (bc *BlockChain) insert(block *types.Block) { rawdb.WriteHeadBlockHash(bc.db, block.Hash()) bc.currentBlock.Store(block) + headBlockGauge.Update(int64(block.NumberU64())) // If the block is better than our head or is on a different chain, force update heads if updateHeads { @@ -605,6 +619,7 @@ func (bc *BlockChain) insert(block *types.Block) { rawdb.WriteHeadFastBlockHash(bc.db, block.Hash()) bc.currentFastBlock.Store(block) + headFastBlockGauge.Update(int64(block.NumberU64())) } } @@ -862,11 +877,13 @@ func (bc *BlockChain) Rollback(chain []common.Hash) { newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1) rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash()) bc.currentFastBlock.Store(newFastBlock) + headFastBlockGauge.Update(int64(newFastBlock.NumberU64())) } if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash { newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1) rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash()) bc.currentBlock.Store(newBlock) + headBlockGauge.Update(int64(newBlock.NumberU64())) } } // Truncate ancient data which exceeds the current header. @@ -952,6 +969,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 { rawdb.WriteHeadFastBlockHash(bc.db, head.Hash()) bc.currentFastBlock.Store(head) + headFastBlockGauge.Update(int64(head.NumberU64())) isCanonical = true } } diff --git a/core/headerchain.go b/core/headerchain.go index cdd64bb50..034858f65 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -104,6 +104,7 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c } } hc.currentHeaderHash = hc.CurrentHeader().Hash() + headHeaderGauge.Update(hc.CurrentHeader().Number.Int64()) return hc, nil } @@ -185,12 +186,12 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er hc.currentHeaderHash = hash hc.currentHeader.Store(types.CopyHeader(header)) + headHeaderGauge.Update(header.Number.Int64()) status = CanonStatTy } else { status = SideStatTy } - hc.headerCache.Add(hash, header) hc.numberCache.Add(hash, number) @@ -456,6 +457,7 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) { hc.currentHeader.Store(head) hc.currentHeaderHash = head.Hash() + headHeaderGauge.Update(head.Number.Int64()) } type ( @@ -508,6 +510,7 @@ func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, d hc.currentHeader.Store(parent) hc.currentHeaderHash = parentHash + headHeaderGauge.Update(parent.Number.Int64()) } batch.Write() diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 741ff9adb..3f377447c 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -80,8 +80,9 @@ type freezer struct { func newFreezer(datadir string, namespace string) (*freezer, error) { // Create the initial freezer object var ( - readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) - writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil) + readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) + writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil) + sizeCounter = metrics.NewRegisteredCounter(namespace+"ancient/size", nil) ) // Ensure the datadir is not a symbolic link if it exists. if info, err := os.Lstat(datadir); !os.IsNotExist(err) { @@ -102,7 +103,7 @@ func newFreezer(datadir string, namespace string) (*freezer, error) { instanceLock: lock, } for name, disableSnappy := range freezerNoSnappy { - table, err := newTable(datadir, name, readMeter, writeMeter, disableSnappy) + table, err := newTable(datadir, name, readMeter, writeMeter, sizeCounter, disableSnappy) if err != nil { for _, table := range freezer.tables { table.Close() diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 1e5c7cd0b..2fe354a06 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -94,17 +94,18 @@ type freezerTable struct { // to count how many historic items have gone missing. itemOffset uint32 // Offset (number of discarded items) - headBytes uint32 // Number of bytes written to the head file - readMeter metrics.Meter // Meter for measuring the effective amount of data read - writeMeter metrics.Meter // Meter for measuring the effective amount of data written + headBytes uint32 // Number of bytes written to the head file + readMeter metrics.Meter // Meter for measuring the effective amount of data read + writeMeter metrics.Meter // Meter for measuring the effective amount of data written + sizeCounter metrics.Counter // Counter for tracking the combined size of all freezer tables logger log.Logger // Logger with database path and table name ambedded lock sync.RWMutex // Mutex protecting the data file descriptors } // newTable opens a freezer table with default settings - 2G files -func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, disableSnappy bool) (*freezerTable, error) { - return newCustomTable(path, name, readMeter, writeMeter, 2*1000*1000*1000, disableSnappy) +func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeCounter metrics.Counter, disableSnappy bool) (*freezerTable, error) { + return newCustomTable(path, name, readMeter, writeMeter, sizeCounter, 2*1000*1000*1000, disableSnappy) } // openFreezerFileForAppend opens a freezer table file and seeks to the end @@ -148,7 +149,7 @@ func truncateFreezerFile(file *os.File, size int64) error { // newCustomTable opens a freezer table, creating the data and index files if they are // non existent. Both files are truncated to the shortest common length to ensure // they don't go out of sync. -func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, maxFilesize uint32, noCompression bool) (*freezerTable, error) { +func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeCounter metrics.Counter, maxFilesize uint32, noCompression bool) (*freezerTable, error) { // Ensure the containing directory exists and open the indexEntry file if err := os.MkdirAll(path, 0755); err != nil { return nil, err @@ -171,6 +172,7 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete files: make(map[uint32]*os.File), readMeter: readMeter, writeMeter: writeMeter, + sizeCounter: sizeCounter, name: name, path: path, logger: log.New("database", path, "table", name), @@ -181,6 +183,14 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete tab.Close() return nil, err } + // Initialize the starting size counter + size, err := tab.sizeNolock() + if err != nil { + tab.Close() + return nil, err + } + tab.sizeCounter.Inc(int64(size)) + return tab, nil } @@ -321,6 +331,11 @@ func (t *freezerTable) truncate(items uint64) error { if atomic.LoadUint64(&t.items) <= items { return nil } + // We need to truncate, save the old size for metrics tracking + oldSize, err := t.sizeNolock() + if err != nil { + return err + } // Something's out of sync, truncate the table's offset index t.logger.Warn("Truncating freezer table", "items", t.items, "limit", items) if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil { @@ -355,6 +370,14 @@ func (t *freezerTable) truncate(items uint64) error { // All data files truncated, set internal counters and return atomic.StoreUint64(&t.items, items) atomic.StoreUint32(&t.headBytes, expected.offset) + + // Retrieve the new size and update the total size counter + newSize, err := t.sizeNolock() + if err != nil { + return err + } + t.sizeCounter.Dec(int64(oldSize - newSize)) + return nil } @@ -483,7 +506,10 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { } // Write indexEntry t.index.Write(idx.marshallBinary()) + t.writeMeter.Mark(int64(bLen + indexEntrySize)) + t.sizeCounter.Inc(int64(bLen + indexEntrySize)) + atomic.AddUint64(&t.items, 1) return nil } @@ -562,6 +588,12 @@ func (t *freezerTable) size() (uint64, error) { t.lock.RLock() defer t.lock.RUnlock() + return t.sizeNolock() +} + +// sizeNolock returns the total data size in the freezer table without obtaining +// the mutex first. +func (t *freezerTable) sizeNolock() (uint64, error) { stat, err := t.index.Stat() if err != nil { return 0, err diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index e63fb63a3..116e26a7f 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -56,7 +56,7 @@ func TestFreezerBasics(t *testing.T) { // set cutoff at 50 bytes f, err := newCustomTable(os.TempDir(), fmt.Sprintf("unittest-%d", rand.Uint64()), - metrics.NewMeter(), metrics.NewMeter(), 50, true) + metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter(), 50, true) if err != nil { t.Fatal(err) } @@ -98,12 +98,12 @@ func TestFreezerBasicsClosing(t *testing.T) { t.Parallel() // set cutoff at 50 bytes var ( - fname = fmt.Sprintf("basics-close-%d", rand.Uint64()) - m1, m2 = metrics.NewMeter(), metrics.NewMeter() - f *freezerTable - err error + fname = fmt.Sprintf("basics-close-%d", rand.Uint64()) + rm, wm, sc = metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() + f *freezerTable + err error ) - f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true) + f, err = newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) if err != nil { t.Fatal(err) } @@ -112,7 +112,7 @@ func TestFreezerBasicsClosing(t *testing.T) { data := getChunk(15, x) f.Append(uint64(x), data) f.Close() - f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true) + f, err = newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) } defer f.Close() @@ -126,7 +126,7 @@ func TestFreezerBasicsClosing(t *testing.T) { t.Fatalf("test %d, got \n%x != \n%x", y, got, exp) } f.Close() - f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true) + f, err = newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) if err != nil { t.Fatal(err) } @@ -136,11 +136,11 @@ func TestFreezerBasicsClosing(t *testing.T) { // TestFreezerRepairDanglingHead tests that we can recover if index entries are removed func TestFreezerRepairDanglingHead(t *testing.T) { t.Parallel() - wm, rm := metrics.NewMeter(), metrics.NewMeter() + rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64()) { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) if err != nil { t.Fatal(err) } @@ -169,7 +169,7 @@ func TestFreezerRepairDanglingHead(t *testing.T) { idxFile.Close() // Now open it again { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) // The last item should be missing if _, err = f.Retrieve(0xff); err == nil { t.Errorf("Expected error for missing index entry") @@ -184,11 +184,11 @@ func TestFreezerRepairDanglingHead(t *testing.T) { // TestFreezerRepairDanglingHeadLarge tests that we can recover if very many index entries are removed func TestFreezerRepairDanglingHeadLarge(t *testing.T) { t.Parallel() - wm, rm := metrics.NewMeter(), metrics.NewMeter() + rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64()) { // Fill a table and close it - f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) if err != nil { t.Fatal(err) } @@ -216,7 +216,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { idxFile.Close() // Now open it again { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) // The first item should be there if _, err = f.Retrieve(0); err != nil { t.Fatal(err) @@ -234,7 +234,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { } // And if we open it, we should now be able to read all of them (new values) { - f, _ := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) + f, _ := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) for y := 1; y < 255; y++ { exp := getChunk(15, ^y) got, err := f.Retrieve(uint64(y)) @@ -251,11 +251,11 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { // TestSnappyDetection tests that we fail to open a snappy database and vice versa func TestSnappyDetection(t *testing.T) { t.Parallel() - wm, rm := metrics.NewMeter(), metrics.NewMeter() + rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() fname := fmt.Sprintf("snappytest-%d", rand.Uint64()) // Open with snappy { - f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) if err != nil { t.Fatal(err) } @@ -268,7 +268,7 @@ func TestSnappyDetection(t *testing.T) { } // Open without snappy { - f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, false) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, false) if _, err = f.Retrieve(0); err == nil { f.Close() t.Fatalf("expected empty table") @@ -277,7 +277,7 @@ func TestSnappyDetection(t *testing.T) { // Open with snappy { - f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) // There should be 255 items if _, err = f.Retrieve(0xfe); err != nil { f.Close() @@ -302,11 +302,11 @@ func assertFileSize(f string, size int64) error { // the index is repaired func TestFreezerRepairDanglingIndex(t *testing.T) { t.Parallel() - wm, rm := metrics.NewMeter(), metrics.NewMeter() + rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() fname := fmt.Sprintf("dangling_indextest-%d", rand.Uint64()) { // Fill a table and close it - f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) if err != nil { t.Fatal(err) } @@ -342,7 +342,7 @@ func TestFreezerRepairDanglingIndex(t *testing.T) { // 45, 45, 15 // with 3+3+1 items { - f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) if err != nil { t.Fatal(err) } @@ -359,11 +359,11 @@ func TestFreezerRepairDanglingIndex(t *testing.T) { func TestFreezerTruncate(t *testing.T) { t.Parallel() - wm, rm := metrics.NewMeter(), metrics.NewMeter() + rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() fname := fmt.Sprintf("truncation-%d", rand.Uint64()) { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) if err != nil { t.Fatal(err) } @@ -380,7 +380,7 @@ func TestFreezerTruncate(t *testing.T) { } // Reopen, truncate { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) if err != nil { t.Fatal(err) } @@ -402,10 +402,10 @@ func TestFreezerTruncate(t *testing.T) { // That will rewind the index, and _should_ truncate the head file func TestFreezerRepairFirstFile(t *testing.T) { t.Parallel() - wm, rm := metrics.NewMeter(), metrics.NewMeter() + rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() fname := fmt.Sprintf("truncationfirst-%d", rand.Uint64()) { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) if err != nil { t.Fatal(err) } @@ -433,7 +433,7 @@ func TestFreezerRepairFirstFile(t *testing.T) { } // Reopen { - f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) if err != nil { t.Fatal(err) } @@ -458,10 +458,10 @@ func TestFreezerRepairFirstFile(t *testing.T) { // - check that we did not keep the rdonly file descriptors func TestFreezerReadAndTruncate(t *testing.T) { t.Parallel() - wm, rm := metrics.NewMeter(), metrics.NewMeter() + rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() fname := fmt.Sprintf("read_truncate-%d", rand.Uint64()) { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) if err != nil { t.Fatal(err) } @@ -478,7 +478,7 @@ func TestFreezerReadAndTruncate(t *testing.T) { } // Reopen and read all files { - f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true) if err != nil { t.Fatal(err) } @@ -504,10 +504,10 @@ func TestFreezerReadAndTruncate(t *testing.T) { func TestOffset(t *testing.T) { t.Parallel() - wm, rm := metrics.NewMeter(), metrics.NewMeter() + rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter() fname := fmt.Sprintf("offset-%d", rand.Uint64()) { // Fill table - f, err := newCustomTable(os.TempDir(), fname, rm, wm, 40, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 40, true) if err != nil { t.Fatal(err) } @@ -563,7 +563,7 @@ func TestOffset(t *testing.T) { } // Now open again { - f, err := newCustomTable(os.TempDir(), fname, rm, wm, 40, true) + f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 40, true) if err != nil { t.Fatal(err) } diff --git a/core/tx_list.go b/core/tx_list.go index 57abc5148..75bfdaeda 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -418,9 +418,9 @@ func (l *txPricedList) Put(tx *types.Transaction) { // Removed notifies the prices transaction list that an old transaction dropped // from the pool. The list will just keep a counter of stale objects and update // the heap if a large enough ratio of transactions go stale. -func (l *txPricedList) Removed() { +func (l *txPricedList) Removed(count int) { // Bump the stale counter, but exit if still too low (< 25%) - l.stales++ + l.stales += count if l.stales <= len(*l.items)/4 { return } diff --git a/core/tx_pool.go b/core/tx_pool.go index 411143aea..b16825332 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -85,20 +85,25 @@ var ( var ( // Metrics for the pending pool - pendingDiscardCounter = metrics.NewRegisteredCounter("txpool/pending/discard", nil) - pendingReplaceCounter = metrics.NewRegisteredCounter("txpool/pending/replace", nil) - pendingRateLimitCounter = metrics.NewRegisteredCounter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting - pendingNofundsCounter = metrics.NewRegisteredCounter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds + pendingDiscardMeter = metrics.NewRegisteredMeter("txpool/pending/discard", nil) + pendingReplaceMeter = metrics.NewRegisteredMeter("txpool/pending/replace", nil) + pendingRateLimitMeter = metrics.NewRegisteredMeter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting + pendingNofundsMeter = metrics.NewRegisteredMeter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds // Metrics for the queued pool - queuedDiscardCounter = metrics.NewRegisteredCounter("txpool/queued/discard", nil) - queuedReplaceCounter = metrics.NewRegisteredCounter("txpool/queued/replace", nil) - queuedRateLimitCounter = metrics.NewRegisteredCounter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting - queuedNofundsCounter = metrics.NewRegisteredCounter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds + queuedDiscardMeter = metrics.NewRegisteredMeter("txpool/queued/discard", nil) + queuedReplaceMeter = metrics.NewRegisteredMeter("txpool/queued/replace", nil) + queuedRateLimitMeter = metrics.NewRegisteredMeter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting + queuedNofundsMeter = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds // General tx metrics - invalidTxCounter = metrics.NewRegisteredCounter("txpool/invalid", nil) - underpricedTxCounter = metrics.NewRegisteredCounter("txpool/underpriced", nil) + validMeter = metrics.NewRegisteredMeter("txpool/valid", nil) + invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil) + underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil) + + pendingCounter = metrics.NewRegisteredCounter("txpool/pending", nil) + queuedCounter = metrics.NewRegisteredCounter("txpool/queued", nil) + localCounter = metrics.NewRegisteredCounter("txpool/local", nil) ) // TxStatus is the current status of a transaction as seen by the pool. @@ -661,7 +666,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { // If the transaction fails basic validation, discard it if err := pool.validateTx(tx, local); err != nil { log.Trace("Discarding invalid transaction", "hash", hash, "err", err) - invalidTxCounter.Inc(1) + invalidTxMeter.Mark(1) return false, err } // If the transaction pool is full, discard underpriced transactions @@ -669,14 +674,14 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { // If the new transaction is underpriced, don't accept it if !local && pool.priced.Underpriced(tx, pool.locals) { log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice()) - underpricedTxCounter.Inc(1) + underpricedTxMeter.Mark(1) return false, ErrUnderpriced } // New transaction is better than our worse ones, make room for it drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals) for _, tx := range drop { log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice()) - underpricedTxCounter.Inc(1) + underpricedTxMeter.Mark(1) pool.removeTx(tx.Hash(), false) } } @@ -686,14 +691,14 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { // Nonce already pending, check if required price bump is met inserted, old := list.Add(tx, pool.config.PriceBump) if !inserted { - pendingDiscardCounter.Inc(1) + pendingDiscardMeter.Mark(1) return false, ErrReplaceUnderpriced } // New transaction is better, replace old one if old != nil { pool.all.Remove(old.Hash()) - pool.priced.Removed() - pendingReplaceCounter.Inc(1) + pool.priced.Removed(1) + pendingReplaceMeter.Mark(1) } pool.all.Add(tx) pool.priced.Put(tx) @@ -718,6 +723,9 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { pool.locals.add(from) } } + if local || pool.locals.contains(from) { + localCounter.Inc(1) + } pool.journalTx(from, tx) log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) @@ -736,14 +744,17 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump) if !inserted { // An older transaction was better, discard this - queuedDiscardCounter.Inc(1) + queuedDiscardMeter.Mark(1) return false, ErrReplaceUnderpriced } // Discard any previous transaction and mark this if old != nil { pool.all.Remove(old.Hash()) - pool.priced.Removed() - queuedReplaceCounter.Inc(1) + pool.priced.Removed(1) + queuedReplaceMeter.Mark(1) + } else { + // Nothing was replaced, bump the queued counter + queuedCounter.Inc(1) } if pool.all.Get(hash) == nil { pool.all.Add(tx) @@ -779,17 +790,20 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T if !inserted { // An older transaction was better, discard this pool.all.Remove(hash) - pool.priced.Removed() + pool.priced.Removed(1) - pendingDiscardCounter.Inc(1) + pendingDiscardMeter.Mark(1) return false } // Otherwise discard any previous transaction and mark this if old != nil { pool.all.Remove(old.Hash()) - pool.priced.Removed() + pool.priced.Removed(1) - pendingReplaceCounter.Inc(1) + pendingReplaceMeter.Mark(1) + } else { + // Nothing was replaced, bump the pending counter + pendingCounter.Inc(1) } // Failsafe to work around direct pending inserts (tests) if pool.all.Get(hash) == nil { @@ -844,6 +858,8 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { if err != nil { return err } + validMeter.Mark(1) + // If we added a new transaction, run promotion checks and return if !replace { from, _ := types.Sender(pool.signer, tx) // already validated @@ -878,6 +894,8 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error { dirty[from] = struct{}{} } } + validMeter.Mark(int64(len(dirty))) + // Only reprocess the internal state if something was actually added if len(dirty) > 0 { addrs := make([]common.Address, 0, len(dirty)) @@ -928,7 +946,10 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { // Remove it from the list of known transactions pool.all.Remove(hash) if outofbound { - pool.priced.Removed() + pool.priced.Removed(1) + } + if pool.locals.contains(addr) { + localCounter.Dec(1) } // Remove the transaction from the pending lists and reset the account nonce if pending := pool.pending[addr]; pending != nil { @@ -946,12 +967,17 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce { pool.pendingState.SetNonce(addr, nonce) } + // Reduce the pending counter + pendingCounter.Dec(int64(1 + len(invalids))) return } } // Transaction is in the future queue if future := pool.queue[addr]; future != nil { - future.Remove(tx) + if removed, _ := future.Remove(tx); removed { + // Reduce the queued counter + queuedCounter.Dec(1) + } if future.Empty() { delete(pool.queue, addr) } @@ -979,38 +1005,48 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { continue // Just in case someone calls with a non existing account } // Drop all transactions that are deemed too old (low nonce) - for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) { + forwards := list.Forward(pool.currentState.GetNonce(addr)) + for _, tx := range forwards { hash := tx.Hash() - log.Trace("Removed old queued transaction", "hash", hash) pool.all.Remove(hash) - pool.priced.Removed() + log.Trace("Removed old queued transaction", "hash", hash) } // Drop all transactions that are too costly (low balance or out of gas) drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() - log.Trace("Removed unpayable queued transaction", "hash", hash) pool.all.Remove(hash) - pool.priced.Removed() - queuedNofundsCounter.Inc(1) + log.Trace("Removed unpayable queued transaction", "hash", hash) } + queuedNofundsMeter.Mark(int64(len(drops))) + // Gather all executable transactions and promote them - for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { + readies := list.Ready(pool.pendingState.GetNonce(addr)) + for _, tx := range readies { hash := tx.Hash() if pool.promoteTx(addr, hash, tx) { log.Trace("Promoting queued transaction", "hash", hash) promoted = append(promoted, tx) } } + queuedCounter.Dec(int64(len(readies))) + // Drop all transactions over the allowed limit + var caps types.Transactions if !pool.locals.contains(addr) { - for _, tx := range list.Cap(int(pool.config.AccountQueue)) { + caps = list.Cap(int(pool.config.AccountQueue)) + for _, tx := range caps { hash := tx.Hash() pool.all.Remove(hash) - pool.priced.Removed() - queuedRateLimitCounter.Inc(1) log.Trace("Removed cap-exceeding queued transaction", "hash", hash) } + queuedRateLimitMeter.Mark(int64(len(caps))) + } + // Mark all the items dropped as removed + pool.priced.Removed(len(forwards) + len(drops) + len(caps)) + queuedCounter.Dec(int64(len(forwards) + len(drops) + len(caps))) + if pool.locals.contains(addr) { + localCounter.Dec(int64(len(forwards) + len(drops) + len(caps))) } // Delete the entire queue entry if it became empty. if list.Empty() { @@ -1052,11 +1088,12 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold { for i := 0; i < len(offenders)-1; i++ { list := pool.pending[offenders[i]] - for _, tx := range list.Cap(list.Len() - 1) { + + caps := list.Cap(list.Len() - 1) + for _, tx := range caps { // Drop the transaction from the global pools too hash := tx.Hash() pool.all.Remove(hash) - pool.priced.Removed() // Update the account nonce to the dropped transaction if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce { @@ -1064,6 +1101,11 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { } log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } + pool.priced.Removed(len(caps)) + pendingCounter.Dec(int64(len(caps))) + if pool.locals.contains(offenders[i]) { + localCounter.Dec(int64(len(caps))) + } pending-- } } @@ -1074,11 +1116,12 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots { for _, addr := range offenders { list := pool.pending[addr] - for _, tx := range list.Cap(list.Len() - 1) { + + caps := list.Cap(list.Len() - 1) + for _, tx := range caps { // Drop the transaction from the global pools too hash := tx.Hash() pool.all.Remove(hash) - pool.priced.Removed() // Update the account nonce to the dropped transaction if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce { @@ -1086,11 +1129,16 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { } log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } + pool.priced.Removed(len(caps)) + pendingCounter.Dec(int64(len(caps))) + if pool.locals.contains(addr) { + localCounter.Dec(int64(len(caps))) + } pending-- } } } - pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending)) + pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending)) } // If we've queued more transactions than the hard limit, drop oldest ones queued := uint64(0) @@ -1120,7 +1168,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { pool.removeTx(tx.Hash(), true) } drop -= size - queuedRateLimitCounter.Inc(int64(size)) + queuedRateLimitMeter.Mark(int64(size)) continue } // Otherwise drop only last few transactions @@ -1128,7 +1176,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { for i := len(txs) - 1; i >= 0 && drop > 0; i-- { pool.removeTx(txs[i].Hash(), true) drop-- - queuedRateLimitCounter.Inc(1) + queuedRateLimitMeter.Mark(1) } } } @@ -1143,11 +1191,11 @@ func (pool *TxPool) demoteUnexecutables() { nonce := pool.currentState.GetNonce(addr) // Drop all transactions that are deemed too old (low nonce) - for _, tx := range list.Forward(nonce) { + olds := list.Forward(nonce) + for _, tx := range olds { hash := tx.Hash() - log.Trace("Removed old pending transaction", "hash", hash) pool.all.Remove(hash) - pool.priced.Removed() + log.Trace("Removed old pending transaction", "hash", hash) } // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) @@ -1155,21 +1203,28 @@ func (pool *TxPool) demoteUnexecutables() { hash := tx.Hash() log.Trace("Removed unpayable pending transaction", "hash", hash) pool.all.Remove(hash) - pool.priced.Removed() - pendingNofundsCounter.Inc(1) } + pool.priced.Removed(len(olds) + len(drops)) + pendingNofundsMeter.Mark(int64(len(drops))) + for _, tx := range invalids { hash := tx.Hash() log.Trace("Demoting pending transaction", "hash", hash) pool.enqueueTx(hash, tx) } + pendingCounter.Dec(int64(len(olds) + len(drops) + len(invalids))) + if pool.locals.contains(addr) { + localCounter.Dec(int64(len(olds) + len(drops) + len(invalids))) + } // If there's a gap in front, alert (should never happen) and postpone all transactions if list.Len() > 0 && list.txs.Get(nonce) == nil { - for _, tx := range list.Cap(0) { + gapped := list.Cap(0) + for _, tx := range gapped { hash := tx.Hash() log.Error("Demoting invalidated transaction", "hash", hash) pool.enqueueTx(hash, tx) } + pendingCounter.Inc(int64(len(gapped))) } // Delete the entire queue entry if it became empty. if list.Empty() { diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index 1ccdf4def..b422557d5 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -442,9 +442,7 @@ func (s *stateSync) process(req *stateReq) (int, error) { default: return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) } - if _, ok := req.tasks[hash]; ok { - delete(req.tasks, hash) - } + delete(req.tasks, hash) } // Put unfulfilled tasks back into the retry queue npeers := s.d.peers.Len() diff --git a/ethdb/leveldb/leveldb.go b/ethdb/leveldb/leveldb.go index f74e94d92..3781a6da1 100644 --- a/ethdb/leveldb/leveldb.go +++ b/ethdb/leveldb/leveldb.go @@ -67,6 +67,7 @@ type Database struct { compWriteMeter metrics.Meter // Meter for measuring the data written during compaction writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction + diskSizeGauge metrics.Gauge // Gauge for tracking the size of all the levels in the database diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written @@ -112,6 +113,7 @@ func New(file string, cache int, handles int, namespace string) (*Database, erro ldb.compTimeMeter = metrics.NewRegisteredMeter(namespace+"compact/time", nil) ldb.compReadMeter = metrics.NewRegisteredMeter(namespace+"compact/input", nil) ldb.compWriteMeter = metrics.NewRegisteredMeter(namespace+"compact/output", nil) + ldb.diskSizeGauge = metrics.NewRegisteredGauge(namespace+"disk/size", nil) ldb.diskReadMeter = metrics.NewRegisteredMeter(namespace+"disk/read", nil) ldb.diskWriteMeter = metrics.NewRegisteredMeter(namespace+"disk/write", nil) ldb.writeDelayMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/duration", nil) @@ -233,7 +235,7 @@ func (db *Database) meter(refresh time.Duration) { // Create the counters to store current and previous compaction values compactions := make([][]float64, 2) for i := 0; i < 2; i++ { - compactions[i] = make([]float64, 3) + compactions[i] = make([]float64, 4) } // Create storage for iostats. var iostats [2]float64 @@ -279,7 +281,7 @@ func (db *Database) meter(refresh time.Duration) { if len(parts) != 6 { break } - for idx, counter := range parts[3:] { + for idx, counter := range parts[2:] { value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64) if err != nil { db.log.Error("Compaction entry parsing failed", "err", err) @@ -290,16 +292,18 @@ func (db *Database) meter(refresh time.Duration) { } } // Update all the requested meters + if db.diskSizeGauge != nil { + db.diskSizeGauge.Update(int64(compactions[i%2][0] * 1024 * 1024)) + } if db.compTimeMeter != nil { - db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000)) + db.compTimeMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1000 * 1000 * 1000)) } if db.compReadMeter != nil { - db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024)) + db.compReadMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024)) } if db.compWriteMeter != nil { - db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024)) + db.compWriteMeter.Mark(int64((compactions[i%2][3] - compactions[(i-1)%2][3]) * 1024 * 1024)) } - // Retrieve the write delay statistic writedelay, err := db.db.GetProperty("leveldb.writedelay") if err != nil { diff --git a/metrics/cpu.go b/metrics/cpu.go new file mode 100644 index 000000000..3278d8161 --- /dev/null +++ b/metrics/cpu.go @@ -0,0 +1,36 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package metrics + +import "github.com/elastic/gosigar" + +// CPUStats is the system and process CPU stats. +type CPUStats struct { + GlobalTime int64 // Time spent by the CPU working on all processes + GlobalWait int64 // Time spent by waiting on disk for all processes + LocalTime int64 // Time spent by the CPU working on this process +} + +// ReadCPUStats retrieves the current CPU stats. +func ReadCPUStats(stats *CPUStats) { + global := gosigar.Cpu{} + global.Get() + + stats.GlobalTime = int64(global.User + global.Nice + global.Sys) + stats.GlobalWait = int64(global.Wait) + stats.LocalTime = getProcessCPUTime() +} diff --git a/metrics/cpu_syscall.go b/metrics/cpu_syscall.go new file mode 100644 index 000000000..e245453e8 --- /dev/null +++ b/metrics/cpu_syscall.go @@ -0,0 +1,35 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// +build !windows + +package metrics + +import ( + "syscall" + + "github.com/ethereum/go-ethereum/log" +) + +// getProcessCPUTime retrieves the process' CPU time since program startup. +func getProcessCPUTime() int64 { + var usage syscall.Rusage + if err := syscall.Getrusage(syscall.RUSAGE_SELF, &usage); err != nil { + log.Warn("Failed to retrieve CPU time", "err", err) + return 0 + } + return int64(usage.Utime.Sec+usage.Stime.Sec)*100 + int64(usage.Utime.Usec+usage.Stime.Usec)/10000 //nolint:unconvert +} diff --git a/metrics/cpu_windows.go b/metrics/cpu_windows.go new file mode 100644 index 000000000..fb29a52a8 --- /dev/null +++ b/metrics/cpu_windows.go @@ -0,0 +1,23 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package metrics + +// getProcessCPUTime returns 0 on Windows as there is no system call to resolve +// the actual process' CPU time. +func getProcessCPUTime() int64 { + return 0 +} diff --git a/metrics/metrics.go b/metrics/metrics.go index 8ae7aec43..98e8ced25 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -61,18 +61,27 @@ func CollectProcessMetrics(refresh time.Duration) { if !Enabled { return } + refreshFreq := int64(refresh / time.Second) + // Create the various data collectors + cpuStats := make([]*CPUStats, 2) memstats := make([]*runtime.MemStats, 2) diskstats := make([]*DiskStats, 2) for i := 0; i < len(memstats); i++ { + cpuStats[i] = new(CPUStats) memstats[i] = new(runtime.MemStats) diskstats[i] = new(DiskStats) } // Define the various metrics to collect + cpuSysLoad := GetOrRegisterGauge("system/cpu/sysload", DefaultRegistry) + cpuSysWait := GetOrRegisterGauge("system/cpu/syswait", DefaultRegistry) + cpuProcLoad := GetOrRegisterGauge("system/cpu/procload", DefaultRegistry) + + memPauses := GetOrRegisterMeter("system/memory/pauses", DefaultRegistry) memAllocs := GetOrRegisterMeter("system/memory/allocs", DefaultRegistry) memFrees := GetOrRegisterMeter("system/memory/frees", DefaultRegistry) - memInuse := GetOrRegisterMeter("system/memory/inuse", DefaultRegistry) - memPauses := GetOrRegisterMeter("system/memory/pauses", DefaultRegistry) + memHeld := GetOrRegisterGauge("system/memory/held", DefaultRegistry) + memUsed := GetOrRegisterGauge("system/memory/used", DefaultRegistry) var diskReads, diskReadBytes, diskWrites, diskWriteBytes Meter var diskReadBytesCounter, diskWriteBytesCounter Counter @@ -91,11 +100,17 @@ func CollectProcessMetrics(refresh time.Duration) { location1 := i % 2 location2 := (i - 1) % 2 + ReadCPUStats(cpuStats[location1]) + cpuSysLoad.Update((cpuStats[location1].GlobalTime - cpuStats[location2].GlobalTime) / refreshFreq) + cpuSysWait.Update((cpuStats[location1].GlobalWait - cpuStats[location2].GlobalWait) / refreshFreq) + cpuProcLoad.Update((cpuStats[location1].LocalTime - cpuStats[location2].LocalTime) / refreshFreq) + runtime.ReadMemStats(memstats[location1]) + memPauses.Mark(int64(memstats[location1].PauseTotalNs - memstats[location2].PauseTotalNs)) memAllocs.Mark(int64(memstats[location1].Mallocs - memstats[location2].Mallocs)) memFrees.Mark(int64(memstats[location1].Frees - memstats[location2].Frees)) - memInuse.Mark(int64(memstats[location1].Alloc - memstats[location2].Alloc)) - memPauses.Mark(int64(memstats[location1].PauseTotalNs - memstats[location2].PauseTotalNs)) + memHeld.Update(int64(memstats[location1].HeapSys - memstats[location1].HeapReleased)) + memUsed.Update(int64(memstats[location1].Alloc)) if ReadDiskStats(diskstats[location1]) == nil { diskReads.Mark(diskstats[location1].ReadCount - diskstats[location2].ReadCount) diff --git a/p2p/metrics.go b/p2p/metrics.go index 2b1ad6df4..c04e5ab4c 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -25,18 +25,17 @@ import ( "sync/atomic" "time" - "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/p2p/enode" ) const ( - MetricsInboundConnects = "p2p/InboundConnects" // Name for the registered inbound connects meter - MetricsInboundTraffic = "p2p/InboundTraffic" // Name for the registered inbound traffic meter - MetricsOutboundConnects = "p2p/OutboundConnects" // Name for the registered outbound connects meter - MetricsOutboundTraffic = "p2p/OutboundTraffic" // Name for the registered outbound traffic meter + MetricsInboundTraffic = "p2p/ingress" // Name for the registered inbound traffic meter + MetricsOutboundTraffic = "p2p/egress" // Name for the registered outbound traffic meter + MetricsOutboundConnects = "p2p/dials" // Name for the registered outbound connects meter + MetricsInboundConnects = "p2p/serves" // Name for the registered inbound connects meter MeteredPeerLimit = 1024 // This amount of peers are individually metered ) @@ -46,6 +45,7 @@ var ( ingressTrafficMeter = metrics.NewRegisteredMeter(MetricsInboundTraffic, nil) // Meter metering the cumulative ingress traffic egressConnectMeter = metrics.NewRegisteredMeter(MetricsOutboundConnects, nil) // Meter counting the egress connections egressTrafficMeter = metrics.NewRegisteredMeter(MetricsOutboundTraffic, nil) // Meter metering the cumulative egress traffic + activePeerCounter = metrics.NewRegisteredCounter("p2p/peers", nil) // Gauge tracking the current peer count PeerIngressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsInboundTraffic+"/") // Registry containing the peer ingress PeerEgressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsOutboundTraffic+"/") // Registry containing the peer egress @@ -124,6 +124,8 @@ func newMeteredConn(conn net.Conn, ingress bool, ip net.IP) net.Conn { } else { egressConnectMeter.Mark(1) } + activePeerCounter.Inc(1) + return &meteredConn{ Conn: conn, ip: ip, @@ -198,6 +200,7 @@ func (c *meteredConn) Close() error { IP: c.ip, Elapsed: time.Since(c.connected), }) + activePeerCounter.Dec(1) return err } id := c.id @@ -209,6 +212,7 @@ func (c *meteredConn) Close() error { IP: c.ip, ID: id, }) + activePeerCounter.Dec(1) return err } ingress, egress := uint64(c.ingressMeter.Count()), uint64(c.egressMeter.Count()) @@ -229,5 +233,6 @@ func (c *meteredConn) Close() error { Ingress: ingress, Egress: egress, }) + activePeerCounter.Dec(1) return err } diff --git a/p2p/server.go b/p2p/server.go index f17ef2c2b..b3494cc88 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -685,9 +685,8 @@ running: // This channel is used by RemoveTrustedPeer to remove an enode // from the trusted node set. srv.log.Trace("Removing trusted node", "node", n) - if _, ok := trusted[n.ID()]; ok { - delete(trusted, n.ID()) - } + delete(trusted, n.ID()) + // Unmark any already-connected peer as trusted if p, ok := peers[n.ID()]; ok { p.rw.set(trustedConn, false) -- cgit v1.2.3