diff options
author | Sonic <sonic@dexon.org> | 2019-03-27 20:02:55 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@byzantine-lab.io> | 2019-06-13 18:11:44 +0800 |
commit | c52c9e04a916fac3550b0a3c3d8cdf979ab70bb8 (patch) | |
tree | aa9e20f32fa084fd9c5e2fbfcee295d5d63b1d48 /dex | |
parent | 7b8b4fcb0e8fd411bf523d06492e966e20e1b613 (diff) | |
download | go-tangerine-c52c9e04a916fac3550b0a3c3d8cdf979ab70bb8.tar go-tangerine-c52c9e04a916fac3550b0a3c3d8cdf979ab70bb8.tar.gz go-tangerine-c52c9e04a916fac3550b0a3c3d8cdf979ab70bb8.tar.bz2 go-tangerine-c52c9e04a916fac3550b0a3c3d8cdf979ab70bb8.tar.lz go-tangerine-c52c9e04a916fac3550b0a3c3d8cdf979ab70bb8.tar.xz go-tangerine-c52c9e04a916fac3550b0a3c3d8cdf979ab70bb8.tar.zst go-tangerine-c52c9e04a916fac3550b0a3c3d8cdf979ab70bb8.zip |
backport from v1.8.23 (#304)
* dex: backport f6193ad
* dex/downloader: backport accc0fa accc0fab 174083c3
* dex: backport 434dd5b
* dex: backport 42a914a 0983d02
* dex: backport 48b70ec 31b3334 and some modification
* dex/downloader: backport 5f251a6
* dex/downloader: backport 81c3dc7
* dex, dex/downloader: fix typos
Diffstat (limited to 'dex')
-rw-r--r-- | dex/api_backend.go | 12 | ||||
-rw-r--r-- | dex/api_tracer.go | 173 | ||||
-rw-r--r-- | dex/backend.go | 2 | ||||
-rw-r--r-- | dex/config.go | 6 | ||||
-rw-r--r-- | dex/downloader/downloader.go | 178 | ||||
-rw-r--r-- | dex/downloader/downloader_test.go | 103 | ||||
-rw-r--r-- | dex/downloader/queue.go | 2 | ||||
-rw-r--r-- | dex/downloader/statesync.go | 11 | ||||
-rw-r--r-- | dex/handler.go | 23 | ||||
-rw-r--r-- | dex/helper_test.go | 2 | ||||
-rw-r--r-- | dex/peer.go | 5 | ||||
-rw-r--r-- | dex/protocol.go | 3 |
12 files changed, 433 insertions, 87 deletions
diff --git a/dex/api_backend.go b/dex/api_backend.go index 60a874701..7333a9bb1 100644 --- a/dex/api_backend.go +++ b/dex/api_backend.go @@ -25,7 +25,6 @@ import ( "github.com/dexon-foundation/dexon/common/math" "github.com/dexon-foundation/dexon/core" "github.com/dexon-foundation/dexon/core/bloombits" - "github.com/dexon-foundation/dexon/core/rawdb" "github.com/dexon-foundation/dexon/core/state" "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/core/vm" @@ -92,18 +91,11 @@ func (b *DexAPIBackend) GetBlock(ctx context.Context, hash common.Hash) (*types. } func (b *DexAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) { - if number := rawdb.ReadHeaderNumber(b.dex.chainDb, hash); number != nil { - return rawdb.ReadReceipts(b.dex.chainDb, hash, *number), nil - } - return nil, nil + return b.dex.blockchain.GetReceiptsByHash(hash), nil } func (b *DexAPIBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) { - number := rawdb.ReadHeaderNumber(b.dex.chainDb, hash) - if number == nil { - return nil, nil - } - receipts := rawdb.ReadReceipts(b.dex.chainDb, hash, *number) + receipts := b.dex.blockchain.GetReceiptsByHash(hash) if receipts == nil { return nil, nil } diff --git a/dex/api_tracer.go b/dex/api_tracer.go index bb6acd764..d451c376d 100644 --- a/dex/api_tracer.go +++ b/dex/api_tracer.go @@ -17,11 +17,13 @@ package dex import ( + "bufio" "bytes" "context" "errors" "fmt" "io/ioutil" + "os" "runtime" "sync" "time" @@ -60,6 +62,13 @@ type TraceConfig struct { Reexec *uint64 } +// StdTraceConfig holds extra parameters to standard-json trace functions. +type StdTraceConfig struct { + *vm.LogConfig + Reexec *uint64 + TxHash common.Hash +} + // txTraceResult is the result of a single transaction trace. type txTraceResult struct { Result interface{} `json:"result,omitempty"` // Trace results produced by the tracer @@ -134,7 +143,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl // Ensure we have a valid starting state before doing any work origin := start.NumberU64() - database := state.NewDatabase(api.dex.ChainDb()) + database := state.NewDatabaseWithCache(api.dex.ChainDb(), 16) // Chain tracing will probably start at genesis if number := start.NumberU64(); number > 0 { start = api.dex.blockchain.GetBlock(start.ParentHash(), start.NumberU64()-1) @@ -360,7 +369,7 @@ func (api *PrivateDebugAPI) TraceBlockByNumber(ctx context.Context, number rpc.B func (api *PrivateDebugAPI) TraceBlockByHash(ctx context.Context, hash common.Hash, config *TraceConfig) ([]*txTraceResult, error) { block := api.dex.blockchain.GetBlockByHash(hash) if block == nil { - return nil, fmt.Errorf("block #%x not found", hash) + return nil, fmt.Errorf("block %#x not found", hash) } return api.traceBlock(ctx, block, config) } @@ -385,6 +394,43 @@ func (api *PrivateDebugAPI) TraceBlockFromFile(ctx context.Context, file string, return api.TraceBlock(ctx, blob, config) } +// TraceBadBlockByHash returns the structured logs created during the execution of +// EVM against a block pulled from the pool of bad ones and returns them as a JSON +// object. +func (api *PrivateDebugAPI) TraceBadBlock(ctx context.Context, hash common.Hash, config *TraceConfig) ([]*txTraceResult, error) { + blocks := api.dex.blockchain.BadBlocks() + for _, block := range blocks { + if block.Hash() == hash { + return api.traceBlock(ctx, block, config) + } + } + return nil, fmt.Errorf("bad block %#x not found", hash) +} + +// StandardTraceBlockToFile dumps the structured logs created during the +// execution of EVM to the local file system and returns a list of files +// to the caller. +func (api *PrivateDebugAPI) StandardTraceBlockToFile(ctx context.Context, hash common.Hash, config *StdTraceConfig) ([]string, error) { + block := api.dex.blockchain.GetBlockByHash(hash) + if block == nil { + return nil, fmt.Errorf("block %#x not found", hash) + } + return api.standardTraceBlockToFile(ctx, block, config) +} + +// StandardTraceBadBlockToFile dumps the structured logs created during the +// execution of EVM against a block pulled from the pool of bad ones to the +// local file system and returns a list of files to the caller. +func (api *PrivateDebugAPI) StandardTraceBadBlockToFile(ctx context.Context, hash common.Hash, config *StdTraceConfig) ([]string, error) { + blocks := api.dex.blockchain.BadBlocks() + for _, block := range blocks { + if block.Hash() == hash { + return api.standardTraceBlockToFile(ctx, block, config) + } + } + return nil, fmt.Errorf("bad block %#x not found", hash) +} + // traceBlock configures a new tracer according to the provided configuration, and // executes all the transactions contained within. The return value will be one item // per transaction, dependent on the requestd tracer. @@ -395,7 +441,7 @@ func (api *PrivateDebugAPI) traceBlock(ctx context.Context, block *types.Block, } parent := api.dex.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) if parent == nil { - return nil, fmt.Errorf("parent %x not found", block.ParentHash()) + return nil, fmt.Errorf("parent %#x not found", block.ParentHash()) } reexec := defaultTraceReexec if config != nil && config.Reexec != nil { @@ -466,6 +512,107 @@ func (api *PrivateDebugAPI) traceBlock(ctx context.Context, block *types.Block, return results, nil } +// standardTraceBlockToFile configures a new tracer which uses standard JSON output, +// and traces either a full block or an individual transaction. The return value will +// be one filename per transaction traced. +func (api *PrivateDebugAPI) standardTraceBlockToFile(ctx context.Context, block *types.Block, config *StdTraceConfig) ([]string, error) { + // If we're tracing a single transaction, make sure it's present + if config != nil && config.TxHash != (common.Hash{}) { + var exists bool + for _, tx := range block.Transactions() { + if exists = (tx.Hash() == config.TxHash); exists { + break + } + } + if !exists { + return nil, fmt.Errorf("transaction %#x not found in block", config.TxHash) + } + } + // Create the parent state database + if err := api.dex.engine.VerifyHeader(api.dex.blockchain, block.Header(), true); err != nil { + return nil, err + } + parent := api.dex.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) + if parent == nil { + return nil, fmt.Errorf("parent %#x not found", block.ParentHash()) + } + reexec := defaultTraceReexec + if config != nil && config.Reexec != nil { + reexec = *config.Reexec + } + statedb, err := api.computeStateDB(parent, reexec) + if err != nil { + return nil, err + } + // Retrieve the tracing configurations, or use default values + var ( + logConfig vm.LogConfig + txHash common.Hash + ) + if config != nil { + if config.LogConfig != nil { + logConfig = *config.LogConfig + } + txHash = config.TxHash + } + logConfig.Debug = true + + // Execute transaction, either tracing all or just the requested one + var ( + signer = types.MakeSigner(api.config, block.Number()) + dumps []string + ) + for i, tx := range block.Transactions() { + // Prepare the trasaction for un-traced execution + var ( + msg, _ = tx.AsMessage(signer) + vmctx = core.NewEVMContext(msg, block.Header(), api.dex.blockchain, nil) + + vmConf vm.Config + dump *os.File + err error + ) + // If the transaction needs tracing, swap out the configs + if tx.Hash() == txHash || txHash == (common.Hash{}) { + // Generate a unique temporary file to dump it into + prefix := fmt.Sprintf("block_%#x-%d-%#x-", block.Hash().Bytes()[:4], i, tx.Hash().Bytes()[:4]) + + dump, err = ioutil.TempFile(os.TempDir(), prefix) + if err != nil { + return nil, err + } + dumps = append(dumps, dump.Name()) + + // Swap out the noop logger to the standard tracer + vmConf = vm.Config{ + Debug: true, + Tracer: vm.NewJSONLogger(&logConfig, bufio.NewWriter(dump)), + EnablePreimageRecording: true, + } + } + // Execute the transaction and flush any traces to disk + vmenv := vm.NewEVM(vmctx, statedb, api.config, vmConf) + _, _, _, err = core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas())) + + if dump != nil { + dump.Close() + log.Info("Wrote standard trace", "file", dump.Name()) + } + if err != nil { + return dumps, err + } + // Finalize the state so any modifications are written to the trie + // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect + statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number())) + + // If we've traced the transaction we were looking for, abort + if tx.Hash() == txHash { + break + } + } + return dumps, nil +} + // computeStateDB retrieves the state database associated with a certain block. // If no state is locally available for the given block, a number of blocks are // attempted to be reexecuted to generate the desired state. @@ -477,7 +624,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (* } // Otherwise try to reexec blocks until we find a state or reach our limit origin := block.NumberU64() - database := state.NewDatabase(api.dex.ChainDb()) + database := state.NewDatabaseWithCache(api.dex.ChainDb(), 16) for i := uint64(0); i < reexec; i++ { block = api.dex.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) @@ -491,7 +638,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (* if err != nil { switch err.(type) { case *trie.MissingNodeError: - return nil, errors.New("required historical state unavailable") + return nil, fmt.Errorf("required historical state unavailable (reexec=%d)", reexec) default: return nil, err } @@ -505,7 +652,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (* for block.NumberU64() < origin { // Print progress logs if long enough time elapsed if time.Since(logged) > 8*time.Second { - log.Info("Regenerating historical state", "block", block.NumberU64()+1, "target", origin, "elapsed", time.Since(start)) + log.Info("Regenerating historical state", "block", block.NumberU64()+1, "target", origin, "remaining", origin-block.NumberU64()-1, "elapsed", time.Since(start)) logged = time.Now() } // Retrieve the next block to regenerate and process it @@ -514,7 +661,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (* } _, _, _, err := api.dex.blockchain.Processor().Process(block, statedb, vm.Config{}) if err != nil { - return nil, err + return nil, fmt.Errorf("processing block %d failed: %v", block.NumberU64(), err) } // Finalize the state so any modifications are written to the trie root, err := statedb.Commit(true) @@ -522,7 +669,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (* return nil, err } if err := statedb.Reset(root); err != nil { - return nil, err + return nil, fmt.Errorf("state reset after block %d failed: %v", block.NumberU64(), err) } database.TrieDB().Reference(root, common.Hash{}) if proot != (common.Hash{}) { @@ -541,7 +688,7 @@ func (api *PrivateDebugAPI) TraceTransaction(ctx context.Context, hash common.Ha // Retrieve the transaction and assemble its EVM context tx, blockHash, _, index := rawdb.ReadTransaction(api.dex.ChainDb(), hash) if tx == nil { - return nil, fmt.Errorf("transaction %x not found", hash) + return nil, fmt.Errorf("transaction %#x not found", hash) } reexec := defaultTraceReexec if config != nil && config.Reexec != nil { @@ -621,11 +768,11 @@ func (api *PrivateDebugAPI) computeTxEnv(blockHash common.Hash, txIndex int, ree // Create the parent state database block := api.dex.blockchain.GetBlockByHash(blockHash) if block == nil { - return nil, vm.Context{}, nil, fmt.Errorf("block %x not found", blockHash) + return nil, vm.Context{}, nil, fmt.Errorf("block %#x not found", blockHash) } parent := api.dex.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) if parent == nil { - return nil, vm.Context{}, nil, fmt.Errorf("parent %x not found", block.ParentHash()) + return nil, vm.Context{}, nil, fmt.Errorf("parent %#x not found", block.ParentHash()) } statedb, err := api.computeStateDB(parent, reexec) if err != nil { @@ -644,10 +791,10 @@ func (api *PrivateDebugAPI) computeTxEnv(blockHash common.Hash, txIndex int, ree // Not yet the searched for transaction, execute on top of the current state vmenv := vm.NewEVM(context, statedb, api.config, vm.Config{}) if _, _, _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil { - return nil, vm.Context{}, nil, fmt.Errorf("tx %x failed: %v", tx.Hash(), err) + return nil, vm.Context{}, nil, fmt.Errorf("tx %#x failed: %v", tx.Hash(), err) } // Ensure any modifications are committed to the state statedb.Finalise(true) } - return nil, vm.Context{}, nil, fmt.Errorf("tx index %d out of range for block %x", txIndex, blockHash) + return nil, vm.Context{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, blockHash) } diff --git a/dex/backend.go b/dex/backend.go index 29ac42906..26a0b49eb 100644 --- a/dex/backend.go +++ b/dex/backend.go @@ -173,7 +173,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode, config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain, - chainDb, config.BlockProposerEnabled, dex.governance, dex.app) + chainDb, config.Whitelist, config.BlockProposerEnabled, dex.governance, dex.app) if err != nil { return nil, err } diff --git a/dex/config.go b/dex/config.go index d218b35e2..ac3e89969 100644 --- a/dex/config.go +++ b/dex/config.go @@ -25,6 +25,7 @@ import ( "runtime" "time" + "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/core" "github.com/dexon-foundation/dexon/dex/downloader" "github.com/dexon-foundation/dexon/eth/gasprice" @@ -81,6 +82,9 @@ type Config struct { SyncMode downloader.SyncMode NoPruning bool + // Whitelist of required block number -> hash values to accept + Whitelist map[uint64]common.Hash `toml:"-"` + // Light client options LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests LightPeers int `toml:",omitempty"` // Maximum number of LES client peers @@ -112,7 +116,7 @@ type Config struct { // Miscellaneous options DocRoot string `toml:"-"` - // Type of the EWASM interpreter ("" for detault) + // Type of the EWASM interpreter ("" for default) EWASMInterpreter string // Type of the EVM interpreter ("" for default) diff --git a/dex/downloader/downloader.go b/dex/downloader/downloader.go index e3960ea30..9d609584b 100644 --- a/dex/downloader/downloader.go +++ b/dex/downloader/downloader.go @@ -103,6 +103,7 @@ type Downloader struct { mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) mux *event.TypeMux // Event multiplexer to announce sync operation events + genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT) queue *queue // Scheduler for selecting the hashes to download peers *peerSet // Set of active peers from which download can proceed stateDB ethdb.Database @@ -192,6 +193,9 @@ type BlockChain interface { // HasBlock verifies a block's presence in the local chain. HasBlock(common.Hash, uint64) bool + // HasFastBlock verifies a fast block's presence in the local chain. + HasFastBlock(common.Hash, uint64) bool + // GetBlockByHash retrieves a block from the local chain. GetBlockByHash(common.Hash) *types.Block @@ -442,7 +446,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, number ui } height := latest.Number.Uint64() - origin, err := d.findAncestor(p, height) + origin, err := d.findAncestor(p, latest) if err != nil { return err } @@ -691,41 +695,107 @@ func (d *Downloader) fetchGovState(p *peerConnection, } } -// findAncestor tries to locate the common ancestor link of the local chain and -// a remote peers blockchain. In the general case when our node was in sync and -// on the correct chain, checking the top N links should already get us a match. -// In the rare scenario when we ended up on a long reorganisation (i.e. none of -// the head links match), we do a binary search to find the common ancestor. -func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) { - // Figure out the valid ancestor range to prevent rewrite attacks - floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64() - - if d.mode == FullSync { - ceil = d.blockchain.CurrentBlock().NumberU64() - } else if d.mode == FastSync { - ceil = d.blockchain.CurrentFastBlock().NumberU64() +// calculateRequestSpan calculates what headers to request from a peer when trying to determine the +// common ancestor. +// It returns parameters to be used for peer.RequestHeadersByNumber: +// from - starting block number +// count - number of headers to request +// skip - number of headers to skip +// and also returns 'max', the last block which is expected to be returned by the remote peers, +// given the (from,count,skip) +func calculateRequestSpan(remoteHeight, localHeight uint64) (int64, int, int, uint64) { + var ( + from int + count int + MaxCount = MaxHeaderFetch / 16 + ) + // requestHead is the highest block that we will ask for. If requestHead is not offset, + // the highest block that we will get is 16 blocks back from head, which means we + // will fetch 14 or 15 blocks unnecessarily in the case the height difference + // between us and the peer is 1-2 blocks, which is most common + requestHead := int(remoteHeight) - 1 + if requestHead < 0 { + requestHead = 0 + } + // requestBottom is the lowest block we want included in the query + // Ideally, we want to include just below own head + requestBottom := int(localHeight - 1) + if requestBottom < 0 { + requestBottom = 0 } - if ceil >= MaxForkAncestry { - floor = int64(ceil - MaxForkAncestry) + totalSpan := requestHead - requestBottom + span := 1 + totalSpan/MaxCount + if span < 2 { + span = 2 + } + if span > 16 { + span = 16 } - p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height) - // Request the topmost blocks to short circuit binary ancestor lookup - head := ceil - if head > height { - head = height + count = 1 + totalSpan/span + if count > MaxCount { + count = MaxCount + } + if count < 2 { + count = 2 } - from := int64(head) - int64(MaxHeaderFetch) + from = requestHead - (count-1)*span if from < 0 { from = 0 } - // Span out with 15 block gaps into the future to catch bad head reports - limit := 2 * MaxHeaderFetch / 16 - count := 1 + int((int64(ceil)-from)/16) - if count > limit { - count = limit + max := from + (count-1)*span + return int64(from), count, span - 1, uint64(max) +} + +// findAncestor tries to locate the common ancestor link of the local chain and +// a remote peers blockchain. In the general case when our node was in sync and +// on the correct chain, checking the top N links should already get us a match. +// In the rare scenario when we ended up on a long reorganisation (i.e. none of +// the head links match), we do a binary search to find the common ancestor. +func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) (uint64, error) { + // Figure out the valid ancestor range to prevent rewrite attacks + var ( + floor = int64(-1) + localHeight uint64 + remoteHeight = remoteHeader.Number.Uint64() + ) + switch d.mode { + case FullSync: + localHeight = d.blockchain.CurrentBlock().NumberU64() + case FastSync: + localHeight = d.blockchain.CurrentFastBlock().NumberU64() + default: + localHeight = d.lightchain.CurrentHeader().Number.Uint64() + } + p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight) + if localHeight >= MaxForkAncestry { + // We're above the max reorg threshold, find the earliest fork point + floor = int64(localHeight - MaxForkAncestry) + + // If we're doing a light sync, ensure the floor doesn't go below the CHT, as + // all headers before that point will be missing. + if d.mode == LightSync { + // If we dont know the current CHT position, find it + if d.genesis == 0 { + header := d.lightchain.CurrentHeader() + for header != nil { + d.genesis = header.Number.Uint64() + if floor >= int64(d.genesis)-1 { + break + } + header = d.lightchain.GetHeaderByHash(header.ParentHash) + } + } + // We already know the "genesis" block number, cap floor to that + if floor < int64(d.genesis)-1 { + floor = int64(d.genesis) - 1 + } + } } - go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false, false) + from, count, skip, max := calculateRequestSpan(remoteHeight, localHeight) + + p.log.Trace("Span searching for common ancestor", "count", count, "from", from, "skip", skip) + go p.peer.RequestHeadersByNumber(uint64(from), count, skip, false, false) // Wait for the remote response to the head fetch number, hash := uint64(0), common.Hash{} @@ -751,9 +821,10 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err return 0, errEmptyHeaderSet } // Make sure the peer's reply conforms to the request - for i := 0; i < len(headers); i++ { - if number := headers[i].Number.Int64(); number != from+int64(i)*16 { - p.log.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number) + for i, header := range headers { + expectNumber := from + int64(i)*int64((skip+1)) + if number := header.Number.Int64(); number != expectNumber { + p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number) return 0, errInvalidChain } } @@ -761,20 +832,24 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err finished = true for i := len(headers) - 1; i >= 0; i-- { // Skip any headers that underflow/overflow our requested set - if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil { + if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > max { continue } // Otherwise check if we already know the header or not h := headers[i].Hash() n := headers[i].Number.Uint64() - if (d.mode == FullSync && d.blockchain.HasBlock(h, n)) || (d.mode != FullSync && d.lightchain.HasHeader(h, n)) { - number, hash = n, h - // If every header is known, even future ones, the peer straight out lied about its head - if number > height && i == limit-1 { - p.log.Warn("Lied about chain head", "reported", height, "found", number) - return 0, errStallingPeer - } + var known bool + switch d.mode { + case FullSync: + known = d.blockchain.HasBlock(h, n) + case FastSync: + known = d.blockchain.HasFastBlock(h, n) + default: + known = d.lightchain.HasHeader(h, n) + } + if known { + number, hash = n, h break } } @@ -798,10 +873,11 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err return number, nil } // Ancestor not found, we need to binary search over our chain - start, end := uint64(0), head + start, end := uint64(0), remoteHeight if floor > 0 { start = uint64(floor) } + p.log.Trace("Binary searching for common ancestor", "start", start, "end", end) for start+1 < end { // Split our chain interval in two, and request the hash to cross check check := (start + end) / 2 @@ -834,7 +910,16 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err // Modify the search interval based on the response h := headers[0].Hash() n := headers[0].Number.Uint64() - if (d.mode == FullSync && !d.blockchain.HasBlock(h, n)) || (d.mode != FullSync && !d.lightchain.HasHeader(h, n)) { + var known bool + switch d.mode { + case FullSync: + known = d.blockchain.HasBlock(h, n) + case FastSync: + known = d.blockchain.HasFastBlock(h, n) + default: + known = d.lightchain.HasHeader(h, n) + } + if !known { end = check break } @@ -1511,8 +1596,15 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) } if index, err := d.blockchain.InsertDexonChain(blocks); err != nil { - log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) - return errInvalidChain + if index < len(results) { + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) + } else { + // The InsertChain method in blockchain.go will sometimes return an out-of-bounds index, + // when it needs to preprocess blocks to import a sidechain. + // The importer will put together a new list of blocks to import, which is a superset + // of the blocks delivered from the downloader, and the indexing will be off. + log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err) + } } return nil } diff --git a/dex/downloader/downloader_test.go b/dex/downloader/downloader_test.go index e8ec0056b..e01e0d96b 100644 --- a/dex/downloader/downloader_test.go +++ b/dex/downloader/downloader_test.go @@ -19,6 +19,7 @@ package downloader import ( "errors" "fmt" + "strings" "sync" "sync/atomic" "testing" @@ -118,6 +119,15 @@ func (dl *downloadTester) HasBlock(hash common.Hash, number uint64) bool { return dl.GetBlockByHash(hash) != nil } +// HasFastBlock checks if a block is present in the testers canonical chain. +func (dl *downloadTester) HasFastBlock(hash common.Hash, number uint64) bool { + dl.lock.RLock() + defer dl.lock.RUnlock() + + _, ok := dl.ownReceipts[hash] + return ok +} + // GetHeader retrieves a header from the testers canonical chain. func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header { dl.lock.RLock() @@ -271,6 +281,7 @@ func (dl *downloadTester) InsertDexonChain(blocks types.Blocks) (i int, err erro dl.ownHeaders[block.Hash()] = block.Header() } dl.ownBlocks[block.Hash()] = block + dl.ownReceipts[block.Hash()] = make(types.Receipts, 0) dl.stateDb.Put(block.Root().Bytes(), []byte{0x00}) } return len(blocks), nil @@ -416,14 +427,20 @@ func (dlp *downloadTesterPeer) RequestNodeData(hashes []common.Hash) error { // assertOwnChain checks if the local chain contains the correct number of items // of the various chain components. func assertOwnChain(t *testing.T, tester *downloadTester, length int) { + // Mark this method as a helper to report errors at callsite, not in here + t.Helper() + assertOwnForkedChain(t, tester, 1, []int{length}) } // assertOwnForkedChain checks if the local forked chain contains the correct // number of items of the various chain components. func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) { + // Mark this method as a helper to report errors at callsite, not in here + t.Helper() + // Initialize the counters for the first fork - headers, blocks, receipts := lengths[0], lengths[0], lengths[0]-fsMinFullBlocks + headers, blocks, receipts := lengths[0], lengths[0], lengths[0] if receipts < 0 { receipts = 1 @@ -432,12 +449,9 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng for _, length := range lengths[1:] { headers += length - common blocks += length - common - receipts += length - common - fsMinFullBlocks + receipts += length - common } - switch tester.downloader.mode { - case FullSync: - receipts = 1 - case LightSync: + if tester.downloader.mode == LightSync { blocks, receipts = 1, 1 } if hs := len(tester.ownHeaders); hs != headers { @@ -1105,7 +1119,9 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { } func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.SyncProgress) { + // Mark this method as a helper to report errors at callsite, not in here t.Helper() + p := d.Progress() p.KnownStates, p.PulledStates = 0, 0 want.KnownStates, want.PulledStates = 0, 0 @@ -1363,3 +1379,78 @@ func (ftp *floodingTestPeer) RequestHeadersByNumber(from uint64, count, skip int } return nil } + +func TestRemoteHeaderRequestSpan(t *testing.T) { + testCases := []struct { + remoteHeight uint64 + localHeight uint64 + expected []int + }{ + // Remote is way higher. We should ask for the remote head and go backwards + {1500, 1000, + []int{1323, 1339, 1355, 1371, 1387, 1403, 1419, 1435, 1451, 1467, 1483, 1499}, + }, + {15000, 13006, + []int{14823, 14839, 14855, 14871, 14887, 14903, 14919, 14935, 14951, 14967, 14983, 14999}, + }, + //Remote is pretty close to us. We don't have to fetch as many + {1200, 1150, + []int{1149, 1154, 1159, 1164, 1169, 1174, 1179, 1184, 1189, 1194, 1199}, + }, + // Remote is equal to us (so on a fork with higher td) + // We should get the closest couple of ancestors + {1500, 1500, + []int{1497, 1499}, + }, + // We're higher than the remote! Odd + {1000, 1500, + []int{997, 999}, + }, + // Check some weird edgecases that it behaves somewhat rationally + {0, 1500, + []int{0, 2}, + }, + {6000000, 0, + []int{5999823, 5999839, 5999855, 5999871, 5999887, 5999903, 5999919, 5999935, 5999951, 5999967, 5999983, 5999999}, + }, + {0, 0, + []int{0, 2}, + }, + } + reqs := func(from, count, span int) []int { + var r []int + num := from + for len(r) < count { + r = append(r, num) + num += span + 1 + } + return r + } + for i, tt := range testCases { + from, count, span, max := calculateRequestSpan(tt.remoteHeight, tt.localHeight) + data := reqs(int(from), count, span) + + if max != uint64(data[len(data)-1]) { + t.Errorf("test %d: wrong last value %d != %d", i, data[len(data)-1], max) + } + failed := false + if len(data) != len(tt.expected) { + failed = true + t.Errorf("test %d: length wrong, expected %d got %d", i, len(tt.expected), len(data)) + } else { + for j, n := range data { + if n != tt.expected[j] { + failed = true + break + } + } + } + if failed { + res := strings.Replace(fmt.Sprint(data), " ", ",", -1) + exp := strings.Replace(fmt.Sprint(tt.expected), " ", ",", -1) + fmt.Printf("got: %v\n", res) + fmt.Printf("exp: %v\n", exp) + t.Errorf("test %d: wrong values", i) + } + } +} diff --git a/dex/downloader/queue.go b/dex/downloader/queue.go index f3a36ec3c..ea847913f 100644 --- a/dex/downloader/queue.go +++ b/dex/downloader/queue.go @@ -325,7 +325,7 @@ func (q *queue) Schedule(headers []*types.HeaderWithGovState, from uint64) []*ty } // Make sure no duplicate requests are executed if _, ok := q.blockTaskPool[hash]; ok { - log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash) + log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash) continue } if _, ok := q.receiptTaskPool[hash]; ok { diff --git a/dex/downloader/statesync.go b/dex/downloader/statesync.go index 49117abbb..1695ba19c 100644 --- a/dex/downloader/statesync.go +++ b/dex/downloader/statesync.go @@ -152,7 +152,7 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync { finished = append(finished, req) delete(active, pack.PeerId()) - // Handle dropped peer connections: + // Handle dropped peer connections: case p := <-peerDrop: // Skip if no request is currently pending req := active[p.id] @@ -398,9 +398,8 @@ func (s *stateSync) fillTasks(n int, req *stateReq) { // process iterates over a batch of delivered state data, injecting each item // into a running state sync, re-queuing any items that were requested but not -// delivered. -// Returns whether the peer actually managed to deliver anything of value, -// and any error that occurred +// delivered. Returns whether the peer actually managed to deliver anything of +// value, and any error that occurred func (s *stateSync) process(req *stateReq) (int, error) { // Collect processing stats and update progress if valid data was received duplicate, unexpected, successful := 0, 0, 0 @@ -412,14 +411,12 @@ func (s *stateSync) process(req *stateReq) (int, error) { }(time.Now()) // Iterate over all the delivered data and inject one-by-one into the trie - progress := false for _, blob := range req.response { - prog, hash, err := s.processNodeData(blob) + _, hash, err := s.processNodeData(blob) switch err { case nil: s.numUncommitted++ s.bytesUncommitted += len(blob) - progress = progress || prog successful++ case trie.ErrNotRequested: unexpected++ diff --git a/dex/handler.go b/dex/handler.go index ec5704103..20df41709 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -124,6 +124,8 @@ type ProtocolManager struct { recordsCh chan newRecordsEvent recordsSub event.Subscription + whitelist map[uint64]common.Hash + // channels for fetcher, syncer, txsyncLoop newPeerCh chan *peer txsyncCh chan *txsync @@ -161,7 +163,7 @@ type ProtocolManager struct { func NewProtocolManager( config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, - blockchain *core.BlockChain, chaindb ethdb.Database, + blockchain *core.BlockChain, chaindb ethdb.Database, whitelist map[uint64]common.Hash, isBlockProposer bool, gov governance, app dexconApp) (*ProtocolManager, error) { tab := newNodeTable() // Create the protocol manager with the base fields @@ -175,6 +177,7 @@ func NewProtocolManager( cache: newCache(5120, dexDB.NewDatabase(chaindb)), nextPullVote: &sync.Map{}, chainconfig: config, + whitelist: whitelist, newPeerCh: make(chan *peer), noMorePeers: make(chan struct{}), txsyncCh: make(chan *txsync), @@ -388,7 +391,13 @@ func (pm *ProtocolManager) handle(p *peer) error { pm.syncTransactions(p) pm.syncNodeRecords(p) - // main loop. handle incoming messages. + // If we have any explicit whitelist block hashes, request them + for number := range pm.whitelist { + if err := p.RequestWhitelistHeader(number); err != nil { + return err + } + } + // Handle incoming messages until the connection is torn down for { if err := pm.handleMsg(p); err != nil { p.Log().Debug("Ethereum message handling failed", "err", err) @@ -597,6 +606,14 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err != nil { log.Debug("Failed to deliver headers", "err", err) } + case whitelistReq: + if want, ok := pm.whitelist[data.Headers[0].Number.Uint64()]; ok { + if hash := data.Headers[0].Hash(); want != hash { + p.Log().Info("Whitelist mismatch, dropping peer", "number", data.Headers[0].Number.Uint64(), "hash", hash, "want", want) + return errors.New("whitelist block mismatch") + } + p.Log().Debug("Whitelist block verified", "number", data.Headers[0].Number.Uint64(), "hash", want) + } default: log.Debug("Got headers with unexpected flag", "flag", data.Flag) } @@ -791,7 +808,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.SetHead(trueHead, trueNumber) // Schedule a sync if above ours. Note, this will not fire a sync for a gap of - // a singe block (as the true number is below the propagated block), however this + // a single block (as the true number is below the propagated block), however this // scenario should easily be covered by the fetcher. currentBlock := pm.blockchain.CurrentBlock() if trueNumber > currentBlock.NumberU64() { diff --git a/dex/helper_test.go b/dex/helper_test.go index 4f5541052..1d903a907 100644 --- a/dex/helper_test.go +++ b/dex/helper_test.go @@ -133,7 +133,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func notarySetFunc: func(uint64) (map[string]struct{}, error) { return nil, nil }, } - pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, true, tgov, &testApp{}) + pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, nil, true, tgov, &testApp{}) if err != nil { return nil, nil, err } diff --git a/dex/peer.go b/dex/peer.go index 562cbfaca..0fa1ac61d 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -641,6 +641,11 @@ func (p *peer) RequestHeadersByNumber(origin uint64, amount int, skip int, rever return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse, WithGov: withGov, Flag: downloaderReq}) } +func (p *peer) RequestWhitelistHeader(origin uint64) error { + p.Log().Debug("Fetching whitelist header", "number", origin, "flag", whitelistReq) + return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: 1, Skip: 0, Reverse: false, WithGov: false, Flag: whitelistReq}) +} + func (p *peer) RequestGovStateByHash(hash common.Hash) error { p.Log().Debug("Fetching one gov state", "hash", hash) return p2p.Send(p.rw, GetGovStateMsg, hash) diff --git a/dex/protocol.go b/dex/protocol.go index 639925265..287bf0883 100644 --- a/dex/protocol.go +++ b/dex/protocol.go @@ -55,7 +55,7 @@ const ( // ProtocolName is the official short name of the protocol used during capability negotiation. var ProtocolName = "dex" -// ProtocolVersions are the upported versions of the eth protocol (first is primary). +// ProtocolVersions are the supported versions of the eth protocol (first is primary). var ProtocolVersions = []uint{dex64} // ProtocolLengths are the number of implemented message corresponding to different protocol versions. @@ -116,6 +116,7 @@ const ( const ( fetcherReq = uint8(iota) downloaderReq + whitelistReq ) func (e errCode) String() string { |