aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xcommon/prque/prque.go25
-rwxr-xr-xcommon/prque/sstack.go18
-rw-r--r--core/blockchain.go43
-rw-r--r--core/rawdb/accessors_indexes.go3
-rw-r--r--core/rawdb/freezer_reinit.go127
5 files changed, 171 insertions, 45 deletions
diff --git a/common/prque/prque.go b/common/prque/prque.go
index 9fd31a2e5..3cc5a1ada 100755
--- a/common/prque/prque.go
+++ b/common/prque/prque.go
@@ -1,5 +1,20 @@
+// CookieJar - A contestant's algorithm toolbox
+// Copyright (c) 2013 Peter Szilagyi. All rights reserved.
+//
+// CookieJar is dual licensed: use of this source code is governed by a BSD
+// license that can be found in the LICENSE file. Alternatively, the CookieJar
+// toolbox may be used in accordance with the terms and conditions contained
+// in a signed written agreement between you and the author(s).
+
// This is a duplicated and slightly modified version of "gopkg.in/karalabe/cookiejar.v2/collections/prque".
+// Package prque implements a priority queue data structure supporting arbitrary
+// value types and int64 priorities.
+//
+// If you would like to use a min-priority queue, simply negate the priorities.
+//
+// Internally the queue is based on the standard heap package working on a
+// sortable version of the block based stack.
package prque
import (
@@ -11,8 +26,8 @@ type Prque struct {
cont *sstack
}
-// Creates a new priority queue.
-func New(setIndex setIndexCallback) *Prque {
+// New creates a new priority queue.
+func New(setIndex SetIndexCallback) *Prque {
return &Prque{newSstack(setIndex)}
}
@@ -21,6 +36,12 @@ func (p *Prque) Push(data interface{}, priority int64) {
heap.Push(p.cont, &item{data, priority})
}
+// Peek returns the value with the greates priority but does not pop it off.
+func (p *Prque) Peek() (interface{}, int64) {
+ item := p.cont.blocks[0][0]
+ return item.value, item.priority
+}
+
// Pops the value with the greates priority off the stack and returns it.
// Currently no shrinking is done.
func (p *Prque) Pop() (interface{}, int64) {
diff --git a/common/prque/sstack.go b/common/prque/sstack.go
index 4875dae99..8518af54f 100755
--- a/common/prque/sstack.go
+++ b/common/prque/sstack.go
@@ -1,3 +1,11 @@
+// CookieJar - A contestant's algorithm toolbox
+// Copyright (c) 2013 Peter Szilagyi. All rights reserved.
+//
+// CookieJar is dual licensed: use of this source code is governed by a BSD
+// license that can be found in the LICENSE file. Alternatively, the CookieJar
+// toolbox may be used in accordance with the terms and conditions contained
+// in a signed written agreement between you and the author(s).
+
// This is a duplicated and slightly modified version of "gopkg.in/karalabe/cookiejar.v2/collections/prque".
package prque
@@ -14,16 +22,16 @@ type item struct {
priority int64
}
-// setIndexCallback is called when the element is moved to a new index.
-// Providing setIndexCallback is optional, it is needed only if the application needs
+// SetIndexCallback is called when the element is moved to a new index.
+// Providing SetIndexCallback is optional, it is needed only if the application needs
// to delete elements other than the top one.
-type setIndexCallback func(a interface{}, i int)
+type SetIndexCallback func(data interface{}, index int)
// Internal sortable stack data structure. Implements the Push and Pop ops for
// the stack (heap) functionality and the Len, Less and Swap methods for the
// sortability requirements of the heaps.
type sstack struct {
- setIndex setIndexCallback
+ setIndex SetIndexCallback
size int
capacity int
offset int
@@ -33,7 +41,7 @@ type sstack struct {
}
// Creates a new, empty stack.
-func newSstack(setIndex setIndexCallback) *sstack {
+func newSstack(setIndex SetIndexCallback) *sstack {
result := new(sstack)
result.setIndex = setIndex
result.active = make([]*item, blockSize)
diff --git a/core/blockchain.go b/core/blockchain.go
index 60707281c..711959692 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -220,47 +220,16 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
// Initialize the chain with ancient data if it isn't empty.
if bc.empty() {
- if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 {
- var (
- start = time.Now()
- logged time.Time
- )
- for i := uint64(0); i < frozen; i++ {
- // Inject hash<->number mapping.
- hash := rawdb.ReadCanonicalHash(bc.db, i)
- if hash == (common.Hash{}) {
- return nil, errors.New("broken ancient database")
- }
- rawdb.WriteHeaderNumber(bc.db, hash, i)
-
- // Inject txlookup indexes.
- block := rawdb.ReadBlock(bc.db, hash, i)
- if block == nil {
- return nil, errors.New("broken ancient database")
- }
- rawdb.WriteTxLookupEntries(bc.db, block)
-
- // If we've spent too much time already, notify the user of what we're doing
- if time.Since(logged) > 8*time.Second {
- log.Info("Initializing chain from ancient data", "number", i, "hash", hash, "total", frozen-1, "elapsed", common.PrettyDuration(time.Since(start)))
- logged = time.Now()
- }
- }
- hash := rawdb.ReadCanonicalHash(bc.db, frozen-1)
- rawdb.WriteHeadHeaderHash(bc.db, hash)
- rawdb.WriteHeadFastBlockHash(bc.db, hash)
-
- // The first thing the node will do is reconstruct the verification data for
- // the head block (ethash cache or clique voting snapshot). Might as well do
- // it in advance.
- bc.engine.VerifyHeader(bc, rawdb.ReadHeader(bc.db, hash, frozen-1), true)
-
- log.Info("Initialized chain from ancient data", "number", frozen-1, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
- }
+ rawdb.InitDatabaseFromFreezer(bc.db)
}
if err := bc.loadLastState(); err != nil {
return nil, err
}
+ // The first thing the node will do is reconstruct the verification data for
+ // the head block (ethash cache or clique voting snapshot). Might as well do
+ // it in advance.
+ bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true)
+
if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 {
var (
needRewind bool
diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go
index ed1f1bca6..38f8fe10e 100644
--- a/core/rawdb/accessors_indexes.go
+++ b/core/rawdb/accessors_indexes.go
@@ -55,8 +55,9 @@ func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 {
// WriteTxLookupEntries stores a positional metadata for every transaction from
// a block, enabling hash based transaction and receipt lookups.
func WriteTxLookupEntries(db ethdb.KeyValueWriter, block *types.Block) {
+ number := block.Number().Bytes()
for _, tx := range block.Transactions() {
- if err := db.Put(txLookupKey(tx.Hash()), block.Number().Bytes()); err != nil {
+ if err := db.Put(txLookupKey(tx.Hash()), number); err != nil {
log.Crit("Failed to store transaction lookup entry", "err", err)
}
}
diff --git a/core/rawdb/freezer_reinit.go b/core/rawdb/freezer_reinit.go
new file mode 100644
index 000000000..ea4dd33d1
--- /dev/null
+++ b/core/rawdb/freezer_reinit.go
@@ -0,0 +1,127 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rawdb
+
+import (
+ "errors"
+ "runtime"
+ "sync/atomic"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/prque"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/log"
+)
+
+// InitDatabaseFromFreezer reinitializes an empty database from a previous batch
+// of frozen ancient blocks. The method iterates over all the frozen blocks and
+// injects into the database the block hash->number mappings and the transaction
+// lookup entries.
+func InitDatabaseFromFreezer(db ethdb.Database) error {
+ // If we can't access the freezer or it's empty, abort
+ frozen, err := db.Ancients()
+ if err != nil || frozen == 0 {
+ return err
+ }
+ // Blocks previously frozen, iterate over- and hash them concurrently
+ var (
+ number = ^uint64(0) // -1
+ results = make(chan *types.Block, 4*runtime.NumCPU())
+ )
+ abort := make(chan struct{})
+ defer close(abort)
+
+ for i := 0; i < runtime.NumCPU(); i++ {
+ go func() {
+ for {
+ // Fetch the next task number, terminating if everything's done
+ n := atomic.AddUint64(&number, 1)
+ if n >= frozen {
+ return
+ }
+ // Retrieve the block from the freezer (no need for the hash, we pull by
+ // number from the freezer). If successful, pre-cache the block hash and
+ // the individual transaction hashes for storing into the database.
+ block := ReadBlock(db, common.Hash{}, n)
+ if block != nil {
+ block.Hash()
+ for _, tx := range block.Transactions() {
+ tx.Hash()
+ }
+ }
+ // Feed the block to the aggregator, or abort on interrupt
+ select {
+ case results <- block:
+ case <-abort:
+ return
+ }
+ }
+ }()
+ }
+ // Reassemble the blocks into a contiguous stream and push them out to disk
+ var (
+ queue = prque.New(nil)
+ next = int64(0)
+
+ batch = db.NewBatch()
+ start = time.Now()
+ logged time.Time
+ )
+ for i := uint64(0); i < frozen; i++ {
+ // Retrieve the next result and bail if it's nil
+ block := <-results
+ if block == nil {
+ return errors.New("broken ancient database")
+ }
+ // Push the block into the import queue and process contiguous ranges
+ queue.Push(block, -int64(block.NumberU64()))
+ for !queue.Empty() {
+ // If the next available item is gapped, return
+ if _, priority := queue.Peek(); -priority != next {
+ break
+ }
+ // Next block available, pop it off and index it
+ block = queue.PopItem().(*types.Block)
+ next++
+
+ // Inject hash<->number mapping and txlookup indexes
+ WriteHeaderNumber(batch, block.Hash(), block.NumberU64())
+ WriteTxLookupEntries(batch, block)
+
+ // If enough data was accumulated in memory or we're at the last block, dump to disk
+ if batch.ValueSize() > ethdb.IdealBatchSize || uint64(next) == frozen {
+ if err := batch.Write(); err != nil {
+ return err
+ }
+ batch.Reset()
+ }
+ // If we've spent too much time already, notify the user of what we're doing
+ if time.Since(logged) > 8*time.Second {
+ log.Info("Initializing chain from ancient data", "number", block.Number(), "hash", block.Hash(), "total", frozen-1, "elapsed", common.PrettyDuration(time.Since(start)))
+ logged = time.Now()
+ }
+ }
+ }
+ hash := ReadCanonicalHash(db, frozen-1)
+ WriteHeadHeaderHash(db, hash)
+ WriteHeadFastBlockHash(db, hash)
+
+ log.Info("Initialized chain from ancient data", "number", frozen-1, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
+ return nil
+}