diff options
author | Péter Szilágyi <peterke@gmail.com> | 2018-11-22 21:01:10 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-11-22 21:01:10 +0800 |
commit | 3ba0418a9a433da4c8360116866873d8793b03da (patch) | |
tree | 4769e07c50c6606ea20d41a943d020499c49aa5c /core | |
parent | e0d091e0909fc94958a33608ae4dad0825482813 (diff) | |
parent | 333b5fb123a497efa1a9ef54a437b24dfb3936cc (diff) | |
download | go-tangerine-3ba0418a9a433da4c8360116866873d8793b03da.tar go-tangerine-3ba0418a9a433da4c8360116866873d8793b03da.tar.gz go-tangerine-3ba0418a9a433da4c8360116866873d8793b03da.tar.bz2 go-tangerine-3ba0418a9a433da4c8360116866873d8793b03da.tar.lz go-tangerine-3ba0418a9a433da4c8360116866873d8793b03da.tar.xz go-tangerine-3ba0418a9a433da4c8360116866873d8793b03da.tar.zst go-tangerine-3ba0418a9a433da4c8360116866873d8793b03da.zip |
Merge pull request #17973 from holiman/splitter2
core: better side-chain importing
Diffstat (limited to 'core')
-rw-r--r-- | core/blockchain.go | 374 | ||||
-rw-r--r-- | core/blockchain_insert.go | 143 | ||||
-rw-r--r-- | core/blockchain_test.go | 8 |
3 files changed, 368 insertions, 157 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 4de7f007b..3a9caf01e 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1036,6 +1036,18 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. return status, nil } +// addFutureBlock checks if the block is within the max allowed window to get +// accepted for future processing, and returns an error if the block is too far +// ahead and was not added. +func (bc *BlockChain) addFutureBlock(block *types.Block) error { + max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks) + if block.Time().Cmp(max) > 0 { + return fmt.Errorf("future block timestamp %v > allowed %v", block.Time(), max) + } + bc.futureBlocks.Add(block.Hash(), block) + return nil +} + // InsertChain attempts to insert the given batch of blocks in to the canonical // chain or, otherwise, create a fork. If an error is returned it will return // the index number of the failing block as well an error describing what went @@ -1043,18 +1055,9 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. // // After insertion is done, all accumulated events will be fired. func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { - n, events, logs, err := bc.insertChain(chain) - bc.PostChainEvents(events, logs) - return n, err -} - -// insertChain will execute the actual chain insertion and event aggregation. The -// only reason this method exists as a separate one is to make locking cleaner -// with deferred statements. -func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) { // Sanity check that we have something meaningful to import if len(chain) == 0 { - return 0, nil, nil, nil + return 0, nil } // Do a sanity check that the provided chain is actually ordered and linked for i := 1; i < len(chain); i++ { @@ -1063,16 +1066,36 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(), "parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash()) - return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(), + return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(), chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4]) } } // Pre-checks passed, start the full block imports bc.wg.Add(1) - defer bc.wg.Done() - bc.chainmu.Lock() - defer bc.chainmu.Unlock() + n, events, logs, err := bc.insertChain(chain, true) + bc.chainmu.Unlock() + bc.wg.Done() + + bc.PostChainEvents(events, logs) + return n, err +} + +// insertChain is the internal implementation of insertChain, which assumes that +// 1) chains are contiguous, and 2) The chain mutex is held. +// +// This method is split out so that import batches that require re-injecting +// historical blocks can do so without releasing the lock, which could lead to +// racey behaviour. If a sidechain import is in progress, and the historic state +// is imported, but then new canon-head is added before the actual sidechain +// completes, then the historic state could be pruned again +func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { + // If the chain is terminating, don't even bother starting u + if atomic.LoadInt32(&bc.procInterrupt) == 1 { + return 0, nil, nil, nil + } + // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) + senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) // A queued approach to delivering events. This is generally // faster than direct delivery and requires much less mutex @@ -1089,16 +1112,56 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty for i, block := range chain { headers[i] = block.Header() - seals[i] = true + seals[i] = verifySeals } abort, results := bc.engine.VerifyHeaders(bc, headers, seals) defer close(abort) - // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) - senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) + // Peek the error for the first block to decide the directing import logic + it := newInsertIterator(chain, results, bc.Validator()) - // Iterate over the blocks and insert when the verifier permits - for i, block := range chain { + block, err := it.next() + switch { + // First block is pruned, insert as sidechain and reorg only if TD grows enough + case err == consensus.ErrPrunedAncestor: + return bc.insertSidechain(it) + + // First block is future, shove it (and all children) to the future queue (unknown ancestor) + case err == consensus.ErrFutureBlock || (err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(it.first().ParentHash())): + for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) { + if err := bc.addFutureBlock(block); err != nil { + return it.index, events, coalescedLogs, err + } + block, err = it.next() + } + stats.queued += it.processed() + stats.ignored += it.remaining() + + // If there are any still remaining, mark as ignored + return it.index, events, coalescedLogs, err + + // First block (and state) is known + // 1. We did a roll-back, and should now do a re-import + // 2. The block is stored as a sidechain, and is lying about it's stateroot, and passes a stateroot + // from the canonical chain, which has not been verified. + case err == ErrKnownBlock: + // Skip all known blocks that behind us + current := bc.CurrentBlock().NumberU64() + + for block != nil && err == ErrKnownBlock && current >= block.NumberU64() { + stats.ignored++ + block, err = it.next() + } + // Falls through to the block import + + // Some other error occurred, abort + case err != nil: + stats.ignored += len(it.chain) + bc.reportBlock(block, nil, err) + return it.index, events, coalescedLogs, err + } + // No validation errors for the first block (or chain prefix skipped) + for ; block != nil && err == nil; block, err = it.next() { // If the chain is terminating, stop processing blocks if atomic.LoadInt32(&bc.procInterrupt) == 1 { log.Debug("Premature abort during blocks processing") @@ -1107,115 +1170,45 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty // If the header is a banned one, straight out abort if BadHashes[block.Hash()] { bc.reportBlock(block, nil, ErrBlacklistedHash) - return i, events, coalescedLogs, ErrBlacklistedHash + return it.index, events, coalescedLogs, ErrBlacklistedHash } - // Wait for the block's verification to complete - bstart := time.Now() - - err := <-results - if err == nil { - err = bc.Validator().ValidateBody(block) - } - switch { - case err == ErrKnownBlock: - // Block and state both already known. However if the current block is below - // this number we did a rollback and we should reimport it nonetheless. - if bc.CurrentBlock().NumberU64() >= block.NumberU64() { - stats.ignored++ - continue - } - - case err == consensus.ErrFutureBlock: - // Allow up to MaxFuture second in the future blocks. If this limit is exceeded - // the chain is discarded and processed at a later time if given. - max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks) - if block.Time().Cmp(max) > 0 { - return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max) - } - bc.futureBlocks.Add(block.Hash(), block) - stats.queued++ - continue - - case err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()): - bc.futureBlocks.Add(block.Hash(), block) - stats.queued++ - continue - - case err == consensus.ErrPrunedAncestor: - // Block competing with the canonical chain, store in the db, but don't process - // until the competitor TD goes above the canonical TD - currentBlock := bc.CurrentBlock() - localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) - externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty()) - if localTd.Cmp(externTd) > 0 { - if err = bc.WriteBlockWithoutState(block, externTd); err != nil { - return i, events, coalescedLogs, err - } - continue - } - // Competitor chain beat canonical, gather all blocks from the common ancestor - var winner []*types.Block + // Retrieve the parent block and it's state to execute on top + start := time.Now() - parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1) - for !bc.HasState(parent.Root()) { - winner = append(winner, parent) - parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1) - } - for j := 0; j < len(winner)/2; j++ { - winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j] - } - // Import all the pruned blocks to make the state available - bc.chainmu.Unlock() - _, evs, logs, err := bc.insertChain(winner) - bc.chainmu.Lock() - events, coalescedLogs = evs, logs - - if err != nil { - return i, events, coalescedLogs, err - } - - case err != nil: - bc.reportBlock(block, nil, err) - return i, events, coalescedLogs, err - } - // Create a new statedb using the parent block and report an - // error if it fails. - var parent *types.Block - if i == 0 { + parent := it.previous() + if parent == nil { parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1) - } else { - parent = chain[i-1] } state, err := state.New(parent.Root(), bc.stateCache) if err != nil { - return i, events, coalescedLogs, err + return it.index, events, coalescedLogs, err } // Process block using the parent state as reference point. receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig) if err != nil { bc.reportBlock(block, receipts, err) - return i, events, coalescedLogs, err + return it.index, events, coalescedLogs, err } // Validate the state using the default validator - err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas) - if err != nil { + if err := bc.Validator().ValidateState(block, parent, state, receipts, usedGas); err != nil { bc.reportBlock(block, receipts, err) - return i, events, coalescedLogs, err + return it.index, events, coalescedLogs, err } - proctime := time.Since(bstart) + proctime := time.Since(start) // Write the block to the chain and get the status. status, err := bc.WriteBlockWithState(block, receipts, state) if err != nil { - return i, events, coalescedLogs, err + return it.index, events, coalescedLogs, err } switch status { case CanonStatTy: - log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), - "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) + log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), + "uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(), + "elapsed", common.PrettyDuration(time.Since(start)), + "root", block.Root()) coalescedLogs = append(coalescedLogs, logs...) - blockInsertTimer.UpdateSince(bstart) events = append(events, ChainEvent{block, block.Hash(), logs}) lastCanon = block @@ -1223,78 +1216,153 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty bc.gcproc += proctime case SideStatTy: - log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", - common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles())) - - blockInsertTimer.UpdateSince(bstart) + log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), + "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)), + "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()), + "root", block.Root()) events = append(events, ChainSideEvent{block}) } + blockInsertTimer.UpdateSince(start) stats.processed++ stats.usedGas += usedGas cache, _ := bc.stateCache.TrieDB().Size() - stats.report(chain, i, cache) + stats.report(chain, it.index, cache) + } + // Any blocks remaining here? The only ones we care about are the future ones + if block != nil && err == consensus.ErrFutureBlock { + if err := bc.addFutureBlock(block); err != nil { + return it.index, events, coalescedLogs, err + } + block, err = it.next() + + for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() { + if err := bc.addFutureBlock(block); err != nil { + return it.index, events, coalescedLogs, err + } + stats.queued++ + } } + stats.ignored += it.remaining() + // Append a single chain head event if we've progressed the chain if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { events = append(events, ChainHeadEvent{lastCanon}) } - return 0, events, coalescedLogs, nil -} - -// insertStats tracks and reports on block insertion. -type insertStats struct { - queued, processed, ignored int - usedGas uint64 - lastIndex int - startTime mclock.AbsTime + return it.index, events, coalescedLogs, err } -// statsReportLimit is the time limit during import and export after which we -// always print out progress. This avoids the user wondering what's going on. -const statsReportLimit = 8 * time.Second - -// report prints statistics if some number of blocks have been processed -// or more than a few seconds have passed since the last message. -func (st *insertStats) report(chain []*types.Block, index int, cache common.StorageSize) { - // Fetch the timings for the batch +// insertSidechain is called when an import batch hits upon a pruned ancestor +// error, which happens when a sidechain with a sufficiently old fork-block is +// found. +// +// The method writes all (header-and-body-valid) blocks to disk, then tries to +// switch over to the new chain if the TD exceeded the current chain. +func (bc *BlockChain) insertSidechain(it *insertIterator) (int, []interface{}, []*types.Log, error) { var ( - now = mclock.Now() - elapsed = time.Duration(now) - time.Duration(st.startTime) + externTd *big.Int + current = bc.CurrentBlock().NumberU64() ) - // If we're at the last block of the batch or report period reached, log - if index == len(chain)-1 || elapsed >= statsReportLimit { - var ( - end = chain[index] - txs = countTransactions(chain[st.lastIndex : index+1]) - ) - context := []interface{}{ - "blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000, - "elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed), - "number", end.Number(), "hash", end.Hash(), + // The first sidechain block error is already verified to be ErrPrunedAncestor. + // Since we don't import them here, we expect ErrUnknownAncestor for the remaining + // ones. Any other errors means that the block is invalid, and should not be written + // to disk. + block, err := it.current(), consensus.ErrPrunedAncestor + for ; block != nil && (err == consensus.ErrPrunedAncestor); block, err = it.next() { + // Check the canonical state root for that number + if number := block.NumberU64(); current >= number { + if canonical := bc.GetBlockByNumber(number); canonical != nil && canonical.Root() == block.Root() { + // This is most likely a shadow-state attack. When a fork is imported into the + // database, and it eventually reaches a block height which is not pruned, we + // just found that the state already exist! This means that the sidechain block + // refers to a state which already exists in our canon chain. + // + // If left unchecked, we would now proceed importing the blocks, without actually + // having verified the state of the previous blocks. + log.Warn("Sidechain ghost-state attack detected", "number", block.NumberU64(), "sideroot", block.Root(), "canonroot", canonical.Root()) + + // If someone legitimately side-mines blocks, they would still be imported as usual. However, + // we cannot risk writing unverified blocks to disk when they obviously target the pruning + // mechanism. + return it.index, nil, nil, errors.New("sidechain ghost-state attack") + } } - if timestamp := time.Unix(end.Time().Int64(), 0); time.Since(timestamp) > time.Minute { - context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...) + if externTd == nil { + externTd = bc.GetTd(block.ParentHash(), block.NumberU64()-1) } - context = append(context, []interface{}{"cache", cache}...) + externTd = new(big.Int).Add(externTd, block.Difficulty()) - if st.queued > 0 { - context = append(context, []interface{}{"queued", st.queued}...) - } - if st.ignored > 0 { - context = append(context, []interface{}{"ignored", st.ignored}...) + if !bc.HasBlock(block.Hash(), block.NumberU64()) { + start := time.Now() + if err := bc.WriteBlockWithoutState(block, externTd); err != nil { + return it.index, nil, nil, err + } + log.Debug("Inserted sidechain block", "number", block.Number(), "hash", block.Hash(), + "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)), + "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()), + "root", block.Root()) } - log.Info("Imported new chain segment", context...) + } + // At this point, we've written all sidechain blocks to database. Loop ended + // either on some other error or all were processed. If there was some other + // error, we can ignore the rest of those blocks. + // + // If the externTd was larger than our local TD, we now need to reimport the previous + // blocks to regenerate the required state + localTd := bc.GetTd(bc.CurrentBlock().Hash(), current) + if localTd.Cmp(externTd) > 0 { + log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().NumberU64(), "sidetd", externTd, "localtd", localTd) + return it.index, nil, nil, err + } + // Gather all the sidechain hashes (full blocks may be memory heavy) + var ( + hashes []common.Hash + numbers []uint64 + ) + parent := bc.GetHeader(it.previous().Hash(), it.previous().NumberU64()) + for parent != nil && !bc.HasState(parent.Root) { + hashes = append(hashes, parent.Hash()) + numbers = append(numbers, parent.Number.Uint64()) - *st = insertStats{startTime: now, lastIndex: index + 1} + parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1) } -} + if parent == nil { + return it.index, nil, nil, errors.New("missing parent") + } + // Import all the pruned blocks to make the state available + var ( + blocks []*types.Block + memory common.StorageSize + ) + for i := len(hashes) - 1; i >= 0; i-- { + // Append the next block to our batch + block := bc.GetBlock(hashes[i], numbers[i]) + + blocks = append(blocks, block) + memory += block.Size() + + // If memory use grew too large, import and continue. Sadly we need to discard + // all raised events and logs from notifications since we're too heavy on the + // memory here. + if len(blocks) >= 2048 || memory > 64*1024*1024 { + log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64()) + if _, _, _, err := bc.insertChain(blocks, false); err != nil { + return 0, nil, nil, err + } + blocks, memory = blocks[:0], 0 -func countTransactions(chain []*types.Block) (c int) { - for _, b := range chain { - c += len(b.Transactions()) + // If the chain is terminating, stop processing blocks + if atomic.LoadInt32(&bc.procInterrupt) == 1 { + log.Debug("Premature abort during blocks processing") + return 0, nil, nil, nil + } + } + } + if len(blocks) > 0 { + log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64()) + return bc.insertChain(blocks, false) } - return c + return 0, nil, nil, nil } // reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go new file mode 100644 index 000000000..70bea3544 --- /dev/null +++ b/core/blockchain_insert.go @@ -0,0 +1,143 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package core + +import ( + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +// insertStats tracks and reports on block insertion. +type insertStats struct { + queued, processed, ignored int + usedGas uint64 + lastIndex int + startTime mclock.AbsTime +} + +// statsReportLimit is the time limit during import and export after which we +// always print out progress. This avoids the user wondering what's going on. +const statsReportLimit = 8 * time.Second + +// report prints statistics if some number of blocks have been processed +// or more than a few seconds have passed since the last message. +func (st *insertStats) report(chain []*types.Block, index int, cache common.StorageSize) { + // Fetch the timings for the batch + var ( + now = mclock.Now() + elapsed = time.Duration(now) - time.Duration(st.startTime) + ) + // If we're at the last block of the batch or report period reached, log + if index == len(chain)-1 || elapsed >= statsReportLimit { + // Count the number of transactions in this segment + var txs int + for _, block := range chain[st.lastIndex : index+1] { + txs += len(block.Transactions()) + } + end := chain[index] + + // Assemble the log context and send it to the logger + context := []interface{}{ + "blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000, + "elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed), + "number", end.Number(), "hash", end.Hash(), + } + if timestamp := time.Unix(end.Time().Int64(), 0); time.Since(timestamp) > time.Minute { + context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...) + } + context = append(context, []interface{}{"cache", cache}...) + + if st.queued > 0 { + context = append(context, []interface{}{"queued", st.queued}...) + } + if st.ignored > 0 { + context = append(context, []interface{}{"ignored", st.ignored}...) + } + log.Info("Imported new chain segment", context...) + + // Bump the stats reported to the next section + *st = insertStats{startTime: now, lastIndex: index + 1} + } +} + +// insertIterator is a helper to assist during chain import. +type insertIterator struct { + chain types.Blocks + results <-chan error + index int + validator Validator +} + +// newInsertIterator creates a new iterator based on the given blocks, which are +// assumed to be a contiguous chain. +func newInsertIterator(chain types.Blocks, results <-chan error, validator Validator) *insertIterator { + return &insertIterator{ + chain: chain, + results: results, + index: -1, + validator: validator, + } +} + +// next returns the next block in the iterator, along with any potential validation +// error for that block. When the end is reached, it will return (nil, nil). +func (it *insertIterator) next() (*types.Block, error) { + if it.index+1 >= len(it.chain) { + it.index = len(it.chain) + return nil, nil + } + it.index++ + if err := <-it.results; err != nil { + return it.chain[it.index], err + } + return it.chain[it.index], it.validator.ValidateBody(it.chain[it.index]) +} + +// current returns the current block that's being processed. +func (it *insertIterator) current() *types.Block { + if it.index < 0 || it.index+1 >= len(it.chain) { + return nil + } + return it.chain[it.index] +} + +// previous returns the previous block was being processed, or nil +func (it *insertIterator) previous() *types.Block { + if it.index < 1 { + return nil + } + return it.chain[it.index-1] +} + +// first returns the first block in the it. +func (it *insertIterator) first() *types.Block { + return it.chain[0] +} + +// remaining returns the number of remaining blocks. +func (it *insertIterator) remaining() int { + return len(it.chain) - it.index +} + +// processed returns the number of processed blocks. +func (it *insertIterator) processed() int { + return it.index + 1 +} diff --git a/core/blockchain_test.go b/core/blockchain_test.go index aef810050..5ab29e205 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -579,11 +579,11 @@ func testInsertNonceError(t *testing.T, full bool) { blockchain.hc.engine = blockchain.engine failRes, err = blockchain.InsertHeaderChain(headers, 1) } - // Check that the returned error indicates the failure. + // Check that the returned error indicates the failure if failRes != failAt { - t.Errorf("test %d: failure index mismatch: have %d, want %d", i, failRes, failAt) + t.Errorf("test %d: failure (%v) index mismatch: have %d, want %d", i, err, failRes, failAt) } - // Check that all no blocks after the failing block have been inserted. + // Check that all blocks after the failing block have been inserted for j := 0; j < i-failAt; j++ { if full { if block := blockchain.GetBlockByNumber(failNum + uint64(j)); block != nil { @@ -1345,7 +1345,7 @@ func TestLargeReorgTrieGC(t *testing.T) { t.Fatalf("failed to insert shared chain: %v", err) } if _, err := chain.InsertChain(original); err != nil { - t.Fatalf("failed to insert shared chain: %v", err) + t.Fatalf("failed to insert original chain: %v", err) } // Ensure that the state associated with the forking point is pruned away if node, _ := chain.stateCache.TrieDB().Node(shared[len(shared)-1].Root()); node != nil { |