aboutsummaryrefslogtreecommitdiffstats
path: root/core/rawdb/freezer_reinit.go
blob: ea4dd33d1d6bcaf4cd267c6bb69d580c3e380325 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package rawdb

import (
    "errors"
    "runtime"
    "sync/atomic"
    "time"

    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/common/prque"
    "github.com/ethereum/go-ethereum/core/types"
    "github.com/ethereum/go-ethereum/ethdb"
    "github.com/ethereum/go-ethereum/log"
)

// InitDatabaseFromFreezer reinitializes an empty database from a previous batch
// of frozen ancient blocks. The method iterates over all the frozen blocks and
// injects into the database the block hash->number mappings and the transaction
// lookup entries.
func InitDatabaseFromFreezer(db ethdb.Database) error {
    // If we can't access the freezer or it's empty, abort
    frozen, err := db.Ancients()
    if err != nil || frozen == 0 {
        return err
    }
    // Blocks previously frozen, iterate over- and hash them concurrently
    var (
        number  = ^uint64(0) // -1
        results = make(chan *types.Block, 4*runtime.NumCPU())
    )
    abort := make(chan struct{})
    defer close(abort)

    for i := 0; i < runtime.NumCPU(); i++ {
        go func() {
            for {
                // Fetch the next task number, terminating if everything's done
                n := atomic.AddUint64(&number, 1)
                if n >= frozen {
                    return
                }
                // Retrieve the block from the freezer (no need for the hash, we pull by
                // number from the freezer). If successful, pre-cache the block hash and
                // the individual transaction hashes for storing into the database.
                block := ReadBlock(db, common.Hash{}, n)
                if block != nil {
                    block.Hash()
                    for _, tx := range block.Transactions() {
                        tx.Hash()
                    }
                }
                // Feed the block to the aggregator, or abort on interrupt
                select {
                case results <- block:
                case <-abort:
                    return
                }
            }
        }()
    }
    // Reassemble the blocks into a contiguous stream and push them out to disk
    var (
        queue = prque.New(nil)
        next  = int64(0)

        batch  = db.NewBatch()
        start  = time.Now()
        logged time.Time
    )
    for i := uint64(0); i < frozen; i++ {
        // Retrieve the next result and bail if it's nil
        block := <-results
        if block == nil {
            return errors.New("broken ancient database")
        }
        // Push the block into the import queue and process contiguous ranges
        queue.Push(block, -int64(block.NumberU64()))
        for !queue.Empty() {
            // If the next available item is gapped, return
            if _, priority := queue.Peek(); -priority != next {
                break
            }
            // Next block available, pop it off and index it
            block = queue.PopItem().(*types.Block)
            next++

            // Inject hash<->number mapping and txlookup indexes
            WriteHeaderNumber(batch, block.Hash(), block.NumberU64())
            WriteTxLookupEntries(batch, block)

            // If enough data was accumulated in memory or we're at the last block, dump to disk
            if batch.ValueSize() > ethdb.IdealBatchSize || uint64(next) == frozen {
                if err := batch.Write(); err != nil {
                    return err
                }
                batch.Reset()
            }
            // If we've spent too much time already, notify the user of what we're doing
            if time.Since(logged) > 8*time.Second {
                log.Info("Initializing chain from ancient data", "number", block.Number(), "hash", block.Hash(), "total", frozen-1, "elapsed", common.PrettyDuration(time.Since(start)))
                logged = time.Now()
            }
        }
    }
    hash := ReadCanonicalHash(db, frozen-1)
    WriteHeadHeaderHash(db, hash)
    WriteHeadFastBlockHash(db, hash)

    log.Info("Initialized chain from ancient data", "number", frozen-1, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
    return nil
}