aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cmd/geth/accountcmd.go2
-rw-r--r--cmd/geth/consolecmd_test.go62
-rw-r--r--console/console.go68
-rw-r--r--console/console_test.go46
-rw-r--r--core/tx_pool.go11
-rw-r--r--eth/downloader/downloader.go209
-rw-r--r--eth/downloader/downloader_test.go124
-rw-r--r--eth/downloader/peer.go81
-rw-r--r--eth/handler.go14
-rw-r--r--eth/protocol_test.go1
-rw-r--r--eth/sync.go2
-rw-r--r--internal/jsre/jsre.go16
12 files changed, 516 insertions, 120 deletions
diff --git a/cmd/geth/accountcmd.go b/cmd/geth/accountcmd.go
index 0f9d95c2c..2c2308514 100644
--- a/cmd/geth/accountcmd.go
+++ b/cmd/geth/accountcmd.go
@@ -70,7 +70,7 @@ either new or import). Without it you are not able to unlock your account.
Note that exporting your key in unencrypted format is NOT supported.
-Keys are stored under <DATADIR>/keys.
+Keys are stored under <DATADIR>/keystore.
It is safe to transfer the entire directory or the individual keys therein
between ethereum nodes by simply copying.
Make sure you backup your keys regularly.
diff --git a/cmd/geth/consolecmd_test.go b/cmd/geth/consolecmd_test.go
index 9cfb3e4e3..e0e549e12 100644
--- a/cmd/geth/consolecmd_test.go
+++ b/cmd/geth/consolecmd_test.go
@@ -17,7 +17,8 @@
package main
import (
- "math/rand"
+ "crypto/rand"
+ "math/big"
"os"
"path/filepath"
"runtime"
@@ -27,7 +28,6 @@ import (
"testing"
"time"
- "github.com/ethereum/go-ethereum/console"
"github.com/ethereum/go-ethereum/rpc"
)
@@ -37,9 +37,10 @@ func TestConsoleWelcome(t *testing.T) {
coinbase := "0x8605cdbbdb6d264aa742e77020dcbc58fcdce182"
// Start a geth console, make sure it's cleaned up and terminate the console
- geth := runGeth(t, "--nat", "none", "--nodiscover", "--etherbase", coinbase, "-shh", "console")
- defer geth.expectExit()
- geth.stdin.Close()
+ geth := runGeth(t,
+ "--port", "0", "--maxpeers", "0", "--nodiscover", "--nat", "none",
+ "--etherbase", coinbase, "--shh",
+ "console")
// Gather all the infos the welcome message needs to contain
geth.setTemplateFunc("goos", func() string { return runtime.GOOS })
@@ -51,7 +52,6 @@ func TestConsoleWelcome(t *testing.T) {
sort.Strings(apis)
return apis
})
- geth.setTemplateFunc("prompt", func() string { return console.DefaultPrompt })
// Verify the actual welcome message to the required template
geth.expect(`
@@ -63,52 +63,63 @@ at block: 0 ({{niltime}})
datadir: {{.Datadir}}
modules:{{range apis}} {{.}}:1.0{{end}}
-{{prompt}}
+> {{.InputLine "exit"}}
`)
+ geth.expectExit()
}
// Tests that a console can be attached to a running node via various means.
func TestIPCAttachWelcome(t *testing.T) {
// Configure the instance for IPC attachement
coinbase := "0x8605cdbbdb6d264aa742e77020dcbc58fcdce182"
-
var ipc string
if runtime.GOOS == "windows" {
- ipc = `\\.\pipe\geth` + strconv.Itoa(rand.Int())
+ ipc = `\\.\pipe\geth` + strconv.Itoa(trulyRandInt(100000, 999999))
} else {
ws := tmpdir(t)
defer os.RemoveAll(ws)
-
ipc = filepath.Join(ws, "geth.ipc")
}
- // Run the parent geth and attach with a child console
- geth := runGeth(t, "--nat", "none", "--nodiscover", "--etherbase", coinbase, "-shh", "--ipcpath", ipc)
- defer geth.interrupt()
+ // Note: we need --shh because testAttachWelcome checks for default
+ // list of ipc modules and shh is included there.
+ geth := runGeth(t,
+ "--port", "0", "--maxpeers", "0", "--nodiscover", "--nat", "none",
+ "--etherbase", coinbase, "--shh", "--ipcpath", ipc)
time.Sleep(2 * time.Second) // Simple way to wait for the RPC endpoint to open
testAttachWelcome(t, geth, "ipc:"+ipc)
+
+ geth.interrupt()
+ geth.expectExit()
}
func TestHTTPAttachWelcome(t *testing.T) {
coinbase := "0x8605cdbbdb6d264aa742e77020dcbc58fcdce182"
- port := strconv.Itoa(rand.Intn(65535-1024) + 1024) // Yeah, sometimes this will fail, sorry :P
-
- geth := runGeth(t, "--nat", "none", "--nodiscover", "--etherbase", coinbase, "--rpc", "--rpcport", port)
- defer geth.interrupt()
+ port := strconv.Itoa(trulyRandInt(1024, 65536)) // Yeah, sometimes this will fail, sorry :P
+ geth := runGeth(t,
+ "--port", "0", "--maxpeers", "0", "--nodiscover", "--nat", "none",
+ "--etherbase", coinbase, "--rpc", "--rpcport", port)
time.Sleep(2 * time.Second) // Simple way to wait for the RPC endpoint to open
testAttachWelcome(t, geth, "http://localhost:"+port)
+
+ geth.interrupt()
+ geth.expectExit()
}
func TestWSAttachWelcome(t *testing.T) {
coinbase := "0x8605cdbbdb6d264aa742e77020dcbc58fcdce182"
- port := strconv.Itoa(rand.Intn(65535-1024) + 1024) // Yeah, sometimes this will fail, sorry :P
+ port := strconv.Itoa(trulyRandInt(1024, 65536)) // Yeah, sometimes this will fail, sorry :P
- geth := runGeth(t, "--nat", "none", "--nodiscover", "--etherbase", coinbase, "--ws", "--wsport", port)
- defer geth.interrupt()
+ geth := runGeth(t,
+ "--port", "0", "--maxpeers", "0", "--nodiscover", "--nat", "none",
+ "--etherbase", coinbase, "--ws", "--wsport", port)
time.Sleep(2 * time.Second) // Simple way to wait for the RPC endpoint to open
testAttachWelcome(t, geth, "ws://localhost:"+port)
+
+ geth.interrupt()
+ geth.expectExit()
}
func testAttachWelcome(t *testing.T, geth *testgeth, endpoint string) {
@@ -135,7 +146,6 @@ func testAttachWelcome(t *testing.T, geth *testgeth, endpoint string) {
sort.Strings(apis)
return apis
})
- attach.setTemplateFunc("prompt", func() string { return console.DefaultPrompt })
// Verify the actual welcome message to the required template
attach.expect(`
@@ -147,6 +157,14 @@ at block: 0 ({{niltime}}){{if ipc}}
datadir: {{datadir}}{{end}}
modules:{{range apis}} {{.}}:1.0{{end}}
-{{prompt}}
+> {{.InputLine "exit" }}
`)
+ attach.expectExit()
+}
+
+// trulyRandInt generates a crypto random integer used by the console tests to
+// not clash network ports with other tests running cocurrently.
+func trulyRandInt(lo, hi int) int {
+ num, _ := rand.Int(rand.Reader, big.NewInt(int64(hi-lo)))
+ return int(num.Int64()) + lo
}
diff --git a/console/console.go b/console/console.go
index a19b267bc..00d1fea1d 100644
--- a/console/console.go
+++ b/console/console.go
@@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/internal/jsre"
"github.com/ethereum/go-ethereum/internal/web3ext"
"github.com/ethereum/go-ethereum/rpc"
+ "github.com/mattn/go-colorable"
"github.com/peterh/liner"
"github.com/robertkrimen/otto"
)
@@ -80,7 +81,7 @@ func New(config Config) (*Console, error) {
config.Prompt = DefaultPrompt
}
if config.Printer == nil {
- config.Printer = os.Stdout
+ config.Printer = colorable.NewColorableStdout()
}
// Initialize the console and return
console := &Console{
@@ -244,15 +245,13 @@ func (c *Console) AutoCompleteInput(line string, pos int) (string, []string, str
// console's available modules.
func (c *Console) Welcome() {
// Print some generic Geth metadata
+ fmt.Fprintf(c.printer, "Welcome to the Geth JavaScript console!\n\n")
c.jsre.Run(`
- (function () {
- console.log("Welcome to the Geth JavaScript console!\n");
- console.log("instance: " + web3.version.node);
- console.log("coinbase: " + eth.coinbase);
- console.log("at block: " + eth.blockNumber + " (" + new Date(1000 * eth.getBlock(eth.blockNumber).timestamp) + ")");
- console.log(" datadir: " + admin.datadir);
- })();
- `)
+ console.log("instance: " + web3.version.node);
+ console.log("coinbase: " + eth.coinbase);
+ console.log("at block: " + eth.blockNumber + " (" + new Date(1000 * eth.getBlock(eth.blockNumber).timestamp) + ")");
+ console.log(" datadir: " + admin.datadir);
+ `)
// List all the supported modules for the user to call
if apis, err := c.client.SupportedModules(); err == nil {
modules := make([]string, 0, len(apis))
@@ -260,9 +259,9 @@ func (c *Console) Welcome() {
modules = append(modules, fmt.Sprintf("%s:%s", api, version))
}
sort.Strings(modules)
- c.jsre.Run("(function () { console.log(' modules: " + strings.Join(modules, " ") + "'); })();")
+ fmt.Fprintln(c.printer, " modules:", strings.Join(modules, " "))
}
- c.jsre.Run("(function () { console.log(); })();")
+ fmt.Fprintln(c.printer)
}
// Evaluate executes code and pretty prints the result to the specified output
@@ -332,11 +331,11 @@ func (c *Console) Interactive() {
// Append the line to the input and check for multi-line interpretation
input += line + "\n"
- indents = strings.Count(input, "{") + strings.Count(input, "(") - strings.Count(input, "}") - strings.Count(input, ")")
+ indents = countIndents(input)
if indents <= 0 {
prompt = c.prompt
} else {
- prompt = strings.Repeat("..", indents*2) + " "
+ prompt = strings.Repeat(".", indents*3) + " "
}
// If all the needed lines are present, save the command and run
if indents <= 0 {
@@ -355,6 +354,49 @@ func (c *Console) Interactive() {
}
}
+// countIndents returns the number of identations for the given input.
+// In case of invalid input such as var a = } the result can be negative.
+func countIndents(input string) int {
+ var (
+ indents = 0
+ inString = false
+ strOpenChar = ' ' // keep track of the string open char to allow var str = "I'm ....";
+ charEscaped = false // keep track if the previous char was the '\' char, allow var str = "abc\"def";
+ )
+
+ for _, c := range input {
+ switch c {
+ case '\\':
+ // indicate next char as escaped when in string and previous char isn't escaping this backslash
+ if !charEscaped && inString {
+ charEscaped = true
+ }
+ case '\'', '"':
+ if inString && !charEscaped && strOpenChar == c { // end string
+ inString = false
+ } else if !inString && !charEscaped { // begin string
+ inString = true
+ strOpenChar = c
+ }
+ charEscaped = false
+ case '{', '(':
+ if !inString { // ignore brackets when in string, allow var str = "a{"; without indenting
+ indents++
+ }
+ charEscaped = false
+ case '}', ')':
+ if !inString {
+ indents--
+ }
+ charEscaped = false
+ default:
+ charEscaped = false
+ }
+ }
+
+ return indents
+}
+
// Execute runs the JavaScript file specified as the argument.
func (c *Console) Execute(path string) error {
return c.jsre.Exec(path)
diff --git a/console/console_test.go b/console/console_test.go
index 911087824..7738d0c44 100644
--- a/console/console_test.go
+++ b/console/console_test.go
@@ -294,3 +294,49 @@ func TestPrettyError(t *testing.T) {
t.Fatalf("pretty error mismatch: have %s, want %s", output, want)
}
}
+
+// Tests that tests if the number of indents for JS input is calculated correct.
+func TestIndenting(t *testing.T) {
+ testCases := []struct {
+ input string
+ expectedIndentCount int
+ }{
+ {`var a = 1;`, 0},
+ {`"some string"`, 0},
+ {`"some string with (parentesis`, 0},
+ {`"some string with newline
+ ("`, 0},
+ {`function v(a,b) {}`, 0},
+ {`function f(a,b) { var str = "asd("; };`, 0},
+ {`function f(a) {`, 1},
+ {`function f(a, function(b) {`, 2},
+ {`function f(a, function(b) {
+ var str = "a)}";
+ });`, 0},
+ {`function f(a,b) {
+ var str = "a{b(" + a, ", " + b;
+ }`, 0},
+ {`var str = "\"{"`, 0},
+ {`var str = "'("`, 0},
+ {`var str = "\\{"`, 0},
+ {`var str = "\\\\{"`, 0},
+ {`var str = 'a"{`, 0},
+ {`var obj = {`, 1},
+ {`var obj = { {a:1`, 2},
+ {`var obj = { {a:1}`, 1},
+ {`var obj = { {a:1}, b:2}`, 0},
+ {`var obj = {}`, 0},
+ {`var obj = {
+ a: 1, b: 2
+ }`, 0},
+ {`var test = }`, -1},
+ {`var str = "a\""; var obj = {`, 1},
+ }
+
+ for i, tt := range testCases {
+ counted := countIndents(tt.input)
+ if counted != tt.expectedIndentCount {
+ t.Errorf("test %d: invalid indenting: have %d, want %d", i, counted, tt.expectedIndentCount)
+ }
+ }
+}
diff --git a/core/tx_pool.go b/core/tx_pool.go
index f2eb2bbdd..596356377 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -368,6 +368,9 @@ func (self *TxPool) AddTransactions(txs []*types.Transaction) {
// GetTransaction returns a transaction if it is contained in the pool
// and nil otherwise.
func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
+ tp.mu.RLock()
+ defer tp.mu.RUnlock()
+
// check the txs first
if tx, ok := tp.pending[hash]; ok {
return tx
@@ -421,12 +424,18 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) {
self.mu.Lock()
defer self.mu.Unlock()
for _, tx := range txs {
- self.RemoveTx(tx.Hash())
+ self.removeTx(tx.Hash())
}
}
// RemoveTx removes the transaction with the given hash from the pool.
func (pool *TxPool) RemoveTx(hash common.Hash) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+ pool.removeTx(hash)
+}
+
+func (pool *TxPool) removeTx(hash common.Hash) {
// delete from pending pool
delete(pool.pending, hash)
// delete from queue
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index f6dbb4610..92124cfeb 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -54,14 +54,15 @@ var (
blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
- headerTargetRTT = time.Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now)
- headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out
- bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request
- bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
- receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request
- receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired
- stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request
- stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired
+ rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests
+ rttMaxEstimate = 20 * time.Second // Maximum rount-trip time to target for download requests
+ rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
+ ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
+ ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts
+
+ qosTuningPeers = 5 // Number of peers to tune based on (best peers)
+ qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
+ qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
@@ -73,6 +74,7 @@ var (
fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point
fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync
+ fsCriticalTrials = 10 // Number of times to retry in the cricical section before bailing
)
var (
@@ -103,14 +105,17 @@ var (
)
type Downloader struct {
- mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
- noFast bool // Flag to disable fast syncing in case of a security error
- mux *event.TypeMux // Event multiplexer to announce sync operation events
+ mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
+ mux *event.TypeMux // Event multiplexer to announce sync operation events
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
- interrupt int32 // Atomic boolean to signal termination
+ fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
+ fsPivotFails int // Number of fast sync failures in the critical section
+
+ rttEstimate uint64 // Round trip time to target for download requests
+ rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
// Statistics
syncStatsChainOrigin uint64 // Origin block number where syncing started at
@@ -156,6 +161,9 @@ type Downloader struct {
cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
+ quitCh chan struct{} // Quit channel to signal termination
+ quitLock sync.RWMutex // Lock to prevent double closes
+
// Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
@@ -169,11 +177,13 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn,
insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader {
- return &Downloader{
+ dl := &Downloader{
mode: FullSync,
mux: mux,
queue: newQueue(stateDb),
peers: newPeerSet(),
+ rttEstimate: uint64(rttMaxEstimate),
+ rttConfidence: uint64(1000000),
hasHeader: hasHeader,
hasBlockAndState: hasBlockAndState,
getHeader: getHeader,
@@ -200,7 +210,10 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
receiptWakeCh: make(chan bool, 1),
stateWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 1),
+ quitCh: make(chan struct{}),
}
+ go dl.qosTuner()
+ return dl
}
// Progress retrieves the synchronisation boundaries, specifically the origin
@@ -247,6 +260,8 @@ func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
glog.V(logger.Error).Infoln("Register failed:", err)
return err
}
+ d.qosReduceConfidence()
+
return nil
}
@@ -314,6 +329,15 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
default:
}
}
+ for _, ch := range []chan dataPack{d.hashCh, d.blockCh, d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} {
+ for empty := false; !empty; {
+ select {
+ case <-ch:
+ default:
+ empty = true
+ }
+ }
+ }
for empty := false; !empty; {
select {
case <-d.headerProcCh:
@@ -330,7 +354,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// Set the requested sync mode, unless it's forbidden
d.mode = mode
- if d.mode == FastSync && d.noFast {
+ if d.mode == FastSync && d.fsPivotFails >= fsCriticalTrials {
d.mode = FullSync
}
// Retrieve the origin peer and initiate the downloading process
@@ -413,12 +437,17 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
pivot = height
case FastSync:
// Calculate the new fast/slow sync pivot point
- pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
- if err != nil {
- panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
- }
- if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
- pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
+ if d.fsPivotLock == nil {
+ pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
+ if err != nil {
+ panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
+ }
+ if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
+ pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
+ }
+ } else {
+ // Pivot point locked in, use this and do not pick a new one!
+ pivot = d.fsPivotLock.Number.Uint64()
}
// If the point is below the origin, move origin back to ensure state download
if pivot < origin {
@@ -498,7 +527,16 @@ func (d *Downloader) cancel() {
// Terminate interrupts the downloader, canceling all pending operations.
// The downloader cannot be reused after calling Terminate.
func (d *Downloader) Terminate() {
- atomic.StoreInt32(&d.interrupt, 1)
+ // Close the termination channel (make sure double close is allowed)
+ d.quitLock.Lock()
+ select {
+ case <-d.quitCh:
+ default:
+ close(d.quitCh)
+ }
+ d.quitLock.Unlock()
+
+ // Cancel any pending download requests
d.cancel()
}
@@ -915,7 +953,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// Reserve a chunk of hashes for a peer. A nil can mean either that
// no more hashes are available, or that the peer is known not to
// have them.
- request := d.queue.ReserveBlocks(peer, peer.BlockCapacity())
+ request := d.queue.ReserveBlocks(peer, peer.BlockCapacity(blockTargetRTT))
if request == nil {
continue
}
@@ -956,7 +994,7 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
// Request the advertised remote head block and wait for the response
go p.getRelHeaders(p.head, 1, 0, false)
- timeout := time.After(headerTTL)
+ timeout := time.After(d.requestTTL())
for {
select {
case <-d.cancelCh:
@@ -1024,7 +1062,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// Wait for the remote response to the head fetch
number, hash := uint64(0), common.Hash{}
- timeout := time.After(hashTTL)
+ timeout := time.After(d.requestTTL())
for finished := false; !finished; {
select {
@@ -1101,7 +1139,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// Split our chain interval in two, and request the hash to cross check
check := (start + end) / 2
- timeout := time.After(hashTTL)
+ timeout := time.After(d.requestTTL())
go p.getAbsHeaders(uint64(check), 1, 0, false)
// Wait until a reply arrives to this request
@@ -1182,7 +1220,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
getHeaders := func(from uint64) {
request = time.Now()
- timeout.Reset(headerTTL)
+ timeout.Reset(d.requestTTL())
if skeleton {
glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from)
@@ -1218,8 +1256,12 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
// If no more headers are inbound, notify the content fetchers and return
if packet.Items() == 0 {
glog.V(logger.Debug).Infof("%v: no available headers", p)
- d.headerProcCh <- nil
- return nil
+ select {
+ case d.headerProcCh <- nil:
+ return nil
+ case <-d.cancelCh:
+ return errCancelHeaderFetch
+ }
}
headers := packet.(*headerPack).headers
@@ -1290,13 +1332,13 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
pack := packet.(*headerPack)
return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh)
}
- expire = func() map[string]int { return d.queue.ExpireHeaders(headerTTL) }
+ expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
throttle = func() bool { return false }
reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveHeaders(p, count), false, nil
}
fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
- capacity = func(p *peer) int { return p.HeaderCapacity() }
+ capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) }
)
err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
@@ -1320,9 +1362,9 @@ func (d *Downloader) fetchBodies(from uint64) error {
pack := packet.(*bodyPack)
return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
}
- expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) }
+ expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
- capacity = func(p *peer) int { return p.BlockCapacity() }
+ capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
)
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
@@ -1344,9 +1386,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
}
- expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) }
+ expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
- capacity = func(p *peer) int { return p.ReceiptCapacity() }
+ capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
)
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
@@ -1396,13 +1438,13 @@ func (d *Downloader) fetchNodeData() error {
}
})
}
- expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }
+ expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) }
throttle = func() bool { return false }
reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveNodeData(p, count), false, nil
}
fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
- capacity = func(p *peer) int { return p.NodeDataCapacity() }
+ capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
)
err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
@@ -1611,9 +1653,18 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number())
- // If we're already past the pivot point, this could be an attack, disable fast sync
+ // If we're already past the pivot point, this could be an attack, thread carefully
if rollback[len(rollback)-1].Number.Uint64() > pivot {
- d.noFast = true
+ // If we didn't ever fail, lock in te pivot header (must! not! change!)
+ if d.fsPivotFails == 0 {
+ for _, header := range rollback {
+ if header.Number.Uint64() == pivot {
+ glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4])
+ d.fsPivotLock = header
+ }
+ }
+ }
+ d.fsPivotFails++
}
}
}()
@@ -1712,6 +1763,13 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
}
}
+ // If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
+ if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot {
+ if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() {
+ glog.V(logger.Warn).Infof("Pivot doesn't match locked in version: have #%v [%x…], want #%v [%x…]", pivot.Number, pivot.Hash().Bytes()[:4], d.fsPivotLock.Number, d.fsPivotLock.Hash().Bytes()[:4])
+ return errInvalidChain
+ }
+ }
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d.mode == FullSync || d.mode == FastSync {
// If we've reached the allowed number of pending headers, stall a bit
@@ -1762,8 +1820,10 @@ func (d *Downloader) processContent() error {
}
for len(results) != 0 {
// Check for any termination requests
- if atomic.LoadInt32(&d.interrupt) == 1 {
+ select {
+ case <-d.quitCh:
return errCancelContentProcessing
+ default:
}
// Retrieve the a batch of results to import
var (
@@ -1864,3 +1924,74 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i
return errNoSyncActive
}
}
+
+// qosTuner is the quality of service tuning loop that occasionally gathers the
+// peer latency statistics and updates the estimated request round trip time.
+func (d *Downloader) qosTuner() {
+ for {
+ // Retrieve the current median RTT and integrate into the previoust target RTT
+ rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
+ atomic.StoreUint64(&d.rttEstimate, uint64(rtt))
+
+ // A new RTT cycle passed, increase our confidence in the estimated RTT
+ conf := atomic.LoadUint64(&d.rttConfidence)
+ conf = conf + (1000000-conf)/2
+ atomic.StoreUint64(&d.rttConfidence, conf)
+
+ // Log the new QoS values and sleep until the next RTT
+ glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
+ select {
+ case <-d.quitCh:
+ return
+ case <-time.After(rtt):
+ }
+ }
+}
+
+// qosReduceConfidence is meant to be called when a new peer joins the downloader's
+// peer set, needing to reduce the confidence we have in out QoS estimates.
+func (d *Downloader) qosReduceConfidence() {
+ // If we have a single peer, confidence is always 1
+ peers := uint64(d.peers.Len())
+ if peers == 1 {
+ atomic.StoreUint64(&d.rttConfidence, 1000000)
+ return
+ }
+ // If we have a ton of peers, don't drop confidence)
+ if peers >= uint64(qosConfidenceCap) {
+ return
+ }
+ // Otherwise drop the confidence factor
+ conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
+ if float64(conf)/1000000 < rttMinConfidence {
+ conf = uint64(rttMinConfidence * 1000000)
+ }
+ atomic.StoreUint64(&d.rttConfidence, conf)
+
+ rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
+ glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
+}
+
+// requestRTT returns the current target round trip time for a download request
+// to complete in.
+//
+// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
+// the downloader tries to adapt queries to the RTT, so multiple RTT values can
+// be adapted to, but smaller ones are preffered (stabler download stream).
+func (d *Downloader) requestRTT() time.Duration {
+ return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
+}
+
+// requestTTL returns the current timeout allowance for a single download request
+// to finish under.
+func (d *Downloader) requestTTL() time.Duration {
+ var (
+ rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate))
+ conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
+ )
+ ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
+ if ttl > ttlLimit {
+ ttl = ttlLimit
+ }
+ return ttl
+}
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 1cf0e7cd3..a9c069a92 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -149,22 +149,25 @@ type downloadTester struct {
peerReceipts map[string]map[common.Hash]types.Receipts // Receipts belonging to different test peers
peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains
+ peerMissingStates map[string]map[common.Hash]bool // State entries that fast sync should not return
+
lock sync.RWMutex
}
// newTester creates a new downloader test mocker.
func newTester() *downloadTester {
tester := &downloadTester{
- ownHashes: []common.Hash{genesis.Hash()},
- ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
- ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
- ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): nil},
- ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()},
- peerHashes: make(map[string][]common.Hash),
- peerHeaders: make(map[string]map[common.Hash]*types.Header),
- peerBlocks: make(map[string]map[common.Hash]*types.Block),
- peerReceipts: make(map[string]map[common.Hash]types.Receipts),
- peerChainTds: make(map[string]map[common.Hash]*big.Int),
+ ownHashes: []common.Hash{genesis.Hash()},
+ ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
+ ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
+ ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): nil},
+ ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()},
+ peerHashes: make(map[string][]common.Hash),
+ peerHeaders: make(map[string]map[common.Hash]*types.Header),
+ peerBlocks: make(map[string]map[common.Hash]*types.Block),
+ peerReceipts: make(map[string]map[common.Hash]types.Receipts),
+ peerChainTds: make(map[string]map[common.Hash]*big.Int),
+ peerMissingStates: make(map[string]map[common.Hash]bool),
}
tester.stateDb, _ = ethdb.NewMemDatabase()
tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00})
@@ -176,6 +179,12 @@ func newTester() *downloadTester {
return tester
}
+// terminate aborts any operations on the embedded downloader and releases all
+// held resources.
+func (dl *downloadTester) terminate() {
+ dl.downloader.Terminate()
+}
+
// sync starts synchronizing with a remote peer, blocking until it completes.
func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
dl.lock.RLock()
@@ -408,6 +417,7 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
dl.peerBlocks[id] = make(map[common.Hash]*types.Block)
dl.peerReceipts[id] = make(map[common.Hash]types.Receipts)
dl.peerChainTds[id] = make(map[common.Hash]*big.Int)
+ dl.peerMissingStates[id] = make(map[common.Hash]bool)
genesis := hashes[len(hashes)-1]
if header := headers[genesis]; header != nil {
@@ -648,7 +658,9 @@ func (dl *downloadTester) peerGetNodeDataFn(id string, delay time.Duration) func
results := make([][]byte, 0, len(hashes))
for _, hash := range hashes {
if data, err := testdb.Get(hash.Bytes()); err == nil {
- results = append(results, data)
+ if !dl.peerMissingStates[id][hash] {
+ results = append(results, data)
+ }
}
}
go dl.downloader.DeliverNodeData(id, results)
@@ -734,6 +746,8 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester()
+ defer tester.terminate()
+
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Synchronise with the peer and make sure all relevant data was retrieved
@@ -758,6 +772,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester()
+ defer tester.terminate()
+
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Wrap the importer to allow stepping
@@ -845,6 +861,8 @@ func testForkedSync(t *testing.T, protocol int, mode SyncMode) {
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true)
tester := newTester()
+ defer tester.terminate()
+
tester.newPeer("fork A", protocol, hashesA, headersA, blocksA, receiptsA)
tester.newPeer("fork B", protocol, hashesB, headersB, blocksB, receiptsB)
@@ -879,6 +897,8 @@ func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, false)
tester := newTester()
+ defer tester.terminate()
+
tester.newPeer("light", protocol, hashesA, headersA, blocksA, receiptsA)
tester.newPeer("heavy", protocol, hashesB[fork/2:], headersB, blocksB, receiptsB)
@@ -928,6 +948,8 @@ func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) {
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true)
tester := newTester()
+ defer tester.terminate()
+
tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA)
tester.newPeer("rewriter", protocol, hashesB, headersB, blocksB, receiptsB)
@@ -962,6 +984,8 @@ func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, false)
tester := newTester()
+ defer tester.terminate()
+
tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA)
tester.newPeer("heavy-rewriter", protocol, hashesB[MaxForkAncestry-17:], headersB, blocksB, receiptsB) // Root the fork below the ancestor limit
@@ -981,7 +1005,9 @@ func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
// bodies.
func TestInactiveDownloader62(t *testing.T) {
t.Parallel()
+
tester := newTester()
+ defer tester.terminate()
// Check that neither block headers nor bodies are accepted
if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
@@ -996,7 +1022,9 @@ func TestInactiveDownloader62(t *testing.T) {
// bodies and receipts.
func TestInactiveDownloader63(t *testing.T) {
t.Parallel()
+
tester := newTester()
+ defer tester.terminate()
// Check that neither block headers nor bodies are accepted
if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
@@ -1033,6 +1061,8 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) {
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester()
+ defer tester.terminate()
+
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Make sure canceling works with a pristine downloader
@@ -1068,6 +1098,8 @@ func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester()
+ defer tester.terminate()
+
for i := 0; i < targetPeers; i++ {
id := fmt.Sprintf("peer #%d", i)
tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], headers, blocks, receipts)
@@ -1097,6 +1129,8 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
// Create peers of every type
tester := newTester()
+ defer tester.terminate()
+
tester.newPeer("peer 61", 61, hashes, nil, blocks, nil)
tester.newPeer("peer 62", 62, hashes, headers, blocks, nil)
tester.newPeer("peer 63", 63, hashes, headers, blocks, receipts)
@@ -1134,6 +1168,8 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester()
+ defer tester.terminate()
+
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Instrument the downloader to signal body requests
@@ -1187,6 +1223,7 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester()
+ defer tester.terminate()
// Attempt a full sync with an attacker feeding gapped headers
tester.newPeer("attack", protocol, hashes, headers, blocks, receipts)
@@ -1219,6 +1256,7 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester()
+ defer tester.terminate()
// Attempt a full sync with an attacker feeding shifted headers
tester.newPeer("attack", protocol, hashes, headers, blocks, receipts)
@@ -1250,6 +1288,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
tester := newTester()
+ defer tester.terminate()
// Attempt to sync with an attacker that feeds junk during the fast sync phase.
// This should result in the last fsHeaderSafetyNet headers being rolled back.
@@ -1288,7 +1327,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
tester.newPeer("withhold-attack", protocol, hashes, headers, blocks, receipts)
missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1
- tester.downloader.noFast = false
+ tester.downloader.fsPivotFails = 0
tester.downloader.syncInitHook = func(uint64, uint64) {
for i := missing; i <= len(hashes); i++ {
delete(tester.peerHeaders["withhold-attack"], hashes[len(hashes)-i])
@@ -1307,6 +1346,8 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
t.Errorf("fast sync pivot block #%d not rolled back", head)
}
}
+ tester.downloader.fsPivotFails = fsCriticalTrials
+
// Synchronise with the valid peer and make sure sync succeeds. Since the last
// rollback should also disable fast syncing for this process, verify that we
// did a fresh full sync. Note, we can't assert anything about the receipts
@@ -1339,9 +1380,11 @@ func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
- hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil, false)
+ defer tester.terminate()
+ hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil, false)
tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, headers, blocks, receipts)
+
if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
}
@@ -1384,6 +1427,8 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
}
// Run the tests and check disconnection status
tester := newTester()
+ defer tester.terminate()
+
for i, tt := range tests {
// Register a new peer and ensure it's presence
id := fmt.Sprintf("test %d", i)
@@ -1425,6 +1470,8 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
progress := make(chan struct{})
tester := newTester()
+ defer tester.terminate()
+
tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{}
<-progress
@@ -1497,6 +1544,8 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
progress := make(chan struct{})
tester := newTester()
+ defer tester.terminate()
+
tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{}
<-progress
@@ -1572,6 +1621,8 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
progress := make(chan struct{})
tester := newTester()
+ defer tester.terminate()
+
tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{}
<-progress
@@ -1648,6 +1699,8 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
progress := make(chan struct{})
tester := newTester()
+ defer tester.terminate()
+
tester.downloader.syncInitHook = func(origin, latest uint64) {
starting <- struct{}{}
<-progress
@@ -1734,7 +1787,7 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
impl := tester.peerGetAbsHeadersFn("peer", 0)
go impl(from, count, skip, reverse)
// None of the extra deliveries should block.
- timeout := time.After(5 * time.Second)
+ timeout := time.After(15 * time.Second)
for i := 0; i < cap(deliveriesDone); i++ {
select {
case <-deliveriesDone:
@@ -1747,5 +1800,48 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
if err := tester.sync("peer", nil, mode); err != nil {
t.Errorf("sync failed: %v", err)
}
+ tester.terminate()
}
}
+
+// Tests that if fast sync aborts in the critical section, it can restart a few
+// times before giving up.
+func TestFastCriticalRestarts63(t *testing.T) { testFastCriticalRestarts(t, 63) }
+func TestFastCriticalRestarts64(t *testing.T) { testFastCriticalRestarts(t, 64) }
+
+func testFastCriticalRestarts(t *testing.T, protocol int) {
+ t.Parallel()
+
+ // Create a large enough blockchin to actually fast sync on
+ targetBlocks := fsMinFullBlocks + 2*fsPivotInterval - 15
+ hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false)
+
+ // Create a tester peer with the critical section state roots missing (force failures)
+ tester := newTester()
+ defer tester.terminate()
+
+ tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
+ for i := 0; i < fsPivotInterval; i++ {
+ tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true
+ }
+ // Synchronise with the peer a few times and make sure they fail until the retry limit
+ for i := 0; i < fsCriticalTrials; i++ {
+ // Attempt a sync and ensure it fails properly
+ if err := tester.sync("peer", nil, FastSync); err == nil {
+ t.Fatalf("failing fast sync succeeded: %v", err)
+ }
+ time.Sleep(500 * time.Millisecond) // Make sure no in-flight requests remain
+
+ // If it's the first failure, pivot should be locked => reenable all others to detect pivot changes
+ if i == 0 {
+ tester.lock.Lock()
+ tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true}
+ tester.lock.Unlock()
+ }
+ }
+ // Retry limit exhausted, downloader will switch to full sync, should succeed
+ if err := tester.sync("peer", nil, FastSync); err != nil {
+ t.Fatalf("failed to synchronise blocks in slow sync: %v", err)
+ }
+ assertOwnChain(t, tester, targetBlocks+1)
+}
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 6aab907d7..94d44fca4 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -23,6 +23,8 @@ import (
"errors"
"fmt"
"math"
+ "sort"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -31,8 +33,8 @@ import (
)
const (
- maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items
- throughputImpact = 0.1 // The impact a single measurement has on a peer's final throughput value.
+ maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items
+ measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value.
)
// Hash and block fetchers belonging to eth/61 and below
@@ -68,6 +70,8 @@ type peer struct {
receiptThroughput float64 // Number of receipts measured to be retrievable per second
stateThroughput float64 // Number of node data pieces measured to be retrievable per second
+ rtt time.Duration // Request round trip time to track responsiveness (QoS)
+
headerStarted time.Time // Time instance when the last header fetch was started
blockStarted time.Time // Time instance when the last block (body) fetch was started
receiptStarted time.Time // Time instance when the last receipt fetch was started
@@ -290,44 +294,47 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id
return
}
// Otherwise update the throughput with a new measurement
- measured := float64(delivered) / (float64(time.Since(started)+1) / float64(time.Second)) // +1 (ns) to ensure non-zero divisor
- *throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured
+ elapsed := time.Since(started) + 1 // +1 (ns) to ensure non-zero divisor
+ measured := float64(delivered) / (float64(elapsed) / float64(time.Second))
+
+ *throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured
+ p.rtt = time.Duration((1-measurementImpact)*float64(p.rtt) + measurementImpact*float64(elapsed))
}
// HeaderCapacity retrieves the peers header download allowance based on its
// previously discovered throughput.
-func (p *peer) HeaderCapacity() int {
+func (p *peer) HeaderCapacity(targetRTT time.Duration) int {
p.lock.RLock()
defer p.lock.RUnlock()
- return int(math.Max(1, math.Min(p.headerThroughput*float64(headerTargetRTT)/float64(time.Second), float64(MaxHeaderFetch))))
+ return int(math.Min(1+math.Max(1, p.headerThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxHeaderFetch)))
}
// BlockCapacity retrieves the peers block download allowance based on its
// previously discovered throughput.
-func (p *peer) BlockCapacity() int {
+func (p *peer) BlockCapacity(targetRTT time.Duration) int {
p.lock.RLock()
defer p.lock.RUnlock()
- return int(math.Max(1, math.Min(p.blockThroughput*float64(blockTargetRTT)/float64(time.Second), float64(MaxBlockFetch))))
+ return int(math.Min(1+math.Max(1, p.blockThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxBlockFetch)))
}
// ReceiptCapacity retrieves the peers receipt download allowance based on its
// previously discovered throughput.
-func (p *peer) ReceiptCapacity() int {
+func (p *peer) ReceiptCapacity(targetRTT time.Duration) int {
p.lock.RLock()
defer p.lock.RUnlock()
- return int(math.Max(1, math.Min(p.receiptThroughput*float64(receiptTargetRTT)/float64(time.Second), float64(MaxReceiptFetch))))
+ return int(math.Min(1+math.Max(1, p.receiptThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxReceiptFetch)))
}
// NodeDataCapacity retrieves the peers state download allowance based on its
// previously discovered throughput.
-func (p *peer) NodeDataCapacity() int {
+func (p *peer) NodeDataCapacity(targetRTT time.Duration) int {
p.lock.RLock()
defer p.lock.RUnlock()
- return int(math.Max(1, math.Min(p.stateThroughput*float64(stateTargetRTT)/float64(time.Second), float64(MaxStateFetch))))
+ return int(math.Min(1+math.Max(1, p.stateThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxStateFetch)))
}
// MarkLacking appends a new entity to the set of items (blocks, receipts, states)
@@ -361,13 +368,14 @@ func (p *peer) String() string {
p.lock.RLock()
defer p.lock.RUnlock()
- return fmt.Sprintf("Peer %s [%s]", p.id,
- fmt.Sprintf("headers %3.2f/s, ", p.headerThroughput)+
- fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+
- fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+
- fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+
- fmt.Sprintf("lacking %4d", len(p.lacking)),
- )
+ return fmt.Sprintf("Peer %s [%s]", p.id, strings.Join([]string{
+ fmt.Sprintf("hs %3.2f/s", p.headerThroughput),
+ fmt.Sprintf("bs %3.2f/s", p.blockThroughput),
+ fmt.Sprintf("rs %3.2f/s", p.receiptThroughput),
+ fmt.Sprintf("ss %3.2f/s", p.stateThroughput),
+ fmt.Sprintf("miss %4d", len(p.lacking)),
+ fmt.Sprintf("rtt %v", p.rtt),
+ }, ", "))
}
// peerSet represents the collection of active peer participating in the chain
@@ -402,6 +410,10 @@ func (ps *peerSet) Reset() {
// average of all existing peers, to give it a realistic chance of being used
// for data retrievals.
func (ps *peerSet) Register(p *peer) error {
+ // Retrieve the current median RTT as a sane default
+ p.rtt = ps.medianRTT()
+
+ // Register the new peer with some meaningful defaults
ps.lock.Lock()
defer ps.lock.Unlock()
@@ -564,3 +576,34 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer)
}
return idle, total
}
+
+// medianRTT returns the median RTT of te peerset, considering only the tuning
+// peers if there are more peers available.
+func (ps *peerSet) medianRTT() time.Duration {
+ // Gather all the currnetly measured round trip times
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ rtts := make([]float64, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ p.lock.RLock()
+ rtts = append(rtts, float64(p.rtt))
+ p.lock.RUnlock()
+ }
+ sort.Float64s(rtts)
+
+ median := rttMaxEstimate
+ if qosTuningPeers <= len(rtts) {
+ median = time.Duration(rtts[qosTuningPeers/2]) // Median of our tuning peers
+ } else if len(rtts) > 0 {
+ median = time.Duration(rtts[len(rtts)/2]) // Median of our connected peers (maintain even like this some baseline qos)
+ }
+ // Restrict the RTT into some QoS defaults, irrelevant of true RTT
+ if median < rttMinEstimate {
+ median = rttMinEstimate
+ }
+ if median > rttMaxEstimate {
+ median = rttMaxEstimate
+ }
+ return median
+}
diff --git a/eth/handler.go b/eth/handler.go
index 58869a2ee..1e4dc1289 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -59,7 +59,9 @@ type blockFetcherFn func([]common.Hash) error
type ProtocolManager struct {
networkId int
- fastSync uint32
+ fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
+ synced uint32 // Flag whether we're considered synchronised (enables transaction processing)
+
txpool txPool
blockchain *core.BlockChain
chaindb ethdb.Database
@@ -161,7 +163,11 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int,
heighter := func() uint64 {
return blockchain.CurrentBlock().NumberU64()
}
- manager.fetcher = fetcher.New(blockchain.GetBlock, validator, manager.BroadcastBlock, heighter, manager.insertChain, manager.removePeer)
+ inserter := func(blocks types.Blocks) (int, error) {
+ atomic.StoreUint32(&manager.synced, 1) // Mark initial sync done on any fetcher import
+ return manager.insertChain(blocks)
+ }
+ manager.fetcher = fetcher.New(blockchain.GetBlock, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
if blockchain.Genesis().Hash().Hex() == defaultGenesisHash && networkId == 1 {
glog.V(logger.Debug).Infoln("Bad Block Reporting is enabled")
@@ -698,8 +704,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
case msg.Code == TxMsg:
- // Transactions arrived, make sure we have a valid chain to handle them
- if atomic.LoadUint32(&pm.fastSync) == 1 {
+ // Transactions arrived, make sure we have a valid and fresh chain to handle them
+ if atomic.LoadUint32(&pm.synced) == 0 {
break
}
// Transactions can be processed, parse all of them and deliver to the pool
diff --git a/eth/protocol_test.go b/eth/protocol_test.go
index 0a82e2e79..f860d0a35 100644
--- a/eth/protocol_test.go
+++ b/eth/protocol_test.go
@@ -97,6 +97,7 @@ func TestRecvTransactions63(t *testing.T) { testRecvTransactions(t, 63) }
func testRecvTransactions(t *testing.T, protocol int) {
txAdded := make(chan []*types.Transaction)
pm := newTestProtocolManagerMust(t, false, 0, nil, txAdded)
+ pm.synced = 1 // mark synced to accept transactions
p, _ := newTestPeer("peer", protocol, pm, true)
defer pm.Stop()
defer p.close()
diff --git a/eth/sync.go b/eth/sync.go
index 4b16c1322..52f7e90e7 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -174,6 +174,8 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
if err := pm.downloader.Synchronise(peer.id, peer.Head(), peer.Td(), mode); err != nil {
return
}
+ atomic.StoreUint32(&pm.synced, 1) // Mark initial sync done
+
// If fast sync was enabled, and we synced up, disable it
if atomic.LoadUint32(&pm.fastSync) == 1 {
// Disable fast sync if we indeed have something in our chain
diff --git a/internal/jsre/jsre.go b/internal/jsre/jsre.go
index a95efd379..481389304 100644
--- a/internal/jsre/jsre.go
+++ b/internal/jsre/jsre.go
@@ -24,7 +24,6 @@ import (
"io"
"io/ioutil"
"math/rand"
- "sync"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -44,7 +43,7 @@ type JSRE struct {
output io.Writer
evalQueue chan *evalReq
stopEventLoop chan bool
- loopWg sync.WaitGroup
+ closed chan struct{}
}
// jsTimer is a single timer instance with a callback function
@@ -66,10 +65,10 @@ func New(assetPath string, output io.Writer) *JSRE {
re := &JSRE{
assetPath: assetPath,
output: output,
+ closed: make(chan struct{}),
evalQueue: make(chan *evalReq),
stopEventLoop: make(chan bool),
}
- re.loopWg.Add(1)
go re.runEventLoop()
re.Set("loadScript", re.loadScript)
re.Set("inspect", prettyPrintJS)
@@ -98,6 +97,8 @@ func randomSource() *rand.Rand {
// functions should be used if and only if running a routine that was already
// called from JS through an RPC call.
func (self *JSRE) runEventLoop() {
+ defer close(self.closed)
+
vm := otto.New()
r := randomSource()
vm.SetRandomSource(r.Float64)
@@ -213,8 +214,6 @@ loop:
timer.timer.Stop()
delete(registry, timer)
}
-
- self.loopWg.Done()
}
// Do executes the given function on the JS event loop.
@@ -227,8 +226,11 @@ func (self *JSRE) Do(fn func(*otto.Otto)) {
// stops the event loop before exit, optionally waits for all timers to expire
func (self *JSRE) Stop(waitForCallbacks bool) {
- self.stopEventLoop <- waitForCallbacks
- self.loopWg.Wait()
+ select {
+ case <-self.closed:
+ case self.stopEventLoop <- waitForCallbacks:
+ <-self.closed
+ }
}
// Exec(file) loads and runs the contents of a file