diff options
author | Péter Szilágyi <peterke@gmail.com> | 2018-11-20 20:15:26 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2018-11-21 19:19:56 +0800 |
commit | 333b5fb123a497efa1a9ef54a437b24dfb3936cc (patch) | |
tree | fd53b4b4cede442cce77f1e24d191c40bdb08fa0 | |
parent | 493903eedecaae3d14966cd99aa84d146ea0ce13 (diff) | |
download | go-tangerine-333b5fb123a497efa1a9ef54a437b24dfb3936cc.tar go-tangerine-333b5fb123a497efa1a9ef54a437b24dfb3936cc.tar.gz go-tangerine-333b5fb123a497efa1a9ef54a437b24dfb3936cc.tar.bz2 go-tangerine-333b5fb123a497efa1a9ef54a437b24dfb3936cc.tar.lz go-tangerine-333b5fb123a497efa1a9ef54a437b24dfb3936cc.tar.xz go-tangerine-333b5fb123a497efa1a9ef54a437b24dfb3936cc.tar.zst go-tangerine-333b5fb123a497efa1a9ef54a437b24dfb3936cc.zip |
core: polish side chain importer a bit
-rw-r--r-- | core/blockchain.go | 471 | ||||
-rw-r--r-- | core/blockchain_insert.go | 143 | ||||
-rw-r--r-- | core/blockchain_test.go | 8 |
3 files changed, 328 insertions, 294 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 74ac30e70..d78946791 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1036,99 +1036,28 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. return status, nil } -// InsertChain attempts to insert the given batch of blocks in to the canonical -// chain or, otherwise, create a fork. If an error is returned it will return -// the index number of the failing block as well an error describing what went -// wrong. -// -// After insertion is done, all accumulated events will be fired. -func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { - n, events, logs, err := bc.insertChain(chain) - bc.PostChainEvents(events, logs) - return n, err -} - -// addFutureBlock checks if the block is within the max allowed window to get accepted for future processing, and -// returns an error if the block is too far ahead and was not added. +// addFutureBlock checks if the block is within the max allowed window to get +// accepted for future processing, and returns an error if the block is too far +// ahead and was not added. func (bc *BlockChain) addFutureBlock(block *types.Block) error { max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks) if block.Time().Cmp(max) > 0 { - return fmt.Errorf("future block: %v > %v", block.Time(), max) + return fmt.Errorf("future block timestamp %v > allowed %v", block.Time(), max) } bc.futureBlocks.Add(block.Hash(), block) return nil } -// importBatch is a helper function to assist during chain import -type importBatch struct { - chain types.Blocks - results <-chan error - index int - validator Validator -} - -// newBatch creates a new batch based on the given blocks, which are assumed to be a contiguous chain -func newBatch(chain types.Blocks, results <-chan error, validator Validator) *importBatch { - return &importBatch{ - chain: chain, - results: results, - index: -1, - validator: validator, - } -} - -// next returns the next block in the batch, along with any potential validation error for that block -// When the end is reached, it will return (nil, nil), but Current() will always return the last element. -func (batch *importBatch) next() (*types.Block, error) { - if batch.index+1 >= len(batch.chain) { - return nil, nil - } - batch.index++ - if err := <-batch.results; err != nil { - return batch.chain[batch.index], err - } - return batch.chain[batch.index], batch.validator.ValidateBody(batch.chain[batch.index]) -} - -// current returns the current block that's being processed. Even after the next() has progressed the entire -// chain, current will always return the last element -func (batch *importBatch) current() *types.Block { - if batch.index < 0 { - return nil - } - return batch.chain[batch.index] -} - -// previous returns the previous block was being processed, or nil -func (batch *importBatch) previous() *types.Block { - if batch.index < 1 { - return nil - } - return batch.chain[batch.index-1] -} - -// first returns the first block in the batch -func (batch *importBatch) first() *types.Block { - return batch.chain[0] -} - -// remaining returns the number of remaining blocks -func (batch *importBatch) remaining() int { - return len(batch.chain) - batch.index -} - -// processed returns the number of processed blocks -func (batch *importBatch) processed() int { - return batch.index + 1 -} - -// insertChain will execute the actual chain insertion and event aggregation. The -// only reason this method exists as a separate one is to make locking cleaner -// with deferred statements. -func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) { +// InsertChain attempts to insert the given batch of blocks in to the canonical +// chain or, otherwise, create a fork. If an error is returned it will return +// the index number of the failing block as well an error describing what went +// wrong. +// +// After insertion is done, all accumulated events will be fired. +func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { // Sanity check that we have something meaningful to import if len(chain) == 0 { - return 0, nil, nil, nil + return 0, nil } // Do a sanity check that the provided chain is actually ordered and linked for i := 1; i < len(chain); i++ { @@ -1137,29 +1066,34 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(), "parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash()) - return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(), + return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(), chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4]) } } - log.Info("insertChain", "from", chain[0].NumberU64(), "to", chain[len(chain)-1].NumberU64()) - // Pre-checks passed, start the full block imports bc.wg.Add(1) - defer bc.wg.Done() bc.chainmu.Lock() - defer bc.chainmu.Unlock() - return bc.insertChainInternal(chain, true) -} + n, events, logs, err := bc.insertChain(chain, true) + bc.chainmu.Unlock() + bc.wg.Done() -//insertChainInternal is the internal implementation of insertChain, which assumes that -// 1. chains are contiguous, and -// 2. The `chainMu` lock is held -// This method is split out so that import batches that require re-injecting historical blocks can do -// so without releasing the lock, which could lead to racey behaviour. If a sidechain import is in progress, -// and the historic state is imported, but then new canon-head is added before the actual sidechain completes, -// then the historic state could be pruned again -func (bc *BlockChain) insertChainInternal(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { + bc.PostChainEvents(events, logs) + return n, err +} +// insertChain is the internal implementation of insertChain, which assumes that +// 1) chains are contiguous, and 2) The chain mutex is held. +// +// This method is split out so that import batches that require re-injecting +// historical blocks can do so without releasing the lock, which could lead to +// racey behaviour. If a sidechain import is in progress, and the historic state +// is imported, but then new canon-head is added before the actual sidechain +// completes, then the historic state could be pruned again +func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { + // If the chain is terminating, don't even bother starting u + if atomic.LoadInt32(&bc.procInterrupt) == 1 { + return 0, nil, nil, nil + } // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) @@ -1171,8 +1105,6 @@ func (bc *BlockChain) insertChainInternal(chain types.Blocks, verifySeals bool) events = make([]interface{}, 0, len(chain)) lastCanon *types.Block coalescedLogs []*types.Log - block *types.Block - err error ) // Start the parallel header verifier headers := make([]*types.Header, len(chain)) @@ -1185,52 +1117,51 @@ func (bc *BlockChain) insertChainInternal(chain types.Blocks, verifySeals bool) abort, results := bc.engine.VerifyHeaders(bc, headers, seals) defer close(abort) - // Peek the error for the first block - batch := newBatch(chain, results, bc.Validator()) - if block, err = batch.next(); err != nil { - if err == consensus.ErrPrunedAncestor { - return bc.insertSidechainInternal(batch, err) - } else if err == consensus.ErrFutureBlock || - (err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(batch.first().ParentHash())) { - - // The first block is a future block - // We can shove that one and any child blocks (that fail because of UnknownAncestor) into the future-queue - for block != nil && (batch.index == 0 || err == consensus.ErrUnknownAncestor) { - block := batch.current() - if futureError := bc.addFutureBlock(block); futureError != nil { - return batch.index, events, coalescedLogs, futureError - } - block, err = batch.next() - } - stats.queued += batch.processed() - stats.ignored += batch.remaining() - - // If there are any still remaining, mark as ignored - return batch.index, events, coalescedLogs, err - } else if err == ErrKnownBlock { - - // Block and state both already known -- there can be two explanations. - // 1. We did a roll-back, and should now do a re-import - // 2. The block is stored as a sidechain, and is lying about it's stateroot, and passes a stateroot - // from the canonical chain, which has not been verified. - - // Skip all known blocks that are blocks behind us - currentNum := bc.CurrentBlock().NumberU64() - for block != nil && err == ErrKnownBlock && currentNum >= block.NumberU64() { - // We ignore these - stats.ignored++ - block, err = batch.next() + // Peek the error for the first block to decide the directing import logic + it := newInsertIterator(chain, results, bc.Validator()) + + block, err := it.next() + switch { + // First block is pruned, insert as sidechain and reorg only if TD grows enough + case err == consensus.ErrPrunedAncestor: + return bc.insertSidechain(it) + + // First block is future, shove it (and all children) to the future queue (unknown ancestor) + case err == consensus.ErrFutureBlock || (err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(it.first().ParentHash())): + for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) { + if err := bc.addFutureBlock(block); err != nil { + return it.index, events, coalescedLogs, err } - // Falls through to the block import - } else { - // Some other error - stats.ignored += len(batch.chain) - bc.reportBlock(block, nil, err) - return batch.index, events, coalescedLogs, err + block, err = it.next() } + stats.queued += it.processed() + stats.ignored += it.remaining() + + // If there are any still remaining, mark as ignored + return it.index, events, coalescedLogs, err + + // First block (and state) is known + // 1. We did a roll-back, and should now do a re-import + // 2. The block is stored as a sidechain, and is lying about it's stateroot, and passes a stateroot + // from the canonical chain, which has not been verified. + case err == ErrKnownBlock: + // Skip all known blocks that behind us + current := bc.CurrentBlock().NumberU64() + + for block != nil && err == ErrKnownBlock && current >= block.NumberU64() { + stats.ignored++ + block, err = it.next() + } + // Falls through to the block import + + // Some other error occurred, abort + case err != nil: + stats.ignored += len(it.chain) + bc.reportBlock(block, nil, err) + return it.index, events, coalescedLogs, err } - // No validation errors - for ; block != nil && err == nil; block, err = batch.next() { + // No validation errors for the first block (or chain prefix skipped) + for ; block != nil && err == nil; block, err = it.next() { // If the chain is terminating, stop processing blocks if atomic.LoadInt32(&bc.procInterrupt) == 1 { log.Debug("Premature abort during blocks processing") @@ -1239,43 +1170,43 @@ func (bc *BlockChain) insertChainInternal(chain types.Blocks, verifySeals bool) // If the header is a banned one, straight out abort if BadHashes[block.Hash()] { bc.reportBlock(block, nil, ErrBlacklistedHash) - return batch.index, events, coalescedLogs, ErrBlacklistedHash + return it.index, events, coalescedLogs, ErrBlacklistedHash } - bstart := time.Now() - var parent *types.Block - parent = batch.previous() + // Retrieve the parent block and it's state to execute on top + start := time.Now() + + parent := it.previous() if parent == nil { parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1) } - state, err := state.New(parent.Root(), bc.stateCache) if err != nil { - return batch.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, err } // Process block using the parent state as reference point. receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig) if err != nil { bc.reportBlock(block, receipts, err) - return batch.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, err } // Validate the state using the default validator if err := bc.Validator().ValidateState(block, parent, state, receipts, usedGas); err != nil { bc.reportBlock(block, receipts, err) - return batch.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, err } - proctime := time.Since(bstart) + proctime := time.Since(start) // Write the block to the chain and get the status. status, err := bc.WriteBlockWithState(block, receipts, state) if err != nil { - return batch.index, events, coalescedLogs, err + return it.index, events, coalescedLogs, err } switch status { case CanonStatTy: log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(), - "elapsed", common.PrettyDuration(time.Since(bstart)), - "root", block.Root().String()) + "elapsed", common.PrettyDuration(time.Since(start)), + "root", block.Root()) coalescedLogs = append(coalescedLogs, logs...) events = append(events, ChainEvent{block, block.Hash(), logs}) @@ -1286,192 +1217,152 @@ func (bc *BlockChain) insertChainInternal(chain types.Blocks, verifySeals bool) case SideStatTy: log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), - "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(bstart)), + "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()), - "root", block.Root().String()) + "root", block.Root()) events = append(events, ChainSideEvent{block}) } - blockInsertTimer.UpdateSince(bstart) + blockInsertTimer.UpdateSince(start) stats.processed++ stats.usedGas += usedGas cache, _ := bc.stateCache.TrieDB().Size() - stats.report(chain, batch.index, cache) + stats.report(chain, it.index, cache) } - - // Any blocks remaining here? If so, the only ones we need to care about are - // shoving future blocks into queue + // Any blocks remaining here? The only ones we care about are the future ones if block != nil && err == consensus.ErrFutureBlock { - if futureErr := bc.addFutureBlock(block); futureErr != nil { - return batch.index, events, coalescedLogs, futureErr + if err := bc.addFutureBlock(block); err != nil { + return it.index, events, coalescedLogs, err } - for block, err = batch.next(); block != nil && err == consensus.ErrUnknownAncestor; { - if futureErr := bc.addFutureBlock(block); futureErr != nil { - return batch.index, events, coalescedLogs, futureErr + block, err = it.next() + + for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() { + if err := bc.addFutureBlock(block); err != nil { + return it.index, events, coalescedLogs, err } stats.queued++ - block, err = batch.next() } } - stats.ignored += batch.remaining() + stats.ignored += it.remaining() + // Append a single chain head event if we've progressed the chain if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { events = append(events, ChainHeadEvent{lastCanon}) } - return 0, events, coalescedLogs, err + return it.index, events, coalescedLogs, err } -// insertSidechainInternal should be called when an import batch hits upon a pruned ancestor error, which happens when -// an sidechain with a sufficiently old fork-block is found. It writes all (header-and-body-valid) blocks to disk, then -// tries to switch over to the new chain if the TD exceeded the current chain. -// It assumes that relevant locks are held already (hence 'Internal') -func (bc *BlockChain) insertSidechainInternal(batch *importBatch, err error) (int, []interface{}, []*types.Log, error) { - // If we're given a chain of blocks, and the first one is pruned, that means we're getting a - // sidechain imported. On the sidechain, we validate headers, but do not validate body and state - // (and actually import them) until the sidechain reaches a higher TD. - // Until then, we store them in the database (assuming that the header PoW check works out) +// insertSidechain is called when an import batch hits upon a pruned ancestor +// error, which happens when a sidechain with a sufficiently old fork-block is +// found. +// +// The method writes all (header-and-body-valid) blocks to disk, then tries to +// switch over to the new chain if the TD exceeded the current chain. +func (bc *BlockChain) insertSidechain(it *insertIterator) (int, []interface{}, []*types.Log, error) { var ( - externTd *big.Int - canonHeadNumber = bc.CurrentBlock().NumberU64() - events = make([]interface{}, 0) - coalescedLogs []*types.Log + externTd *big.Int + current = bc.CurrentBlock().NumberU64() ) - // The first sidechain block error is already verified to be ErrPrunedAncestor. Since we don't import - // them here, we expect ErrUnknownAncestor for the remaining ones. Any other errors means that - // the block is invalid, and should not be written to disk. - block := batch.current() - for block != nil && (err == consensus.ErrPrunedAncestor) { - // Check the canonical stateroot for that number - if remoteNum := block.NumberU64(); canonHeadNumber >= remoteNum { - canonBlock := bc.GetBlockByNumber(remoteNum) - if canonBlock != nil && canonBlock.Root() == block.Root() { - // This is most likely a shadow-state attack. - // When a fork is imported into the database, and it eventually reaches a block height which is - // not pruned, we just found that the state already exist! This means that the sidechain block + // The first sidechain block error is already verified to be ErrPrunedAncestor. + // Since we don't import them here, we expect ErrUnknownAncestor for the remaining + // ones. Any other errors means that the block is invalid, and should not be written + // to disk. + block, err := it.current(), consensus.ErrPrunedAncestor + for ; block != nil && (err == consensus.ErrPrunedAncestor); block, err = it.next() { + // Check the canonical state root for that number + if number := block.NumberU64(); current >= number { + if canonical := bc.GetBlockByNumber(number); canonical != nil && canonical.Root() == block.Root() { + // This is most likely a shadow-state attack. When a fork is imported into the + // database, and it eventually reaches a block height which is not pruned, we + // just found that the state already exist! This means that the sidechain block // refers to a state which already exists in our canon chain. - // If left unchecked, we would now proceed importing the blocks, without actually having verified - // the state of the previous blocks. - log.Warn("Sidechain ghost-state attack detected", "blocknum", block.NumberU64(), - "sidechain root", block.Root(), "canon root", canonBlock.Root()) + // + // If left unchecked, we would now proceed importing the blocks, without actually + // having verified the state of the previous blocks. + log.Warn("Sidechain ghost-state attack detected", "number", block.NumberU64(), "sideroot", block.Root(), "canonroot", canonical.Root()) + // If someone legitimately side-mines blocks, they would still be imported as usual. However, // we cannot risk writing unverified blocks to disk when they obviously target the pruning // mechanism. - return batch.index, events, coalescedLogs, fmt.Errorf("sidechain ghost-state attack detected") + return it.index, nil, nil, errors.New("sidechain ghost-state attack") } } if externTd == nil { externTd = bc.GetTd(block.ParentHash(), block.NumberU64()-1) } externTd = new(big.Int).Add(externTd, block.Difficulty()) + if !bc.HasBlock(block.Hash(), block.NumberU64()) { + start := time.Now() if err := bc.WriteBlockWithoutState(block, externTd); err != nil { - return batch.index, events, coalescedLogs, err + return it.index, nil, nil, err } + log.Debug("Inserted sidechain block", "number", block.Number(), "hash", block.Hash(), + "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)), + "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()), + "root", block.Root()) } - block, err = batch.next() } - // At this point, we've written all sidechain blocks to database. Loop ended either on some other error, - // or all were processed. If there was some other error, we can ignore the rest of those blocks. + // At this point, we've written all sidechain blocks to database. Loop ended + // either on some other error or all were processed. If there was some other + // error, we can ignore the rest of those blocks. // // If the externTd was larger than our local TD, we now need to reimport the previous // blocks to regenerate the required state - currentBlock := bc.CurrentBlock() - localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) - // don't process until the competitor TD goes above the canonical TD + localTd := bc.GetTd(bc.CurrentBlock().Hash(), current) if localTd.Cmp(externTd) > 0 { - // If we have hit a sidechain, we may have to reimport pruned blocks - log.Info("Sidechain stored", "start", batch.first().NumberU64(), "end", batch.current().NumberU64(), "sidechain TD", externTd, "local TD", localTd) - return batch.index, events, coalescedLogs, err - } - // Competitor chain beat canonical. Before we reprocess to get the common ancestor, investigate if - // any blocks in the chain are 'known bad' blocks. - for index, b := range batch.chain { - if bc.badBlocks.Contains(b.Hash()) { - log.Info("Sidechain import aborted, bad block found", "index", index, "hash", b.Hash()) - return index, events, coalescedLogs, fmt.Errorf("known bad block %d 0x%x", b.NumberU64(), b.Hash()) - } + log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().NumberU64(), "sidetd", externTd, "localtd", localTd) + return it.index, nil, nil, err } - // gather all blocks from the common ancestor - var parents []*types.Block - // Import all the pruned blocks to make the state available - parent := bc.GetBlock(batch.first().ParentHash(), batch.first().NumberU64()-1) - for !bc.HasState(parent.Root()) { - if bc.badBlocks.Contains(parent.Hash()) { - log.Info("Sidechain parent processing aborted, bad block found", "number", parent.NumberU64(), "hash", parent.Hash()) - return 0, events, coalescedLogs, fmt.Errorf("known bad block %d 0x%x", parent.NumberU64(), parent.Hash()) - } - parents = append(parents, parent) - parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1) + // Gather all the sidechain hashes (full blocks may be memory heavy) + var ( + hashes []common.Hash + numbers []uint64 + ) + parent := bc.GetHeader(it.previous().Hash(), it.previous().NumberU64()) + for parent != nil && !bc.HasState(parent.Root) { + hashes = append(hashes, parent.Hash()) + numbers = append(numbers, parent.Number.Uint64()) + + parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1) } - for j := 0; j < len(parents)/2; j++ { - parents[j], parents[len(parents)-1-j] = parents[len(parents)-1-j], parents[j] + if parent == nil { + return it.index, nil, nil, errors.New("missing parent") } // Import all the pruned blocks to make the state available - // During re-import, we can disable PoW-verification, since these are already verified - log.Info("Inserting parent blocks for reprocessing", "first", parents[0].NumberU64(), "count", len(parents), "last", parents[len(parents)-1].NumberU64) - _, evs, logs, err := bc.insertChainInternal(parents, false) - events, coalescedLogs = evs, logs - if err != nil { - return 0, events, coalescedLogs, err - } - log.Info("Inserting sidechain blocks for processing") - errindex, events, coalescedLogs, err := bc.insertChainInternal(batch.chain[0:batch.index], false) - return errindex, events, coalescedLogs, err -} - -// insertStats tracks and reports on block insertion. -type insertStats struct { - queued, processed, ignored int - usedGas uint64 - lastIndex int - startTime mclock.AbsTime -} - -// statsReportLimit is the time limit during import and export after which we -// always print out progress. This avoids the user wondering what's going on. -const statsReportLimit = 8 * time.Second - -// report prints statistics if some number of blocks have been processed -// or more than a few seconds have passed since the last message. -func (st *insertStats) report(chain []*types.Block, index int, cache common.StorageSize) { - // Fetch the timings for the batch var ( - now = mclock.Now() - elapsed = time.Duration(now) - time.Duration(st.startTime) + blocks []*types.Block + memory common.StorageSize ) - // If we're at the last block of the batch or report period reached, log - if index == len(chain)-1 || elapsed >= statsReportLimit { - var ( - end = chain[index] - txs = countTransactions(chain[st.lastIndex : index+1]) - ) - context := []interface{}{ - "blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000, - "elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed), - "number", end.Number(), "hash", end.Hash(), - } - if timestamp := time.Unix(end.Time().Int64(), 0); time.Since(timestamp) > time.Minute { - context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...) - } - context = append(context, []interface{}{"cache", cache}...) + for i := len(hashes) - 1; i >= 0; i-- { + // Append the next block to our batch + block := bc.GetBlock(hashes[i], numbers[i]) - if st.queued > 0 { - context = append(context, []interface{}{"queued", st.queued}...) - } - if st.ignored > 0 { - context = append(context, []interface{}{"ignored", st.ignored}...) - } - log.Info("Imported new chain segment", context...) + blocks = append(blocks, block) + memory += block.Size() + + // If memory use grew too large, import and continue. Sadly we need to discard + // all raised events and logs from notifications since we're too heavy on the + // memory here. + if len(blocks) >= 2048 || memory > 64*1024*1024 { + log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64()) + if _, _, _, err := bc.insertChain(blocks, false); err != nil { + return 0, nil, nil, err + } + blocks, memory = blocks[:0], 0 - *st = insertStats{startTime: now, lastIndex: index + 1} + // If the chain is terminating, stop processing blocks + if atomic.LoadInt32(&bc.procInterrupt) == 1 { + log.Debug("Premature abort during blocks processing") + return 0, nil, nil, nil + } + } } -} - -func countTransactions(chain []*types.Block) (c int) { - for _, b := range chain { - c += len(b.Transactions()) + if len(blocks) > 0 { + log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64()) + return bc.insertChain(blocks, false) } - return c + return 0, nil, nil, nil } // reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go new file mode 100644 index 000000000..70bea3544 --- /dev/null +++ b/core/blockchain_insert.go @@ -0,0 +1,143 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package core + +import ( + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +// insertStats tracks and reports on block insertion. +type insertStats struct { + queued, processed, ignored int + usedGas uint64 + lastIndex int + startTime mclock.AbsTime +} + +// statsReportLimit is the time limit during import and export after which we +// always print out progress. This avoids the user wondering what's going on. +const statsReportLimit = 8 * time.Second + +// report prints statistics if some number of blocks have been processed +// or more than a few seconds have passed since the last message. +func (st *insertStats) report(chain []*types.Block, index int, cache common.StorageSize) { + // Fetch the timings for the batch + var ( + now = mclock.Now() + elapsed = time.Duration(now) - time.Duration(st.startTime) + ) + // If we're at the last block of the batch or report period reached, log + if index == len(chain)-1 || elapsed >= statsReportLimit { + // Count the number of transactions in this segment + var txs int + for _, block := range chain[st.lastIndex : index+1] { + txs += len(block.Transactions()) + } + end := chain[index] + + // Assemble the log context and send it to the logger + context := []interface{}{ + "blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000, + "elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed), + "number", end.Number(), "hash", end.Hash(), + } + if timestamp := time.Unix(end.Time().Int64(), 0); time.Since(timestamp) > time.Minute { + context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...) + } + context = append(context, []interface{}{"cache", cache}...) + + if st.queued > 0 { + context = append(context, []interface{}{"queued", st.queued}...) + } + if st.ignored > 0 { + context = append(context, []interface{}{"ignored", st.ignored}...) + } + log.Info("Imported new chain segment", context...) + + // Bump the stats reported to the next section + *st = insertStats{startTime: now, lastIndex: index + 1} + } +} + +// insertIterator is a helper to assist during chain import. +type insertIterator struct { + chain types.Blocks + results <-chan error + index int + validator Validator +} + +// newInsertIterator creates a new iterator based on the given blocks, which are +// assumed to be a contiguous chain. +func newInsertIterator(chain types.Blocks, results <-chan error, validator Validator) *insertIterator { + return &insertIterator{ + chain: chain, + results: results, + index: -1, + validator: validator, + } +} + +// next returns the next block in the iterator, along with any potential validation +// error for that block. When the end is reached, it will return (nil, nil). +func (it *insertIterator) next() (*types.Block, error) { + if it.index+1 >= len(it.chain) { + it.index = len(it.chain) + return nil, nil + } + it.index++ + if err := <-it.results; err != nil { + return it.chain[it.index], err + } + return it.chain[it.index], it.validator.ValidateBody(it.chain[it.index]) +} + +// current returns the current block that's being processed. +func (it *insertIterator) current() *types.Block { + if it.index < 0 || it.index+1 >= len(it.chain) { + return nil + } + return it.chain[it.index] +} + +// previous returns the previous block was being processed, or nil +func (it *insertIterator) previous() *types.Block { + if it.index < 1 { + return nil + } + return it.chain[it.index-1] +} + +// first returns the first block in the it. +func (it *insertIterator) first() *types.Block { + return it.chain[0] +} + +// remaining returns the number of remaining blocks. +func (it *insertIterator) remaining() int { + return len(it.chain) - it.index +} + +// processed returns the number of processed blocks. +func (it *insertIterator) processed() int { + return it.index + 1 +} diff --git a/core/blockchain_test.go b/core/blockchain_test.go index aef810050..5ab29e205 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -579,11 +579,11 @@ func testInsertNonceError(t *testing.T, full bool) { blockchain.hc.engine = blockchain.engine failRes, err = blockchain.InsertHeaderChain(headers, 1) } - // Check that the returned error indicates the failure. + // Check that the returned error indicates the failure if failRes != failAt { - t.Errorf("test %d: failure index mismatch: have %d, want %d", i, failRes, failAt) + t.Errorf("test %d: failure (%v) index mismatch: have %d, want %d", i, err, failRes, failAt) } - // Check that all no blocks after the failing block have been inserted. + // Check that all blocks after the failing block have been inserted for j := 0; j < i-failAt; j++ { if full { if block := blockchain.GetBlockByNumber(failNum + uint64(j)); block != nil { @@ -1345,7 +1345,7 @@ func TestLargeReorgTrieGC(t *testing.T) { t.Fatalf("failed to insert shared chain: %v", err) } if _, err := chain.InsertChain(original); err != nil { - t.Fatalf("failed to insert shared chain: %v", err) + t.Fatalf("failed to insert original chain: %v", err) } // Ensure that the state associated with the forking point is pruned away if node, _ := chain.stateCache.TrieDB().Node(shared[len(shared)-1].Root()); node != nil { |