diff options
-rw-r--r-- | cmd/geth/accountcmd.go | 2 | ||||
-rw-r--r-- | cmd/geth/consolecmd_test.go | 62 | ||||
-rw-r--r-- | console/console.go | 68 | ||||
-rw-r--r-- | console/console_test.go | 46 | ||||
-rw-r--r-- | core/tx_pool.go | 11 | ||||
-rw-r--r-- | eth/downloader/downloader.go | 209 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 124 | ||||
-rw-r--r-- | eth/downloader/peer.go | 81 | ||||
-rw-r--r-- | eth/handler.go | 14 | ||||
-rw-r--r-- | eth/protocol_test.go | 1 | ||||
-rw-r--r-- | eth/sync.go | 2 | ||||
-rw-r--r-- | internal/jsre/jsre.go | 16 |
12 files changed, 516 insertions, 120 deletions
diff --git a/cmd/geth/accountcmd.go b/cmd/geth/accountcmd.go index 0f9d95c2c..2c2308514 100644 --- a/cmd/geth/accountcmd.go +++ b/cmd/geth/accountcmd.go @@ -70,7 +70,7 @@ either new or import). Without it you are not able to unlock your account. Note that exporting your key in unencrypted format is NOT supported. -Keys are stored under <DATADIR>/keys. +Keys are stored under <DATADIR>/keystore. It is safe to transfer the entire directory or the individual keys therein between ethereum nodes by simply copying. Make sure you backup your keys regularly. diff --git a/cmd/geth/consolecmd_test.go b/cmd/geth/consolecmd_test.go index 9cfb3e4e3..e0e549e12 100644 --- a/cmd/geth/consolecmd_test.go +++ b/cmd/geth/consolecmd_test.go @@ -17,7 +17,8 @@ package main import ( - "math/rand" + "crypto/rand" + "math/big" "os" "path/filepath" "runtime" @@ -27,7 +28,6 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum/console" "github.com/ethereum/go-ethereum/rpc" ) @@ -37,9 +37,10 @@ func TestConsoleWelcome(t *testing.T) { coinbase := "0x8605cdbbdb6d264aa742e77020dcbc58fcdce182" // Start a geth console, make sure it's cleaned up and terminate the console - geth := runGeth(t, "--nat", "none", "--nodiscover", "--etherbase", coinbase, "-shh", "console") - defer geth.expectExit() - geth.stdin.Close() + geth := runGeth(t, + "--port", "0", "--maxpeers", "0", "--nodiscover", "--nat", "none", + "--etherbase", coinbase, "--shh", + "console") // Gather all the infos the welcome message needs to contain geth.setTemplateFunc("goos", func() string { return runtime.GOOS }) @@ -51,7 +52,6 @@ func TestConsoleWelcome(t *testing.T) { sort.Strings(apis) return apis }) - geth.setTemplateFunc("prompt", func() string { return console.DefaultPrompt }) // Verify the actual welcome message to the required template geth.expect(` @@ -63,52 +63,63 @@ at block: 0 ({{niltime}}) datadir: {{.Datadir}} modules:{{range apis}} {{.}}:1.0{{end}} -{{prompt}} +> {{.InputLine "exit"}} `) + geth.expectExit() } // Tests that a console can be attached to a running node via various means. func TestIPCAttachWelcome(t *testing.T) { // Configure the instance for IPC attachement coinbase := "0x8605cdbbdb6d264aa742e77020dcbc58fcdce182" - var ipc string if runtime.GOOS == "windows" { - ipc = `\\.\pipe\geth` + strconv.Itoa(rand.Int()) + ipc = `\\.\pipe\geth` + strconv.Itoa(trulyRandInt(100000, 999999)) } else { ws := tmpdir(t) defer os.RemoveAll(ws) - ipc = filepath.Join(ws, "geth.ipc") } - // Run the parent geth and attach with a child console - geth := runGeth(t, "--nat", "none", "--nodiscover", "--etherbase", coinbase, "-shh", "--ipcpath", ipc) - defer geth.interrupt() + // Note: we need --shh because testAttachWelcome checks for default + // list of ipc modules and shh is included there. + geth := runGeth(t, + "--port", "0", "--maxpeers", "0", "--nodiscover", "--nat", "none", + "--etherbase", coinbase, "--shh", "--ipcpath", ipc) time.Sleep(2 * time.Second) // Simple way to wait for the RPC endpoint to open testAttachWelcome(t, geth, "ipc:"+ipc) + + geth.interrupt() + geth.expectExit() } func TestHTTPAttachWelcome(t *testing.T) { coinbase := "0x8605cdbbdb6d264aa742e77020dcbc58fcdce182" - port := strconv.Itoa(rand.Intn(65535-1024) + 1024) // Yeah, sometimes this will fail, sorry :P - - geth := runGeth(t, "--nat", "none", "--nodiscover", "--etherbase", coinbase, "--rpc", "--rpcport", port) - defer geth.interrupt() + port := strconv.Itoa(trulyRandInt(1024, 65536)) // Yeah, sometimes this will fail, sorry :P + geth := runGeth(t, + "--port", "0", "--maxpeers", "0", "--nodiscover", "--nat", "none", + "--etherbase", coinbase, "--rpc", "--rpcport", port) time.Sleep(2 * time.Second) // Simple way to wait for the RPC endpoint to open testAttachWelcome(t, geth, "http://localhost:"+port) + + geth.interrupt() + geth.expectExit() } func TestWSAttachWelcome(t *testing.T) { coinbase := "0x8605cdbbdb6d264aa742e77020dcbc58fcdce182" - port := strconv.Itoa(rand.Intn(65535-1024) + 1024) // Yeah, sometimes this will fail, sorry :P + port := strconv.Itoa(trulyRandInt(1024, 65536)) // Yeah, sometimes this will fail, sorry :P - geth := runGeth(t, "--nat", "none", "--nodiscover", "--etherbase", coinbase, "--ws", "--wsport", port) - defer geth.interrupt() + geth := runGeth(t, + "--port", "0", "--maxpeers", "0", "--nodiscover", "--nat", "none", + "--etherbase", coinbase, "--ws", "--wsport", port) time.Sleep(2 * time.Second) // Simple way to wait for the RPC endpoint to open testAttachWelcome(t, geth, "ws://localhost:"+port) + + geth.interrupt() + geth.expectExit() } func testAttachWelcome(t *testing.T, geth *testgeth, endpoint string) { @@ -135,7 +146,6 @@ func testAttachWelcome(t *testing.T, geth *testgeth, endpoint string) { sort.Strings(apis) return apis }) - attach.setTemplateFunc("prompt", func() string { return console.DefaultPrompt }) // Verify the actual welcome message to the required template attach.expect(` @@ -147,6 +157,14 @@ at block: 0 ({{niltime}}){{if ipc}} datadir: {{datadir}}{{end}} modules:{{range apis}} {{.}}:1.0{{end}} -{{prompt}} +> {{.InputLine "exit" }} `) + attach.expectExit() +} + +// trulyRandInt generates a crypto random integer used by the console tests to +// not clash network ports with other tests running cocurrently. +func trulyRandInt(lo, hi int) int { + num, _ := rand.Int(rand.Reader, big.NewInt(int64(hi-lo))) + return int(num.Int64()) + lo } diff --git a/console/console.go b/console/console.go index a19b267bc..00d1fea1d 100644 --- a/console/console.go +++ b/console/console.go @@ -30,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/internal/jsre" "github.com/ethereum/go-ethereum/internal/web3ext" "github.com/ethereum/go-ethereum/rpc" + "github.com/mattn/go-colorable" "github.com/peterh/liner" "github.com/robertkrimen/otto" ) @@ -80,7 +81,7 @@ func New(config Config) (*Console, error) { config.Prompt = DefaultPrompt } if config.Printer == nil { - config.Printer = os.Stdout + config.Printer = colorable.NewColorableStdout() } // Initialize the console and return console := &Console{ @@ -244,15 +245,13 @@ func (c *Console) AutoCompleteInput(line string, pos int) (string, []string, str // console's available modules. func (c *Console) Welcome() { // Print some generic Geth metadata + fmt.Fprintf(c.printer, "Welcome to the Geth JavaScript console!\n\n") c.jsre.Run(` - (function () { - console.log("Welcome to the Geth JavaScript console!\n"); - console.log("instance: " + web3.version.node); - console.log("coinbase: " + eth.coinbase); - console.log("at block: " + eth.blockNumber + " (" + new Date(1000 * eth.getBlock(eth.blockNumber).timestamp) + ")"); - console.log(" datadir: " + admin.datadir); - })(); - `) + console.log("instance: " + web3.version.node); + console.log("coinbase: " + eth.coinbase); + console.log("at block: " + eth.blockNumber + " (" + new Date(1000 * eth.getBlock(eth.blockNumber).timestamp) + ")"); + console.log(" datadir: " + admin.datadir); + `) // List all the supported modules for the user to call if apis, err := c.client.SupportedModules(); err == nil { modules := make([]string, 0, len(apis)) @@ -260,9 +259,9 @@ func (c *Console) Welcome() { modules = append(modules, fmt.Sprintf("%s:%s", api, version)) } sort.Strings(modules) - c.jsre.Run("(function () { console.log(' modules: " + strings.Join(modules, " ") + "'); })();") + fmt.Fprintln(c.printer, " modules:", strings.Join(modules, " ")) } - c.jsre.Run("(function () { console.log(); })();") + fmt.Fprintln(c.printer) } // Evaluate executes code and pretty prints the result to the specified output @@ -332,11 +331,11 @@ func (c *Console) Interactive() { // Append the line to the input and check for multi-line interpretation input += line + "\n" - indents = strings.Count(input, "{") + strings.Count(input, "(") - strings.Count(input, "}") - strings.Count(input, ")") + indents = countIndents(input) if indents <= 0 { prompt = c.prompt } else { - prompt = strings.Repeat("..", indents*2) + " " + prompt = strings.Repeat(".", indents*3) + " " } // If all the needed lines are present, save the command and run if indents <= 0 { @@ -355,6 +354,49 @@ func (c *Console) Interactive() { } } +// countIndents returns the number of identations for the given input. +// In case of invalid input such as var a = } the result can be negative. +func countIndents(input string) int { + var ( + indents = 0 + inString = false + strOpenChar = ' ' // keep track of the string open char to allow var str = "I'm ...."; + charEscaped = false // keep track if the previous char was the '\' char, allow var str = "abc\"def"; + ) + + for _, c := range input { + switch c { + case '\\': + // indicate next char as escaped when in string and previous char isn't escaping this backslash + if !charEscaped && inString { + charEscaped = true + } + case '\'', '"': + if inString && !charEscaped && strOpenChar == c { // end string + inString = false + } else if !inString && !charEscaped { // begin string + inString = true + strOpenChar = c + } + charEscaped = false + case '{', '(': + if !inString { // ignore brackets when in string, allow var str = "a{"; without indenting + indents++ + } + charEscaped = false + case '}', ')': + if !inString { + indents-- + } + charEscaped = false + default: + charEscaped = false + } + } + + return indents +} + // Execute runs the JavaScript file specified as the argument. func (c *Console) Execute(path string) error { return c.jsre.Exec(path) diff --git a/console/console_test.go b/console/console_test.go index 911087824..7738d0c44 100644 --- a/console/console_test.go +++ b/console/console_test.go @@ -294,3 +294,49 @@ func TestPrettyError(t *testing.T) { t.Fatalf("pretty error mismatch: have %s, want %s", output, want) } } + +// Tests that tests if the number of indents for JS input is calculated correct. +func TestIndenting(t *testing.T) { + testCases := []struct { + input string + expectedIndentCount int + }{ + {`var a = 1;`, 0}, + {`"some string"`, 0}, + {`"some string with (parentesis`, 0}, + {`"some string with newline + ("`, 0}, + {`function v(a,b) {}`, 0}, + {`function f(a,b) { var str = "asd("; };`, 0}, + {`function f(a) {`, 1}, + {`function f(a, function(b) {`, 2}, + {`function f(a, function(b) { + var str = "a)}"; + });`, 0}, + {`function f(a,b) { + var str = "a{b(" + a, ", " + b; + }`, 0}, + {`var str = "\"{"`, 0}, + {`var str = "'("`, 0}, + {`var str = "\\{"`, 0}, + {`var str = "\\\\{"`, 0}, + {`var str = 'a"{`, 0}, + {`var obj = {`, 1}, + {`var obj = { {a:1`, 2}, + {`var obj = { {a:1}`, 1}, + {`var obj = { {a:1}, b:2}`, 0}, + {`var obj = {}`, 0}, + {`var obj = { + a: 1, b: 2 + }`, 0}, + {`var test = }`, -1}, + {`var str = "a\""; var obj = {`, 1}, + } + + for i, tt := range testCases { + counted := countIndents(tt.input) + if counted != tt.expectedIndentCount { + t.Errorf("test %d: invalid indenting: have %d, want %d", i, counted, tt.expectedIndentCount) + } + } +} diff --git a/core/tx_pool.go b/core/tx_pool.go index f2eb2bbdd..596356377 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -368,6 +368,9 @@ func (self *TxPool) AddTransactions(txs []*types.Transaction) { // GetTransaction returns a transaction if it is contained in the pool // and nil otherwise. func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction { + tp.mu.RLock() + defer tp.mu.RUnlock() + // check the txs first if tx, ok := tp.pending[hash]; ok { return tx @@ -421,12 +424,18 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) { self.mu.Lock() defer self.mu.Unlock() for _, tx := range txs { - self.RemoveTx(tx.Hash()) + self.removeTx(tx.Hash()) } } // RemoveTx removes the transaction with the given hash from the pool. func (pool *TxPool) RemoveTx(hash common.Hash) { + pool.mu.Lock() + defer pool.mu.Unlock() + pool.removeTx(hash) +} + +func (pool *TxPool) removeTx(hash common.Hash) { // delete from pending pool delete(pool.pending, hash) // delete from queue diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f6dbb4610..92124cfeb 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -54,14 +54,15 @@ var ( blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired - headerTargetRTT = time.Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now) - headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out - bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request - bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired - receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request - receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired - stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request - stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired + rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests + rttMaxEstimate = 20 * time.Second // Maximum rount-trip time to target for download requests + rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value + ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion + ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts + + qosTuningPeers = 5 // Number of peers to tune based on (best peers) + qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence + qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) @@ -73,6 +74,7 @@ var ( fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync + fsCriticalTrials = 10 // Number of times to retry in the cricical section before bailing ) var ( @@ -103,14 +105,17 @@ var ( ) type Downloader struct { - mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) - noFast bool // Flag to disable fast syncing in case of a security error - mux *event.TypeMux // Event multiplexer to announce sync operation events + mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) + mux *event.TypeMux // Event multiplexer to announce sync operation events queue *queue // Scheduler for selecting the hashes to download peers *peerSet // Set of active peers from which download can proceed - interrupt int32 // Atomic boolean to signal termination + fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries) + fsPivotFails int // Number of fast sync failures in the critical section + + rttEstimate uint64 // Round trip time to target for download requests + rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) // Statistics syncStatsChainOrigin uint64 // Origin block number where syncing started at @@ -156,6 +161,9 @@ type Downloader struct { cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers + quitCh chan struct{} // Quit channel to signal termination + quitLock sync.RWMutex // Lock to prevent double closes + // Testing hooks syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch @@ -169,11 +177,13 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader { - return &Downloader{ + dl := &Downloader{ mode: FullSync, mux: mux, queue: newQueue(stateDb), peers: newPeerSet(), + rttEstimate: uint64(rttMaxEstimate), + rttConfidence: uint64(1000000), hasHeader: hasHeader, hasBlockAndState: hasBlockAndState, getHeader: getHeader, @@ -200,7 +210,10 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha receiptWakeCh: make(chan bool, 1), stateWakeCh: make(chan bool, 1), headerProcCh: make(chan []*types.Header, 1), + quitCh: make(chan struct{}), } + go dl.qosTuner() + return dl } // Progress retrieves the synchronisation boundaries, specifically the origin @@ -247,6 +260,8 @@ func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, glog.V(logger.Error).Infoln("Register failed:", err) return err } + d.qosReduceConfidence() + return nil } @@ -314,6 +329,15 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode default: } } + for _, ch := range []chan dataPack{d.hashCh, d.blockCh, d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} { + for empty := false; !empty; { + select { + case <-ch: + default: + empty = true + } + } + } for empty := false; !empty; { select { case <-d.headerProcCh: @@ -330,7 +354,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode // Set the requested sync mode, unless it's forbidden d.mode = mode - if d.mode == FastSync && d.noFast { + if d.mode == FastSync && d.fsPivotFails >= fsCriticalTrials { d.mode = FullSync } // Retrieve the origin peer and initiate the downloading process @@ -413,12 +437,17 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e pivot = height case FastSync: // Calculate the new fast/slow sync pivot point - pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) - if err != nil { - panic(fmt.Sprintf("Failed to access crypto random source: %v", err)) - } - if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() { - pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64() + if d.fsPivotLock == nil { + pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) + if err != nil { + panic(fmt.Sprintf("Failed to access crypto random source: %v", err)) + } + if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() { + pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64() + } + } else { + // Pivot point locked in, use this and do not pick a new one! + pivot = d.fsPivotLock.Number.Uint64() } // If the point is below the origin, move origin back to ensure state download if pivot < origin { @@ -498,7 +527,16 @@ func (d *Downloader) cancel() { // Terminate interrupts the downloader, canceling all pending operations. // The downloader cannot be reused after calling Terminate. func (d *Downloader) Terminate() { - atomic.StoreInt32(&d.interrupt, 1) + // Close the termination channel (make sure double close is allowed) + d.quitLock.Lock() + select { + case <-d.quitCh: + default: + close(d.quitCh) + } + d.quitLock.Unlock() + + // Cancel any pending download requests d.cancel() } @@ -915,7 +953,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // Reserve a chunk of hashes for a peer. A nil can mean either that // no more hashes are available, or that the peer is known not to // have them. - request := d.queue.ReserveBlocks(peer, peer.BlockCapacity()) + request := d.queue.ReserveBlocks(peer, peer.BlockCapacity(blockTargetRTT)) if request == nil { continue } @@ -956,7 +994,7 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { // Request the advertised remote head block and wait for the response go p.getRelHeaders(p.head, 1, 0, false) - timeout := time.After(headerTTL) + timeout := time.After(d.requestTTL()) for { select { case <-d.cancelCh: @@ -1024,7 +1062,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // Wait for the remote response to the head fetch number, hash := uint64(0), common.Hash{} - timeout := time.After(hashTTL) + timeout := time.After(d.requestTTL()) for finished := false; !finished; { select { @@ -1101,7 +1139,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // Split our chain interval in two, and request the hash to cross check check := (start + end) / 2 - timeout := time.After(hashTTL) + timeout := time.After(d.requestTTL()) go p.getAbsHeaders(uint64(check), 1, 0, false) // Wait until a reply arrives to this request @@ -1182,7 +1220,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { getHeaders := func(from uint64) { request = time.Now() - timeout.Reset(headerTTL) + timeout.Reset(d.requestTTL()) if skeleton { glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from) @@ -1218,8 +1256,12 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { // If no more headers are inbound, notify the content fetchers and return if packet.Items() == 0 { glog.V(logger.Debug).Infof("%v: no available headers", p) - d.headerProcCh <- nil - return nil + select { + case d.headerProcCh <- nil: + return nil + case <-d.cancelCh: + return errCancelHeaderFetch + } } headers := packet.(*headerPack).headers @@ -1290,13 +1332,13 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( pack := packet.(*headerPack) return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh) } - expire = func() map[string]int { return d.queue.ExpireHeaders(headerTTL) } + expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) } throttle = func() bool { return false } reserve = func(p *peer, count int) (*fetchRequest, bool, error) { return d.queue.ReserveHeaders(p, count), false, nil } fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } - capacity = func(p *peer) int { return p.HeaderCapacity() } + capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) } setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) } ) err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire, @@ -1320,9 +1362,9 @@ func (d *Downloader) fetchBodies(from uint64) error { pack := packet.(*bodyPack) return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles) } - expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) } + expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } - capacity = func(p *peer) int { return p.BlockCapacity() } + capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) } setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) } ) err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, @@ -1344,9 +1386,9 @@ func (d *Downloader) fetchReceipts(from uint64) error { pack := packet.(*receiptPack) return d.queue.DeliverReceipts(pack.peerId, pack.receipts) } - expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) } + expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) } - capacity = func(p *peer) int { return p.ReceiptCapacity() } + capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) } setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) } ) err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, @@ -1396,13 +1438,13 @@ func (d *Downloader) fetchNodeData() error { } }) } - expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) } + expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) } throttle = func() bool { return false } reserve = func(p *peer, count int) (*fetchRequest, bool, error) { return d.queue.ReserveNodeData(p, count), false, nil } fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } - capacity = func(p *peer) int { return p.NodeDataCapacity() } + capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) } setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) } ) err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire, @@ -1611,9 +1653,18 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)", len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number()) - // If we're already past the pivot point, this could be an attack, disable fast sync + // If we're already past the pivot point, this could be an attack, thread carefully if rollback[len(rollback)-1].Number.Uint64() > pivot { - d.noFast = true + // If we didn't ever fail, lock in te pivot header (must! not! change!) + if d.fsPivotFails == 0 { + for _, header := range rollback { + if header.Number.Uint64() == pivot { + glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4]) + d.fsPivotLock = header + } + } + } + d.fsPivotFails++ } } }() @@ -1712,6 +1763,13 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) } } + // If we're fast syncing and just pulled in the pivot, make sure it's the one locked in + if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot { + if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() { + glog.V(logger.Warn).Infof("Pivot doesn't match locked in version: have #%v [%x…], want #%v [%x…]", pivot.Number, pivot.Hash().Bytes()[:4], d.fsPivotLock.Number, d.fsPivotLock.Hash().Bytes()[:4]) + return errInvalidChain + } + } // Unless we're doing light chains, schedule the headers for associated content retrieval if d.mode == FullSync || d.mode == FastSync { // If we've reached the allowed number of pending headers, stall a bit @@ -1762,8 +1820,10 @@ func (d *Downloader) processContent() error { } for len(results) != 0 { // Check for any termination requests - if atomic.LoadInt32(&d.interrupt) == 1 { + select { + case <-d.quitCh: return errCancelContentProcessing + default: } // Retrieve the a batch of results to import var ( @@ -1864,3 +1924,74 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i return errNoSyncActive } } + +// qosTuner is the quality of service tuning loop that occasionally gathers the +// peer latency statistics and updates the estimated request round trip time. +func (d *Downloader) qosTuner() { + for { + // Retrieve the current median RTT and integrate into the previoust target RTT + rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT())) + atomic.StoreUint64(&d.rttEstimate, uint64(rtt)) + + // A new RTT cycle passed, increase our confidence in the estimated RTT + conf := atomic.LoadUint64(&d.rttConfidence) + conf = conf + (1000000-conf)/2 + atomic.StoreUint64(&d.rttConfidence, conf) + + // Log the new QoS values and sleep until the next RTT + glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL()) + select { + case <-d.quitCh: + return + case <-time.After(rtt): + } + } +} + +// qosReduceConfidence is meant to be called when a new peer joins the downloader's +// peer set, needing to reduce the confidence we have in out QoS estimates. +func (d *Downloader) qosReduceConfidence() { + // If we have a single peer, confidence is always 1 + peers := uint64(d.peers.Len()) + if peers == 1 { + atomic.StoreUint64(&d.rttConfidence, 1000000) + return + } + // If we have a ton of peers, don't drop confidence) + if peers >= uint64(qosConfidenceCap) { + return + } + // Otherwise drop the confidence factor + conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers + if float64(conf)/1000000 < rttMinConfidence { + conf = uint64(rttMinConfidence * 1000000) + } + atomic.StoreUint64(&d.rttConfidence, conf) + + rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate)) + glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL()) +} + +// requestRTT returns the current target round trip time for a download request +// to complete in. +// +// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that +// the downloader tries to adapt queries to the RTT, so multiple RTT values can +// be adapted to, but smaller ones are preffered (stabler download stream). +func (d *Downloader) requestRTT() time.Duration { + return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10 +} + +// requestTTL returns the current timeout allowance for a single download request +// to finish under. +func (d *Downloader) requestTTL() time.Duration { + var ( + rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate)) + conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0 + ) + ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf) + if ttl > ttlLimit { + ttl = ttlLimit + } + return ttl +} diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 1cf0e7cd3..a9c069a92 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -149,22 +149,25 @@ type downloadTester struct { peerReceipts map[string]map[common.Hash]types.Receipts // Receipts belonging to different test peers peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains + peerMissingStates map[string]map[common.Hash]bool // State entries that fast sync should not return + lock sync.RWMutex } // newTester creates a new downloader test mocker. func newTester() *downloadTester { tester := &downloadTester{ - ownHashes: []common.Hash{genesis.Hash()}, - ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()}, - ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis}, - ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): nil}, - ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()}, - peerHashes: make(map[string][]common.Hash), - peerHeaders: make(map[string]map[common.Hash]*types.Header), - peerBlocks: make(map[string]map[common.Hash]*types.Block), - peerReceipts: make(map[string]map[common.Hash]types.Receipts), - peerChainTds: make(map[string]map[common.Hash]*big.Int), + ownHashes: []common.Hash{genesis.Hash()}, + ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()}, + ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis}, + ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): nil}, + ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()}, + peerHashes: make(map[string][]common.Hash), + peerHeaders: make(map[string]map[common.Hash]*types.Header), + peerBlocks: make(map[string]map[common.Hash]*types.Block), + peerReceipts: make(map[string]map[common.Hash]types.Receipts), + peerChainTds: make(map[string]map[common.Hash]*big.Int), + peerMissingStates: make(map[string]map[common.Hash]bool), } tester.stateDb, _ = ethdb.NewMemDatabase() tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00}) @@ -176,6 +179,12 @@ func newTester() *downloadTester { return tester } +// terminate aborts any operations on the embedded downloader and releases all +// held resources. +func (dl *downloadTester) terminate() { + dl.downloader.Terminate() +} + // sync starts synchronizing with a remote peer, blocking until it completes. func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error { dl.lock.RLock() @@ -408,6 +417,7 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha dl.peerBlocks[id] = make(map[common.Hash]*types.Block) dl.peerReceipts[id] = make(map[common.Hash]types.Receipts) dl.peerChainTds[id] = make(map[common.Hash]*big.Int) + dl.peerMissingStates[id] = make(map[common.Hash]bool) genesis := hashes[len(hashes)-1] if header := headers[genesis]; header != nil { @@ -648,7 +658,9 @@ func (dl *downloadTester) peerGetNodeDataFn(id string, delay time.Duration) func results := make([][]byte, 0, len(hashes)) for _, hash := range hashes { if data, err := testdb.Get(hash.Bytes()); err == nil { - results = append(results, data) + if !dl.peerMissingStates[id][hash] { + results = append(results, data) + } } } go dl.downloader.DeliverNodeData(id, results) @@ -734,6 +746,8 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) { hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) tester := newTester() + defer tester.terminate() + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) // Synchronise with the peer and make sure all relevant data was retrieved @@ -758,6 +772,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) tester := newTester() + defer tester.terminate() + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) // Wrap the importer to allow stepping @@ -845,6 +861,8 @@ func testForkedSync(t *testing.T, protocol int, mode SyncMode) { hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true) tester := newTester() + defer tester.terminate() + tester.newPeer("fork A", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("fork B", protocol, hashesB, headersB, blocksB, receiptsB) @@ -879,6 +897,8 @@ func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) { hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, false) tester := newTester() + defer tester.terminate() + tester.newPeer("light", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("heavy", protocol, hashesB[fork/2:], headersB, blocksB, receiptsB) @@ -928,6 +948,8 @@ func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) { hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true) tester := newTester() + defer tester.terminate() + tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("rewriter", protocol, hashesB, headersB, blocksB, receiptsB) @@ -962,6 +984,8 @@ func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) { hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, false) tester := newTester() + defer tester.terminate() + tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("heavy-rewriter", protocol, hashesB[MaxForkAncestry-17:], headersB, blocksB, receiptsB) // Root the fork below the ancestor limit @@ -981,7 +1005,9 @@ func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) { // bodies. func TestInactiveDownloader62(t *testing.T) { t.Parallel() + tester := newTester() + defer tester.terminate() // Check that neither block headers nor bodies are accepted if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive { @@ -996,7 +1022,9 @@ func TestInactiveDownloader62(t *testing.T) { // bodies and receipts. func TestInactiveDownloader63(t *testing.T) { t.Parallel() + tester := newTester() + defer tester.terminate() // Check that neither block headers nor bodies are accepted if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive { @@ -1033,6 +1061,8 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) { hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) tester := newTester() + defer tester.terminate() + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) // Make sure canceling works with a pristine downloader @@ -1068,6 +1098,8 @@ func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) { hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) tester := newTester() + defer tester.terminate() + for i := 0; i < targetPeers; i++ { id := fmt.Sprintf("peer #%d", i) tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], headers, blocks, receipts) @@ -1097,6 +1129,8 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) { // Create peers of every type tester := newTester() + defer tester.terminate() + tester.newPeer("peer 61", 61, hashes, nil, blocks, nil) tester.newPeer("peer 62", 62, hashes, headers, blocks, nil) tester.newPeer("peer 63", 63, hashes, headers, blocks, receipts) @@ -1134,6 +1168,8 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) tester := newTester() + defer tester.terminate() + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) // Instrument the downloader to signal body requests @@ -1187,6 +1223,7 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) { hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) tester := newTester() + defer tester.terminate() // Attempt a full sync with an attacker feeding gapped headers tester.newPeer("attack", protocol, hashes, headers, blocks, receipts) @@ -1219,6 +1256,7 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) { hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) tester := newTester() + defer tester.terminate() // Attempt a full sync with an attacker feeding shifted headers tester.newPeer("attack", protocol, hashes, headers, blocks, receipts) @@ -1250,6 +1288,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) tester := newTester() + defer tester.terminate() // Attempt to sync with an attacker that feeds junk during the fast sync phase. // This should result in the last fsHeaderSafetyNet headers being rolled back. @@ -1288,7 +1327,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { tester.newPeer("withhold-attack", protocol, hashes, headers, blocks, receipts) missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1 - tester.downloader.noFast = false + tester.downloader.fsPivotFails = 0 tester.downloader.syncInitHook = func(uint64, uint64) { for i := missing; i <= len(hashes); i++ { delete(tester.peerHeaders["withhold-attack"], hashes[len(hashes)-i]) @@ -1307,6 +1346,8 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { t.Errorf("fast sync pivot block #%d not rolled back", head) } } + tester.downloader.fsPivotFails = fsCriticalTrials + // Synchronise with the valid peer and make sure sync succeeds. Since the last // rollback should also disable fast syncing for this process, verify that we // did a fresh full sync. Note, we can't assert anything about the receipts @@ -1339,9 +1380,11 @@ func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) { t.Parallel() tester := newTester() - hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil, false) + defer tester.terminate() + hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil, false) tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, headers, blocks, receipts) + if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } @@ -1384,6 +1427,8 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { } // Run the tests and check disconnection status tester := newTester() + defer tester.terminate() + for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) @@ -1425,6 +1470,8 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { progress := make(chan struct{}) tester := newTester() + defer tester.terminate() + tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress @@ -1497,6 +1544,8 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { progress := make(chan struct{}) tester := newTester() + defer tester.terminate() + tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress @@ -1572,6 +1621,8 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { progress := make(chan struct{}) tester := newTester() + defer tester.terminate() + tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress @@ -1648,6 +1699,8 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { progress := make(chan struct{}) tester := newTester() + defer tester.terminate() + tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress @@ -1734,7 +1787,7 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { impl := tester.peerGetAbsHeadersFn("peer", 0) go impl(from, count, skip, reverse) // None of the extra deliveries should block. - timeout := time.After(5 * time.Second) + timeout := time.After(15 * time.Second) for i := 0; i < cap(deliveriesDone); i++ { select { case <-deliveriesDone: @@ -1747,5 +1800,48 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { if err := tester.sync("peer", nil, mode); err != nil { t.Errorf("sync failed: %v", err) } + tester.terminate() } } + +// Tests that if fast sync aborts in the critical section, it can restart a few +// times before giving up. +func TestFastCriticalRestarts63(t *testing.T) { testFastCriticalRestarts(t, 63) } +func TestFastCriticalRestarts64(t *testing.T) { testFastCriticalRestarts(t, 64) } + +func testFastCriticalRestarts(t *testing.T, protocol int) { + t.Parallel() + + // Create a large enough blockchin to actually fast sync on + targetBlocks := fsMinFullBlocks + 2*fsPivotInterval - 15 + hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) + + // Create a tester peer with the critical section state roots missing (force failures) + tester := newTester() + defer tester.terminate() + + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) + for i := 0; i < fsPivotInterval; i++ { + tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true + } + // Synchronise with the peer a few times and make sure they fail until the retry limit + for i := 0; i < fsCriticalTrials; i++ { + // Attempt a sync and ensure it fails properly + if err := tester.sync("peer", nil, FastSync); err == nil { + t.Fatalf("failing fast sync succeeded: %v", err) + } + time.Sleep(500 * time.Millisecond) // Make sure no in-flight requests remain + + // If it's the first failure, pivot should be locked => reenable all others to detect pivot changes + if i == 0 { + tester.lock.Lock() + tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true} + tester.lock.Unlock() + } + } + // Retry limit exhausted, downloader will switch to full sync, should succeed + if err := tester.sync("peer", nil, FastSync); err != nil { + t.Fatalf("failed to synchronise blocks in slow sync: %v", err) + } + assertOwnChain(t, tester, targetBlocks+1) +} diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 6aab907d7..94d44fca4 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -23,6 +23,8 @@ import ( "errors" "fmt" "math" + "sort" + "strings" "sync" "sync/atomic" "time" @@ -31,8 +33,8 @@ import ( ) const ( - maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items - throughputImpact = 0.1 // The impact a single measurement has on a peer's final throughput value. + maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items + measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value. ) // Hash and block fetchers belonging to eth/61 and below @@ -68,6 +70,8 @@ type peer struct { receiptThroughput float64 // Number of receipts measured to be retrievable per second stateThroughput float64 // Number of node data pieces measured to be retrievable per second + rtt time.Duration // Request round trip time to track responsiveness (QoS) + headerStarted time.Time // Time instance when the last header fetch was started blockStarted time.Time // Time instance when the last block (body) fetch was started receiptStarted time.Time // Time instance when the last receipt fetch was started @@ -290,44 +294,47 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id return } // Otherwise update the throughput with a new measurement - measured := float64(delivered) / (float64(time.Since(started)+1) / float64(time.Second)) // +1 (ns) to ensure non-zero divisor - *throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured + elapsed := time.Since(started) + 1 // +1 (ns) to ensure non-zero divisor + measured := float64(delivered) / (float64(elapsed) / float64(time.Second)) + + *throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured + p.rtt = time.Duration((1-measurementImpact)*float64(p.rtt) + measurementImpact*float64(elapsed)) } // HeaderCapacity retrieves the peers header download allowance based on its // previously discovered throughput. -func (p *peer) HeaderCapacity() int { +func (p *peer) HeaderCapacity(targetRTT time.Duration) int { p.lock.RLock() defer p.lock.RUnlock() - return int(math.Max(1, math.Min(p.headerThroughput*float64(headerTargetRTT)/float64(time.Second), float64(MaxHeaderFetch)))) + return int(math.Min(1+math.Max(1, p.headerThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxHeaderFetch))) } // BlockCapacity retrieves the peers block download allowance based on its // previously discovered throughput. -func (p *peer) BlockCapacity() int { +func (p *peer) BlockCapacity(targetRTT time.Duration) int { p.lock.RLock() defer p.lock.RUnlock() - return int(math.Max(1, math.Min(p.blockThroughput*float64(blockTargetRTT)/float64(time.Second), float64(MaxBlockFetch)))) + return int(math.Min(1+math.Max(1, p.blockThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxBlockFetch))) } // ReceiptCapacity retrieves the peers receipt download allowance based on its // previously discovered throughput. -func (p *peer) ReceiptCapacity() int { +func (p *peer) ReceiptCapacity(targetRTT time.Duration) int { p.lock.RLock() defer p.lock.RUnlock() - return int(math.Max(1, math.Min(p.receiptThroughput*float64(receiptTargetRTT)/float64(time.Second), float64(MaxReceiptFetch)))) + return int(math.Min(1+math.Max(1, p.receiptThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxReceiptFetch))) } // NodeDataCapacity retrieves the peers state download allowance based on its // previously discovered throughput. -func (p *peer) NodeDataCapacity() int { +func (p *peer) NodeDataCapacity(targetRTT time.Duration) int { p.lock.RLock() defer p.lock.RUnlock() - return int(math.Max(1, math.Min(p.stateThroughput*float64(stateTargetRTT)/float64(time.Second), float64(MaxStateFetch)))) + return int(math.Min(1+math.Max(1, p.stateThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxStateFetch))) } // MarkLacking appends a new entity to the set of items (blocks, receipts, states) @@ -361,13 +368,14 @@ func (p *peer) String() string { p.lock.RLock() defer p.lock.RUnlock() - return fmt.Sprintf("Peer %s [%s]", p.id, - fmt.Sprintf("headers %3.2f/s, ", p.headerThroughput)+ - fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+ - fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+ - fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+ - fmt.Sprintf("lacking %4d", len(p.lacking)), - ) + return fmt.Sprintf("Peer %s [%s]", p.id, strings.Join([]string{ + fmt.Sprintf("hs %3.2f/s", p.headerThroughput), + fmt.Sprintf("bs %3.2f/s", p.blockThroughput), + fmt.Sprintf("rs %3.2f/s", p.receiptThroughput), + fmt.Sprintf("ss %3.2f/s", p.stateThroughput), + fmt.Sprintf("miss %4d", len(p.lacking)), + fmt.Sprintf("rtt %v", p.rtt), + }, ", ")) } // peerSet represents the collection of active peer participating in the chain @@ -402,6 +410,10 @@ func (ps *peerSet) Reset() { // average of all existing peers, to give it a realistic chance of being used // for data retrievals. func (ps *peerSet) Register(p *peer) error { + // Retrieve the current median RTT as a sane default + p.rtt = ps.medianRTT() + + // Register the new peer with some meaningful defaults ps.lock.Lock() defer ps.lock.Unlock() @@ -564,3 +576,34 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) } return idle, total } + +// medianRTT returns the median RTT of te peerset, considering only the tuning +// peers if there are more peers available. +func (ps *peerSet) medianRTT() time.Duration { + // Gather all the currnetly measured round trip times + ps.lock.RLock() + defer ps.lock.RUnlock() + + rtts := make([]float64, 0, len(ps.peers)) + for _, p := range ps.peers { + p.lock.RLock() + rtts = append(rtts, float64(p.rtt)) + p.lock.RUnlock() + } + sort.Float64s(rtts) + + median := rttMaxEstimate + if qosTuningPeers <= len(rtts) { + median = time.Duration(rtts[qosTuningPeers/2]) // Median of our tuning peers + } else if len(rtts) > 0 { + median = time.Duration(rtts[len(rtts)/2]) // Median of our connected peers (maintain even like this some baseline qos) + } + // Restrict the RTT into some QoS defaults, irrelevant of true RTT + if median < rttMinEstimate { + median = rttMinEstimate + } + if median > rttMaxEstimate { + median = rttMaxEstimate + } + return median +} diff --git a/eth/handler.go b/eth/handler.go index 58869a2ee..1e4dc1289 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -59,7 +59,9 @@ type blockFetcherFn func([]common.Hash) error type ProtocolManager struct { networkId int - fastSync uint32 + fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks) + synced uint32 // Flag whether we're considered synchronised (enables transaction processing) + txpool txPool blockchain *core.BlockChain chaindb ethdb.Database @@ -161,7 +163,11 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int, heighter := func() uint64 { return blockchain.CurrentBlock().NumberU64() } - manager.fetcher = fetcher.New(blockchain.GetBlock, validator, manager.BroadcastBlock, heighter, manager.insertChain, manager.removePeer) + inserter := func(blocks types.Blocks) (int, error) { + atomic.StoreUint32(&manager.synced, 1) // Mark initial sync done on any fetcher import + return manager.insertChain(blocks) + } + manager.fetcher = fetcher.New(blockchain.GetBlock, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) if blockchain.Genesis().Hash().Hex() == defaultGenesisHash && networkId == 1 { glog.V(logger.Debug).Infoln("Bad Block Reporting is enabled") @@ -698,8 +704,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } case msg.Code == TxMsg: - // Transactions arrived, make sure we have a valid chain to handle them - if atomic.LoadUint32(&pm.fastSync) == 1 { + // Transactions arrived, make sure we have a valid and fresh chain to handle them + if atomic.LoadUint32(&pm.synced) == 0 { break } // Transactions can be processed, parse all of them and deliver to the pool diff --git a/eth/protocol_test.go b/eth/protocol_test.go index 0a82e2e79..f860d0a35 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -97,6 +97,7 @@ func TestRecvTransactions63(t *testing.T) { testRecvTransactions(t, 63) } func testRecvTransactions(t *testing.T, protocol int) { txAdded := make(chan []*types.Transaction) pm := newTestProtocolManagerMust(t, false, 0, nil, txAdded) + pm.synced = 1 // mark synced to accept transactions p, _ := newTestPeer("peer", protocol, pm, true) defer pm.Stop() defer p.close() diff --git a/eth/sync.go b/eth/sync.go index 4b16c1322..52f7e90e7 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -174,6 +174,8 @@ func (pm *ProtocolManager) synchronise(peer *peer) { if err := pm.downloader.Synchronise(peer.id, peer.Head(), peer.Td(), mode); err != nil { return } + atomic.StoreUint32(&pm.synced, 1) // Mark initial sync done + // If fast sync was enabled, and we synced up, disable it if atomic.LoadUint32(&pm.fastSync) == 1 { // Disable fast sync if we indeed have something in our chain diff --git a/internal/jsre/jsre.go b/internal/jsre/jsre.go index a95efd379..481389304 100644 --- a/internal/jsre/jsre.go +++ b/internal/jsre/jsre.go @@ -24,7 +24,6 @@ import ( "io" "io/ioutil" "math/rand" - "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -44,7 +43,7 @@ type JSRE struct { output io.Writer evalQueue chan *evalReq stopEventLoop chan bool - loopWg sync.WaitGroup + closed chan struct{} } // jsTimer is a single timer instance with a callback function @@ -66,10 +65,10 @@ func New(assetPath string, output io.Writer) *JSRE { re := &JSRE{ assetPath: assetPath, output: output, + closed: make(chan struct{}), evalQueue: make(chan *evalReq), stopEventLoop: make(chan bool), } - re.loopWg.Add(1) go re.runEventLoop() re.Set("loadScript", re.loadScript) re.Set("inspect", prettyPrintJS) @@ -98,6 +97,8 @@ func randomSource() *rand.Rand { // functions should be used if and only if running a routine that was already // called from JS through an RPC call. func (self *JSRE) runEventLoop() { + defer close(self.closed) + vm := otto.New() r := randomSource() vm.SetRandomSource(r.Float64) @@ -213,8 +214,6 @@ loop: timer.timer.Stop() delete(registry, timer) } - - self.loopWg.Done() } // Do executes the given function on the JS event loop. @@ -227,8 +226,11 @@ func (self *JSRE) Do(fn func(*otto.Otto)) { // stops the event loop before exit, optionally waits for all timers to expire func (self *JSRE) Stop(waitForCallbacks bool) { - self.stopEventLoop <- waitForCallbacks - self.loopWg.Wait() + select { + case <-self.closed: + case self.stopEventLoop <- waitForCallbacks: + <-self.closed + } } // Exec(file) loads and runs the contents of a file |