diff options
-rw-r--r-- | accounts/abi/bind/backend.go | 49 | ||||
-rw-r--r-- | accounts/abi/bind/backends/nil.go | 1 | ||||
-rw-r--r-- | accounts/abi/bind/backends/remote.go | 20 | ||||
-rw-r--r-- | accounts/abi/bind/backends/simulated.go | 10 | ||||
-rw-r--r-- | accounts/abi/bind/base.go | 27 | ||||
-rw-r--r-- | core/types/bloom9_test.go | 4 | ||||
-rw-r--r-- | eth/api.go | 15 | ||||
-rw-r--r-- | eth/bind.go | 18 | ||||
-rw-r--r-- | eth/downloader/downloader.go | 517 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 48 | ||||
-rw-r--r-- | eth/downloader/peer.go | 66 | ||||
-rw-r--r-- | eth/downloader/queue.go | 223 | ||||
-rw-r--r-- | eth/filters/api.go | 74 | ||||
-rw-r--r-- | eth/filters/api_test.go | 198 |
14 files changed, 991 insertions, 279 deletions
diff --git a/accounts/abi/bind/backend.go b/accounts/abi/bind/backend.go index 604e1ef26..65806aef4 100644 --- a/accounts/abi/bind/backend.go +++ b/accounts/abi/bind/backend.go @@ -27,15 +27,16 @@ import ( // ErrNoCode is returned by call and transact operations for which the requested // recipient contract to operate on does not exist in the state db or does not // have any code associated with it (i.e. suicided). -// -// Please note, this error string is part of the RPC API and is expected by the -// native contract bindings to signal this particular error. Do not change this -// as it will break all dependent code! var ErrNoCode = errors.New("no contract code at given address") // ContractCaller defines the methods needed to allow operating with contract on a read // only basis. type ContractCaller interface { + // HasCode checks if the contract at the given address has any code associated + // with it or not. This is needed to differentiate between contract internal + // errors and the local chain being out of sync. + HasCode(contract common.Address, pending bool) (bool, error) + // ContractCall executes an Ethereum contract call with the specified data as // the input. The pending flag requests execution against the pending block, not // the stable head of the chain. @@ -55,6 +56,11 @@ type ContractTransactor interface { // execution of a transaction. SuggestGasPrice() (*big.Int, error) + // HasCode checks if the contract at the given address has any code associated + // with it or not. This is needed to differentiate between contract internal + // errors and the local chain being out of sync. + HasCode(contract common.Address, pending bool) (bool, error) + // EstimateGasLimit tries to estimate the gas needed to execute a specific // transaction based on the current pending state of the backend blockchain. // There is no guarantee that this is the true gas limit requirement as other @@ -68,7 +74,38 @@ type ContractTransactor interface { // ContractBackend defines the methods needed to allow operating with contract // on a read-write basis. +// +// This interface is essentially the union of ContractCaller and ContractTransactor +// but due to a bug in the Go compiler (https://github.com/golang/go/issues/6977), +// we cannot simply list it as the two interfaces. The other solution is to add a +// third interface containing the common methods, but that convolutes the user API +// as it introduces yet another parameter to require for initialization. type ContractBackend interface { - ContractCaller - ContractTransactor + // HasCode checks if the contract at the given address has any code associated + // with it or not. This is needed to differentiate between contract internal + // errors and the local chain being out of sync. + HasCode(contract common.Address, pending bool) (bool, error) + + // ContractCall executes an Ethereum contract call with the specified data as + // the input. The pending flag requests execution against the pending block, not + // the stable head of the chain. + ContractCall(contract common.Address, data []byte, pending bool) ([]byte, error) + + // PendingAccountNonce retrieves the current pending nonce associated with an + // account. + PendingAccountNonce(account common.Address) (uint64, error) + + // SuggestGasPrice retrieves the currently suggested gas price to allow a timely + // execution of a transaction. + SuggestGasPrice() (*big.Int, error) + + // EstimateGasLimit tries to estimate the gas needed to execute a specific + // transaction based on the current pending state of the backend blockchain. + // There is no guarantee that this is the true gas limit requirement as other + // transactions may be added or removed by miners, but it should provide a basis + // for setting a reasonable default. + EstimateGasLimit(sender common.Address, contract *common.Address, value *big.Int, data []byte) (*big.Int, error) + + // SendTransaction injects the transaction into the pending pool for execution. + SendTransaction(tx *types.Transaction) error } diff --git a/accounts/abi/bind/backends/nil.go b/accounts/abi/bind/backends/nil.go index 3b1e6dce7..f10bb61ac 100644 --- a/accounts/abi/bind/backends/nil.go +++ b/accounts/abi/bind/backends/nil.go @@ -38,6 +38,7 @@ func (*nilBackend) ContractCall(common.Address, []byte, bool) ([]byte, error) { func (*nilBackend) EstimateGasLimit(common.Address, *common.Address, *big.Int, []byte) (*big.Int, error) { panic("not implemented") } +func (*nilBackend) HasCode(common.Address, bool) (bool, error) { panic("not implemented") } func (*nilBackend) SuggestGasPrice() (*big.Int, error) { panic("not implemented") } func (*nilBackend) PendingAccountNonce(common.Address) (uint64, error) { panic("not implemented") } func (*nilBackend) SendTransaction(*types.Transaction) error { panic("not implemented") } diff --git a/accounts/abi/bind/backends/remote.go b/accounts/abi/bind/backends/remote.go index 9b3647192..d903cbc8f 100644 --- a/accounts/abi/bind/backends/remote.go +++ b/accounts/abi/bind/backends/remote.go @@ -111,6 +111,26 @@ func (b *rpcBackend) request(method string, params []interface{}) (json.RawMessa return res.Result, nil } +// HasCode implements ContractVerifier.HasCode by retrieving any code associated +// with the contract from the remote node, and checking its size. +func (b *rpcBackend) HasCode(contract common.Address, pending bool) (bool, error) { + // Execute the RPC code retrieval + block := "latest" + if pending { + block = "pending" + } + res, err := b.request("eth_getCode", []interface{}{contract.Hex(), block}) + if err != nil { + return false, err + } + var hex string + if err := json.Unmarshal(res, &hex); err != nil { + return false, err + } + // Convert the response back to a Go byte slice and return + return len(common.FromHex(hex)) > 0, nil +} + // ContractCall implements ContractCaller.ContractCall, delegating the execution of // a contract call to the remote node, returning the reply to for local processing. func (b *rpcBackend) ContractCall(contract common.Address, data []byte, pending bool) ([]byte, error) { diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 4866c4f58..54b1ce603 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -78,6 +78,16 @@ func (b *SimulatedBackend) Rollback() { b.pendingState, _ = state.New(b.pendingBlock.Root(), b.database) } +// HasCode implements ContractVerifier.HasCode, checking whether there is any +// code associated with a certain account in the blockchain. +func (b *SimulatedBackend) HasCode(contract common.Address, pending bool) (bool, error) { + if pending { + return len(b.pendingState.GetCode(contract)) > 0, nil + } + statedb, _ := b.blockchain.State() + return len(statedb.GetCode(contract)) > 0, nil +} + // ContractCall implements ContractCaller.ContractCall, executing the specified // contract with the given input data. func (b *SimulatedBackend) ContractCall(contract common.Address, data []byte, pending bool) ([]byte, error) { diff --git a/accounts/abi/bind/base.go b/accounts/abi/bind/base.go index 06621c5ad..75e8d5bc8 100644 --- a/accounts/abi/bind/base.go +++ b/accounts/abi/bind/base.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "math/big" + "sync/atomic" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -56,6 +57,9 @@ type BoundContract struct { abi abi.ABI // Reflect based ABI to access the correct Ethereum methods caller ContractCaller // Read interface to interact with the blockchain transactor ContractTransactor // Write interface to interact with the blockchain + + latestHasCode uint32 // Cached verification that the latest state contains code for this contract + pendingHasCode uint32 // Cached verification that the pending state contains code for this contract } // NewBoundContract creates a low level contract interface through which calls @@ -96,6 +100,19 @@ func (c *BoundContract) Call(opts *CallOpts, result interface{}, method string, if opts == nil { opts = new(CallOpts) } + // Make sure we have a contract to operate on, and bail out otherwise + if (opts.Pending && atomic.LoadUint32(&c.pendingHasCode) == 0) || (!opts.Pending && atomic.LoadUint32(&c.latestHasCode) == 0) { + if code, err := c.caller.HasCode(c.address, opts.Pending); err != nil { + return err + } else if !code { + return ErrNoCode + } + if opts.Pending { + atomic.StoreUint32(&c.pendingHasCode, 1) + } else { + atomic.StoreUint32(&c.latestHasCode, 1) + } + } // Pack the input, call and unpack the results input, err := c.abi.Pack(method, params...) if err != nil { @@ -153,6 +170,16 @@ func (c *BoundContract) transact(opts *TransactOpts, contract *common.Address, i } gasLimit := opts.GasLimit if gasLimit == nil { + // Gas estimation cannot succeed without code for method invocations + if contract != nil && atomic.LoadUint32(&c.pendingHasCode) == 0 { + if code, err := c.transactor.HasCode(c.address, true); err != nil { + return nil, err + } else if !code { + return nil, ErrNoCode + } + atomic.StoreUint32(&c.pendingHasCode, 1) + } + // If the contract surely has code (or code is not needed), estimate the transaction gasLimit, err = c.transactor.EstimateGasLimit(opts.From, contract, value, input) if err != nil { return nil, fmt.Errorf("failed to exstimate gas needed: %v", err) diff --git a/core/types/bloom9_test.go b/core/types/bloom9_test.go index 58e8f7073..a28ac0e7a 100644 --- a/core/types/bloom9_test.go +++ b/core/types/bloom9_test.go @@ -39,12 +39,12 @@ func TestBloom(t *testing.T) { } for _, data := range positive { - if !bloom.Test(new(big.Int).SetBytes([]byte(data))) { + if !bloom.TestBytes([]byte(data)) { t.Error("expected", data, "to test true") } } for _, data := range negative { - if bloom.Test(new(big.Int).SetBytes([]byte(data))) { + if bloom.TestBytes([]byte(data)) { t.Error("did not expect", data, "to test true") } } diff --git a/eth/api.go b/eth/api.go index 06f4d5de0..d048904f3 100644 --- a/eth/api.go +++ b/eth/api.go @@ -52,15 +52,6 @@ import ( "golang.org/x/net/context" ) -// errNoCode is returned by call and transact operations for which the requested -// recipient contract to operate on does not exist in the state db or does not -// have any code associated with it (i.e. suicided). -// -// Please note, this error string is part of the RPC API and is expected by the -// native contract bindings to signal this particular error. Do not change this -// as it will break all dependent code! -var errNoCode = errors.New("no contract code at given address") - const defaultGas = uint64(90000) // blockByNumber is a commonly used helper function which retrieves and returns @@ -755,12 +746,6 @@ func (s *PublicBlockChainAPI) doCall(args CallArgs, blockNr rpc.BlockNumber) (st } stateDb = stateDb.Copy() - // If there's no code to interact with, respond with an appropriate error - if args.To != nil { - if code := stateDb.GetCode(*args.To); len(code) == 0 { - return "0x", nil, errNoCode - } - } // Retrieve the account state object to interact with var from *state.StateObject if args.From == (common.Address{}) { diff --git a/eth/bind.go b/eth/bind.go index 3a3eca062..fb7f29f60 100644 --- a/eth/bind.go +++ b/eth/bind.go @@ -19,7 +19,6 @@ package eth import ( "math/big" - "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" @@ -49,6 +48,17 @@ func NewContractBackend(eth *Ethereum) *ContractBackend { } } +// HasCode implements bind.ContractVerifier.HasCode by retrieving any code associated +// with the contract from the local API, and checking its size. +func (b *ContractBackend) HasCode(contract common.Address, pending bool) (bool, error) { + block := rpc.LatestBlockNumber + if pending { + block = rpc.PendingBlockNumber + } + out, err := b.bcapi.GetCode(contract, block) + return len(common.FromHex(out)) > 0, err +} + // ContractCall implements bind.ContractCaller executing an Ethereum contract // call with the specified data as the input. The pending flag requests execution // against the pending block, not the stable head of the chain. @@ -64,9 +74,6 @@ func (b *ContractBackend) ContractCall(contract common.Address, data []byte, pen } // Execute the call and convert the output back to Go types out, err := b.bcapi.Call(args, block) - if err == errNoCode { - err = bind.ErrNoCode - } return common.FromHex(out), err } @@ -95,9 +102,6 @@ func (b *ContractBackend) EstimateGasLimit(sender common.Address, contract *comm Value: *rpc.NewHexNumber(value), Data: common.ToHex(data), }) - if err == errNoCode { - err = bind.ErrNoCode - } return out.BigInt(), err } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 0f76357cb..74bff2b66 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -42,6 +42,7 @@ var ( MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request + MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request MaxStateFetch = 384 // Amount of node state values to allow fetching per request @@ -52,6 +53,7 @@ var ( blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired + headerTargetRTT = time.Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now) headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired @@ -60,9 +62,10 @@ var ( stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired - maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) - maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) - maxResultsProcess = 256 // Number of download results to import at once into the chain + maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) + maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) + maxHeadersProcess = 2048 // Number of header download results to import at once into the chain + maxResultsProcess = 2048 // Number of content download results to import at once into the chain fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected @@ -72,29 +75,30 @@ var ( ) var ( - errBusy = errors.New("busy") - errUnknownPeer = errors.New("peer is unknown or unhealthy") - errBadPeer = errors.New("action from bad peer ignored") - errStallingPeer = errors.New("peer is stalling") - errNoPeers = errors.New("no peers to keep download active") - errTimeout = errors.New("timeout") - errEmptyHashSet = errors.New("empty hash set by peer") - errEmptyHeaderSet = errors.New("empty header set by peer") - errPeersUnavailable = errors.New("no peers available or all tried for download") - errAlreadyInPool = errors.New("hash already in pool") - errInvalidAncestor = errors.New("retrieved ancestor is invalid") - errInvalidChain = errors.New("retrieved hash chain is invalid") - errInvalidBlock = errors.New("retrieved block is invalid") - errInvalidBody = errors.New("retrieved block body is invalid") - errInvalidReceipt = errors.New("retrieved receipt is invalid") - errCancelHashFetch = errors.New("hash download canceled (requested)") - errCancelBlockFetch = errors.New("block download canceled (requested)") - errCancelHeaderFetch = errors.New("block header download canceled (requested)") - errCancelBodyFetch = errors.New("block body download canceled (requested)") - errCancelReceiptFetch = errors.New("receipt download canceled (requested)") - errCancelStateFetch = errors.New("state data download canceled (requested)") - errCancelProcessing = errors.New("processing canceled (requested)") - errNoSyncActive = errors.New("no sync active") + errBusy = errors.New("busy") + errUnknownPeer = errors.New("peer is unknown or unhealthy") + errBadPeer = errors.New("action from bad peer ignored") + errStallingPeer = errors.New("peer is stalling") + errNoPeers = errors.New("no peers to keep download active") + errTimeout = errors.New("timeout") + errEmptyHashSet = errors.New("empty hash set by peer") + errEmptyHeaderSet = errors.New("empty header set by peer") + errPeersUnavailable = errors.New("no peers available or all tried for download") + errAlreadyInPool = errors.New("hash already in pool") + errInvalidAncestor = errors.New("retrieved ancestor is invalid") + errInvalidChain = errors.New("retrieved hash chain is invalid") + errInvalidBlock = errors.New("retrieved block is invalid") + errInvalidBody = errors.New("retrieved block body is invalid") + errInvalidReceipt = errors.New("retrieved receipt is invalid") + errCancelHashFetch = errors.New("hash download canceled (requested)") + errCancelBlockFetch = errors.New("block download canceled (requested)") + errCancelHeaderFetch = errors.New("block header download canceled (requested)") + errCancelBodyFetch = errors.New("block body download canceled (requested)") + errCancelReceiptFetch = errors.New("receipt download canceled (requested)") + errCancelStateFetch = errors.New("state data download canceled (requested)") + errCancelHeaderProcessing = errors.New("header processing canceled (requested)") + errCancelContentProcessing = errors.New("content processing canceled (requested)") + errNoSyncActive = errors.New("no sync active") ) type Downloader struct { @@ -137,16 +141,17 @@ type Downloader struct { // Channels newPeerCh chan *peer - hashCh chan dataPack // [eth/61] Channel receiving inbound hashes - blockCh chan dataPack // [eth/61] Channel receiving inbound blocks - headerCh chan dataPack // [eth/62] Channel receiving inbound block headers - bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies - receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts - stateCh chan dataPack // [eth/63] Channel receiving inbound node state data - blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks - bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks - receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks - stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks + hashCh chan dataPack // [eth/61] Channel receiving inbound hashes + blockCh chan dataPack // [eth/61] Channel receiving inbound blocks + headerCh chan dataPack // [eth/62] Channel receiving inbound block headers + bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies + receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts + stateCh chan dataPack // [eth/63] Channel receiving inbound node state data + blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks + bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks + receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks + stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks + headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers @@ -194,6 +199,7 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha bodyWakeCh: make(chan bool, 1), receiptWakeCh: make(chan bool, 1), stateWakeCh: make(chan bool, 1), + headerProcCh: make(chan []*types.Header, 1), } } @@ -308,6 +314,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode default: } } + for empty := false; !empty; { + select { + case <-d.headerProcCh: + default: + empty = true + } + } // Reset any ephemeral sync statistics d.syncStatsLock.Lock() d.syncStatsStateTotal = 0 @@ -373,7 +386,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - return d.spawnSync( + return d.spawnSync(origin+1, func() error { return d.fetchHashes61(p, td, origin+1) }, func() error { return d.fetchBlocks61(origin + 1) }, ) @@ -423,11 +436,12 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - return d.spawnSync( - func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved - func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync - func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync - func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync + return d.spawnSync(origin+1, + func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved + func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync ) default: @@ -439,11 +453,11 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e // spawnSync runs d.process and all given fetcher functions to completion in // separate goroutines, returning the first error that appears. -func (d *Downloader) spawnSync(fetchers ...func() error) error { +func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { var wg sync.WaitGroup errc := make(chan error, len(fetchers)+1) wg.Add(len(fetchers) + 1) - go func() { defer wg.Done(); errc <- d.process() }() + go func() { defer wg.Done(); errc <- d.processContent() }() for _, fn := range fetchers { fn := fn go func() { defer wg.Done(); errc <- fn() }() @@ -702,9 +716,9 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { getHashes := func(from uint64) { glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from) - go p.getAbsHashes(from, MaxHashFetch) request = time.Now() timeout.Reset(hashTTL) + go p.getAbsHashes(from, MaxHashFetch) } // Start pulling hashes, until all are exhausted getHashes(from) @@ -1050,7 +1064,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { continue } // Otherwise check if we already know the header or not - if (d.mode != LightSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode == LightSync && d.hasHeader(headers[i].Hash())) { + if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) { number, hash = headers[i].Number.Uint64(), headers[i].Hash() break } @@ -1149,55 +1163,39 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { return start, nil } -// fetchHeaders keeps retrieving headers from the requested number, until no more -// are returned, potentially throttling on the way. -// -// The queue parameter can be used to switch between queuing headers for block -// body download too, or directly import as pure header chains. -func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { - glog.V(logger.Debug).Infof("%v: downloading headers from #%d", p, from) +// fetchHeaders keeps retrieving headers concurrently from the number +// requested, until no more are returned, potentially throttling on the way. To +// facilitate concurrency but still protect against malicious nodes sending bad +// headers, we construct a header chain skeleton using the "origin" peer we are +// syncing with, and fill in the missing headers using anyone else. Headers from +// other peers are only accepted if they map cleanly to the skeleton. If no one +// can fill in the skeleton - not even the origin peer - it's assumed invalid and +// the origin is dropped. +func (d *Downloader) fetchHeaders(p *peer, from uint64) error { + glog.V(logger.Debug).Infof("%v: directing header downloads from #%d", p, from) defer glog.V(logger.Debug).Infof("%v: header download terminated", p) - // Calculate the pivoting point for switching from fast to slow sync - pivot := d.queue.FastSyncPivot() - - // Keep a count of uncertain headers to roll back - rollback := []*types.Header{} - defer func() { - if len(rollback) > 0 { - // Flatten the headers and roll them back - hashes := make([]common.Hash, len(rollback)) - for i, header := range rollback { - hashes[i] = header.Hash() - } - lh, lfb, lb := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number() - d.rollback(hashes) - glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)", - len(hashes), lh, d.headHeader().Number, lfb, d.headFastBlock().Number(), lb, d.headBlock().Number()) - - // If we're already past the pivot point, this could be an attack, disable fast sync - if rollback[len(rollback)-1].Number.Uint64() > pivot { - d.noFast = true - } - } - }() - - // Create a timeout timer, and the associated hash fetcher - request := time.Now() // time of the last fetch request + // Create a timeout timer, and the associated header fetcher + skeleton := true // Skeleton assembly phase or finishing up + request := time.Now() // time of the last skeleton fetch request timeout := time.NewTimer(0) // timer to dump a non-responsive active peer <-timeout.C // timeout channel should be initially empty defer timeout.Stop() getHeaders := func(from uint64) { - glog.V(logger.Detail).Infof("%v: fetching %d headers from #%d", p, MaxHeaderFetch, from) - - go p.getAbsHeaders(from, MaxHeaderFetch, 0, false) request = time.Now() timeout.Reset(headerTTL) + + if skeleton { + glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from) + go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) + } else { + glog.V(logger.Detail).Infof("%v: fetching %d full headers from #%d", p, MaxHeaderFetch, from) + go p.getAbsHeaders(from, MaxHeaderFetch, 0, false) + } } - // Start pulling headers, until all are exhausted + // Start pulling the header chain skeleton until all is done getHeaders(from) - gotHeaders := false for { select { @@ -1205,116 +1203,48 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { return errCancelHeaderFetch case packet := <-d.headerCh: - // Make sure the active peer is giving us the headers + // Make sure the active peer is giving us the skeleton headers if packet.PeerId() != p.id { - glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", packet.PeerId()) + glog.V(logger.Debug).Infof("Received skeleton headers from incorrect peer (%s)", packet.PeerId()) break } headerReqTimer.UpdateSince(request) timeout.Stop() + // If the skeleton's finished, pull any remaining head headers directly from the origin + if packet.Items() == 0 && skeleton { + skeleton = false + getHeaders(from) + continue + } // If no more headers are inbound, notify the content fetchers and return if packet.Items() == 0 { glog.V(logger.Debug).Infof("%v: no available headers", p) - - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { - select { - case ch <- false: - case <-d.cancelCh: - } - } - // If no headers were retrieved at all, the peer violated it's TD promise that it had a - // better chain compared to ours. The only exception is if it's promised blocks were - // already imported by other means (e.g. fetcher): - // - // R <remote peer>, L <local node>: Both at block 10 - // R: Mine block 11, and propagate it to L - // L: Queue block 11 for import - // L: Notice that R's head and TD increased compared to ours, start sync - // L: Import of block 11 finishes - // L: Sync begins, and finds common ancestor at 11 - // L: Request new headers up from 11 (R's TD was higher, it must have something) - // R: Nothing to give - if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 { - return errStallingPeer - } - // If fast or light syncing, ensure promised headers are indeed delivered. This is - // needed to detect scenarios where an attacker feeds a bad pivot and then bails out - // of delivering the post-pivot blocks that would flag the invalid content. - // - // This check cannot be executed "as is" for full imports, since blocks may still be - // queued for processing when the header download completes. However, as long as the - // peer gave us something useful, we're already happy/progressed (above check). - if d.mode == FastSync || d.mode == LightSync { - if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 { - return errStallingPeer - } - } - rollback = nil + d.headerProcCh <- nil return nil } - gotHeaders = true headers := packet.(*headerPack).headers - // Otherwise insert all the new headers, aborting in case of junk - glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from) - - if d.mode == FastSync || d.mode == LightSync { - // Collect the yet unknown headers to mark them as uncertain - unknown := make([]*types.Header, 0, len(headers)) - for _, header := range headers { - if !d.hasHeader(header.Hash()) { - unknown = append(unknown, header) - } - } - // If we're importing pure headers, verify based on their recentness - frequency := fsHeaderCheckFrequency - if headers[len(headers)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { - frequency = 1 - } - if n, err := d.insertHeaders(headers, frequency); err != nil { - // If some headers were inserted, add them too to the rollback list - if n > 0 { - rollback = append(rollback, headers[:n]...) - } - glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headers[n].Number, headers[n].Hash().Bytes()[:4], err) + // If we received a skeleton batch, resolve internals concurrently + if skeleton { + filled, proced, err := d.fillHeaderSkeleton(from, headers) + if err != nil { + glog.V(logger.Debug).Infof("%v: skeleton chain invalid: %v", p, err) return errInvalidChain } - // All verifications passed, store newly found uncertain headers - rollback = append(rollback, unknown...) - if len(rollback) > fsHeaderSafetyNet { - rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) - } - } - if d.mode == FullSync || d.mode == FastSync { - inserts := d.queue.Schedule(headers, from) - if len(inserts) != len(headers) { - glog.V(logger.Debug).Infof("%v: stale headers", p) - return errBadPeer - } + headers = filled[proced:] + from += uint64(proced) } - // Notify the content fetchers of new headers, but stop if queue is full - cont := d.queue.PendingBlocks() < maxQueuedHeaders && d.queue.PendingReceipts() < maxQueuedHeaders - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { - if cont { - // We still have headers to fetch, send continuation wake signal (potential) - select { - case ch <- true: - default: - } - } else { - // Header limit reached, send a termination wake signal (enforced) - select { - case ch <- false: - case <-d.cancelCh: - } + // Insert all the new headers and fetch the next batch + if len(headers) > 0 { + glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from) + select { + case d.headerProcCh <- headers: + case <-d.cancelCh: + return errCancelHeaderFetch } + from += uint64(len(headers)) } - if !cont { - return nil - } - // Queue not yet full, fetch the next batch - from += uint64(len(headers)) getHeaders(from) case <-timeout.C: @@ -1330,7 +1260,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { case <-d.cancelCh: } } - return nil + select { + case d.headerProcCh <- nil: + case <-d.cancelCh: + } + return errBadPeer case <-d.hashCh: case <-d.blockCh: @@ -1340,6 +1274,43 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { } } +// fillHeaderSkeleton concurrently retrieves headers from all our available peers +// and maps them to the provided skeleton header chain. +// +// Any partial results from the beginning of the skeleton is (if possible) forwarded +// immediately to the header processor to keep the rest of the pipeline full even +// in the case of header stalls. +// +// The method returs the entire filled skeleton and also the number of headers +// already forwarded for processing. +func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) { + glog.V(logger.Debug).Infof("Filling up skeleton from #%d", from) + d.queue.ScheduleSkeleton(from, skeleton) + + var ( + deliver = func(packet dataPack) (int, error) { + pack := packet.(*headerPack) + return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh) + } + expire = func() map[string]int { return d.queue.ExpireHeaders(headerTTL) } + throttle = func() bool { return false } + reserve = func(p *peer, count int) (*fetchRequest, bool, error) { + return d.queue.ReserveHeaders(p, count), false, nil + } + fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } + capacity = func(p *peer) int { return p.HeaderCapacity() } + setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) } + ) + err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire, + d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve, + nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "Header") + + glog.V(logger.Debug).Infof("Skeleton fill terminated: %v", err) + + filled, proced := d.queue.RetrieveHeaders() + return filled, proced, err +} + // fetchBodies iteratively downloads the scheduled block bodies, taking any // available peers, reserving a chunk of blocks for each, waiting for delivery // and also periodically checking for timeouts. @@ -1398,6 +1369,11 @@ func (d *Downloader) fetchNodeData() error { deliver = func(packet dataPack) (int, error) { start := time.Now() return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) { + // If the peer gave us nothing, stalling fast sync, drop + if delivered == 0 { + glog.V(logger.Debug).Infof("peer %s: stalling state delivery, dropping", packet.PeerId()) + d.dropPeer(packet.PeerId()) + } if err != nil { // If the node data processing failed, the root hash is very wrong, abort glog.V(logger.Error).Infof("peer %d: state processing failed: %v", packet.PeerId(), err) @@ -1438,6 +1414,28 @@ func (d *Downloader) fetchNodeData() error { // fetchParts iteratively downloads scheduled block parts, taking any available // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. +// +// As the scheduling/timeout logic mostly is the same for all downloaded data +// types, this method is used by each for data gathering and is instrumented with +// various callbacks to handle the slight differences between processing them. +// +// The instrumentation parameters: +// - errCancel: error type to return if the fetch operation is cancelled (mostly makes logging nicer) +// - deliveryCh: channel from which to retrieve downloaded data packets (merged from all concurrent peers) +// - deliver: processing callback to deliver data packets into type specific download queues (usually within `queue`) +// - wakeCh: notification channel for waking the fetcher when new tasks are available (or sync completed) +// - expire: task callback method to abort requests that took too long and return the faulty peers (traffic shaping) +// - pending: task callback for the number of requests still needing download (detect completion/non-completability) +// - inFlight: task callback for the number of in-progress requests (wait for all active downloads to finish) +// - throttle: task callback to check if the processing queue is full and activate throttling (bound memory use) +// - reserve: task callback to reserve new download tasks to a particular peer (also signals partial completions) +// - fetchHook: tester callback to notify of new tasks being initiated (allows testing the scheduling logic) +// - fetch: network callback to actually send a particular download request to a physical remote peer +// - cancel: task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer) +// - capacity: network callback to retreive the estimated type-specific bandwidth capacity of a peer (traffic shaping) +// - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks +// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping) +// - kind: textual label of the type being downloaded to display in log mesages func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, @@ -1554,7 +1552,9 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv continue } if glog.V(logger.Detail) { - if len(request.Headers) > 0 { + if request.From > 0 { + glog.Infof("%s: requesting %s(s) from #%d", peer, strings.ToLower(kind), request.From) + } else if len(request.Headers) > 0 { glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number) } else { glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind)) @@ -1588,9 +1588,162 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv } } -// process takes fetch results from the queue and tries to import them into the -// chain. The type of import operation will depend on the result contents. -func (d *Downloader) process() error { +// processHeaders takes batches of retrieved headers from an input channel and +// keeps processing and scheduling them into the header chain and downloader's +// queue until the stream ends or a failure occurs. +func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { + // Calculate the pivoting point for switching from fast to slow sync + pivot := d.queue.FastSyncPivot() + + // Keep a count of uncertain headers to roll back + rollback := []*types.Header{} + defer func() { + if len(rollback) > 0 { + // Flatten the headers and roll them back + hashes := make([]common.Hash, len(rollback)) + for i, header := range rollback { + hashes[i] = header.Hash() + } + lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number() + d.rollback(hashes) + glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)", + len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number()) + + // If we're already past the pivot point, this could be an attack, disable fast sync + if rollback[len(rollback)-1].Number.Uint64() > pivot { + d.noFast = true + } + } + }() + + // Wait for batches of headers to process + gotHeaders := false + + for { + select { + case <-d.cancelCh: + return errCancelHeaderProcessing + + case headers := <-d.headerProcCh: + // Terminate header processing if we synced up + if len(headers) == 0 { + // Notify everyone that headers are fully processed + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + select { + case ch <- false: + case <-d.cancelCh: + } + } + // If no headers were retrieved at all, the peer violated it's TD promise that it had a + // better chain compared to ours. The only exception is if it's promised blocks were + // already imported by other means (e.g. fecher): + // + // R <remote peer>, L <local node>: Both at block 10 + // R: Mine block 11, and propagate it to L + // L: Queue block 11 for import + // L: Notice that R's head and TD increased compared to ours, start sync + // L: Import of block 11 finishes + // L: Sync begins, and finds common ancestor at 11 + // L: Request new headers up from 11 (R's TD was higher, it must have something) + // R: Nothing to give + if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 { + return errStallingPeer + } + // If fast or light syncing, ensure promised headers are indeed delivered. This is + // needed to detect scenarios where an attacker feeds a bad pivot and then bails out + // of delivering the post-pivot blocks that would flag the invalid content. + // + // This check cannot be executed "as is" for full imports, since blocks may still be + // queued for processing when the header download completes. However, as long as the + // peer gave us something useful, we're already happy/progressed (above check). + if d.mode == FastSync || d.mode == LightSync { + if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 { + return errStallingPeer + } + } + // Disable any rollback and return + rollback = nil + return nil + } + // Otherwise split the chunk of headers into batches and process them + gotHeaders = true + + for len(headers) > 0 { + // Terminate if something failed in between processing chunks + select { + case <-d.cancelCh: + return errCancelHeaderProcessing + default: + } + // Select the next chunk of headers to import + limit := maxHeadersProcess + if limit > len(headers) { + limit = len(headers) + } + chunk := headers[:limit] + + // In case of header only syncing, validate the chunk immediately + if d.mode == FastSync || d.mode == LightSync { + // Collect the yet unknown headers to mark them as uncertain + unknown := make([]*types.Header, 0, len(headers)) + for _, header := range chunk { + if !d.hasHeader(header.Hash()) { + unknown = append(unknown, header) + } + } + // If we're importing pure headers, verify based on their recentness + frequency := fsHeaderCheckFrequency + if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { + frequency = 1 + } + if n, err := d.insertHeaders(chunk, frequency); err != nil { + // If some headers were inserted, add them too to the rollback list + if n > 0 { + rollback = append(rollback, chunk[:n]...) + } + glog.V(logger.Debug).Infof("invalid header #%d [%x…]: %v", chunk[n].Number, chunk[n].Hash().Bytes()[:4], err) + return errInvalidChain + } + // All verifications passed, store newly found uncertain headers + rollback = append(rollback, unknown...) + if len(rollback) > fsHeaderSafetyNet { + rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) + } + } + // Unless we're doing light chains, schedule the headers for associated content retrieval + if d.mode == FullSync || d.mode == FastSync { + // If we've reached the allowed number of pending headers, stall a bit + for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { + select { + case <-d.cancelCh: + return errCancelHeaderProcessing + case <-time.After(time.Second): + } + } + // Otherwise insert the headers for content retrieval + inserts := d.queue.Schedule(chunk, origin) + if len(inserts) != len(chunk) { + glog.V(logger.Debug).Infof("stale headers") + return errBadPeer + } + } + headers = headers[limit:] + origin += uint64(limit) + } + // Signal the content downloaders of the availablility of new tasks + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + select { + case ch <- true: + default: + } + } + } + } +} + +// processContent takes fetch results from the queue and tries to import them +// into the chain. The type of import operation will depend on the result contents. +func (d *Downloader) processContent() error { pivot := d.queue.FastSyncPivot() for { results := d.queue.WaitResults() @@ -1608,7 +1761,7 @@ func (d *Downloader) process() error { for len(results) != 0 { // Check for any termination requests if atomic.LoadInt32(&d.interrupt) == 1 { - return errCancelProcessing + return errCancelContentProcessing } // Retrieve the a batch of results to import var ( diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index b0b0c2bd3..4ea8a8abe 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -560,8 +560,8 @@ func (dl *downloadTester) peerGetAbsHeadersFn(id string, delay time.Duration) fu hashes := dl.peerHashes[id] headers := dl.peerHeaders[id] result := make([]*types.Header, 0, amount) - for i := 0; i < amount && len(hashes)-int(origin)-1-i >= 0; i++ { - if header, ok := headers[hashes[len(hashes)-int(origin)-1-i]]; ok { + for i := 0; i < amount && len(hashes)-int(origin)-1-i*(skip+1) >= 0; i++ { + if header, ok := headers[hashes[len(hashes)-int(origin)-1-i*(skip+1)]]; ok { result = append(result, header) } } @@ -1258,6 +1258,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { // rolled back, and also the pivot point being reverted to a non-block status. tester.newPeer("block-attack", protocol, hashes, headers, blocks, receipts) missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1 + delete(tester.peerHeaders["fast-attack"], hashes[len(hashes)-missing]) // Make sure the fast-attacker doesn't fill in delete(tester.peerHeaders["block-attack"], hashes[len(hashes)-missing]) if err := tester.sync("block-attack", nil, mode); err == nil { @@ -1348,27 +1349,28 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { result error drop bool }{ - {nil, false}, // Sync succeeded, all is well - {errBusy, false}, // Sync is already in progress, no problem - {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop - {errBadPeer, true}, // Peer was deemed bad for some reason, drop it - {errStallingPeer, true}, // Peer was detected to be stalling, drop it - {errNoPeers, false}, // No peers to download from, soft race, no issue - {errTimeout, true}, // No hashes received in due time, drop the peer - {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end - {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end - {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser - {errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter - {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop - {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin - {errInvalidBody, false}, // A bad peer was detected, but not the sync origin - {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin - {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {nil, false}, // Sync succeeded, all is well + {errBusy, false}, // Sync is already in progress, no problem + {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop + {errBadPeer, true}, // Peer was deemed bad for some reason, drop it + {errStallingPeer, true}, // Peer was detected to be stalling, drop it + {errNoPeers, false}, // No peers to download from, soft race, no issue + {errTimeout, true}, // No hashes received in due time, drop the peer + {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end + {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end + {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser + {errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter + {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop + {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin + {errInvalidBody, false}, // A bad peer was detected, but not the sync origin + {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin + {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelHeaderProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop } // Run the tests and check disconnection status tester := newTester() diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index c4846194b..6aab907d7 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -58,15 +58,18 @@ type peer struct { id string // Unique identifier of the peer head common.Hash // Hash of the peers latest known block + headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1) blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1) + headerThroughput float64 // Number of headers measured to be retrievable per second blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second receiptThroughput float64 // Number of receipts measured to be retrievable per second stateThroughput float64 // Number of node data pieces measured to be retrievable per second - blockStarted time.Time // Time instance when the last block (body)fetch was started + headerStarted time.Time // Time instance when the last header fetch was started + blockStarted time.Time // Time instance when the last block (body) fetch was started receiptStarted time.Time // Time instance when the last receipt fetch was started stateStarted time.Time // Time instance when the last node data fetch was started @@ -118,10 +121,12 @@ func (p *peer) Reset() { p.lock.Lock() defer p.lock.Unlock() + atomic.StoreInt32(&p.headerIdle, 0) atomic.StoreInt32(&p.blockIdle, 0) atomic.StoreInt32(&p.receiptIdle, 0) atomic.StoreInt32(&p.stateIdle, 0) + p.headerThroughput = 0 p.blockThroughput = 0 p.receiptThroughput = 0 p.stateThroughput = 0 @@ -151,6 +156,24 @@ func (p *peer) Fetch61(request *fetchRequest) error { return nil } +// FetchHeaders sends a header retrieval request to the remote peer. +func (p *peer) FetchHeaders(from uint64, count int) error { + // Sanity check the protocol version + if p.version < 62 { + panic(fmt.Sprintf("header fetch [eth/62+] requested on eth/%d", p.version)) + } + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.headerIdle, 0, 1) { + return errAlreadyFetching + } + p.headerStarted = time.Now() + + // Issue the header retrieval request (absolut upwards without gaps) + go p.getAbsHeaders(from, count, 0, false) + + return nil +} + // FetchBodies sends a block body retrieval request to the remote peer. func (p *peer) FetchBodies(request *fetchRequest) error { // Sanity check the protocol version @@ -217,6 +240,13 @@ func (p *peer) FetchNodeData(request *fetchRequest) error { return nil } +// SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval +// requests. Its estimated header retrieval throughput is updated with that measured +// just now. +func (p *peer) SetHeadersIdle(delivered int) { + p.setIdle(p.headerStarted, delivered, &p.headerThroughput, &p.headerIdle) +} + // SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval // requests. Its estimated block retrieval throughput is updated with that measured // just now. @@ -264,6 +294,15 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id *throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured } +// HeaderCapacity retrieves the peers header download allowance based on its +// previously discovered throughput. +func (p *peer) HeaderCapacity() int { + p.lock.RLock() + defer p.lock.RUnlock() + + return int(math.Max(1, math.Min(p.headerThroughput*float64(headerTargetRTT)/float64(time.Second), float64(MaxHeaderFetch)))) +} + // BlockCapacity retrieves the peers block download allowance based on its // previously discovered throughput. func (p *peer) BlockCapacity() int { @@ -323,14 +362,15 @@ func (p *peer) String() string { defer p.lock.RUnlock() return fmt.Sprintf("Peer %s [%s]", p.id, - fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+ + fmt.Sprintf("headers %3.2f/s, ", p.headerThroughput)+ + fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+ fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+ fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+ fmt.Sprintf("lacking %4d", len(p.lacking)), ) } -// peerSet represents the collection of active peer participating in the block +// peerSet represents the collection of active peer participating in the chain // download procedure. type peerSet struct { peers map[string]*peer @@ -359,7 +399,7 @@ func (ps *peerSet) Reset() { // peer is already known. // // The method also sets the starting throughput values of the new peer to the -// average of all existing peers, to give it a realistic change of being used +// average of all existing peers, to give it a realistic chance of being used // for data retrievals. func (ps *peerSet) Register(p *peer) error { ps.lock.Lock() @@ -369,15 +409,17 @@ func (ps *peerSet) Register(p *peer) error { return errAlreadyRegistered } if len(ps.peers) > 0 { - p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0 + p.headerThroughput, p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0, 0 for _, peer := range ps.peers { peer.lock.RLock() + p.headerThroughput += peer.headerThroughput p.blockThroughput += peer.blockThroughput p.receiptThroughput += peer.receiptThroughput p.stateThroughput += peer.stateThroughput peer.lock.RUnlock() } + p.headerThroughput /= float64(len(ps.peers)) p.blockThroughput /= float64(len(ps.peers)) p.receiptThroughput /= float64(len(ps.peers)) p.stateThroughput /= float64(len(ps.peers)) @@ -441,6 +483,20 @@ func (ps *peerSet) BlockIdlePeers() ([]*peer, int) { return ps.idlePeers(61, 61, idle, throughput) } +// HeaderIdlePeers retrieves a flat list of all the currently header-idle peers +// within the active peer set, ordered by their reputation. +func (ps *peerSet) HeaderIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.headerIdle) == 0 + } + throughput := func(p *peer) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.headerThroughput + } + return ps.idlePeers(62, 64, idle, throughput) +} + // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within // the active peer set, ordered by their reputation. func (ps *peerSet) BodyIdlePeers() ([]*peer, int) { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index d8d1bddce..195eae4ff 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -40,7 +40,7 @@ import ( var ( blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download - maxInFlightStates = 4096 // Maximum number of state downloads to allow concurrently + maxInFlightStates = 8192 // Maximum number of state downloads to allow concurrently ) var ( @@ -52,6 +52,7 @@ var ( // fetchRequest is a currently running data retrieval operation. type fetchRequest struct { Peer *peer // Peer to which the request was sent + From uint64 // [eth/62] Requested chain element index (used for skeleton fills only) Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority) Headers []*types.Header // [eth/62] Requested headers, sorted by request order Time time.Time // Time when the request was made @@ -79,6 +80,18 @@ type queue struct { headerHead common.Hash // [eth/62] Hash of the last queued header to verify order + // Headers are "special", they download in batches, supported by a skeleton chain + headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers + headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for + headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable + headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations + headerDonePool map[uint64]struct{} // [eth/62] Set of the completed header fetches + headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers + headerProced int // [eth/62] Number of headers already processed from the results + headerOffset uint64 // [eth/62] Number of the first header in the result cache + headerContCh chan bool // [eth/62] Channel to notify when header download finishes + + // All data retrievals below are based on an already assembles header chain blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers blockTaskQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations @@ -113,6 +126,8 @@ func newQueue(stateDb ethdb.Database) *queue { return &queue{ hashPool: make(map[common.Hash]int), hashQueue: prque.New(), + headerPendPool: make(map[string]*fetchRequest), + headerContCh: make(chan bool), blockTaskPool: make(map[common.Hash]*types.Header), blockTaskQueue: prque.New(), blockPendPool: make(map[string]*fetchRequest), @@ -149,6 +164,8 @@ func (q *queue) Reset() { q.headerHead = common.Hash{} + q.headerPendPool = make(map[string]*fetchRequest) + q.blockTaskPool = make(map[common.Hash]*types.Header) q.blockTaskQueue.Reset() q.blockPendPool = make(map[string]*fetchRequest) @@ -178,6 +195,14 @@ func (q *queue) Close() { q.active.Broadcast() } +// PendingHeaders retrieves the number of header requests pending for retrieval. +func (q *queue) PendingHeaders() int { + q.lock.Lock() + defer q.lock.Unlock() + + return q.headerTaskQueue.Size() +} + // PendingBlocks retrieves the number of block (body) requests pending for retrieval. func (q *queue) PendingBlocks() int { q.lock.Lock() @@ -205,6 +230,15 @@ func (q *queue) PendingNodeData() int { return 0 } +// InFlightHeaders retrieves whether there are header fetch requests currently +// in flight. +func (q *queue) InFlightHeaders() bool { + q.lock.Lock() + defer q.lock.Unlock() + + return len(q.headerPendPool) > 0 +} + // InFlightBlocks retrieves whether there are block fetch requests currently in // flight. func (q *queue) InFlightBlocks() bool { @@ -317,6 +351,45 @@ func (q *queue) Schedule61(hashes []common.Hash, fifo bool) []common.Hash { return inserts } +// ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill +// up an already retrieved header skeleton. +func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { + q.lock.Lock() + defer q.lock.Unlock() + + // No skeleton retrieval can be in progress, fail hard if so (huge implementation bug) + if q.headerResults != nil { + panic("skeleton assembly already in progress") + } + // Shedule all the header retrieval tasks for the skeleton assembly + q.headerTaskPool = make(map[uint64]*types.Header) + q.headerTaskQueue = prque.New() + q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains + q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch) + q.headerProced = 0 + q.headerOffset = from + q.headerContCh = make(chan bool, 1) + + for i, header := range skeleton { + index := from + uint64(i*MaxHeaderFetch) + + q.headerTaskPool[index] = header + q.headerTaskQueue.Push(index, -float32(index)) + } +} + +// RetrieveHeaders retrieves the header chain assemble based on the scheduled +// skeleton. +func (q *queue) RetrieveHeaders() ([]*types.Header, int) { + q.lock.Lock() + defer q.lock.Unlock() + + headers, proced := q.headerResults, q.headerProced + q.headerResults, q.headerProced = nil, 0 + + return headers, proced +} + // Schedule adds a set of headers for the download queue for scheduling, returning // the new headers encountered. func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { @@ -437,6 +510,46 @@ func (q *queue) countProcessableItems() int { return len(q.resultCache) } +// ReserveHeaders reserves a set of headers for the given peer, skipping any +// previously failed batches. +func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest { + q.lock.Lock() + defer q.lock.Unlock() + + // Short circuit if the peer's already downloading something (sanity check to + // not corrupt state) + if _, ok := q.headerPendPool[p.id]; ok { + return nil + } + // Retrieve a batch of hashes, skipping previously failed ones + send, skip := uint64(0), []uint64{} + for send == 0 && !q.headerTaskQueue.Empty() { + from, _ := q.headerTaskQueue.Pop() + if q.headerPeerMiss[p.id] != nil { + if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok { + skip = append(skip, from.(uint64)) + continue + } + } + send = from.(uint64) + } + // Merge all the skipped batches back + for _, from := range skip { + q.headerTaskQueue.Push(from, -float32(from)) + } + // Assemble and return the block download request + if send == 0 { + return nil + } + request := &fetchRequest{ + Peer: p, + From: send, + Time: time.Now(), + } + q.headerPendPool[p.id] = request + return request +} + // ReserveBlocks reserves a set of block hashes for the given peer, skipping any // previously failed download. func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest { @@ -635,6 +748,11 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ return request, progress, nil } +// CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue. +func (q *queue) CancelHeaders(request *fetchRequest) { + q.cancel(request, q.headerTaskQueue, q.headerPendPool) +} + // CancelBlocks aborts a fetch request, returning all pending hashes to the queue. func (q *queue) CancelBlocks(request *fetchRequest) { q.cancel(request, q.hashQueue, q.blockPendPool) @@ -663,6 +781,9 @@ func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool m q.lock.Lock() defer q.lock.Unlock() + if request.From > 0 { + taskQueue.Push(request.From, -float32(request.From)) + } for hash, index := range request.Hashes { taskQueue.Push(hash, float32(index)) } @@ -702,6 +823,15 @@ func (q *queue) Revoke(peerId string) { } } +// ExpireHeaders checks for in flight requests that exceeded a timeout allowance, +// canceling them and returning the responsible peers for penalisation. +func (q *queue) ExpireHeaders(timeout time.Duration) map[string]int { + q.lock.Lock() + defer q.lock.Unlock() + + return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter) +} + // ExpireBlocks checks for in flight requests that exceeded a timeout allowance, // canceling them and returning the responsible peers for penalisation. func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int { @@ -753,6 +883,9 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, timeoutMeter.Mark(1) // Return any non satisfied requests to the pool + if request.From > 0 { + taskQueue.Push(request.From, -float32(request.From)) + } for hash, index := range request.Hashes { taskQueue.Push(hash, float32(index)) } @@ -842,6 +975,94 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) { } } +// DeliverHeaders injects a header retrieval response into the header results +// cache. This method either accepts all headers it received, or none of them +// if they do not map correctly to the skeleton. +// +// If the headers are accepted, the method makes an attempt to deliver the set +// of ready headers to the processor to keep the pipeline full. However it will +// not block to prevent stalling other pending deliveries. +func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) { + q.lock.Lock() + defer q.lock.Unlock() + + // Short circuit if the data was never requested + request := q.headerPendPool[id] + if request == nil { + return 0, errNoFetchesPending + } + headerReqTimer.UpdateSince(request.Time) + delete(q.headerPendPool, id) + + // Ensure headers can be mapped onto the skeleton chain + target := q.headerTaskPool[request.From].Hash() + + accepted := len(headers) == MaxHeaderFetch + if accepted { + if headers[0].Number.Uint64() != request.From { + glog.V(logger.Detail).Infof("Peer %s: first header #%v [%x] broke chain ordering, expected %d", id, headers[0].Number, headers[0].Hash().Bytes()[:4], request.From) + accepted = false + } else if headers[len(headers)-1].Hash() != target { + glog.V(logger.Detail).Infof("Peer %s: last header #%v [%x] broke skeleton structure, expected %x", id, headers[len(headers)-1].Number, headers[len(headers)-1].Hash().Bytes()[:4], target[:4]) + accepted = false + } + } + if accepted { + for i, header := range headers[1:] { + hash := header.Hash() + if want := request.From + 1 + uint64(i); header.Number.Uint64() != want { + glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ordering, expected %d", id, header.Number, hash[:4], want) + accepted = false + break + } + if headers[i].Hash() != header.ParentHash { + glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ancestry", id, header.Number, hash[:4]) + accepted = false + break + } + } + } + // If the batch of headers wasn't accepted, mark as unavailable + if !accepted { + glog.V(logger.Detail).Infof("Peer %s: skeleton filling from header #%d not accepted", id, request.From) + + miss := q.headerPeerMiss[id] + if miss == nil { + q.headerPeerMiss[id] = make(map[uint64]struct{}) + miss = q.headerPeerMiss[id] + } + miss[request.From] = struct{}{} + + q.headerTaskQueue.Push(request.From, -float32(request.From)) + return 0, errors.New("delivery not accepted") + } + // Clean up a successful fetch and try to deliver any sub-results + copy(q.headerResults[request.From-q.headerOffset:], headers) + delete(q.headerTaskPool, request.From) + + ready := 0 + for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil { + ready += MaxHeaderFetch + } + if ready > 0 { + // Headers are ready for delivery, gather them and push forward (non blocking) + process := make([]*types.Header, ready) + copy(process, q.headerResults[q.headerProced:q.headerProced+ready]) + + select { + case headerProcCh <- process: + glog.V(logger.Detail).Infof("%s: pre-scheduled %d headers from #%v", id, len(process), process[0].Number) + q.headerProced += len(process) + default: + } + } + // Check for termination and return + if len(q.headerTaskPool) == 0 { + q.headerContCh <- false + } + return len(headers), nil +} + // DeliverBodies injects a block body retrieval response into the results queue. // The method returns the number of blocks bodies accepted from the delivery and // also wakes any threads waiting for data delivery. diff --git a/eth/filters/api.go b/eth/filters/api.go index d9bd4d4b7..7278e20b9 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -233,6 +233,7 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo return id, nil } +// Logs creates a subscription that fires for all new log that match the given filter criteria. func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { @@ -291,12 +292,13 @@ type NewFilterArgs struct { Topics [][]common.Hash } +// UnmarshalJSON sets *args fields with given data. func (args *NewFilterArgs) UnmarshalJSON(data []byte) error { type input struct { From *rpc.BlockNumber `json:"fromBlock"` ToBlock *rpc.BlockNumber `json:"toBlock"` Addresses interface{} `json:"address"` - Topics interface{} `json:"topics"` + Topics []interface{} `json:"topics"` } var raw input @@ -321,7 +323,6 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error { if raw.Addresses != nil { // raw.Address can contain a single address or an array of addresses var addresses []common.Address - if strAddrs, ok := raw.Addresses.([]interface{}); ok { for i, addr := range strAddrs { if strAddr, ok := addr.(string); ok { @@ -352,56 +353,53 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error { args.Addresses = addresses } + // helper function which parses a string to a topic hash topicConverter := func(raw string) (common.Hash, error) { if len(raw) == 0 { return common.Hash{}, nil } - if len(raw) >= 2 && raw[0] == '0' && (raw[1] == 'x' || raw[1] == 'X') { raw = raw[2:] } - + if len(raw) != 2 * common.HashLength { + return common.Hash{}, errors.New("invalid topic(s)") + } if decAddr, err := hex.DecodeString(raw); err == nil { return common.BytesToHash(decAddr), nil } - - return common.Hash{}, errors.New("invalid topic given") - } - - // topics is an array consisting of strings or arrays of strings - if raw.Topics != nil { - topics, ok := raw.Topics.([]interface{}) - if ok { - parsedTopics := make([][]common.Hash, len(topics)) - for i, topic := range topics { - if topic == nil { - parsedTopics[i] = []common.Hash{common.StringToHash("")} - } else if strTopic, ok := topic.(string); ok { - if t, err := topicConverter(strTopic); err != nil { - return fmt.Errorf("invalid topic on index %d", i) - } else { - parsedTopics[i] = []common.Hash{t} - } - } else if arrTopic, ok := topic.([]interface{}); ok { - parsedTopics[i] = make([]common.Hash, len(arrTopic)) - for j := 0; j < len(parsedTopics[i]); i++ { - if arrTopic[j] == nil { - parsedTopics[i][j] = common.StringToHash("") - } else if str, ok := arrTopic[j].(string); ok { - if t, err := topicConverter(str); err != nil { - return fmt.Errorf("invalid topic on index %d", i) - } else { - parsedTopics[i] = []common.Hash{t} - } - } else { - return fmt.Errorf("topic[%d][%d] not a string", i, j) + return common.Hash{}, errors.New("invalid topic(s)") + } + + // topics is an array consisting of strings and/or arrays of strings. + // JSON null values are converted to common.Hash{} and ignored by the filter manager. + if len(raw.Topics) > 0 { + args.Topics = make([][]common.Hash, len(raw.Topics)) + for i, t := range raw.Topics { + if t == nil { // ignore topic when matching logs + args.Topics[i] = []common.Hash{common.Hash{}} + } else if topic, ok := t.(string); ok { // match specific topic + top, err := topicConverter(topic) + if err != nil { + return err + } + args.Topics[i] = []common.Hash{top} + } else if topics, ok := t.([]interface{}); ok { // or case e.g. [null, "topic0", "topic1"] + for _, rawTopic := range topics { + if rawTopic == nil { + args.Topics[i] = append(args.Topics[i], common.Hash{}) + } else if topic, ok := rawTopic.(string); ok { + parsed, err := topicConverter(topic) + if err != nil { + return err } + args.Topics[i] = append(args.Topics[i], parsed) + } else { + return fmt.Errorf("invalid topic(s)") } - } else { - return fmt.Errorf("topic[%d] invalid", i) } + } else { + return fmt.Errorf("invalid topic(s)") } - args.Topics = parsedTopics } } diff --git a/eth/filters/api_test.go b/eth/filters/api_test.go new file mode 100644 index 000000000..9e8edc241 --- /dev/null +++ b/eth/filters/api_test.go @@ -0,0 +1,198 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package filters_test + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/eth/filters" + "github.com/ethereum/go-ethereum/rpc" +) + +func TestUnmarshalJSONNewFilterArgs(t *testing.T) { + var ( + fromBlock rpc.BlockNumber = 0x123435 + toBlock rpc.BlockNumber = 0xabcdef + address0 = common.StringToAddress("70c87d191324e6712a591f304b4eedef6ad9bb9d") + address1 = common.StringToAddress("9b2055d370f73ec7d8a03e965129118dc8f5bf83") + topic0 = common.HexToHash("3ac225168df54212a25c1c01fd35bebfea408fdac2e31ddd6f80a4bbf9a5f1ca") + topic1 = common.HexToHash("9084a792d2f8b16a62b882fd56f7860c07bf5fa91dd8a2ae7e809e5180fef0b3") + topic2 = common.HexToHash("6ccae1c4af4152f460ff510e573399795dfab5dcf1fa60d1f33ac8fdc1e480ce") + nullTopic = common.Hash{} + ) + + // default values + var test0 filters.NewFilterArgs + if err := json.Unmarshal([]byte("{}"), &test0); err != nil { + t.Fatal(err) + } + if test0.FromBlock != rpc.LatestBlockNumber { + t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.FromBlock) + } + if test0.ToBlock != rpc.LatestBlockNumber { + t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.ToBlock) + } + if len(test0.Addresses) != 0 { + t.Fatalf("expected 0 addresses, got %d", len(test0.Addresses)) + } + if len(test0.Topics) != 0 { + t.Fatalf("expected 0 topics, got %d topics", len(test0.Topics)) + } + + // from, to block number + var test1 filters.NewFilterArgs + vector := fmt.Sprintf(`{"fromBlock":"0x%x","toBlock":"0x%x"}`, fromBlock, toBlock) + if err := json.Unmarshal([]byte(vector), &test1); err != nil { + t.Fatal(err) + } + if test1.FromBlock != fromBlock { + t.Fatalf("expected FromBlock %d, got %d", fromBlock, test1.FromBlock) + } + if test1.ToBlock != toBlock { + t.Fatalf("expected ToBlock %d, got %d", toBlock, test1.ToBlock) + } + + // single address + var test2 filters.NewFilterArgs + vector = fmt.Sprintf(`{"address": "%s"}`, address0.Hex()) + if err := json.Unmarshal([]byte(vector), &test2); err != nil { + t.Fatal(err) + } + if len(test2.Addresses) != 1 { + t.Fatalf("expected 1 address, got %d address(es)", len(test2.Addresses)) + } + if test2.Addresses[0] != address0 { + t.Fatalf("expected address %x, got %x", address0, test2.Addresses[0]) + } + + // multiple address + var test3 filters.NewFilterArgs + vector = fmt.Sprintf(`{"address": ["%s", "%s"]}`, address0.Hex(), address1.Hex()) + if err := json.Unmarshal([]byte(vector), &test3); err != nil { + t.Fatal(err) + } + if len(test3.Addresses) != 2 { + t.Fatalf("expected 2 addresses, got %d address(es)", len(test3.Addresses)) + } + if test3.Addresses[0] != address0 { + t.Fatalf("expected address %x, got %x", address0, test3.Addresses[0]) + } + if test3.Addresses[1] != address1 { + t.Fatalf("expected address %x, got %x", address1, test3.Addresses[1]) + } + + // single topic + var test4 filters.NewFilterArgs + vector = fmt.Sprintf(`{"topics": ["%s"]}`, topic0.Hex()) + if err := json.Unmarshal([]byte(vector), &test4); err != nil { + t.Fatal(err) + } + if len(test4.Topics) != 1 { + t.Fatalf("expected 1 topic, got %d", len(test4.Topics)) + } + if len(test4.Topics[0]) != 1 { + t.Fatalf("expected len(topics[0]) to be 1, got %d", len(test4.Topics[0])) + } + if test4.Topics[0][0] != topic0 { + t.Fatalf("got %x, expected %x", test4.Topics[0][0], topic0) + } + + // test multiple "AND" topics + var test5 filters.NewFilterArgs + vector = fmt.Sprintf(`{"topics": ["%s", "%s"]}`, topic0.Hex(), topic1.Hex()) + if err := json.Unmarshal([]byte(vector), &test5); err != nil { + t.Fatal(err) + } + if len(test5.Topics) != 2 { + t.Fatalf("expected 2 topics, got %d", len(test5.Topics)) + } + if len(test5.Topics[0]) != 1 { + t.Fatalf("expected 1 topic, got %d", len(test5.Topics[0])) + } + if test5.Topics[0][0] != topic0 { + t.Fatalf("got %x, expected %x", test5.Topics[0][0], topic0) + } + if len(test5.Topics[1]) != 1 { + t.Fatalf("expected 1 topic, got %d", len(test5.Topics[1])) + } + if test5.Topics[1][0] != topic1 { + t.Fatalf("got %x, expected %x", test5.Topics[1][0], topic1) + } + + // test optional topic + var test6 filters.NewFilterArgs + vector = fmt.Sprintf(`{"topics": ["%s", null, "%s"]}`, topic0.Hex(), topic2.Hex()) + if err := json.Unmarshal([]byte(vector), &test6); err != nil { + t.Fatal(err) + } + if len(test6.Topics) != 3 { + t.Fatalf("expected 3 topics, got %d", len(test6.Topics)) + } + if len(test6.Topics[0]) != 1 { + t.Fatalf("expected 1 topic, got %d", len(test6.Topics[0])) + } + if test6.Topics[0][0] != topic0 { + t.Fatalf("got %x, expected %x", test6.Topics[0][0], topic0) + } + if len(test6.Topics[1]) != 1 { + t.Fatalf("expected 1 topic, got %d", len(test6.Topics[1])) + } + if test6.Topics[1][0] != nullTopic { + t.Fatalf("got %x, expected empty hash", test6.Topics[1][0]) + } + if len(test6.Topics[2]) != 1 { + t.Fatalf("expected 1 topic, got %d", len(test6.Topics[2])) + } + if test6.Topics[2][0] != topic2 { + t.Fatalf("got %x, expected %x", test6.Topics[2][0], topic2) + } + + // test OR topics + var test7 filters.NewFilterArgs + vector = fmt.Sprintf(`{"topics": [["%s", "%s"], null, ["%s", null]]}`, topic0.Hex(), topic1.Hex(), topic2.Hex()) + if err := json.Unmarshal([]byte(vector), &test7); err != nil { + t.Fatal(err) + } + if len(test7.Topics) != 3 { + t.Fatalf("expected 3 topics, got %d topics", len(test7.Topics)) + } + if len(test7.Topics[0]) != 2 { + t.Fatalf("expected 2 topics, got %d topics", len(test7.Topics[0])) + } + if test7.Topics[0][0] != topic0 || test7.Topics[0][1] != topic1 { + t.Fatalf("invalid topics expected [%x,%x], got [%x,%x]", + topic0, topic1, test7.Topics[0][0], test7.Topics[0][1], + ) + } + if len(test7.Topics[1]) != 1 { + t.Fatalf("expected 1 topic, got %d topics", len(test7.Topics[1])) + } + if test7.Topics[1][0] != nullTopic { + t.Fatalf("expected empty hash, got %x", test7.Topics[1][0]) + } + if len(test7.Topics[2]) != 2 { + t.Fatalf("expected 2 topics, got %d topics", len(test7.Topics[2])) + } + if test7.Topics[2][0] != topic2 || test7.Topics[2][1] != nullTopic { + t.Fatalf("invalid topics expected [%x,%x], got [%x,%x]", + topic2, nullTopic, test7.Topics[2][0], test7.Topics[2][1], + ) + } +} |