aboutsummaryrefslogtreecommitdiffstats
path: root/dex
diff options
context:
space:
mode:
authorSonic <sonic@dexon.org>2019-03-27 20:02:55 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:58 +0800
commit91981becf98b988470810aa1c26d86de2d294e29 (patch)
treeaeb9b175a5cce93e6bbe8870028e151096bc6759 /dex
parenta29eba604b1a58ea414a6570223dedb0482cefaf (diff)
downloaddexon-91981becf98b988470810aa1c26d86de2d294e29.tar
dexon-91981becf98b988470810aa1c26d86de2d294e29.tar.gz
dexon-91981becf98b988470810aa1c26d86de2d294e29.tar.bz2
dexon-91981becf98b988470810aa1c26d86de2d294e29.tar.lz
dexon-91981becf98b988470810aa1c26d86de2d294e29.tar.xz
dexon-91981becf98b988470810aa1c26d86de2d294e29.tar.zst
dexon-91981becf98b988470810aa1c26d86de2d294e29.zip
backport from v1.8.23 (#304)
* dex: backport f6193ad * dex/downloader: backport accc0fa accc0fab 174083c3 * dex: backport 434dd5b * dex: backport 42a914a 0983d02 * dex: backport 48b70ec 31b3334 and some modification * dex/downloader: backport 5f251a6 * dex/downloader: backport 81c3dc7 * dex, dex/downloader: fix typos
Diffstat (limited to 'dex')
-rw-r--r--dex/api_backend.go12
-rw-r--r--dex/api_tracer.go173
-rw-r--r--dex/backend.go2
-rw-r--r--dex/config.go6
-rw-r--r--dex/downloader/downloader.go178
-rw-r--r--dex/downloader/downloader_test.go103
-rw-r--r--dex/downloader/queue.go2
-rw-r--r--dex/downloader/statesync.go11
-rw-r--r--dex/handler.go23
-rw-r--r--dex/helper_test.go2
-rw-r--r--dex/peer.go5
-rw-r--r--dex/protocol.go3
12 files changed, 433 insertions, 87 deletions
diff --git a/dex/api_backend.go b/dex/api_backend.go
index 60a874701..7333a9bb1 100644
--- a/dex/api_backend.go
+++ b/dex/api_backend.go
@@ -25,7 +25,6 @@ import (
"github.com/dexon-foundation/dexon/common/math"
"github.com/dexon-foundation/dexon/core"
"github.com/dexon-foundation/dexon/core/bloombits"
- "github.com/dexon-foundation/dexon/core/rawdb"
"github.com/dexon-foundation/dexon/core/state"
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/core/vm"
@@ -92,18 +91,11 @@ func (b *DexAPIBackend) GetBlock(ctx context.Context, hash common.Hash) (*types.
}
func (b *DexAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) {
- if number := rawdb.ReadHeaderNumber(b.dex.chainDb, hash); number != nil {
- return rawdb.ReadReceipts(b.dex.chainDb, hash, *number), nil
- }
- return nil, nil
+ return b.dex.blockchain.GetReceiptsByHash(hash), nil
}
func (b *DexAPIBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
- number := rawdb.ReadHeaderNumber(b.dex.chainDb, hash)
- if number == nil {
- return nil, nil
- }
- receipts := rawdb.ReadReceipts(b.dex.chainDb, hash, *number)
+ receipts := b.dex.blockchain.GetReceiptsByHash(hash)
if receipts == nil {
return nil, nil
}
diff --git a/dex/api_tracer.go b/dex/api_tracer.go
index bb6acd764..d451c376d 100644
--- a/dex/api_tracer.go
+++ b/dex/api_tracer.go
@@ -17,11 +17,13 @@
package dex
import (
+ "bufio"
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
+ "os"
"runtime"
"sync"
"time"
@@ -60,6 +62,13 @@ type TraceConfig struct {
Reexec *uint64
}
+// StdTraceConfig holds extra parameters to standard-json trace functions.
+type StdTraceConfig struct {
+ *vm.LogConfig
+ Reexec *uint64
+ TxHash common.Hash
+}
+
// txTraceResult is the result of a single transaction trace.
type txTraceResult struct {
Result interface{} `json:"result,omitempty"` // Trace results produced by the tracer
@@ -134,7 +143,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl
// Ensure we have a valid starting state before doing any work
origin := start.NumberU64()
- database := state.NewDatabase(api.dex.ChainDb())
+ database := state.NewDatabaseWithCache(api.dex.ChainDb(), 16) // Chain tracing will probably start at genesis
if number := start.NumberU64(); number > 0 {
start = api.dex.blockchain.GetBlock(start.ParentHash(), start.NumberU64()-1)
@@ -360,7 +369,7 @@ func (api *PrivateDebugAPI) TraceBlockByNumber(ctx context.Context, number rpc.B
func (api *PrivateDebugAPI) TraceBlockByHash(ctx context.Context, hash common.Hash, config *TraceConfig) ([]*txTraceResult, error) {
block := api.dex.blockchain.GetBlockByHash(hash)
if block == nil {
- return nil, fmt.Errorf("block #%x not found", hash)
+ return nil, fmt.Errorf("block %#x not found", hash)
}
return api.traceBlock(ctx, block, config)
}
@@ -385,6 +394,43 @@ func (api *PrivateDebugAPI) TraceBlockFromFile(ctx context.Context, file string,
return api.TraceBlock(ctx, blob, config)
}
+// TraceBadBlockByHash returns the structured logs created during the execution of
+// EVM against a block pulled from the pool of bad ones and returns them as a JSON
+// object.
+func (api *PrivateDebugAPI) TraceBadBlock(ctx context.Context, hash common.Hash, config *TraceConfig) ([]*txTraceResult, error) {
+ blocks := api.dex.blockchain.BadBlocks()
+ for _, block := range blocks {
+ if block.Hash() == hash {
+ return api.traceBlock(ctx, block, config)
+ }
+ }
+ return nil, fmt.Errorf("bad block %#x not found", hash)
+}
+
+// StandardTraceBlockToFile dumps the structured logs created during the
+// execution of EVM to the local file system and returns a list of files
+// to the caller.
+func (api *PrivateDebugAPI) StandardTraceBlockToFile(ctx context.Context, hash common.Hash, config *StdTraceConfig) ([]string, error) {
+ block := api.dex.blockchain.GetBlockByHash(hash)
+ if block == nil {
+ return nil, fmt.Errorf("block %#x not found", hash)
+ }
+ return api.standardTraceBlockToFile(ctx, block, config)
+}
+
+// StandardTraceBadBlockToFile dumps the structured logs created during the
+// execution of EVM against a block pulled from the pool of bad ones to the
+// local file system and returns a list of files to the caller.
+func (api *PrivateDebugAPI) StandardTraceBadBlockToFile(ctx context.Context, hash common.Hash, config *StdTraceConfig) ([]string, error) {
+ blocks := api.dex.blockchain.BadBlocks()
+ for _, block := range blocks {
+ if block.Hash() == hash {
+ return api.standardTraceBlockToFile(ctx, block, config)
+ }
+ }
+ return nil, fmt.Errorf("bad block %#x not found", hash)
+}
+
// traceBlock configures a new tracer according to the provided configuration, and
// executes all the transactions contained within. The return value will be one item
// per transaction, dependent on the requestd tracer.
@@ -395,7 +441,7 @@ func (api *PrivateDebugAPI) traceBlock(ctx context.Context, block *types.Block,
}
parent := api.dex.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1)
if parent == nil {
- return nil, fmt.Errorf("parent %x not found", block.ParentHash())
+ return nil, fmt.Errorf("parent %#x not found", block.ParentHash())
}
reexec := defaultTraceReexec
if config != nil && config.Reexec != nil {
@@ -466,6 +512,107 @@ func (api *PrivateDebugAPI) traceBlock(ctx context.Context, block *types.Block,
return results, nil
}
+// standardTraceBlockToFile configures a new tracer which uses standard JSON output,
+// and traces either a full block or an individual transaction. The return value will
+// be one filename per transaction traced.
+func (api *PrivateDebugAPI) standardTraceBlockToFile(ctx context.Context, block *types.Block, config *StdTraceConfig) ([]string, error) {
+ // If we're tracing a single transaction, make sure it's present
+ if config != nil && config.TxHash != (common.Hash{}) {
+ var exists bool
+ for _, tx := range block.Transactions() {
+ if exists = (tx.Hash() == config.TxHash); exists {
+ break
+ }
+ }
+ if !exists {
+ return nil, fmt.Errorf("transaction %#x not found in block", config.TxHash)
+ }
+ }
+ // Create the parent state database
+ if err := api.dex.engine.VerifyHeader(api.dex.blockchain, block.Header(), true); err != nil {
+ return nil, err
+ }
+ parent := api.dex.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1)
+ if parent == nil {
+ return nil, fmt.Errorf("parent %#x not found", block.ParentHash())
+ }
+ reexec := defaultTraceReexec
+ if config != nil && config.Reexec != nil {
+ reexec = *config.Reexec
+ }
+ statedb, err := api.computeStateDB(parent, reexec)
+ if err != nil {
+ return nil, err
+ }
+ // Retrieve the tracing configurations, or use default values
+ var (
+ logConfig vm.LogConfig
+ txHash common.Hash
+ )
+ if config != nil {
+ if config.LogConfig != nil {
+ logConfig = *config.LogConfig
+ }
+ txHash = config.TxHash
+ }
+ logConfig.Debug = true
+
+ // Execute transaction, either tracing all or just the requested one
+ var (
+ signer = types.MakeSigner(api.config, block.Number())
+ dumps []string
+ )
+ for i, tx := range block.Transactions() {
+ // Prepare the trasaction for un-traced execution
+ var (
+ msg, _ = tx.AsMessage(signer)
+ vmctx = core.NewEVMContext(msg, block.Header(), api.dex.blockchain, nil)
+
+ vmConf vm.Config
+ dump *os.File
+ err error
+ )
+ // If the transaction needs tracing, swap out the configs
+ if tx.Hash() == txHash || txHash == (common.Hash{}) {
+ // Generate a unique temporary file to dump it into
+ prefix := fmt.Sprintf("block_%#x-%d-%#x-", block.Hash().Bytes()[:4], i, tx.Hash().Bytes()[:4])
+
+ dump, err = ioutil.TempFile(os.TempDir(), prefix)
+ if err != nil {
+ return nil, err
+ }
+ dumps = append(dumps, dump.Name())
+
+ // Swap out the noop logger to the standard tracer
+ vmConf = vm.Config{
+ Debug: true,
+ Tracer: vm.NewJSONLogger(&logConfig, bufio.NewWriter(dump)),
+ EnablePreimageRecording: true,
+ }
+ }
+ // Execute the transaction and flush any traces to disk
+ vmenv := vm.NewEVM(vmctx, statedb, api.config, vmConf)
+ _, _, _, err = core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas()))
+
+ if dump != nil {
+ dump.Close()
+ log.Info("Wrote standard trace", "file", dump.Name())
+ }
+ if err != nil {
+ return dumps, err
+ }
+ // Finalize the state so any modifications are written to the trie
+ // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect
+ statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number()))
+
+ // If we've traced the transaction we were looking for, abort
+ if tx.Hash() == txHash {
+ break
+ }
+ }
+ return dumps, nil
+}
+
// computeStateDB retrieves the state database associated with a certain block.
// If no state is locally available for the given block, a number of blocks are
// attempted to be reexecuted to generate the desired state.
@@ -477,7 +624,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (*
}
// Otherwise try to reexec blocks until we find a state or reach our limit
origin := block.NumberU64()
- database := state.NewDatabase(api.dex.ChainDb())
+ database := state.NewDatabaseWithCache(api.dex.ChainDb(), 16)
for i := uint64(0); i < reexec; i++ {
block = api.dex.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1)
@@ -491,7 +638,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (*
if err != nil {
switch err.(type) {
case *trie.MissingNodeError:
- return nil, errors.New("required historical state unavailable")
+ return nil, fmt.Errorf("required historical state unavailable (reexec=%d)", reexec)
default:
return nil, err
}
@@ -505,7 +652,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (*
for block.NumberU64() < origin {
// Print progress logs if long enough time elapsed
if time.Since(logged) > 8*time.Second {
- log.Info("Regenerating historical state", "block", block.NumberU64()+1, "target", origin, "elapsed", time.Since(start))
+ log.Info("Regenerating historical state", "block", block.NumberU64()+1, "target", origin, "remaining", origin-block.NumberU64()-1, "elapsed", time.Since(start))
logged = time.Now()
}
// Retrieve the next block to regenerate and process it
@@ -514,7 +661,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (*
}
_, _, _, err := api.dex.blockchain.Processor().Process(block, statedb, vm.Config{})
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("processing block %d failed: %v", block.NumberU64(), err)
}
// Finalize the state so any modifications are written to the trie
root, err := statedb.Commit(true)
@@ -522,7 +669,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (*
return nil, err
}
if err := statedb.Reset(root); err != nil {
- return nil, err
+ return nil, fmt.Errorf("state reset after block %d failed: %v", block.NumberU64(), err)
}
database.TrieDB().Reference(root, common.Hash{})
if proot != (common.Hash{}) {
@@ -541,7 +688,7 @@ func (api *PrivateDebugAPI) TraceTransaction(ctx context.Context, hash common.Ha
// Retrieve the transaction and assemble its EVM context
tx, blockHash, _, index := rawdb.ReadTransaction(api.dex.ChainDb(), hash)
if tx == nil {
- return nil, fmt.Errorf("transaction %x not found", hash)
+ return nil, fmt.Errorf("transaction %#x not found", hash)
}
reexec := defaultTraceReexec
if config != nil && config.Reexec != nil {
@@ -621,11 +768,11 @@ func (api *PrivateDebugAPI) computeTxEnv(blockHash common.Hash, txIndex int, ree
// Create the parent state database
block := api.dex.blockchain.GetBlockByHash(blockHash)
if block == nil {
- return nil, vm.Context{}, nil, fmt.Errorf("block %x not found", blockHash)
+ return nil, vm.Context{}, nil, fmt.Errorf("block %#x not found", blockHash)
}
parent := api.dex.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1)
if parent == nil {
- return nil, vm.Context{}, nil, fmt.Errorf("parent %x not found", block.ParentHash())
+ return nil, vm.Context{}, nil, fmt.Errorf("parent %#x not found", block.ParentHash())
}
statedb, err := api.computeStateDB(parent, reexec)
if err != nil {
@@ -644,10 +791,10 @@ func (api *PrivateDebugAPI) computeTxEnv(blockHash common.Hash, txIndex int, ree
// Not yet the searched for transaction, execute on top of the current state
vmenv := vm.NewEVM(context, statedb, api.config, vm.Config{})
if _, _, _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil {
- return nil, vm.Context{}, nil, fmt.Errorf("tx %x failed: %v", tx.Hash(), err)
+ return nil, vm.Context{}, nil, fmt.Errorf("tx %#x failed: %v", tx.Hash(), err)
}
// Ensure any modifications are committed to the state
statedb.Finalise(true)
}
- return nil, vm.Context{}, nil, fmt.Errorf("tx index %d out of range for block %x", txIndex, blockHash)
+ return nil, vm.Context{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, blockHash)
}
diff --git a/dex/backend.go b/dex/backend.go
index 29ac42906..26a0b49eb 100644
--- a/dex/backend.go
+++ b/dex/backend.go
@@ -173,7 +173,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode,
config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain,
- chainDb, config.BlockProposerEnabled, dex.governance, dex.app)
+ chainDb, config.Whitelist, config.BlockProposerEnabled, dex.governance, dex.app)
if err != nil {
return nil, err
}
diff --git a/dex/config.go b/dex/config.go
index d218b35e2..ac3e89969 100644
--- a/dex/config.go
+++ b/dex/config.go
@@ -25,6 +25,7 @@ import (
"runtime"
"time"
+ "github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core"
"github.com/dexon-foundation/dexon/dex/downloader"
"github.com/dexon-foundation/dexon/eth/gasprice"
@@ -81,6 +82,9 @@ type Config struct {
SyncMode downloader.SyncMode
NoPruning bool
+ // Whitelist of required block number -> hash values to accept
+ Whitelist map[uint64]common.Hash `toml:"-"`
+
// Light client options
LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests
LightPeers int `toml:",omitempty"` // Maximum number of LES client peers
@@ -112,7 +116,7 @@ type Config struct {
// Miscellaneous options
DocRoot string `toml:"-"`
- // Type of the EWASM interpreter ("" for detault)
+ // Type of the EWASM interpreter ("" for default)
EWASMInterpreter string
// Type of the EVM interpreter ("" for default)
diff --git a/dex/downloader/downloader.go b/dex/downloader/downloader.go
index e3960ea30..9d609584b 100644
--- a/dex/downloader/downloader.go
+++ b/dex/downloader/downloader.go
@@ -103,6 +103,7 @@ type Downloader struct {
mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
mux *event.TypeMux // Event multiplexer to announce sync operation events
+ genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT)
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
stateDB ethdb.Database
@@ -192,6 +193,9 @@ type BlockChain interface {
// HasBlock verifies a block's presence in the local chain.
HasBlock(common.Hash, uint64) bool
+ // HasFastBlock verifies a fast block's presence in the local chain.
+ HasFastBlock(common.Hash, uint64) bool
+
// GetBlockByHash retrieves a block from the local chain.
GetBlockByHash(common.Hash) *types.Block
@@ -442,7 +446,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, number ui
}
height := latest.Number.Uint64()
- origin, err := d.findAncestor(p, height)
+ origin, err := d.findAncestor(p, latest)
if err != nil {
return err
}
@@ -691,41 +695,107 @@ func (d *Downloader) fetchGovState(p *peerConnection,
}
}
-// findAncestor tries to locate the common ancestor link of the local chain and
-// a remote peers blockchain. In the general case when our node was in sync and
-// on the correct chain, checking the top N links should already get us a match.
-// In the rare scenario when we ended up on a long reorganisation (i.e. none of
-// the head links match), we do a binary search to find the common ancestor.
-func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) {
- // Figure out the valid ancestor range to prevent rewrite attacks
- floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64()
-
- if d.mode == FullSync {
- ceil = d.blockchain.CurrentBlock().NumberU64()
- } else if d.mode == FastSync {
- ceil = d.blockchain.CurrentFastBlock().NumberU64()
+// calculateRequestSpan calculates what headers to request from a peer when trying to determine the
+// common ancestor.
+// It returns parameters to be used for peer.RequestHeadersByNumber:
+// from - starting block number
+// count - number of headers to request
+// skip - number of headers to skip
+// and also returns 'max', the last block which is expected to be returned by the remote peers,
+// given the (from,count,skip)
+func calculateRequestSpan(remoteHeight, localHeight uint64) (int64, int, int, uint64) {
+ var (
+ from int
+ count int
+ MaxCount = MaxHeaderFetch / 16
+ )
+ // requestHead is the highest block that we will ask for. If requestHead is not offset,
+ // the highest block that we will get is 16 blocks back from head, which means we
+ // will fetch 14 or 15 blocks unnecessarily in the case the height difference
+ // between us and the peer is 1-2 blocks, which is most common
+ requestHead := int(remoteHeight) - 1
+ if requestHead < 0 {
+ requestHead = 0
+ }
+ // requestBottom is the lowest block we want included in the query
+ // Ideally, we want to include just below own head
+ requestBottom := int(localHeight - 1)
+ if requestBottom < 0 {
+ requestBottom = 0
}
- if ceil >= MaxForkAncestry {
- floor = int64(ceil - MaxForkAncestry)
+ totalSpan := requestHead - requestBottom
+ span := 1 + totalSpan/MaxCount
+ if span < 2 {
+ span = 2
+ }
+ if span > 16 {
+ span = 16
}
- p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height)
- // Request the topmost blocks to short circuit binary ancestor lookup
- head := ceil
- if head > height {
- head = height
+ count = 1 + totalSpan/span
+ if count > MaxCount {
+ count = MaxCount
+ }
+ if count < 2 {
+ count = 2
}
- from := int64(head) - int64(MaxHeaderFetch)
+ from = requestHead - (count-1)*span
if from < 0 {
from = 0
}
- // Span out with 15 block gaps into the future to catch bad head reports
- limit := 2 * MaxHeaderFetch / 16
- count := 1 + int((int64(ceil)-from)/16)
- if count > limit {
- count = limit
+ max := from + (count-1)*span
+ return int64(from), count, span - 1, uint64(max)
+}
+
+// findAncestor tries to locate the common ancestor link of the local chain and
+// a remote peers blockchain. In the general case when our node was in sync and
+// on the correct chain, checking the top N links should already get us a match.
+// In the rare scenario when we ended up on a long reorganisation (i.e. none of
+// the head links match), we do a binary search to find the common ancestor.
+func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) (uint64, error) {
+ // Figure out the valid ancestor range to prevent rewrite attacks
+ var (
+ floor = int64(-1)
+ localHeight uint64
+ remoteHeight = remoteHeader.Number.Uint64()
+ )
+ switch d.mode {
+ case FullSync:
+ localHeight = d.blockchain.CurrentBlock().NumberU64()
+ case FastSync:
+ localHeight = d.blockchain.CurrentFastBlock().NumberU64()
+ default:
+ localHeight = d.lightchain.CurrentHeader().Number.Uint64()
+ }
+ p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight)
+ if localHeight >= MaxForkAncestry {
+ // We're above the max reorg threshold, find the earliest fork point
+ floor = int64(localHeight - MaxForkAncestry)
+
+ // If we're doing a light sync, ensure the floor doesn't go below the CHT, as
+ // all headers before that point will be missing.
+ if d.mode == LightSync {
+ // If we dont know the current CHT position, find it
+ if d.genesis == 0 {
+ header := d.lightchain.CurrentHeader()
+ for header != nil {
+ d.genesis = header.Number.Uint64()
+ if floor >= int64(d.genesis)-1 {
+ break
+ }
+ header = d.lightchain.GetHeaderByHash(header.ParentHash)
+ }
+ }
+ // We already know the "genesis" block number, cap floor to that
+ if floor < int64(d.genesis)-1 {
+ floor = int64(d.genesis) - 1
+ }
+ }
}
- go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false, false)
+ from, count, skip, max := calculateRequestSpan(remoteHeight, localHeight)
+
+ p.log.Trace("Span searching for common ancestor", "count", count, "from", from, "skip", skip)
+ go p.peer.RequestHeadersByNumber(uint64(from), count, skip, false, false)
// Wait for the remote response to the head fetch
number, hash := uint64(0), common.Hash{}
@@ -751,9 +821,10 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
return 0, errEmptyHeaderSet
}
// Make sure the peer's reply conforms to the request
- for i := 0; i < len(headers); i++ {
- if number := headers[i].Number.Int64(); number != from+int64(i)*16 {
- p.log.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number)
+ for i, header := range headers {
+ expectNumber := from + int64(i)*int64((skip+1))
+ if number := header.Number.Int64(); number != expectNumber {
+ p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number)
return 0, errInvalidChain
}
}
@@ -761,20 +832,24 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
finished = true
for i := len(headers) - 1; i >= 0; i-- {
// Skip any headers that underflow/overflow our requested set
- if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil {
+ if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > max {
continue
}
// Otherwise check if we already know the header or not
h := headers[i].Hash()
n := headers[i].Number.Uint64()
- if (d.mode == FullSync && d.blockchain.HasBlock(h, n)) || (d.mode != FullSync && d.lightchain.HasHeader(h, n)) {
- number, hash = n, h
- // If every header is known, even future ones, the peer straight out lied about its head
- if number > height && i == limit-1 {
- p.log.Warn("Lied about chain head", "reported", height, "found", number)
- return 0, errStallingPeer
- }
+ var known bool
+ switch d.mode {
+ case FullSync:
+ known = d.blockchain.HasBlock(h, n)
+ case FastSync:
+ known = d.blockchain.HasFastBlock(h, n)
+ default:
+ known = d.lightchain.HasHeader(h, n)
+ }
+ if known {
+ number, hash = n, h
break
}
}
@@ -798,10 +873,11 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
return number, nil
}
// Ancestor not found, we need to binary search over our chain
- start, end := uint64(0), head
+ start, end := uint64(0), remoteHeight
if floor > 0 {
start = uint64(floor)
}
+ p.log.Trace("Binary searching for common ancestor", "start", start, "end", end)
for start+1 < end {
// Split our chain interval in two, and request the hash to cross check
check := (start + end) / 2
@@ -834,7 +910,16 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
// Modify the search interval based on the response
h := headers[0].Hash()
n := headers[0].Number.Uint64()
- if (d.mode == FullSync && !d.blockchain.HasBlock(h, n)) || (d.mode != FullSync && !d.lightchain.HasHeader(h, n)) {
+ var known bool
+ switch d.mode {
+ case FullSync:
+ known = d.blockchain.HasBlock(h, n)
+ case FastSync:
+ known = d.blockchain.HasFastBlock(h, n)
+ default:
+ known = d.lightchain.HasHeader(h, n)
+ }
+ if !known {
end = check
break
}
@@ -1511,8 +1596,15 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
}
if index, err := d.blockchain.InsertDexonChain(blocks); err != nil {
- log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
- return errInvalidChain
+ if index < len(results) {
+ log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
+ } else {
+ // The InsertChain method in blockchain.go will sometimes return an out-of-bounds index,
+ // when it needs to preprocess blocks to import a sidechain.
+ // The importer will put together a new list of blocks to import, which is a superset
+ // of the blocks delivered from the downloader, and the indexing will be off.
+ log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err)
+ }
}
return nil
}
diff --git a/dex/downloader/downloader_test.go b/dex/downloader/downloader_test.go
index e8ec0056b..e01e0d96b 100644
--- a/dex/downloader/downloader_test.go
+++ b/dex/downloader/downloader_test.go
@@ -19,6 +19,7 @@ package downloader
import (
"errors"
"fmt"
+ "strings"
"sync"
"sync/atomic"
"testing"
@@ -118,6 +119,15 @@ func (dl *downloadTester) HasBlock(hash common.Hash, number uint64) bool {
return dl.GetBlockByHash(hash) != nil
}
+// HasFastBlock checks if a block is present in the testers canonical chain.
+func (dl *downloadTester) HasFastBlock(hash common.Hash, number uint64) bool {
+ dl.lock.RLock()
+ defer dl.lock.RUnlock()
+
+ _, ok := dl.ownReceipts[hash]
+ return ok
+}
+
// GetHeader retrieves a header from the testers canonical chain.
func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header {
dl.lock.RLock()
@@ -271,6 +281,7 @@ func (dl *downloadTester) InsertDexonChain(blocks types.Blocks) (i int, err erro
dl.ownHeaders[block.Hash()] = block.Header()
}
dl.ownBlocks[block.Hash()] = block
+ dl.ownReceipts[block.Hash()] = make(types.Receipts, 0)
dl.stateDb.Put(block.Root().Bytes(), []byte{0x00})
}
return len(blocks), nil
@@ -416,14 +427,20 @@ func (dlp *downloadTesterPeer) RequestNodeData(hashes []common.Hash) error {
// assertOwnChain checks if the local chain contains the correct number of items
// of the various chain components.
func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
+ // Mark this method as a helper to report errors at callsite, not in here
+ t.Helper()
+
assertOwnForkedChain(t, tester, 1, []int{length})
}
// assertOwnForkedChain checks if the local forked chain contains the correct
// number of items of the various chain components.
func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) {
+ // Mark this method as a helper to report errors at callsite, not in here
+ t.Helper()
+
// Initialize the counters for the first fork
- headers, blocks, receipts := lengths[0], lengths[0], lengths[0]-fsMinFullBlocks
+ headers, blocks, receipts := lengths[0], lengths[0], lengths[0]
if receipts < 0 {
receipts = 1
@@ -432,12 +449,9 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng
for _, length := range lengths[1:] {
headers += length - common
blocks += length - common
- receipts += length - common - fsMinFullBlocks
+ receipts += length - common
}
- switch tester.downloader.mode {
- case FullSync:
- receipts = 1
- case LightSync:
+ if tester.downloader.mode == LightSync {
blocks, receipts = 1, 1
}
if hs := len(tester.ownHeaders); hs != headers {
@@ -1105,7 +1119,9 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
}
func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.SyncProgress) {
+ // Mark this method as a helper to report errors at callsite, not in here
t.Helper()
+
p := d.Progress()
p.KnownStates, p.PulledStates = 0, 0
want.KnownStates, want.PulledStates = 0, 0
@@ -1363,3 +1379,78 @@ func (ftp *floodingTestPeer) RequestHeadersByNumber(from uint64, count, skip int
}
return nil
}
+
+func TestRemoteHeaderRequestSpan(t *testing.T) {
+ testCases := []struct {
+ remoteHeight uint64
+ localHeight uint64
+ expected []int
+ }{
+ // Remote is way higher. We should ask for the remote head and go backwards
+ {1500, 1000,
+ []int{1323, 1339, 1355, 1371, 1387, 1403, 1419, 1435, 1451, 1467, 1483, 1499},
+ },
+ {15000, 13006,
+ []int{14823, 14839, 14855, 14871, 14887, 14903, 14919, 14935, 14951, 14967, 14983, 14999},
+ },
+ //Remote is pretty close to us. We don't have to fetch as many
+ {1200, 1150,
+ []int{1149, 1154, 1159, 1164, 1169, 1174, 1179, 1184, 1189, 1194, 1199},
+ },
+ // Remote is equal to us (so on a fork with higher td)
+ // We should get the closest couple of ancestors
+ {1500, 1500,
+ []int{1497, 1499},
+ },
+ // We're higher than the remote! Odd
+ {1000, 1500,
+ []int{997, 999},
+ },
+ // Check some weird edgecases that it behaves somewhat rationally
+ {0, 1500,
+ []int{0, 2},
+ },
+ {6000000, 0,
+ []int{5999823, 5999839, 5999855, 5999871, 5999887, 5999903, 5999919, 5999935, 5999951, 5999967, 5999983, 5999999},
+ },
+ {0, 0,
+ []int{0, 2},
+ },
+ }
+ reqs := func(from, count, span int) []int {
+ var r []int
+ num := from
+ for len(r) < count {
+ r = append(r, num)
+ num += span + 1
+ }
+ return r
+ }
+ for i, tt := range testCases {
+ from, count, span, max := calculateRequestSpan(tt.remoteHeight, tt.localHeight)
+ data := reqs(int(from), count, span)
+
+ if max != uint64(data[len(data)-1]) {
+ t.Errorf("test %d: wrong last value %d != %d", i, data[len(data)-1], max)
+ }
+ failed := false
+ if len(data) != len(tt.expected) {
+ failed = true
+ t.Errorf("test %d: length wrong, expected %d got %d", i, len(tt.expected), len(data))
+ } else {
+ for j, n := range data {
+ if n != tt.expected[j] {
+ failed = true
+ break
+ }
+ }
+ }
+ if failed {
+ res := strings.Replace(fmt.Sprint(data), " ", ",", -1)
+ exp := strings.Replace(fmt.Sprint(tt.expected), " ", ",", -1)
+ fmt.Printf("got: %v\n", res)
+ fmt.Printf("exp: %v\n", exp)
+ t.Errorf("test %d: wrong values", i)
+ }
+ }
+}
diff --git a/dex/downloader/queue.go b/dex/downloader/queue.go
index f3a36ec3c..ea847913f 100644
--- a/dex/downloader/queue.go
+++ b/dex/downloader/queue.go
@@ -325,7 +325,7 @@ func (q *queue) Schedule(headers []*types.HeaderWithGovState, from uint64) []*ty
}
// Make sure no duplicate requests are executed
if _, ok := q.blockTaskPool[hash]; ok {
- log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash)
+ log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash)
continue
}
if _, ok := q.receiptTaskPool[hash]; ok {
diff --git a/dex/downloader/statesync.go b/dex/downloader/statesync.go
index 49117abbb..1695ba19c 100644
--- a/dex/downloader/statesync.go
+++ b/dex/downloader/statesync.go
@@ -152,7 +152,7 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
finished = append(finished, req)
delete(active, pack.PeerId())
- // Handle dropped peer connections:
+ // Handle dropped peer connections:
case p := <-peerDrop:
// Skip if no request is currently pending
req := active[p.id]
@@ -398,9 +398,8 @@ func (s *stateSync) fillTasks(n int, req *stateReq) {
// process iterates over a batch of delivered state data, injecting each item
// into a running state sync, re-queuing any items that were requested but not
-// delivered.
-// Returns whether the peer actually managed to deliver anything of value,
-// and any error that occurred
+// delivered. Returns whether the peer actually managed to deliver anything of
+// value, and any error that occurred
func (s *stateSync) process(req *stateReq) (int, error) {
// Collect processing stats and update progress if valid data was received
duplicate, unexpected, successful := 0, 0, 0
@@ -412,14 +411,12 @@ func (s *stateSync) process(req *stateReq) (int, error) {
}(time.Now())
// Iterate over all the delivered data and inject one-by-one into the trie
- progress := false
for _, blob := range req.response {
- prog, hash, err := s.processNodeData(blob)
+ _, hash, err := s.processNodeData(blob)
switch err {
case nil:
s.numUncommitted++
s.bytesUncommitted += len(blob)
- progress = progress || prog
successful++
case trie.ErrNotRequested:
unexpected++
diff --git a/dex/handler.go b/dex/handler.go
index ec5704103..20df41709 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -124,6 +124,8 @@ type ProtocolManager struct {
recordsCh chan newRecordsEvent
recordsSub event.Subscription
+ whitelist map[uint64]common.Hash
+
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
txsyncCh chan *txsync
@@ -161,7 +163,7 @@ type ProtocolManager struct {
func NewProtocolManager(
config *params.ChainConfig, mode downloader.SyncMode, networkID uint64,
mux *event.TypeMux, txpool txPool, engine consensus.Engine,
- blockchain *core.BlockChain, chaindb ethdb.Database,
+ blockchain *core.BlockChain, chaindb ethdb.Database, whitelist map[uint64]common.Hash,
isBlockProposer bool, gov governance, app dexconApp) (*ProtocolManager, error) {
tab := newNodeTable()
// Create the protocol manager with the base fields
@@ -175,6 +177,7 @@ func NewProtocolManager(
cache: newCache(5120, dexDB.NewDatabase(chaindb)),
nextPullVote: &sync.Map{},
chainconfig: config,
+ whitelist: whitelist,
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
@@ -388,7 +391,13 @@ func (pm *ProtocolManager) handle(p *peer) error {
pm.syncTransactions(p)
pm.syncNodeRecords(p)
- // main loop. handle incoming messages.
+ // If we have any explicit whitelist block hashes, request them
+ for number := range pm.whitelist {
+ if err := p.RequestWhitelistHeader(number); err != nil {
+ return err
+ }
+ }
+ // Handle incoming messages until the connection is torn down
for {
if err := pm.handleMsg(p); err != nil {
p.Log().Debug("Ethereum message handling failed", "err", err)
@@ -597,6 +606,14 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err != nil {
log.Debug("Failed to deliver headers", "err", err)
}
+ case whitelistReq:
+ if want, ok := pm.whitelist[data.Headers[0].Number.Uint64()]; ok {
+ if hash := data.Headers[0].Hash(); want != hash {
+ p.Log().Info("Whitelist mismatch, dropping peer", "number", data.Headers[0].Number.Uint64(), "hash", hash, "want", want)
+ return errors.New("whitelist block mismatch")
+ }
+ p.Log().Debug("Whitelist block verified", "number", data.Headers[0].Number.Uint64(), "hash", want)
+ }
default:
log.Debug("Got headers with unexpected flag", "flag", data.Flag)
}
@@ -791,7 +808,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.SetHead(trueHead, trueNumber)
// Schedule a sync if above ours. Note, this will not fire a sync for a gap of
- // a singe block (as the true number is below the propagated block), however this
+ // a single block (as the true number is below the propagated block), however this
// scenario should easily be covered by the fetcher.
currentBlock := pm.blockchain.CurrentBlock()
if trueNumber > currentBlock.NumberU64() {
diff --git a/dex/helper_test.go b/dex/helper_test.go
index 4f5541052..1d903a907 100644
--- a/dex/helper_test.go
+++ b/dex/helper_test.go
@@ -133,7 +133,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
notarySetFunc: func(uint64) (map[string]struct{}, error) { return nil, nil },
}
- pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, true, tgov, &testApp{})
+ pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, nil, true, tgov, &testApp{})
if err != nil {
return nil, nil, err
}
diff --git a/dex/peer.go b/dex/peer.go
index 562cbfaca..0fa1ac61d 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -641,6 +641,11 @@ func (p *peer) RequestHeadersByNumber(origin uint64, amount int, skip int, rever
return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse, WithGov: withGov, Flag: downloaderReq})
}
+func (p *peer) RequestWhitelistHeader(origin uint64) error {
+ p.Log().Debug("Fetching whitelist header", "number", origin, "flag", whitelistReq)
+ return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: 1, Skip: 0, Reverse: false, WithGov: false, Flag: whitelistReq})
+}
+
func (p *peer) RequestGovStateByHash(hash common.Hash) error {
p.Log().Debug("Fetching one gov state", "hash", hash)
return p2p.Send(p.rw, GetGovStateMsg, hash)
diff --git a/dex/protocol.go b/dex/protocol.go
index 639925265..287bf0883 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -55,7 +55,7 @@ const (
// ProtocolName is the official short name of the protocol used during capability negotiation.
var ProtocolName = "dex"
-// ProtocolVersions are the upported versions of the eth protocol (first is primary).
+// ProtocolVersions are the supported versions of the eth protocol (first is primary).
var ProtocolVersions = []uint{dex64}
// ProtocolLengths are the number of implemented message corresponding to different protocol versions.
@@ -116,6 +116,7 @@ const (
const (
fetcherReq = uint8(iota)
downloaderReq
+ whitelistReq
)
func (e errCode) String() string {