aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--accounts/abi/bind/backend.go49
-rw-r--r--accounts/abi/bind/backends/nil.go1
-rw-r--r--accounts/abi/bind/backends/remote.go20
-rw-r--r--accounts/abi/bind/backends/simulated.go10
-rw-r--r--accounts/abi/bind/base.go27
-rw-r--r--core/types/bloom9_test.go4
-rw-r--r--eth/api.go15
-rw-r--r--eth/bind.go18
-rw-r--r--eth/downloader/downloader.go517
-rw-r--r--eth/downloader/downloader_test.go48
-rw-r--r--eth/downloader/peer.go66
-rw-r--r--eth/downloader/queue.go223
-rw-r--r--eth/filters/api.go74
-rw-r--r--eth/filters/api_test.go198
14 files changed, 991 insertions, 279 deletions
diff --git a/accounts/abi/bind/backend.go b/accounts/abi/bind/backend.go
index 604e1ef26..65806aef4 100644
--- a/accounts/abi/bind/backend.go
+++ b/accounts/abi/bind/backend.go
@@ -27,15 +27,16 @@ import (
// ErrNoCode is returned by call and transact operations for which the requested
// recipient contract to operate on does not exist in the state db or does not
// have any code associated with it (i.e. suicided).
-//
-// Please note, this error string is part of the RPC API and is expected by the
-// native contract bindings to signal this particular error. Do not change this
-// as it will break all dependent code!
var ErrNoCode = errors.New("no contract code at given address")
// ContractCaller defines the methods needed to allow operating with contract on a read
// only basis.
type ContractCaller interface {
+ // HasCode checks if the contract at the given address has any code associated
+ // with it or not. This is needed to differentiate between contract internal
+ // errors and the local chain being out of sync.
+ HasCode(contract common.Address, pending bool) (bool, error)
+
// ContractCall executes an Ethereum contract call with the specified data as
// the input. The pending flag requests execution against the pending block, not
// the stable head of the chain.
@@ -55,6 +56,11 @@ type ContractTransactor interface {
// execution of a transaction.
SuggestGasPrice() (*big.Int, error)
+ // HasCode checks if the contract at the given address has any code associated
+ // with it or not. This is needed to differentiate between contract internal
+ // errors and the local chain being out of sync.
+ HasCode(contract common.Address, pending bool) (bool, error)
+
// EstimateGasLimit tries to estimate the gas needed to execute a specific
// transaction based on the current pending state of the backend blockchain.
// There is no guarantee that this is the true gas limit requirement as other
@@ -68,7 +74,38 @@ type ContractTransactor interface {
// ContractBackend defines the methods needed to allow operating with contract
// on a read-write basis.
+//
+// This interface is essentially the union of ContractCaller and ContractTransactor
+// but due to a bug in the Go compiler (https://github.com/golang/go/issues/6977),
+// we cannot simply list it as the two interfaces. The other solution is to add a
+// third interface containing the common methods, but that convolutes the user API
+// as it introduces yet another parameter to require for initialization.
type ContractBackend interface {
- ContractCaller
- ContractTransactor
+ // HasCode checks if the contract at the given address has any code associated
+ // with it or not. This is needed to differentiate between contract internal
+ // errors and the local chain being out of sync.
+ HasCode(contract common.Address, pending bool) (bool, error)
+
+ // ContractCall executes an Ethereum contract call with the specified data as
+ // the input. The pending flag requests execution against the pending block, not
+ // the stable head of the chain.
+ ContractCall(contract common.Address, data []byte, pending bool) ([]byte, error)
+
+ // PendingAccountNonce retrieves the current pending nonce associated with an
+ // account.
+ PendingAccountNonce(account common.Address) (uint64, error)
+
+ // SuggestGasPrice retrieves the currently suggested gas price to allow a timely
+ // execution of a transaction.
+ SuggestGasPrice() (*big.Int, error)
+
+ // EstimateGasLimit tries to estimate the gas needed to execute a specific
+ // transaction based on the current pending state of the backend blockchain.
+ // There is no guarantee that this is the true gas limit requirement as other
+ // transactions may be added or removed by miners, but it should provide a basis
+ // for setting a reasonable default.
+ EstimateGasLimit(sender common.Address, contract *common.Address, value *big.Int, data []byte) (*big.Int, error)
+
+ // SendTransaction injects the transaction into the pending pool for execution.
+ SendTransaction(tx *types.Transaction) error
}
diff --git a/accounts/abi/bind/backends/nil.go b/accounts/abi/bind/backends/nil.go
index 3b1e6dce7..f10bb61ac 100644
--- a/accounts/abi/bind/backends/nil.go
+++ b/accounts/abi/bind/backends/nil.go
@@ -38,6 +38,7 @@ func (*nilBackend) ContractCall(common.Address, []byte, bool) ([]byte, error) {
func (*nilBackend) EstimateGasLimit(common.Address, *common.Address, *big.Int, []byte) (*big.Int, error) {
panic("not implemented")
}
+func (*nilBackend) HasCode(common.Address, bool) (bool, error) { panic("not implemented") }
func (*nilBackend) SuggestGasPrice() (*big.Int, error) { panic("not implemented") }
func (*nilBackend) PendingAccountNonce(common.Address) (uint64, error) { panic("not implemented") }
func (*nilBackend) SendTransaction(*types.Transaction) error { panic("not implemented") }
diff --git a/accounts/abi/bind/backends/remote.go b/accounts/abi/bind/backends/remote.go
index 9b3647192..d903cbc8f 100644
--- a/accounts/abi/bind/backends/remote.go
+++ b/accounts/abi/bind/backends/remote.go
@@ -111,6 +111,26 @@ func (b *rpcBackend) request(method string, params []interface{}) (json.RawMessa
return res.Result, nil
}
+// HasCode implements ContractVerifier.HasCode by retrieving any code associated
+// with the contract from the remote node, and checking its size.
+func (b *rpcBackend) HasCode(contract common.Address, pending bool) (bool, error) {
+ // Execute the RPC code retrieval
+ block := "latest"
+ if pending {
+ block = "pending"
+ }
+ res, err := b.request("eth_getCode", []interface{}{contract.Hex(), block})
+ if err != nil {
+ return false, err
+ }
+ var hex string
+ if err := json.Unmarshal(res, &hex); err != nil {
+ return false, err
+ }
+ // Convert the response back to a Go byte slice and return
+ return len(common.FromHex(hex)) > 0, nil
+}
+
// ContractCall implements ContractCaller.ContractCall, delegating the execution of
// a contract call to the remote node, returning the reply to for local processing.
func (b *rpcBackend) ContractCall(contract common.Address, data []byte, pending bool) ([]byte, error) {
diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go
index 4866c4f58..54b1ce603 100644
--- a/accounts/abi/bind/backends/simulated.go
+++ b/accounts/abi/bind/backends/simulated.go
@@ -78,6 +78,16 @@ func (b *SimulatedBackend) Rollback() {
b.pendingState, _ = state.New(b.pendingBlock.Root(), b.database)
}
+// HasCode implements ContractVerifier.HasCode, checking whether there is any
+// code associated with a certain account in the blockchain.
+func (b *SimulatedBackend) HasCode(contract common.Address, pending bool) (bool, error) {
+ if pending {
+ return len(b.pendingState.GetCode(contract)) > 0, nil
+ }
+ statedb, _ := b.blockchain.State()
+ return len(statedb.GetCode(contract)) > 0, nil
+}
+
// ContractCall implements ContractCaller.ContractCall, executing the specified
// contract with the given input data.
func (b *SimulatedBackend) ContractCall(contract common.Address, data []byte, pending bool) ([]byte, error) {
diff --git a/accounts/abi/bind/base.go b/accounts/abi/bind/base.go
index 06621c5ad..75e8d5bc8 100644
--- a/accounts/abi/bind/base.go
+++ b/accounts/abi/bind/base.go
@@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"math/big"
+ "sync/atomic"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
@@ -56,6 +57,9 @@ type BoundContract struct {
abi abi.ABI // Reflect based ABI to access the correct Ethereum methods
caller ContractCaller // Read interface to interact with the blockchain
transactor ContractTransactor // Write interface to interact with the blockchain
+
+ latestHasCode uint32 // Cached verification that the latest state contains code for this contract
+ pendingHasCode uint32 // Cached verification that the pending state contains code for this contract
}
// NewBoundContract creates a low level contract interface through which calls
@@ -96,6 +100,19 @@ func (c *BoundContract) Call(opts *CallOpts, result interface{}, method string,
if opts == nil {
opts = new(CallOpts)
}
+ // Make sure we have a contract to operate on, and bail out otherwise
+ if (opts.Pending && atomic.LoadUint32(&c.pendingHasCode) == 0) || (!opts.Pending && atomic.LoadUint32(&c.latestHasCode) == 0) {
+ if code, err := c.caller.HasCode(c.address, opts.Pending); err != nil {
+ return err
+ } else if !code {
+ return ErrNoCode
+ }
+ if opts.Pending {
+ atomic.StoreUint32(&c.pendingHasCode, 1)
+ } else {
+ atomic.StoreUint32(&c.latestHasCode, 1)
+ }
+ }
// Pack the input, call and unpack the results
input, err := c.abi.Pack(method, params...)
if err != nil {
@@ -153,6 +170,16 @@ func (c *BoundContract) transact(opts *TransactOpts, contract *common.Address, i
}
gasLimit := opts.GasLimit
if gasLimit == nil {
+ // Gas estimation cannot succeed without code for method invocations
+ if contract != nil && atomic.LoadUint32(&c.pendingHasCode) == 0 {
+ if code, err := c.transactor.HasCode(c.address, true); err != nil {
+ return nil, err
+ } else if !code {
+ return nil, ErrNoCode
+ }
+ atomic.StoreUint32(&c.pendingHasCode, 1)
+ }
+ // If the contract surely has code (or code is not needed), estimate the transaction
gasLimit, err = c.transactor.EstimateGasLimit(opts.From, contract, value, input)
if err != nil {
return nil, fmt.Errorf("failed to exstimate gas needed: %v", err)
diff --git a/core/types/bloom9_test.go b/core/types/bloom9_test.go
index 58e8f7073..a28ac0e7a 100644
--- a/core/types/bloom9_test.go
+++ b/core/types/bloom9_test.go
@@ -39,12 +39,12 @@ func TestBloom(t *testing.T) {
}
for _, data := range positive {
- if !bloom.Test(new(big.Int).SetBytes([]byte(data))) {
+ if !bloom.TestBytes([]byte(data)) {
t.Error("expected", data, "to test true")
}
}
for _, data := range negative {
- if bloom.Test(new(big.Int).SetBytes([]byte(data))) {
+ if bloom.TestBytes([]byte(data)) {
t.Error("did not expect", data, "to test true")
}
}
diff --git a/eth/api.go b/eth/api.go
index 06f4d5de0..d048904f3 100644
--- a/eth/api.go
+++ b/eth/api.go
@@ -52,15 +52,6 @@ import (
"golang.org/x/net/context"
)
-// errNoCode is returned by call and transact operations for which the requested
-// recipient contract to operate on does not exist in the state db or does not
-// have any code associated with it (i.e. suicided).
-//
-// Please note, this error string is part of the RPC API and is expected by the
-// native contract bindings to signal this particular error. Do not change this
-// as it will break all dependent code!
-var errNoCode = errors.New("no contract code at given address")
-
const defaultGas = uint64(90000)
// blockByNumber is a commonly used helper function which retrieves and returns
@@ -755,12 +746,6 @@ func (s *PublicBlockChainAPI) doCall(args CallArgs, blockNr rpc.BlockNumber) (st
}
stateDb = stateDb.Copy()
- // If there's no code to interact with, respond with an appropriate error
- if args.To != nil {
- if code := stateDb.GetCode(*args.To); len(code) == 0 {
- return "0x", nil, errNoCode
- }
- }
// Retrieve the account state object to interact with
var from *state.StateObject
if args.From == (common.Address{}) {
diff --git a/eth/bind.go b/eth/bind.go
index 3a3eca062..fb7f29f60 100644
--- a/eth/bind.go
+++ b/eth/bind.go
@@ -19,7 +19,6 @@ package eth
import (
"math/big"
- "github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
@@ -49,6 +48,17 @@ func NewContractBackend(eth *Ethereum) *ContractBackend {
}
}
+// HasCode implements bind.ContractVerifier.HasCode by retrieving any code associated
+// with the contract from the local API, and checking its size.
+func (b *ContractBackend) HasCode(contract common.Address, pending bool) (bool, error) {
+ block := rpc.LatestBlockNumber
+ if pending {
+ block = rpc.PendingBlockNumber
+ }
+ out, err := b.bcapi.GetCode(contract, block)
+ return len(common.FromHex(out)) > 0, err
+}
+
// ContractCall implements bind.ContractCaller executing an Ethereum contract
// call with the specified data as the input. The pending flag requests execution
// against the pending block, not the stable head of the chain.
@@ -64,9 +74,6 @@ func (b *ContractBackend) ContractCall(contract common.Address, data []byte, pen
}
// Execute the call and convert the output back to Go types
out, err := b.bcapi.Call(args, block)
- if err == errNoCode {
- err = bind.ErrNoCode
- }
return common.FromHex(out), err
}
@@ -95,9 +102,6 @@ func (b *ContractBackend) EstimateGasLimit(sender common.Address, contract *comm
Value: *rpc.NewHexNumber(value),
Data: common.ToHex(data),
})
- if err == errNoCode {
- err = bind.ErrNoCode
- }
return out.BigInt(), err
}
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 0f76357cb..74bff2b66 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -42,6 +42,7 @@ var (
MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
+ MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Amount of node state values to allow fetching per request
@@ -52,6 +53,7 @@ var (
blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
+ headerTargetRTT = time.Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now)
headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out
bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request
bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
@@ -60,9 +62,10 @@ var (
stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request
stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired
- maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
- maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
- maxResultsProcess = 256 // Number of download results to import at once into the chain
+ maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
+ maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
+ maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
+ maxResultsProcess = 2048 // Number of content download results to import at once into the chain
fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
@@ -72,29 +75,30 @@ var (
)
var (
- errBusy = errors.New("busy")
- errUnknownPeer = errors.New("peer is unknown or unhealthy")
- errBadPeer = errors.New("action from bad peer ignored")
- errStallingPeer = errors.New("peer is stalling")
- errNoPeers = errors.New("no peers to keep download active")
- errTimeout = errors.New("timeout")
- errEmptyHashSet = errors.New("empty hash set by peer")
- errEmptyHeaderSet = errors.New("empty header set by peer")
- errPeersUnavailable = errors.New("no peers available or all tried for download")
- errAlreadyInPool = errors.New("hash already in pool")
- errInvalidAncestor = errors.New("retrieved ancestor is invalid")
- errInvalidChain = errors.New("retrieved hash chain is invalid")
- errInvalidBlock = errors.New("retrieved block is invalid")
- errInvalidBody = errors.New("retrieved block body is invalid")
- errInvalidReceipt = errors.New("retrieved receipt is invalid")
- errCancelHashFetch = errors.New("hash download canceled (requested)")
- errCancelBlockFetch = errors.New("block download canceled (requested)")
- errCancelHeaderFetch = errors.New("block header download canceled (requested)")
- errCancelBodyFetch = errors.New("block body download canceled (requested)")
- errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
- errCancelStateFetch = errors.New("state data download canceled (requested)")
- errCancelProcessing = errors.New("processing canceled (requested)")
- errNoSyncActive = errors.New("no sync active")
+ errBusy = errors.New("busy")
+ errUnknownPeer = errors.New("peer is unknown or unhealthy")
+ errBadPeer = errors.New("action from bad peer ignored")
+ errStallingPeer = errors.New("peer is stalling")
+ errNoPeers = errors.New("no peers to keep download active")
+ errTimeout = errors.New("timeout")
+ errEmptyHashSet = errors.New("empty hash set by peer")
+ errEmptyHeaderSet = errors.New("empty header set by peer")
+ errPeersUnavailable = errors.New("no peers available or all tried for download")
+ errAlreadyInPool = errors.New("hash already in pool")
+ errInvalidAncestor = errors.New("retrieved ancestor is invalid")
+ errInvalidChain = errors.New("retrieved hash chain is invalid")
+ errInvalidBlock = errors.New("retrieved block is invalid")
+ errInvalidBody = errors.New("retrieved block body is invalid")
+ errInvalidReceipt = errors.New("retrieved receipt is invalid")
+ errCancelHashFetch = errors.New("hash download canceled (requested)")
+ errCancelBlockFetch = errors.New("block download canceled (requested)")
+ errCancelHeaderFetch = errors.New("block header download canceled (requested)")
+ errCancelBodyFetch = errors.New("block body download canceled (requested)")
+ errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
+ errCancelStateFetch = errors.New("state data download canceled (requested)")
+ errCancelHeaderProcessing = errors.New("header processing canceled (requested)")
+ errCancelContentProcessing = errors.New("content processing canceled (requested)")
+ errNoSyncActive = errors.New("no sync active")
)
type Downloader struct {
@@ -137,16 +141,17 @@ type Downloader struct {
// Channels
newPeerCh chan *peer
- hashCh chan dataPack // [eth/61] Channel receiving inbound hashes
- blockCh chan dataPack // [eth/61] Channel receiving inbound blocks
- headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
- bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
- receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
- stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
- blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks
- bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
- receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
- stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
+ hashCh chan dataPack // [eth/61] Channel receiving inbound hashes
+ blockCh chan dataPack // [eth/61] Channel receiving inbound blocks
+ headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
+ bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
+ receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
+ stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
+ blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks
+ bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
+ receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
+ stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
+ headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
@@ -194,6 +199,7 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1),
stateWakeCh: make(chan bool, 1),
+ headerProcCh: make(chan []*types.Header, 1),
}
}
@@ -308,6 +314,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
default:
}
}
+ for empty := false; !empty; {
+ select {
+ case <-d.headerProcCh:
+ default:
+ empty = true
+ }
+ }
// Reset any ephemeral sync statistics
d.syncStatsLock.Lock()
d.syncStatsStateTotal = 0
@@ -373,7 +386,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- return d.spawnSync(
+ return d.spawnSync(origin+1,
func() error { return d.fetchHashes61(p, td, origin+1) },
func() error { return d.fetchBlocks61(origin + 1) },
)
@@ -423,11 +436,12 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- return d.spawnSync(
- func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved
- func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
- func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
- func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
+ return d.spawnSync(origin+1,
+ func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
+ func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved
+ func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
+ func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
+ func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
)
default:
@@ -439,11 +453,11 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
-func (d *Downloader) spawnSync(fetchers ...func() error) error {
+func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
var wg sync.WaitGroup
errc := make(chan error, len(fetchers)+1)
wg.Add(len(fetchers) + 1)
- go func() { defer wg.Done(); errc <- d.process() }()
+ go func() { defer wg.Done(); errc <- d.processContent() }()
for _, fn := range fetchers {
fn := fn
go func() { defer wg.Done(); errc <- fn() }()
@@ -702,9 +716,9 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
getHashes := func(from uint64) {
glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from)
- go p.getAbsHashes(from, MaxHashFetch)
request = time.Now()
timeout.Reset(hashTTL)
+ go p.getAbsHashes(from, MaxHashFetch)
}
// Start pulling hashes, until all are exhausted
getHashes(from)
@@ -1050,7 +1064,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
continue
}
// Otherwise check if we already know the header or not
- if (d.mode != LightSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode == LightSync && d.hasHeader(headers[i].Hash())) {
+ if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) {
number, hash = headers[i].Number.Uint64(), headers[i].Hash()
break
}
@@ -1149,55 +1163,39 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
return start, nil
}
-// fetchHeaders keeps retrieving headers from the requested number, until no more
-// are returned, potentially throttling on the way.
-//
-// The queue parameter can be used to switch between queuing headers for block
-// body download too, or directly import as pure header chains.
-func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
- glog.V(logger.Debug).Infof("%v: downloading headers from #%d", p, from)
+// fetchHeaders keeps retrieving headers concurrently from the number
+// requested, until no more are returned, potentially throttling on the way. To
+// facilitate concurrency but still protect against malicious nodes sending bad
+// headers, we construct a header chain skeleton using the "origin" peer we are
+// syncing with, and fill in the missing headers using anyone else. Headers from
+// other peers are only accepted if they map cleanly to the skeleton. If no one
+// can fill in the skeleton - not even the origin peer - it's assumed invalid and
+// the origin is dropped.
+func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
+ glog.V(logger.Debug).Infof("%v: directing header downloads from #%d", p, from)
defer glog.V(logger.Debug).Infof("%v: header download terminated", p)
- // Calculate the pivoting point for switching from fast to slow sync
- pivot := d.queue.FastSyncPivot()
-
- // Keep a count of uncertain headers to roll back
- rollback := []*types.Header{}
- defer func() {
- if len(rollback) > 0 {
- // Flatten the headers and roll them back
- hashes := make([]common.Hash, len(rollback))
- for i, header := range rollback {
- hashes[i] = header.Hash()
- }
- lh, lfb, lb := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number()
- d.rollback(hashes)
- glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
- len(hashes), lh, d.headHeader().Number, lfb, d.headFastBlock().Number(), lb, d.headBlock().Number())
-
- // If we're already past the pivot point, this could be an attack, disable fast sync
- if rollback[len(rollback)-1].Number.Uint64() > pivot {
- d.noFast = true
- }
- }
- }()
-
- // Create a timeout timer, and the associated hash fetcher
- request := time.Now() // time of the last fetch request
+ // Create a timeout timer, and the associated header fetcher
+ skeleton := true // Skeleton assembly phase or finishing up
+ request := time.Now() // time of the last skeleton fetch request
timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
<-timeout.C // timeout channel should be initially empty
defer timeout.Stop()
getHeaders := func(from uint64) {
- glog.V(logger.Detail).Infof("%v: fetching %d headers from #%d", p, MaxHeaderFetch, from)
-
- go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
request = time.Now()
timeout.Reset(headerTTL)
+
+ if skeleton {
+ glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from)
+ go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
+ } else {
+ glog.V(logger.Detail).Infof("%v: fetching %d full headers from #%d", p, MaxHeaderFetch, from)
+ go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
+ }
}
- // Start pulling headers, until all are exhausted
+ // Start pulling the header chain skeleton until all is done
getHeaders(from)
- gotHeaders := false
for {
select {
@@ -1205,116 +1203,48 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
return errCancelHeaderFetch
case packet := <-d.headerCh:
- // Make sure the active peer is giving us the headers
+ // Make sure the active peer is giving us the skeleton headers
if packet.PeerId() != p.id {
- glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", packet.PeerId())
+ glog.V(logger.Debug).Infof("Received skeleton headers from incorrect peer (%s)", packet.PeerId())
break
}
headerReqTimer.UpdateSince(request)
timeout.Stop()
+ // If the skeleton's finished, pull any remaining head headers directly from the origin
+ if packet.Items() == 0 && skeleton {
+ skeleton = false
+ getHeaders(from)
+ continue
+ }
// If no more headers are inbound, notify the content fetchers and return
if packet.Items() == 0 {
glog.V(logger.Debug).Infof("%v: no available headers", p)
-
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
- select {
- case ch <- false:
- case <-d.cancelCh:
- }
- }
- // If no headers were retrieved at all, the peer violated it's TD promise that it had a
- // better chain compared to ours. The only exception is if it's promised blocks were
- // already imported by other means (e.g. fetcher):
- //
- // R <remote peer>, L <local node>: Both at block 10
- // R: Mine block 11, and propagate it to L
- // L: Queue block 11 for import
- // L: Notice that R's head and TD increased compared to ours, start sync
- // L: Import of block 11 finishes
- // L: Sync begins, and finds common ancestor at 11
- // L: Request new headers up from 11 (R's TD was higher, it must have something)
- // R: Nothing to give
- if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
- return errStallingPeer
- }
- // If fast or light syncing, ensure promised headers are indeed delivered. This is
- // needed to detect scenarios where an attacker feeds a bad pivot and then bails out
- // of delivering the post-pivot blocks that would flag the invalid content.
- //
- // This check cannot be executed "as is" for full imports, since blocks may still be
- // queued for processing when the header download completes. However, as long as the
- // peer gave us something useful, we're already happy/progressed (above check).
- if d.mode == FastSync || d.mode == LightSync {
- if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 {
- return errStallingPeer
- }
- }
- rollback = nil
+ d.headerProcCh <- nil
return nil
}
- gotHeaders = true
headers := packet.(*headerPack).headers
- // Otherwise insert all the new headers, aborting in case of junk
- glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
-
- if d.mode == FastSync || d.mode == LightSync {
- // Collect the yet unknown headers to mark them as uncertain
- unknown := make([]*types.Header, 0, len(headers))
- for _, header := range headers {
- if !d.hasHeader(header.Hash()) {
- unknown = append(unknown, header)
- }
- }
- // If we're importing pure headers, verify based on their recentness
- frequency := fsHeaderCheckFrequency
- if headers[len(headers)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
- frequency = 1
- }
- if n, err := d.insertHeaders(headers, frequency); err != nil {
- // If some headers were inserted, add them too to the rollback list
- if n > 0 {
- rollback = append(rollback, headers[:n]...)
- }
- glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headers[n].Number, headers[n].Hash().Bytes()[:4], err)
+ // If we received a skeleton batch, resolve internals concurrently
+ if skeleton {
+ filled, proced, err := d.fillHeaderSkeleton(from, headers)
+ if err != nil {
+ glog.V(logger.Debug).Infof("%v: skeleton chain invalid: %v", p, err)
return errInvalidChain
}
- // All verifications passed, store newly found uncertain headers
- rollback = append(rollback, unknown...)
- if len(rollback) > fsHeaderSafetyNet {
- rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
- }
- }
- if d.mode == FullSync || d.mode == FastSync {
- inserts := d.queue.Schedule(headers, from)
- if len(inserts) != len(headers) {
- glog.V(logger.Debug).Infof("%v: stale headers", p)
- return errBadPeer
- }
+ headers = filled[proced:]
+ from += uint64(proced)
}
- // Notify the content fetchers of new headers, but stop if queue is full
- cont := d.queue.PendingBlocks() < maxQueuedHeaders && d.queue.PendingReceipts() < maxQueuedHeaders
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
- if cont {
- // We still have headers to fetch, send continuation wake signal (potential)
- select {
- case ch <- true:
- default:
- }
- } else {
- // Header limit reached, send a termination wake signal (enforced)
- select {
- case ch <- false:
- case <-d.cancelCh:
- }
+ // Insert all the new headers and fetch the next batch
+ if len(headers) > 0 {
+ glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
+ select {
+ case d.headerProcCh <- headers:
+ case <-d.cancelCh:
+ return errCancelHeaderFetch
}
+ from += uint64(len(headers))
}
- if !cont {
- return nil
- }
- // Queue not yet full, fetch the next batch
- from += uint64(len(headers))
getHeaders(from)
case <-timeout.C:
@@ -1330,7 +1260,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
case <-d.cancelCh:
}
}
- return nil
+ select {
+ case d.headerProcCh <- nil:
+ case <-d.cancelCh:
+ }
+ return errBadPeer
case <-d.hashCh:
case <-d.blockCh:
@@ -1340,6 +1274,43 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
}
}
+// fillHeaderSkeleton concurrently retrieves headers from all our available peers
+// and maps them to the provided skeleton header chain.
+//
+// Any partial results from the beginning of the skeleton is (if possible) forwarded
+// immediately to the header processor to keep the rest of the pipeline full even
+// in the case of header stalls.
+//
+// The method returs the entire filled skeleton and also the number of headers
+// already forwarded for processing.
+func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
+ glog.V(logger.Debug).Infof("Filling up skeleton from #%d", from)
+ d.queue.ScheduleSkeleton(from, skeleton)
+
+ var (
+ deliver = func(packet dataPack) (int, error) {
+ pack := packet.(*headerPack)
+ return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh)
+ }
+ expire = func() map[string]int { return d.queue.ExpireHeaders(headerTTL) }
+ throttle = func() bool { return false }
+ reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
+ return d.queue.ReserveHeaders(p, count), false, nil
+ }
+ fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
+ capacity = func(p *peer) int { return p.HeaderCapacity() }
+ setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) }
+ )
+ err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
+ d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
+ nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "Header")
+
+ glog.V(logger.Debug).Infof("Skeleton fill terminated: %v", err)
+
+ filled, proced := d.queue.RetrieveHeaders()
+ return filled, proced, err
+}
+
// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
@@ -1398,6 +1369,11 @@ func (d *Downloader) fetchNodeData() error {
deliver = func(packet dataPack) (int, error) {
start := time.Now()
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
+ // If the peer gave us nothing, stalling fast sync, drop
+ if delivered == 0 {
+ glog.V(logger.Debug).Infof("peer %s: stalling state delivery, dropping", packet.PeerId())
+ d.dropPeer(packet.PeerId())
+ }
if err != nil {
// If the node data processing failed, the root hash is very wrong, abort
glog.V(logger.Error).Infof("peer %d: state processing failed: %v", packet.PeerId(), err)
@@ -1438,6 +1414,28 @@ func (d *Downloader) fetchNodeData() error {
// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
+//
+// As the scheduling/timeout logic mostly is the same for all downloaded data
+// types, this method is used by each for data gathering and is instrumented with
+// various callbacks to handle the slight differences between processing them.
+//
+// The instrumentation parameters:
+// - errCancel: error type to return if the fetch operation is cancelled (mostly makes logging nicer)
+// - deliveryCh: channel from which to retrieve downloaded data packets (merged from all concurrent peers)
+// - deliver: processing callback to deliver data packets into type specific download queues (usually within `queue`)
+// - wakeCh: notification channel for waking the fetcher when new tasks are available (or sync completed)
+// - expire: task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
+// - pending: task callback for the number of requests still needing download (detect completion/non-completability)
+// - inFlight: task callback for the number of in-progress requests (wait for all active downloads to finish)
+// - throttle: task callback to check if the processing queue is full and activate throttling (bound memory use)
+// - reserve: task callback to reserve new download tasks to a particular peer (also signals partial completions)
+// - fetchHook: tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
+// - fetch: network callback to actually send a particular download request to a physical remote peer
+// - cancel: task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
+// - capacity: network callback to retreive the estimated type-specific bandwidth capacity of a peer (traffic shaping)
+// - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
+// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
+// - kind: textual label of the type being downloaded to display in log mesages
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
@@ -1554,7 +1552,9 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
continue
}
if glog.V(logger.Detail) {
- if len(request.Headers) > 0 {
+ if request.From > 0 {
+ glog.Infof("%s: requesting %s(s) from #%d", peer, strings.ToLower(kind), request.From)
+ } else if len(request.Headers) > 0 {
glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
} else {
glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind))
@@ -1588,9 +1588,162 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
}
}
-// process takes fetch results from the queue and tries to import them into the
-// chain. The type of import operation will depend on the result contents.
-func (d *Downloader) process() error {
+// processHeaders takes batches of retrieved headers from an input channel and
+// keeps processing and scheduling them into the header chain and downloader's
+// queue until the stream ends or a failure occurs.
+func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
+ // Calculate the pivoting point for switching from fast to slow sync
+ pivot := d.queue.FastSyncPivot()
+
+ // Keep a count of uncertain headers to roll back
+ rollback := []*types.Header{}
+ defer func() {
+ if len(rollback) > 0 {
+ // Flatten the headers and roll them back
+ hashes := make([]common.Hash, len(rollback))
+ for i, header := range rollback {
+ hashes[i] = header.Hash()
+ }
+ lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number()
+ d.rollback(hashes)
+ glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
+ len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number())
+
+ // If we're already past the pivot point, this could be an attack, disable fast sync
+ if rollback[len(rollback)-1].Number.Uint64() > pivot {
+ d.noFast = true
+ }
+ }
+ }()
+
+ // Wait for batches of headers to process
+ gotHeaders := false
+
+ for {
+ select {
+ case <-d.cancelCh:
+ return errCancelHeaderProcessing
+
+ case headers := <-d.headerProcCh:
+ // Terminate header processing if we synced up
+ if len(headers) == 0 {
+ // Notify everyone that headers are fully processed
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ select {
+ case ch <- false:
+ case <-d.cancelCh:
+ }
+ }
+ // If no headers were retrieved at all, the peer violated it's TD promise that it had a
+ // better chain compared to ours. The only exception is if it's promised blocks were
+ // already imported by other means (e.g. fecher):
+ //
+ // R <remote peer>, L <local node>: Both at block 10
+ // R: Mine block 11, and propagate it to L
+ // L: Queue block 11 for import
+ // L: Notice that R's head and TD increased compared to ours, start sync
+ // L: Import of block 11 finishes
+ // L: Sync begins, and finds common ancestor at 11
+ // L: Request new headers up from 11 (R's TD was higher, it must have something)
+ // R: Nothing to give
+ if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
+ return errStallingPeer
+ }
+ // If fast or light syncing, ensure promised headers are indeed delivered. This is
+ // needed to detect scenarios where an attacker feeds a bad pivot and then bails out
+ // of delivering the post-pivot blocks that would flag the invalid content.
+ //
+ // This check cannot be executed "as is" for full imports, since blocks may still be
+ // queued for processing when the header download completes. However, as long as the
+ // peer gave us something useful, we're already happy/progressed (above check).
+ if d.mode == FastSync || d.mode == LightSync {
+ if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 {
+ return errStallingPeer
+ }
+ }
+ // Disable any rollback and return
+ rollback = nil
+ return nil
+ }
+ // Otherwise split the chunk of headers into batches and process them
+ gotHeaders = true
+
+ for len(headers) > 0 {
+ // Terminate if something failed in between processing chunks
+ select {
+ case <-d.cancelCh:
+ return errCancelHeaderProcessing
+ default:
+ }
+ // Select the next chunk of headers to import
+ limit := maxHeadersProcess
+ if limit > len(headers) {
+ limit = len(headers)
+ }
+ chunk := headers[:limit]
+
+ // In case of header only syncing, validate the chunk immediately
+ if d.mode == FastSync || d.mode == LightSync {
+ // Collect the yet unknown headers to mark them as uncertain
+ unknown := make([]*types.Header, 0, len(headers))
+ for _, header := range chunk {
+ if !d.hasHeader(header.Hash()) {
+ unknown = append(unknown, header)
+ }
+ }
+ // If we're importing pure headers, verify based on their recentness
+ frequency := fsHeaderCheckFrequency
+ if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
+ frequency = 1
+ }
+ if n, err := d.insertHeaders(chunk, frequency); err != nil {
+ // If some headers were inserted, add them too to the rollback list
+ if n > 0 {
+ rollback = append(rollback, chunk[:n]...)
+ }
+ glog.V(logger.Debug).Infof("invalid header #%d [%x…]: %v", chunk[n].Number, chunk[n].Hash().Bytes()[:4], err)
+ return errInvalidChain
+ }
+ // All verifications passed, store newly found uncertain headers
+ rollback = append(rollback, unknown...)
+ if len(rollback) > fsHeaderSafetyNet {
+ rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
+ }
+ }
+ // Unless we're doing light chains, schedule the headers for associated content retrieval
+ if d.mode == FullSync || d.mode == FastSync {
+ // If we've reached the allowed number of pending headers, stall a bit
+ for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
+ select {
+ case <-d.cancelCh:
+ return errCancelHeaderProcessing
+ case <-time.After(time.Second):
+ }
+ }
+ // Otherwise insert the headers for content retrieval
+ inserts := d.queue.Schedule(chunk, origin)
+ if len(inserts) != len(chunk) {
+ glog.V(logger.Debug).Infof("stale headers")
+ return errBadPeer
+ }
+ }
+ headers = headers[limit:]
+ origin += uint64(limit)
+ }
+ // Signal the content downloaders of the availablility of new tasks
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ select {
+ case ch <- true:
+ default:
+ }
+ }
+ }
+ }
+}
+
+// processContent takes fetch results from the queue and tries to import them
+// into the chain. The type of import operation will depend on the result contents.
+func (d *Downloader) processContent() error {
pivot := d.queue.FastSyncPivot()
for {
results := d.queue.WaitResults()
@@ -1608,7 +1761,7 @@ func (d *Downloader) process() error {
for len(results) != 0 {
// Check for any termination requests
if atomic.LoadInt32(&d.interrupt) == 1 {
- return errCancelProcessing
+ return errCancelContentProcessing
}
// Retrieve the a batch of results to import
var (
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index b0b0c2bd3..4ea8a8abe 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -560,8 +560,8 @@ func (dl *downloadTester) peerGetAbsHeadersFn(id string, delay time.Duration) fu
hashes := dl.peerHashes[id]
headers := dl.peerHeaders[id]
result := make([]*types.Header, 0, amount)
- for i := 0; i < amount && len(hashes)-int(origin)-1-i >= 0; i++ {
- if header, ok := headers[hashes[len(hashes)-int(origin)-1-i]]; ok {
+ for i := 0; i < amount && len(hashes)-int(origin)-1-i*(skip+1) >= 0; i++ {
+ if header, ok := headers[hashes[len(hashes)-int(origin)-1-i*(skip+1)]]; ok {
result = append(result, header)
}
}
@@ -1258,6 +1258,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
// rolled back, and also the pivot point being reverted to a non-block status.
tester.newPeer("block-attack", protocol, hashes, headers, blocks, receipts)
missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1
+ delete(tester.peerHeaders["fast-attack"], hashes[len(hashes)-missing]) // Make sure the fast-attacker doesn't fill in
delete(tester.peerHeaders["block-attack"], hashes[len(hashes)-missing])
if err := tester.sync("block-attack", nil, mode); err == nil {
@@ -1348,27 +1349,28 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
result error
drop bool
}{
- {nil, false}, // Sync succeeded, all is well
- {errBusy, false}, // Sync is already in progress, no problem
- {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
- {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
- {errStallingPeer, true}, // Peer was detected to be stalling, drop it
- {errNoPeers, false}, // No peers to download from, soft race, no issue
- {errTimeout, true}, // No hashes received in due time, drop the peer
- {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
- {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
- {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
- {errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter
- {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
- {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
- {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
- {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
- {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {nil, false}, // Sync succeeded, all is well
+ {errBusy, false}, // Sync is already in progress, no problem
+ {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
+ {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
+ {errStallingPeer, true}, // Peer was detected to be stalling, drop it
+ {errNoPeers, false}, // No peers to download from, soft race, no issue
+ {errTimeout, true}, // No hashes received in due time, drop the peer
+ {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
+ {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
+ {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
+ {errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter
+ {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
+ {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
+ {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
+ {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
+ {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelHeaderProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
}
// Run the tests and check disconnection status
tester := newTester()
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index c4846194b..6aab907d7 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -58,15 +58,18 @@ type peer struct {
id string // Unique identifier of the peer
head common.Hash // Hash of the peers latest known block
+ headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1)
blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
+ headerThroughput float64 // Number of headers measured to be retrievable per second
blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second
receiptThroughput float64 // Number of receipts measured to be retrievable per second
stateThroughput float64 // Number of node data pieces measured to be retrievable per second
- blockStarted time.Time // Time instance when the last block (body)fetch was started
+ headerStarted time.Time // Time instance when the last header fetch was started
+ blockStarted time.Time // Time instance when the last block (body) fetch was started
receiptStarted time.Time // Time instance when the last receipt fetch was started
stateStarted time.Time // Time instance when the last node data fetch was started
@@ -118,10 +121,12 @@ func (p *peer) Reset() {
p.lock.Lock()
defer p.lock.Unlock()
+ atomic.StoreInt32(&p.headerIdle, 0)
atomic.StoreInt32(&p.blockIdle, 0)
atomic.StoreInt32(&p.receiptIdle, 0)
atomic.StoreInt32(&p.stateIdle, 0)
+ p.headerThroughput = 0
p.blockThroughput = 0
p.receiptThroughput = 0
p.stateThroughput = 0
@@ -151,6 +156,24 @@ func (p *peer) Fetch61(request *fetchRequest) error {
return nil
}
+// FetchHeaders sends a header retrieval request to the remote peer.
+func (p *peer) FetchHeaders(from uint64, count int) error {
+ // Sanity check the protocol version
+ if p.version < 62 {
+ panic(fmt.Sprintf("header fetch [eth/62+] requested on eth/%d", p.version))
+ }
+ // Short circuit if the peer is already fetching
+ if !atomic.CompareAndSwapInt32(&p.headerIdle, 0, 1) {
+ return errAlreadyFetching
+ }
+ p.headerStarted = time.Now()
+
+ // Issue the header retrieval request (absolut upwards without gaps)
+ go p.getAbsHeaders(from, count, 0, false)
+
+ return nil
+}
+
// FetchBodies sends a block body retrieval request to the remote peer.
func (p *peer) FetchBodies(request *fetchRequest) error {
// Sanity check the protocol version
@@ -217,6 +240,13 @@ func (p *peer) FetchNodeData(request *fetchRequest) error {
return nil
}
+// SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval
+// requests. Its estimated header retrieval throughput is updated with that measured
+// just now.
+func (p *peer) SetHeadersIdle(delivered int) {
+ p.setIdle(p.headerStarted, delivered, &p.headerThroughput, &p.headerIdle)
+}
+
// SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval
// requests. Its estimated block retrieval throughput is updated with that measured
// just now.
@@ -264,6 +294,15 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id
*throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured
}
+// HeaderCapacity retrieves the peers header download allowance based on its
+// previously discovered throughput.
+func (p *peer) HeaderCapacity() int {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ return int(math.Max(1, math.Min(p.headerThroughput*float64(headerTargetRTT)/float64(time.Second), float64(MaxHeaderFetch))))
+}
+
// BlockCapacity retrieves the peers block download allowance based on its
// previously discovered throughput.
func (p *peer) BlockCapacity() int {
@@ -323,14 +362,15 @@ func (p *peer) String() string {
defer p.lock.RUnlock()
return fmt.Sprintf("Peer %s [%s]", p.id,
- fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+
+ fmt.Sprintf("headers %3.2f/s, ", p.headerThroughput)+
+ fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+
fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+
fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+
fmt.Sprintf("lacking %4d", len(p.lacking)),
)
}
-// peerSet represents the collection of active peer participating in the block
+// peerSet represents the collection of active peer participating in the chain
// download procedure.
type peerSet struct {
peers map[string]*peer
@@ -359,7 +399,7 @@ func (ps *peerSet) Reset() {
// peer is already known.
//
// The method also sets the starting throughput values of the new peer to the
-// average of all existing peers, to give it a realistic change of being used
+// average of all existing peers, to give it a realistic chance of being used
// for data retrievals.
func (ps *peerSet) Register(p *peer) error {
ps.lock.Lock()
@@ -369,15 +409,17 @@ func (ps *peerSet) Register(p *peer) error {
return errAlreadyRegistered
}
if len(ps.peers) > 0 {
- p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0
+ p.headerThroughput, p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0, 0
for _, peer := range ps.peers {
peer.lock.RLock()
+ p.headerThroughput += peer.headerThroughput
p.blockThroughput += peer.blockThroughput
p.receiptThroughput += peer.receiptThroughput
p.stateThroughput += peer.stateThroughput
peer.lock.RUnlock()
}
+ p.headerThroughput /= float64(len(ps.peers))
p.blockThroughput /= float64(len(ps.peers))
p.receiptThroughput /= float64(len(ps.peers))
p.stateThroughput /= float64(len(ps.peers))
@@ -441,6 +483,20 @@ func (ps *peerSet) BlockIdlePeers() ([]*peer, int) {
return ps.idlePeers(61, 61, idle, throughput)
}
+// HeaderIdlePeers retrieves a flat list of all the currently header-idle peers
+// within the active peer set, ordered by their reputation.
+func (ps *peerSet) HeaderIdlePeers() ([]*peer, int) {
+ idle := func(p *peer) bool {
+ return atomic.LoadInt32(&p.headerIdle) == 0
+ }
+ throughput := func(p *peer) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.headerThroughput
+ }
+ return ps.idlePeers(62, 64, idle, throughput)
+}
+
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
// the active peer set, ordered by their reputation.
func (ps *peerSet) BodyIdlePeers() ([]*peer, int) {
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index d8d1bddce..195eae4ff 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -40,7 +40,7 @@ import (
var (
blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download
- maxInFlightStates = 4096 // Maximum number of state downloads to allow concurrently
+ maxInFlightStates = 8192 // Maximum number of state downloads to allow concurrently
)
var (
@@ -52,6 +52,7 @@ var (
// fetchRequest is a currently running data retrieval operation.
type fetchRequest struct {
Peer *peer // Peer to which the request was sent
+ From uint64 // [eth/62] Requested chain element index (used for skeleton fills only)
Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority)
Headers []*types.Header // [eth/62] Requested headers, sorted by request order
Time time.Time // Time when the request was made
@@ -79,6 +80,18 @@ type queue struct {
headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
+ // Headers are "special", they download in batches, supported by a skeleton chain
+ headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers
+ headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
+ headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
+ headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations
+ headerDonePool map[uint64]struct{} // [eth/62] Set of the completed header fetches
+ headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers
+ headerProced int // [eth/62] Number of headers already processed from the results
+ headerOffset uint64 // [eth/62] Number of the first header in the result cache
+ headerContCh chan bool // [eth/62] Channel to notify when header download finishes
+
+ // All data retrievals below are based on an already assembles header chain
blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers
blockTaskQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for
blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations
@@ -113,6 +126,8 @@ func newQueue(stateDb ethdb.Database) *queue {
return &queue{
hashPool: make(map[common.Hash]int),
hashQueue: prque.New(),
+ headerPendPool: make(map[string]*fetchRequest),
+ headerContCh: make(chan bool),
blockTaskPool: make(map[common.Hash]*types.Header),
blockTaskQueue: prque.New(),
blockPendPool: make(map[string]*fetchRequest),
@@ -149,6 +164,8 @@ func (q *queue) Reset() {
q.headerHead = common.Hash{}
+ q.headerPendPool = make(map[string]*fetchRequest)
+
q.blockTaskPool = make(map[common.Hash]*types.Header)
q.blockTaskQueue.Reset()
q.blockPendPool = make(map[string]*fetchRequest)
@@ -178,6 +195,14 @@ func (q *queue) Close() {
q.active.Broadcast()
}
+// PendingHeaders retrieves the number of header requests pending for retrieval.
+func (q *queue) PendingHeaders() int {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return q.headerTaskQueue.Size()
+}
+
// PendingBlocks retrieves the number of block (body) requests pending for retrieval.
func (q *queue) PendingBlocks() int {
q.lock.Lock()
@@ -205,6 +230,15 @@ func (q *queue) PendingNodeData() int {
return 0
}
+// InFlightHeaders retrieves whether there are header fetch requests currently
+// in flight.
+func (q *queue) InFlightHeaders() bool {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return len(q.headerPendPool) > 0
+}
+
// InFlightBlocks retrieves whether there are block fetch requests currently in
// flight.
func (q *queue) InFlightBlocks() bool {
@@ -317,6 +351,45 @@ func (q *queue) Schedule61(hashes []common.Hash, fifo bool) []common.Hash {
return inserts
}
+// ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
+// up an already retrieved header skeleton.
+func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ // No skeleton retrieval can be in progress, fail hard if so (huge implementation bug)
+ if q.headerResults != nil {
+ panic("skeleton assembly already in progress")
+ }
+ // Shedule all the header retrieval tasks for the skeleton assembly
+ q.headerTaskPool = make(map[uint64]*types.Header)
+ q.headerTaskQueue = prque.New()
+ q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
+ q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
+ q.headerProced = 0
+ q.headerOffset = from
+ q.headerContCh = make(chan bool, 1)
+
+ for i, header := range skeleton {
+ index := from + uint64(i*MaxHeaderFetch)
+
+ q.headerTaskPool[index] = header
+ q.headerTaskQueue.Push(index, -float32(index))
+ }
+}
+
+// RetrieveHeaders retrieves the header chain assemble based on the scheduled
+// skeleton.
+func (q *queue) RetrieveHeaders() ([]*types.Header, int) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ headers, proced := q.headerResults, q.headerProced
+ q.headerResults, q.headerProced = nil, 0
+
+ return headers, proced
+}
+
// Schedule adds a set of headers for the download queue for scheduling, returning
// the new headers encountered.
func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
@@ -437,6 +510,46 @@ func (q *queue) countProcessableItems() int {
return len(q.resultCache)
}
+// ReserveHeaders reserves a set of headers for the given peer, skipping any
+// previously failed batches.
+func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ // Short circuit if the peer's already downloading something (sanity check to
+ // not corrupt state)
+ if _, ok := q.headerPendPool[p.id]; ok {
+ return nil
+ }
+ // Retrieve a batch of hashes, skipping previously failed ones
+ send, skip := uint64(0), []uint64{}
+ for send == 0 && !q.headerTaskQueue.Empty() {
+ from, _ := q.headerTaskQueue.Pop()
+ if q.headerPeerMiss[p.id] != nil {
+ if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {
+ skip = append(skip, from.(uint64))
+ continue
+ }
+ }
+ send = from.(uint64)
+ }
+ // Merge all the skipped batches back
+ for _, from := range skip {
+ q.headerTaskQueue.Push(from, -float32(from))
+ }
+ // Assemble and return the block download request
+ if send == 0 {
+ return nil
+ }
+ request := &fetchRequest{
+ Peer: p,
+ From: send,
+ Time: time.Now(),
+ }
+ q.headerPendPool[p.id] = request
+ return request
+}
+
// ReserveBlocks reserves a set of block hashes for the given peer, skipping any
// previously failed download.
func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest {
@@ -635,6 +748,11 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
return request, progress, nil
}
+// CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue.
+func (q *queue) CancelHeaders(request *fetchRequest) {
+ q.cancel(request, q.headerTaskQueue, q.headerPendPool)
+}
+
// CancelBlocks aborts a fetch request, returning all pending hashes to the queue.
func (q *queue) CancelBlocks(request *fetchRequest) {
q.cancel(request, q.hashQueue, q.blockPendPool)
@@ -663,6 +781,9 @@ func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool m
q.lock.Lock()
defer q.lock.Unlock()
+ if request.From > 0 {
+ taskQueue.Push(request.From, -float32(request.From))
+ }
for hash, index := range request.Hashes {
taskQueue.Push(hash, float32(index))
}
@@ -702,6 +823,15 @@ func (q *queue) Revoke(peerId string) {
}
}
+// ExpireHeaders checks for in flight requests that exceeded a timeout allowance,
+// canceling them and returning the responsible peers for penalisation.
+func (q *queue) ExpireHeaders(timeout time.Duration) map[string]int {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter)
+}
+
// ExpireBlocks checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int {
@@ -753,6 +883,9 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
timeoutMeter.Mark(1)
// Return any non satisfied requests to the pool
+ if request.From > 0 {
+ taskQueue.Push(request.From, -float32(request.From))
+ }
for hash, index := range request.Hashes {
taskQueue.Push(hash, float32(index))
}
@@ -842,6 +975,94 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) {
}
}
+// DeliverHeaders injects a header retrieval response into the header results
+// cache. This method either accepts all headers it received, or none of them
+// if they do not map correctly to the skeleton.
+//
+// If the headers are accepted, the method makes an attempt to deliver the set
+// of ready headers to the processor to keep the pipeline full. However it will
+// not block to prevent stalling other pending deliveries.
+func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ // Short circuit if the data was never requested
+ request := q.headerPendPool[id]
+ if request == nil {
+ return 0, errNoFetchesPending
+ }
+ headerReqTimer.UpdateSince(request.Time)
+ delete(q.headerPendPool, id)
+
+ // Ensure headers can be mapped onto the skeleton chain
+ target := q.headerTaskPool[request.From].Hash()
+
+ accepted := len(headers) == MaxHeaderFetch
+ if accepted {
+ if headers[0].Number.Uint64() != request.From {
+ glog.V(logger.Detail).Infof("Peer %s: first header #%v [%x] broke chain ordering, expected %d", id, headers[0].Number, headers[0].Hash().Bytes()[:4], request.From)
+ accepted = false
+ } else if headers[len(headers)-1].Hash() != target {
+ glog.V(logger.Detail).Infof("Peer %s: last header #%v [%x] broke skeleton structure, expected %x", id, headers[len(headers)-1].Number, headers[len(headers)-1].Hash().Bytes()[:4], target[:4])
+ accepted = false
+ }
+ }
+ if accepted {
+ for i, header := range headers[1:] {
+ hash := header.Hash()
+ if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
+ glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ordering, expected %d", id, header.Number, hash[:4], want)
+ accepted = false
+ break
+ }
+ if headers[i].Hash() != header.ParentHash {
+ glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ancestry", id, header.Number, hash[:4])
+ accepted = false
+ break
+ }
+ }
+ }
+ // If the batch of headers wasn't accepted, mark as unavailable
+ if !accepted {
+ glog.V(logger.Detail).Infof("Peer %s: skeleton filling from header #%d not accepted", id, request.From)
+
+ miss := q.headerPeerMiss[id]
+ if miss == nil {
+ q.headerPeerMiss[id] = make(map[uint64]struct{})
+ miss = q.headerPeerMiss[id]
+ }
+ miss[request.From] = struct{}{}
+
+ q.headerTaskQueue.Push(request.From, -float32(request.From))
+ return 0, errors.New("delivery not accepted")
+ }
+ // Clean up a successful fetch and try to deliver any sub-results
+ copy(q.headerResults[request.From-q.headerOffset:], headers)
+ delete(q.headerTaskPool, request.From)
+
+ ready := 0
+ for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil {
+ ready += MaxHeaderFetch
+ }
+ if ready > 0 {
+ // Headers are ready for delivery, gather them and push forward (non blocking)
+ process := make([]*types.Header, ready)
+ copy(process, q.headerResults[q.headerProced:q.headerProced+ready])
+
+ select {
+ case headerProcCh <- process:
+ glog.V(logger.Detail).Infof("%s: pre-scheduled %d headers from #%v", id, len(process), process[0].Number)
+ q.headerProced += len(process)
+ default:
+ }
+ }
+ // Check for termination and return
+ if len(q.headerTaskPool) == 0 {
+ q.headerContCh <- false
+ }
+ return len(headers), nil
+}
+
// DeliverBodies injects a block body retrieval response into the results queue.
// The method returns the number of blocks bodies accepted from the delivery and
// also wakes any threads waiting for data delivery.
diff --git a/eth/filters/api.go b/eth/filters/api.go
index d9bd4d4b7..7278e20b9 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -233,6 +233,7 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
return id, nil
}
+// Logs creates a subscription that fires for all new log that match the given filter criteria.
func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
@@ -291,12 +292,13 @@ type NewFilterArgs struct {
Topics [][]common.Hash
}
+// UnmarshalJSON sets *args fields with given data.
func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
type input struct {
From *rpc.BlockNumber `json:"fromBlock"`
ToBlock *rpc.BlockNumber `json:"toBlock"`
Addresses interface{} `json:"address"`
- Topics interface{} `json:"topics"`
+ Topics []interface{} `json:"topics"`
}
var raw input
@@ -321,7 +323,6 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
if raw.Addresses != nil {
// raw.Address can contain a single address or an array of addresses
var addresses []common.Address
-
if strAddrs, ok := raw.Addresses.([]interface{}); ok {
for i, addr := range strAddrs {
if strAddr, ok := addr.(string); ok {
@@ -352,56 +353,53 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
args.Addresses = addresses
}
+ // helper function which parses a string to a topic hash
topicConverter := func(raw string) (common.Hash, error) {
if len(raw) == 0 {
return common.Hash{}, nil
}
-
if len(raw) >= 2 && raw[0] == '0' && (raw[1] == 'x' || raw[1] == 'X') {
raw = raw[2:]
}
-
+ if len(raw) != 2 * common.HashLength {
+ return common.Hash{}, errors.New("invalid topic(s)")
+ }
if decAddr, err := hex.DecodeString(raw); err == nil {
return common.BytesToHash(decAddr), nil
}
-
- return common.Hash{}, errors.New("invalid topic given")
- }
-
- // topics is an array consisting of strings or arrays of strings
- if raw.Topics != nil {
- topics, ok := raw.Topics.([]interface{})
- if ok {
- parsedTopics := make([][]common.Hash, len(topics))
- for i, topic := range topics {
- if topic == nil {
- parsedTopics[i] = []common.Hash{common.StringToHash("")}
- } else if strTopic, ok := topic.(string); ok {
- if t, err := topicConverter(strTopic); err != nil {
- return fmt.Errorf("invalid topic on index %d", i)
- } else {
- parsedTopics[i] = []common.Hash{t}
- }
- } else if arrTopic, ok := topic.([]interface{}); ok {
- parsedTopics[i] = make([]common.Hash, len(arrTopic))
- for j := 0; j < len(parsedTopics[i]); i++ {
- if arrTopic[j] == nil {
- parsedTopics[i][j] = common.StringToHash("")
- } else if str, ok := arrTopic[j].(string); ok {
- if t, err := topicConverter(str); err != nil {
- return fmt.Errorf("invalid topic on index %d", i)
- } else {
- parsedTopics[i] = []common.Hash{t}
- }
- } else {
- return fmt.Errorf("topic[%d][%d] not a string", i, j)
+ return common.Hash{}, errors.New("invalid topic(s)")
+ }
+
+ // topics is an array consisting of strings and/or arrays of strings.
+ // JSON null values are converted to common.Hash{} and ignored by the filter manager.
+ if len(raw.Topics) > 0 {
+ args.Topics = make([][]common.Hash, len(raw.Topics))
+ for i, t := range raw.Topics {
+ if t == nil { // ignore topic when matching logs
+ args.Topics[i] = []common.Hash{common.Hash{}}
+ } else if topic, ok := t.(string); ok { // match specific topic
+ top, err := topicConverter(topic)
+ if err != nil {
+ return err
+ }
+ args.Topics[i] = []common.Hash{top}
+ } else if topics, ok := t.([]interface{}); ok { // or case e.g. [null, "topic0", "topic1"]
+ for _, rawTopic := range topics {
+ if rawTopic == nil {
+ args.Topics[i] = append(args.Topics[i], common.Hash{})
+ } else if topic, ok := rawTopic.(string); ok {
+ parsed, err := topicConverter(topic)
+ if err != nil {
+ return err
}
+ args.Topics[i] = append(args.Topics[i], parsed)
+ } else {
+ return fmt.Errorf("invalid topic(s)")
}
- } else {
- return fmt.Errorf("topic[%d] invalid", i)
}
+ } else {
+ return fmt.Errorf("invalid topic(s)")
}
- args.Topics = parsedTopics
}
}
diff --git a/eth/filters/api_test.go b/eth/filters/api_test.go
new file mode 100644
index 000000000..9e8edc241
--- /dev/null
+++ b/eth/filters/api_test.go
@@ -0,0 +1,198 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package filters_test
+
+import (
+ "encoding/json"
+ "fmt"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/eth/filters"
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
+ var (
+ fromBlock rpc.BlockNumber = 0x123435
+ toBlock rpc.BlockNumber = 0xabcdef
+ address0 = common.StringToAddress("70c87d191324e6712a591f304b4eedef6ad9bb9d")
+ address1 = common.StringToAddress("9b2055d370f73ec7d8a03e965129118dc8f5bf83")
+ topic0 = common.HexToHash("3ac225168df54212a25c1c01fd35bebfea408fdac2e31ddd6f80a4bbf9a5f1ca")
+ topic1 = common.HexToHash("9084a792d2f8b16a62b882fd56f7860c07bf5fa91dd8a2ae7e809e5180fef0b3")
+ topic2 = common.HexToHash("6ccae1c4af4152f460ff510e573399795dfab5dcf1fa60d1f33ac8fdc1e480ce")
+ nullTopic = common.Hash{}
+ )
+
+ // default values
+ var test0 filters.NewFilterArgs
+ if err := json.Unmarshal([]byte("{}"), &test0); err != nil {
+ t.Fatal(err)
+ }
+ if test0.FromBlock != rpc.LatestBlockNumber {
+ t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.FromBlock)
+ }
+ if test0.ToBlock != rpc.LatestBlockNumber {
+ t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.ToBlock)
+ }
+ if len(test0.Addresses) != 0 {
+ t.Fatalf("expected 0 addresses, got %d", len(test0.Addresses))
+ }
+ if len(test0.Topics) != 0 {
+ t.Fatalf("expected 0 topics, got %d topics", len(test0.Topics))
+ }
+
+ // from, to block number
+ var test1 filters.NewFilterArgs
+ vector := fmt.Sprintf(`{"fromBlock":"0x%x","toBlock":"0x%x"}`, fromBlock, toBlock)
+ if err := json.Unmarshal([]byte(vector), &test1); err != nil {
+ t.Fatal(err)
+ }
+ if test1.FromBlock != fromBlock {
+ t.Fatalf("expected FromBlock %d, got %d", fromBlock, test1.FromBlock)
+ }
+ if test1.ToBlock != toBlock {
+ t.Fatalf("expected ToBlock %d, got %d", toBlock, test1.ToBlock)
+ }
+
+ // single address
+ var test2 filters.NewFilterArgs
+ vector = fmt.Sprintf(`{"address": "%s"}`, address0.Hex())
+ if err := json.Unmarshal([]byte(vector), &test2); err != nil {
+ t.Fatal(err)
+ }
+ if len(test2.Addresses) != 1 {
+ t.Fatalf("expected 1 address, got %d address(es)", len(test2.Addresses))
+ }
+ if test2.Addresses[0] != address0 {
+ t.Fatalf("expected address %x, got %x", address0, test2.Addresses[0])
+ }
+
+ // multiple address
+ var test3 filters.NewFilterArgs
+ vector = fmt.Sprintf(`{"address": ["%s", "%s"]}`, address0.Hex(), address1.Hex())
+ if err := json.Unmarshal([]byte(vector), &test3); err != nil {
+ t.Fatal(err)
+ }
+ if len(test3.Addresses) != 2 {
+ t.Fatalf("expected 2 addresses, got %d address(es)", len(test3.Addresses))
+ }
+ if test3.Addresses[0] != address0 {
+ t.Fatalf("expected address %x, got %x", address0, test3.Addresses[0])
+ }
+ if test3.Addresses[1] != address1 {
+ t.Fatalf("expected address %x, got %x", address1, test3.Addresses[1])
+ }
+
+ // single topic
+ var test4 filters.NewFilterArgs
+ vector = fmt.Sprintf(`{"topics": ["%s"]}`, topic0.Hex())
+ if err := json.Unmarshal([]byte(vector), &test4); err != nil {
+ t.Fatal(err)
+ }
+ if len(test4.Topics) != 1 {
+ t.Fatalf("expected 1 topic, got %d", len(test4.Topics))
+ }
+ if len(test4.Topics[0]) != 1 {
+ t.Fatalf("expected len(topics[0]) to be 1, got %d", len(test4.Topics[0]))
+ }
+ if test4.Topics[0][0] != topic0 {
+ t.Fatalf("got %x, expected %x", test4.Topics[0][0], topic0)
+ }
+
+ // test multiple "AND" topics
+ var test5 filters.NewFilterArgs
+ vector = fmt.Sprintf(`{"topics": ["%s", "%s"]}`, topic0.Hex(), topic1.Hex())
+ if err := json.Unmarshal([]byte(vector), &test5); err != nil {
+ t.Fatal(err)
+ }
+ if len(test5.Topics) != 2 {
+ t.Fatalf("expected 2 topics, got %d", len(test5.Topics))
+ }
+ if len(test5.Topics[0]) != 1 {
+ t.Fatalf("expected 1 topic, got %d", len(test5.Topics[0]))
+ }
+ if test5.Topics[0][0] != topic0 {
+ t.Fatalf("got %x, expected %x", test5.Topics[0][0], topic0)
+ }
+ if len(test5.Topics[1]) != 1 {
+ t.Fatalf("expected 1 topic, got %d", len(test5.Topics[1]))
+ }
+ if test5.Topics[1][0] != topic1 {
+ t.Fatalf("got %x, expected %x", test5.Topics[1][0], topic1)
+ }
+
+ // test optional topic
+ var test6 filters.NewFilterArgs
+ vector = fmt.Sprintf(`{"topics": ["%s", null, "%s"]}`, topic0.Hex(), topic2.Hex())
+ if err := json.Unmarshal([]byte(vector), &test6); err != nil {
+ t.Fatal(err)
+ }
+ if len(test6.Topics) != 3 {
+ t.Fatalf("expected 3 topics, got %d", len(test6.Topics))
+ }
+ if len(test6.Topics[0]) != 1 {
+ t.Fatalf("expected 1 topic, got %d", len(test6.Topics[0]))
+ }
+ if test6.Topics[0][0] != topic0 {
+ t.Fatalf("got %x, expected %x", test6.Topics[0][0], topic0)
+ }
+ if len(test6.Topics[1]) != 1 {
+ t.Fatalf("expected 1 topic, got %d", len(test6.Topics[1]))
+ }
+ if test6.Topics[1][0] != nullTopic {
+ t.Fatalf("got %x, expected empty hash", test6.Topics[1][0])
+ }
+ if len(test6.Topics[2]) != 1 {
+ t.Fatalf("expected 1 topic, got %d", len(test6.Topics[2]))
+ }
+ if test6.Topics[2][0] != topic2 {
+ t.Fatalf("got %x, expected %x", test6.Topics[2][0], topic2)
+ }
+
+ // test OR topics
+ var test7 filters.NewFilterArgs
+ vector = fmt.Sprintf(`{"topics": [["%s", "%s"], null, ["%s", null]]}`, topic0.Hex(), topic1.Hex(), topic2.Hex())
+ if err := json.Unmarshal([]byte(vector), &test7); err != nil {
+ t.Fatal(err)
+ }
+ if len(test7.Topics) != 3 {
+ t.Fatalf("expected 3 topics, got %d topics", len(test7.Topics))
+ }
+ if len(test7.Topics[0]) != 2 {
+ t.Fatalf("expected 2 topics, got %d topics", len(test7.Topics[0]))
+ }
+ if test7.Topics[0][0] != topic0 || test7.Topics[0][1] != topic1 {
+ t.Fatalf("invalid topics expected [%x,%x], got [%x,%x]",
+ topic0, topic1, test7.Topics[0][0], test7.Topics[0][1],
+ )
+ }
+ if len(test7.Topics[1]) != 1 {
+ t.Fatalf("expected 1 topic, got %d topics", len(test7.Topics[1]))
+ }
+ if test7.Topics[1][0] != nullTopic {
+ t.Fatalf("expected empty hash, got %x", test7.Topics[1][0])
+ }
+ if len(test7.Topics[2]) != 2 {
+ t.Fatalf("expected 2 topics, got %d topics", len(test7.Topics[2]))
+ }
+ if test7.Topics[2][0] != topic2 || test7.Topics[2][1] != nullTopic {
+ t.Fatalf("invalid topics expected [%x,%x], got [%x,%x]",
+ topic2, nullTopic, test7.Topics[2][0], test7.Topics[2][1],
+ )
+ }
+}