aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/blockchain.go20
-rw-r--r--core/headerchain.go5
-rw-r--r--core/rawdb/freezer.go7
-rw-r--r--core/rawdb/freezer_table.go44
-rw-r--r--core/rawdb/freezer_table_test.go68
-rw-r--r--core/tx_list.go4
-rw-r--r--core/tx_pool.go151
7 files changed, 204 insertions, 95 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index 2355f0ea3..a1f3b68c5 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -46,6 +46,10 @@ import (
)
var (
+ headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil)
+ headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil)
+ headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil)
+
accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil)
accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil)
accountUpdateTimer = metrics.NewRegisteredTimer("chain/account/updates", nil)
@@ -332,6 +336,7 @@ func (bc *BlockChain) loadLastState() error {
}
// Everything seems to be fine, set as the head block
bc.currentBlock.Store(currentBlock)
+ headBlockGauge.Update(int64(currentBlock.NumberU64()))
// Restore the last known head header
currentHeader := currentBlock.Header()
@@ -344,12 +349,14 @@ func (bc *BlockChain) loadLastState() error {
// Restore the last known head fast block
bc.currentFastBlock.Store(currentBlock)
+ headFastBlockGauge.Update(int64(currentBlock.NumberU64()))
+
if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) {
if block := bc.GetBlockByHash(head); block != nil {
bc.currentFastBlock.Store(block)
+ headFastBlockGauge.Update(int64(block.NumberU64()))
}
}
-
// Issue a status log for the user
currentFastBlock := bc.CurrentFastBlock()
@@ -388,6 +395,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
}
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
bc.currentBlock.Store(newHeadBlock)
+ headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
}
// Rewind the fast block in a simpleton way to the target head
@@ -399,6 +407,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
}
rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash())
bc.currentFastBlock.Store(newHeadFastBlock)
+ headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
}
}
@@ -450,6 +459,7 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
// If all checks out, manually set the head block
bc.chainmu.Lock()
bc.currentBlock.Store(block)
+ headBlockGauge.Update(int64(block.NumberU64()))
bc.chainmu.Unlock()
log.Info("Committed new head block", "number", block.Number(), "hash", hash)
@@ -522,9 +532,12 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
bc.genesisBlock = genesis
bc.insert(bc.genesisBlock)
bc.currentBlock.Store(bc.genesisBlock)
+ headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
+
bc.hc.SetGenesis(bc.genesisBlock.Header())
bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
bc.currentFastBlock.Store(bc.genesisBlock)
+ headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
return nil
}
@@ -598,6 +611,7 @@ func (bc *BlockChain) insert(block *types.Block) {
rawdb.WriteHeadBlockHash(bc.db, block.Hash())
bc.currentBlock.Store(block)
+ headBlockGauge.Update(int64(block.NumberU64()))
// If the block is better than our head or is on a different chain, force update heads
if updateHeads {
@@ -605,6 +619,7 @@ func (bc *BlockChain) insert(block *types.Block) {
rawdb.WriteHeadFastBlockHash(bc.db, block.Hash())
bc.currentFastBlock.Store(block)
+ headFastBlockGauge.Update(int64(block.NumberU64()))
}
}
@@ -862,11 +877,13 @@ func (bc *BlockChain) Rollback(chain []common.Hash) {
newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash())
bc.currentFastBlock.Store(newFastBlock)
+ headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
}
if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash())
bc.currentBlock.Store(newBlock)
+ headBlockGauge.Update(int64(newBlock.NumberU64()))
}
}
// Truncate ancient data which exceeds the current header.
@@ -952,6 +969,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 {
rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
bc.currentFastBlock.Store(head)
+ headFastBlockGauge.Update(int64(head.NumberU64()))
isCanonical = true
}
}
diff --git a/core/headerchain.go b/core/headerchain.go
index cdd64bb50..034858f65 100644
--- a/core/headerchain.go
+++ b/core/headerchain.go
@@ -104,6 +104,7 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c
}
}
hc.currentHeaderHash = hc.CurrentHeader().Hash()
+ headHeaderGauge.Update(hc.CurrentHeader().Number.Int64())
return hc, nil
}
@@ -185,12 +186,12 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er
hc.currentHeaderHash = hash
hc.currentHeader.Store(types.CopyHeader(header))
+ headHeaderGauge.Update(header.Number.Int64())
status = CanonStatTy
} else {
status = SideStatTy
}
-
hc.headerCache.Add(hash, header)
hc.numberCache.Add(hash, number)
@@ -456,6 +457,7 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) {
hc.currentHeader.Store(head)
hc.currentHeaderHash = head.Hash()
+ headHeaderGauge.Update(head.Number.Int64())
}
type (
@@ -508,6 +510,7 @@ func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, d
hc.currentHeader.Store(parent)
hc.currentHeaderHash = parentHash
+ headHeaderGauge.Update(parent.Number.Int64())
}
batch.Write()
diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go
index 741ff9adb..3f377447c 100644
--- a/core/rawdb/freezer.go
+++ b/core/rawdb/freezer.go
@@ -80,8 +80,9 @@ type freezer struct {
func newFreezer(datadir string, namespace string) (*freezer, error) {
// Create the initial freezer object
var (
- readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
- writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil)
+ readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
+ writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil)
+ sizeCounter = metrics.NewRegisteredCounter(namespace+"ancient/size", nil)
)
// Ensure the datadir is not a symbolic link if it exists.
if info, err := os.Lstat(datadir); !os.IsNotExist(err) {
@@ -102,7 +103,7 @@ func newFreezer(datadir string, namespace string) (*freezer, error) {
instanceLock: lock,
}
for name, disableSnappy := range freezerNoSnappy {
- table, err := newTable(datadir, name, readMeter, writeMeter, disableSnappy)
+ table, err := newTable(datadir, name, readMeter, writeMeter, sizeCounter, disableSnappy)
if err != nil {
for _, table := range freezer.tables {
table.Close()
diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go
index 1e5c7cd0b..2fe354a06 100644
--- a/core/rawdb/freezer_table.go
+++ b/core/rawdb/freezer_table.go
@@ -94,17 +94,18 @@ type freezerTable struct {
// to count how many historic items have gone missing.
itemOffset uint32 // Offset (number of discarded items)
- headBytes uint32 // Number of bytes written to the head file
- readMeter metrics.Meter // Meter for measuring the effective amount of data read
- writeMeter metrics.Meter // Meter for measuring the effective amount of data written
+ headBytes uint32 // Number of bytes written to the head file
+ readMeter metrics.Meter // Meter for measuring the effective amount of data read
+ writeMeter metrics.Meter // Meter for measuring the effective amount of data written
+ sizeCounter metrics.Counter // Counter for tracking the combined size of all freezer tables
logger log.Logger // Logger with database path and table name ambedded
lock sync.RWMutex // Mutex protecting the data file descriptors
}
// newTable opens a freezer table with default settings - 2G files
-func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, disableSnappy bool) (*freezerTable, error) {
- return newCustomTable(path, name, readMeter, writeMeter, 2*1000*1000*1000, disableSnappy)
+func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeCounter metrics.Counter, disableSnappy bool) (*freezerTable, error) {
+ return newCustomTable(path, name, readMeter, writeMeter, sizeCounter, 2*1000*1000*1000, disableSnappy)
}
// openFreezerFileForAppend opens a freezer table file and seeks to the end
@@ -148,7 +149,7 @@ func truncateFreezerFile(file *os.File, size int64) error {
// newCustomTable opens a freezer table, creating the data and index files if they are
// non existent. Both files are truncated to the shortest common length to ensure
// they don't go out of sync.
-func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, maxFilesize uint32, noCompression bool) (*freezerTable, error) {
+func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeCounter metrics.Counter, maxFilesize uint32, noCompression bool) (*freezerTable, error) {
// Ensure the containing directory exists and open the indexEntry file
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
@@ -171,6 +172,7 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete
files: make(map[uint32]*os.File),
readMeter: readMeter,
writeMeter: writeMeter,
+ sizeCounter: sizeCounter,
name: name,
path: path,
logger: log.New("database", path, "table", name),
@@ -181,6 +183,14 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete
tab.Close()
return nil, err
}
+ // Initialize the starting size counter
+ size, err := tab.sizeNolock()
+ if err != nil {
+ tab.Close()
+ return nil, err
+ }
+ tab.sizeCounter.Inc(int64(size))
+
return tab, nil
}
@@ -321,6 +331,11 @@ func (t *freezerTable) truncate(items uint64) error {
if atomic.LoadUint64(&t.items) <= items {
return nil
}
+ // We need to truncate, save the old size for metrics tracking
+ oldSize, err := t.sizeNolock()
+ if err != nil {
+ return err
+ }
// Something's out of sync, truncate the table's offset index
t.logger.Warn("Truncating freezer table", "items", t.items, "limit", items)
if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil {
@@ -355,6 +370,14 @@ func (t *freezerTable) truncate(items uint64) error {
// All data files truncated, set internal counters and return
atomic.StoreUint64(&t.items, items)
atomic.StoreUint32(&t.headBytes, expected.offset)
+
+ // Retrieve the new size and update the total size counter
+ newSize, err := t.sizeNolock()
+ if err != nil {
+ return err
+ }
+ t.sizeCounter.Dec(int64(oldSize - newSize))
+
return nil
}
@@ -483,7 +506,10 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
}
// Write indexEntry
t.index.Write(idx.marshallBinary())
+
t.writeMeter.Mark(int64(bLen + indexEntrySize))
+ t.sizeCounter.Inc(int64(bLen + indexEntrySize))
+
atomic.AddUint64(&t.items, 1)
return nil
}
@@ -562,6 +588,12 @@ func (t *freezerTable) size() (uint64, error) {
t.lock.RLock()
defer t.lock.RUnlock()
+ return t.sizeNolock()
+}
+
+// sizeNolock returns the total data size in the freezer table without obtaining
+// the mutex first.
+func (t *freezerTable) sizeNolock() (uint64, error) {
stat, err := t.index.Stat()
if err != nil {
return 0, err
diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go
index e63fb63a3..116e26a7f 100644
--- a/core/rawdb/freezer_table_test.go
+++ b/core/rawdb/freezer_table_test.go
@@ -56,7 +56,7 @@ func TestFreezerBasics(t *testing.T) {
// set cutoff at 50 bytes
f, err := newCustomTable(os.TempDir(),
fmt.Sprintf("unittest-%d", rand.Uint64()),
- metrics.NewMeter(), metrics.NewMeter(), 50, true)
+ metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter(), 50, true)
if err != nil {
t.Fatal(err)
}
@@ -98,12 +98,12 @@ func TestFreezerBasicsClosing(t *testing.T) {
t.Parallel()
// set cutoff at 50 bytes
var (
- fname = fmt.Sprintf("basics-close-%d", rand.Uint64())
- m1, m2 = metrics.NewMeter(), metrics.NewMeter()
- f *freezerTable
- err error
+ fname = fmt.Sprintf("basics-close-%d", rand.Uint64())
+ rm, wm, sc = metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
+ f *freezerTable
+ err error
)
- f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true)
+ f, err = newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -112,7 +112,7 @@ func TestFreezerBasicsClosing(t *testing.T) {
data := getChunk(15, x)
f.Append(uint64(x), data)
f.Close()
- f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true)
+ f, err = newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
}
defer f.Close()
@@ -126,7 +126,7 @@ func TestFreezerBasicsClosing(t *testing.T) {
t.Fatalf("test %d, got \n%x != \n%x", y, got, exp)
}
f.Close()
- f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true)
+ f, err = newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -136,11 +136,11 @@ func TestFreezerBasicsClosing(t *testing.T) {
// TestFreezerRepairDanglingHead tests that we can recover if index entries are removed
func TestFreezerRepairDanglingHead(t *testing.T) {
t.Parallel()
- wm, rm := metrics.NewMeter(), metrics.NewMeter()
+ rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64())
{ // Fill table
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -169,7 +169,7 @@ func TestFreezerRepairDanglingHead(t *testing.T) {
idxFile.Close()
// Now open it again
{
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
// The last item should be missing
if _, err = f.Retrieve(0xff); err == nil {
t.Errorf("Expected error for missing index entry")
@@ -184,11 +184,11 @@ func TestFreezerRepairDanglingHead(t *testing.T) {
// TestFreezerRepairDanglingHeadLarge tests that we can recover if very many index entries are removed
func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
t.Parallel()
- wm, rm := metrics.NewMeter(), metrics.NewMeter()
+ rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64())
{ // Fill a table and close it
- f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -216,7 +216,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
idxFile.Close()
// Now open it again
{
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
// The first item should be there
if _, err = f.Retrieve(0); err != nil {
t.Fatal(err)
@@ -234,7 +234,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
}
// And if we open it, we should now be able to read all of them (new values)
{
- f, _ := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
+ f, _ := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
for y := 1; y < 255; y++ {
exp := getChunk(15, ^y)
got, err := f.Retrieve(uint64(y))
@@ -251,11 +251,11 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
// TestSnappyDetection tests that we fail to open a snappy database and vice versa
func TestSnappyDetection(t *testing.T) {
t.Parallel()
- wm, rm := metrics.NewMeter(), metrics.NewMeter()
+ rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("snappytest-%d", rand.Uint64())
// Open with snappy
{
- f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -268,7 +268,7 @@ func TestSnappyDetection(t *testing.T) {
}
// Open without snappy
{
- f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, false)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, false)
if _, err = f.Retrieve(0); err == nil {
f.Close()
t.Fatalf("expected empty table")
@@ -277,7 +277,7 @@ func TestSnappyDetection(t *testing.T) {
// Open with snappy
{
- f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
// There should be 255 items
if _, err = f.Retrieve(0xfe); err != nil {
f.Close()
@@ -302,11 +302,11 @@ func assertFileSize(f string, size int64) error {
// the index is repaired
func TestFreezerRepairDanglingIndex(t *testing.T) {
t.Parallel()
- wm, rm := metrics.NewMeter(), metrics.NewMeter()
+ rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("dangling_indextest-%d", rand.Uint64())
{ // Fill a table and close it
- f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -342,7 +342,7 @@ func TestFreezerRepairDanglingIndex(t *testing.T) {
// 45, 45, 15
// with 3+3+1 items
{
- f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -359,11 +359,11 @@ func TestFreezerRepairDanglingIndex(t *testing.T) {
func TestFreezerTruncate(t *testing.T) {
t.Parallel()
- wm, rm := metrics.NewMeter(), metrics.NewMeter()
+ rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("truncation-%d", rand.Uint64())
{ // Fill table
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -380,7 +380,7 @@ func TestFreezerTruncate(t *testing.T) {
}
// Reopen, truncate
{
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -402,10 +402,10 @@ func TestFreezerTruncate(t *testing.T) {
// That will rewind the index, and _should_ truncate the head file
func TestFreezerRepairFirstFile(t *testing.T) {
t.Parallel()
- wm, rm := metrics.NewMeter(), metrics.NewMeter()
+ rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("truncationfirst-%d", rand.Uint64())
{ // Fill table
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -433,7 +433,7 @@ func TestFreezerRepairFirstFile(t *testing.T) {
}
// Reopen
{
- f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -458,10 +458,10 @@ func TestFreezerRepairFirstFile(t *testing.T) {
// - check that we did not keep the rdonly file descriptors
func TestFreezerReadAndTruncate(t *testing.T) {
t.Parallel()
- wm, rm := metrics.NewMeter(), metrics.NewMeter()
+ rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("read_truncate-%d", rand.Uint64())
{ // Fill table
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -478,7 +478,7 @@ func TestFreezerReadAndTruncate(t *testing.T) {
}
// Reopen and read all files
{
- f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
@@ -504,10 +504,10 @@ func TestFreezerReadAndTruncate(t *testing.T) {
func TestOffset(t *testing.T) {
t.Parallel()
- wm, rm := metrics.NewMeter(), metrics.NewMeter()
+ rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("offset-%d", rand.Uint64())
{ // Fill table
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, 40, true)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 40, true)
if err != nil {
t.Fatal(err)
}
@@ -563,7 +563,7 @@ func TestOffset(t *testing.T) {
}
// Now open again
{
- f, err := newCustomTable(os.TempDir(), fname, rm, wm, 40, true)
+ f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 40, true)
if err != nil {
t.Fatal(err)
}
diff --git a/core/tx_list.go b/core/tx_list.go
index 57abc5148..75bfdaeda 100644
--- a/core/tx_list.go
+++ b/core/tx_list.go
@@ -418,9 +418,9 @@ func (l *txPricedList) Put(tx *types.Transaction) {
// Removed notifies the prices transaction list that an old transaction dropped
// from the pool. The list will just keep a counter of stale objects and update
// the heap if a large enough ratio of transactions go stale.
-func (l *txPricedList) Removed() {
+func (l *txPricedList) Removed(count int) {
// Bump the stale counter, but exit if still too low (< 25%)
- l.stales++
+ l.stales += count
if l.stales <= len(*l.items)/4 {
return
}
diff --git a/core/tx_pool.go b/core/tx_pool.go
index 411143aea..b16825332 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -85,20 +85,25 @@ var (
var (
// Metrics for the pending pool
- pendingDiscardCounter = metrics.NewRegisteredCounter("txpool/pending/discard", nil)
- pendingReplaceCounter = metrics.NewRegisteredCounter("txpool/pending/replace", nil)
- pendingRateLimitCounter = metrics.NewRegisteredCounter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting
- pendingNofundsCounter = metrics.NewRegisteredCounter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds
+ pendingDiscardMeter = metrics.NewRegisteredMeter("txpool/pending/discard", nil)
+ pendingReplaceMeter = metrics.NewRegisteredMeter("txpool/pending/replace", nil)
+ pendingRateLimitMeter = metrics.NewRegisteredMeter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting
+ pendingNofundsMeter = metrics.NewRegisteredMeter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds
// Metrics for the queued pool
- queuedDiscardCounter = metrics.NewRegisteredCounter("txpool/queued/discard", nil)
- queuedReplaceCounter = metrics.NewRegisteredCounter("txpool/queued/replace", nil)
- queuedRateLimitCounter = metrics.NewRegisteredCounter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting
- queuedNofundsCounter = metrics.NewRegisteredCounter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds
+ queuedDiscardMeter = metrics.NewRegisteredMeter("txpool/queued/discard", nil)
+ queuedReplaceMeter = metrics.NewRegisteredMeter("txpool/queued/replace", nil)
+ queuedRateLimitMeter = metrics.NewRegisteredMeter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting
+ queuedNofundsMeter = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds
// General tx metrics
- invalidTxCounter = metrics.NewRegisteredCounter("txpool/invalid", nil)
- underpricedTxCounter = metrics.NewRegisteredCounter("txpool/underpriced", nil)
+ validMeter = metrics.NewRegisteredMeter("txpool/valid", nil)
+ invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil)
+ underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil)
+
+ pendingCounter = metrics.NewRegisteredCounter("txpool/pending", nil)
+ queuedCounter = metrics.NewRegisteredCounter("txpool/queued", nil)
+ localCounter = metrics.NewRegisteredCounter("txpool/local", nil)
)
// TxStatus is the current status of a transaction as seen by the pool.
@@ -661,7 +666,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, local); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
- invalidTxCounter.Inc(1)
+ invalidTxMeter.Mark(1)
return false, err
}
// If the transaction pool is full, discard underpriced transactions
@@ -669,14 +674,14 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
// If the new transaction is underpriced, don't accept it
if !local && pool.priced.Underpriced(tx, pool.locals) {
log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
- underpricedTxCounter.Inc(1)
+ underpricedTxMeter.Mark(1)
return false, ErrUnderpriced
}
// New transaction is better than our worse ones, make room for it
drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
- underpricedTxCounter.Inc(1)
+ underpricedTxMeter.Mark(1)
pool.removeTx(tx.Hash(), false)
}
}
@@ -686,14 +691,14 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
- pendingDiscardCounter.Inc(1)
+ pendingDiscardMeter.Mark(1)
return false, ErrReplaceUnderpriced
}
// New transaction is better, replace old one
if old != nil {
pool.all.Remove(old.Hash())
- pool.priced.Removed()
- pendingReplaceCounter.Inc(1)
+ pool.priced.Removed(1)
+ pendingReplaceMeter.Mark(1)
}
pool.all.Add(tx)
pool.priced.Put(tx)
@@ -718,6 +723,9 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool.locals.add(from)
}
}
+ if local || pool.locals.contains(from) {
+ localCounter.Inc(1)
+ }
pool.journalTx(from, tx)
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
@@ -736,14 +744,17 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
if !inserted {
// An older transaction was better, discard this
- queuedDiscardCounter.Inc(1)
+ queuedDiscardMeter.Mark(1)
return false, ErrReplaceUnderpriced
}
// Discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
- pool.priced.Removed()
- queuedReplaceCounter.Inc(1)
+ pool.priced.Removed(1)
+ queuedReplaceMeter.Mark(1)
+ } else {
+ // Nothing was replaced, bump the queued counter
+ queuedCounter.Inc(1)
}
if pool.all.Get(hash) == nil {
pool.all.Add(tx)
@@ -779,17 +790,20 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
if !inserted {
// An older transaction was better, discard this
pool.all.Remove(hash)
- pool.priced.Removed()
+ pool.priced.Removed(1)
- pendingDiscardCounter.Inc(1)
+ pendingDiscardMeter.Mark(1)
return false
}
// Otherwise discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
- pool.priced.Removed()
+ pool.priced.Removed(1)
- pendingReplaceCounter.Inc(1)
+ pendingReplaceMeter.Mark(1)
+ } else {
+ // Nothing was replaced, bump the pending counter
+ pendingCounter.Inc(1)
}
// Failsafe to work around direct pending inserts (tests)
if pool.all.Get(hash) == nil {
@@ -844,6 +858,8 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
if err != nil {
return err
}
+ validMeter.Mark(1)
+
// If we added a new transaction, run promotion checks and return
if !replace {
from, _ := types.Sender(pool.signer, tx) // already validated
@@ -878,6 +894,8 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error {
dirty[from] = struct{}{}
}
}
+ validMeter.Mark(int64(len(dirty)))
+
// Only reprocess the internal state if something was actually added
if len(dirty) > 0 {
addrs := make([]common.Address, 0, len(dirty))
@@ -928,7 +946,10 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
// Remove it from the list of known transactions
pool.all.Remove(hash)
if outofbound {
- pool.priced.Removed()
+ pool.priced.Removed(1)
+ }
+ if pool.locals.contains(addr) {
+ localCounter.Dec(1)
}
// Remove the transaction from the pending lists and reset the account nonce
if pending := pool.pending[addr]; pending != nil {
@@ -946,12 +967,17 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
pool.pendingState.SetNonce(addr, nonce)
}
+ // Reduce the pending counter
+ pendingCounter.Dec(int64(1 + len(invalids)))
return
}
}
// Transaction is in the future queue
if future := pool.queue[addr]; future != nil {
- future.Remove(tx)
+ if removed, _ := future.Remove(tx); removed {
+ // Reduce the queued counter
+ queuedCounter.Dec(1)
+ }
if future.Empty() {
delete(pool.queue, addr)
}
@@ -979,38 +1005,48 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
continue // Just in case someone calls with a non existing account
}
// Drop all transactions that are deemed too old (low nonce)
- for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
+ forwards := list.Forward(pool.currentState.GetNonce(addr))
+ for _, tx := range forwards {
hash := tx.Hash()
- log.Trace("Removed old queued transaction", "hash", hash)
pool.all.Remove(hash)
- pool.priced.Removed()
+ log.Trace("Removed old queued transaction", "hash", hash)
}
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
- log.Trace("Removed unpayable queued transaction", "hash", hash)
pool.all.Remove(hash)
- pool.priced.Removed()
- queuedNofundsCounter.Inc(1)
+ log.Trace("Removed unpayable queued transaction", "hash", hash)
}
+ queuedNofundsMeter.Mark(int64(len(drops)))
+
// Gather all executable transactions and promote them
- for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
+ readies := list.Ready(pool.pendingState.GetNonce(addr))
+ for _, tx := range readies {
hash := tx.Hash()
if pool.promoteTx(addr, hash, tx) {
log.Trace("Promoting queued transaction", "hash", hash)
promoted = append(promoted, tx)
}
}
+ queuedCounter.Dec(int64(len(readies)))
+
// Drop all transactions over the allowed limit
+ var caps types.Transactions
if !pool.locals.contains(addr) {
- for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
+ caps = list.Cap(int(pool.config.AccountQueue))
+ for _, tx := range caps {
hash := tx.Hash()
pool.all.Remove(hash)
- pool.priced.Removed()
- queuedRateLimitCounter.Inc(1)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
+ queuedRateLimitMeter.Mark(int64(len(caps)))
+ }
+ // Mark all the items dropped as removed
+ pool.priced.Removed(len(forwards) + len(drops) + len(caps))
+ queuedCounter.Dec(int64(len(forwards) + len(drops) + len(caps)))
+ if pool.locals.contains(addr) {
+ localCounter.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
@@ -1052,11 +1088,12 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
for i := 0; i < len(offenders)-1; i++ {
list := pool.pending[offenders[i]]
- for _, tx := range list.Cap(list.Len() - 1) {
+
+ caps := list.Cap(list.Len() - 1)
+ for _, tx := range caps {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.all.Remove(hash)
- pool.priced.Removed()
// Update the account nonce to the dropped transaction
if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce {
@@ -1064,6 +1101,11 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
+ pool.priced.Removed(len(caps))
+ pendingCounter.Dec(int64(len(caps)))
+ if pool.locals.contains(offenders[i]) {
+ localCounter.Dec(int64(len(caps)))
+ }
pending--
}
}
@@ -1074,11 +1116,12 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
for _, addr := range offenders {
list := pool.pending[addr]
- for _, tx := range list.Cap(list.Len() - 1) {
+
+ caps := list.Cap(list.Len() - 1)
+ for _, tx := range caps {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.all.Remove(hash)
- pool.priced.Removed()
// Update the account nonce to the dropped transaction
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
@@ -1086,11 +1129,16 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
+ pool.priced.Removed(len(caps))
+ pendingCounter.Dec(int64(len(caps)))
+ if pool.locals.contains(addr) {
+ localCounter.Dec(int64(len(caps)))
+ }
pending--
}
}
}
- pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending))
+ pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending))
}
// If we've queued more transactions than the hard limit, drop oldest ones
queued := uint64(0)
@@ -1120,7 +1168,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
pool.removeTx(tx.Hash(), true)
}
drop -= size
- queuedRateLimitCounter.Inc(int64(size))
+ queuedRateLimitMeter.Mark(int64(size))
continue
}
// Otherwise drop only last few transactions
@@ -1128,7 +1176,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash(), true)
drop--
- queuedRateLimitCounter.Inc(1)
+ queuedRateLimitMeter.Mark(1)
}
}
}
@@ -1143,11 +1191,11 @@ func (pool *TxPool) demoteUnexecutables() {
nonce := pool.currentState.GetNonce(addr)
// Drop all transactions that are deemed too old (low nonce)
- for _, tx := range list.Forward(nonce) {
+ olds := list.Forward(nonce)
+ for _, tx := range olds {
hash := tx.Hash()
- log.Trace("Removed old pending transaction", "hash", hash)
pool.all.Remove(hash)
- pool.priced.Removed()
+ log.Trace("Removed old pending transaction", "hash", hash)
}
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
@@ -1155,21 +1203,28 @@ func (pool *TxPool) demoteUnexecutables() {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash)
pool.all.Remove(hash)
- pool.priced.Removed()
- pendingNofundsCounter.Inc(1)
}
+ pool.priced.Removed(len(olds) + len(drops))
+ pendingNofundsMeter.Mark(int64(len(drops)))
+
for _, tx := range invalids {
hash := tx.Hash()
log.Trace("Demoting pending transaction", "hash", hash)
pool.enqueueTx(hash, tx)
}
+ pendingCounter.Dec(int64(len(olds) + len(drops) + len(invalids)))
+ if pool.locals.contains(addr) {
+ localCounter.Dec(int64(len(olds) + len(drops) + len(invalids)))
+ }
// If there's a gap in front, alert (should never happen) and postpone all transactions
if list.Len() > 0 && list.txs.Get(nonce) == nil {
- for _, tx := range list.Cap(0) {
+ gapped := list.Cap(0)
+ for _, tx := range gapped {
hash := tx.Hash()
log.Error("Demoting invalidated transaction", "hash", hash)
pool.enqueueTx(hash, tx)
}
+ pendingCounter.Inc(int64(len(gapped)))
}
// Delete the entire queue entry if it became empty.
if list.Empty() {