diff options
69 files changed, 1120 insertions, 944 deletions
diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 5a717da00..f7f6916c5 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -9,23 +9,26 @@ les/ @zsfelfoldi light/ @zsfelfoldi mobile/ @karalabe p2p/ @fjl @zsfelfoldi +p2p/simulations @lmars +p2p/protocols @zelig +swarm/api/http @justelad swarm/bmt @zelig swarm/dev @lmars swarm/fuse @jmozah @holisticode swarm/grafana_dashboards @nonsense swarm/metrics @nonsense @holisticode swarm/multihash @nolash -swarm/network/bitvector @zelig @janos @gbalint -swarm/network/priorityqueue @zelig @janos @gbalint -swarm/network/simulations @zelig -swarm/network/stream @janos @zelig @gbalint @holisticode @justelad +swarm/network/bitvector @zelig @janos +swarm/network/priorityqueue @zelig @janos +swarm/network/simulations @zelig @janos +swarm/network/stream @janos @zelig @holisticode @justelad swarm/network/stream/intervals @janos swarm/network/stream/testing @zelig swarm/pot @zelig swarm/pss @nolash @zelig @nonsense swarm/services @zelig swarm/state @justelad -swarm/storage/encryption @gbalint @zelig @nagydani +swarm/storage/encryption @zelig @nagydani swarm/storage/mock @janos swarm/storage/feed @nolash @jpeletier swarm/testutil @lmars diff --git a/.travis.yml b/.travis.yml index 69535b7ef..c1cc7c4aa 100644 --- a/.travis.yml +++ b/.travis.yml @@ -148,7 +148,7 @@ matrix: git: submodules: false # avoid cloning ethereum/tests before_install: - - curl https://storage.googleapis.com/golang/go1.11.1.linux-amd64.tar.gz | tar -xz + - curl https://storage.googleapis.com/golang/go1.11.2.linux-amd64.tar.gz | tar -xz - export PATH=`pwd`/go/bin:$PATH - export GOROOT=`pwd`/go - export GOPATH=$HOME/go diff --git a/appveyor.yml b/appveyor.yml index 11848ddb9..e5126b252 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -23,8 +23,8 @@ environment: install: - git submodule update --init - rmdir C:\go /s /q - - appveyor DownloadFile https://storage.googleapis.com/golang/go1.11.1.windows-%GETH_ARCH%.zip - - 7z x go1.11.1.windows-%GETH_ARCH%.zip -y -oC:\ > NUL + - appveyor DownloadFile https://storage.googleapis.com/golang/go1.11.2.windows-%GETH_ARCH%.zip + - 7z x go1.11.2.windows-%GETH_ARCH%.zip -y -oC:\ > NUL - go version - gcc --version diff --git a/cmd/swarm/access.go b/cmd/swarm/access.go index 629781edd..072541b65 100644 --- a/cmd/swarm/access.go +++ b/cmd/swarm/access.go @@ -114,6 +114,9 @@ func accessNewPass(ctx *cli.Context) { utils.Fatalf("error getting session key: %v", err) } m, err := api.GenerateAccessControlManifest(ctx, ref, accessKey, ae) + if err != nil { + utils.Fatalf("had an error generating the manifest: %v", err) + } if dryRun { err = printManifests(m, nil) if err != nil { @@ -147,6 +150,9 @@ func accessNewPK(ctx *cli.Context) { utils.Fatalf("error getting session key: %v", err) } m, err := api.GenerateAccessControlManifest(ctx, ref, sessionKey, ae) + if err != nil { + utils.Fatalf("had an error generating the manifest: %v", err) + } if dryRun { err = printManifests(m, nil) if err != nil { diff --git a/cmd/swarm/config.go b/cmd/swarm/config.go index 16001010d..3eea3057b 100644 --- a/cmd/swarm/config.go +++ b/cmd/swarm/config.go @@ -80,6 +80,7 @@ const ( SWARM_ENV_STORE_CAPACITY = "SWARM_STORE_CAPACITY" SWARM_ENV_STORE_CACHE_CAPACITY = "SWARM_STORE_CACHE_CAPACITY" SWARM_ACCESS_PASSWORD = "SWARM_ACCESS_PASSWORD" + SWARM_AUTO_DEFAULTPATH = "SWARM_AUTO_DEFAULTPATH" GETH_ENV_DATADIR = "GETH_DATADIR" ) diff --git a/cmd/swarm/fs_test.go b/cmd/swarm/fs_test.go index 4f38b094b..3b722515e 100644 --- a/cmd/swarm/fs_test.go +++ b/cmd/swarm/fs_test.go @@ -80,6 +80,9 @@ func TestCLISwarmFs(t *testing.T) { t.Fatal(err) } dirPath2, err := createDirInDir(dirPath, "AnotherTestSubDir") + if err != nil { + t.Fatal(err) + } dummyContent := "somerandomtestcontentthatshouldbeasserted" dirs := []string{ diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index c7a1475d6..1371d6654 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -13,16 +13,13 @@ import ( "sync" "time" - "github.com/pborman/uuid" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/multihash" "github.com/ethereum/go-ethereum/swarm/storage/feed" - colorable "github.com/mattn/go-colorable" - + "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) @@ -190,7 +187,7 @@ func cliFeedUploadAndSync(c *cli.Context) error { for _, hex := range []string{topicHex, subTopicOnlyHex, mergedSubTopicHex} { wg.Add(1) ruid := uuid.New()[:8] - go func(endpoint string, ruid string) { + go func(hex string, endpoint string, ruid string) { for { err := fetchFeed(hex, userHex, endpoint, dataHash, ruid) if err != nil { @@ -200,7 +197,7 @@ func cliFeedUploadAndSync(c *cli.Context) error { wg.Done() return } - }(endpoint, ruid) + }(hex, endpoint, ruid) } } @@ -268,7 +265,7 @@ func cliFeedUploadAndSync(c *cli.Context) error { for _, url := range []string{manifestWithTopic, manifestWithSubTopic, manifestWithMergedTopic} { wg.Add(1) ruid := uuid.New()[:8] - go func(endpoint string, ruid string) { + go func(url string, endpoint string, ruid string) { for { err := fetch(url, endpoint, fileHash, ruid) if err != nil { @@ -278,7 +275,7 @@ func cliFeedUploadAndSync(c *cli.Context) error { wg.Done() return } - }(endpoint, ruid) + }(url, endpoint, ruid) } } diff --git a/cmd/swarm/upload.go b/cmd/swarm/upload.go index 0dbe896e2..992f2d6e9 100644 --- a/cmd/swarm/upload.go +++ b/cmd/swarm/upload.go @@ -26,8 +26,10 @@ import ( "os/user" "path" "path/filepath" + "strconv" "strings" + "github.com/ethereum/go-ethereum/log" swarm "github.com/ethereum/go-ethereum/swarm/api/client" "github.com/ethereum/go-ethereum/cmd/utils" @@ -47,17 +49,24 @@ var upCommand = cli.Command{ func upload(ctx *cli.Context) { args := ctx.Args() var ( - bzzapi = strings.TrimRight(ctx.GlobalString(SwarmApiFlag.Name), "/") - recursive = ctx.GlobalBool(SwarmRecursiveFlag.Name) - wantManifest = ctx.GlobalBoolT(SwarmWantManifestFlag.Name) - defaultPath = ctx.GlobalString(SwarmUploadDefaultPath.Name) - fromStdin = ctx.GlobalBool(SwarmUpFromStdinFlag.Name) - mimeType = ctx.GlobalString(SwarmUploadMimeType.Name) - client = swarm.NewClient(bzzapi) - toEncrypt = ctx.Bool(SwarmEncryptedFlag.Name) - file string + bzzapi = strings.TrimRight(ctx.GlobalString(SwarmApiFlag.Name), "/") + recursive = ctx.GlobalBool(SwarmRecursiveFlag.Name) + wantManifest = ctx.GlobalBoolT(SwarmWantManifestFlag.Name) + defaultPath = ctx.GlobalString(SwarmUploadDefaultPath.Name) + fromStdin = ctx.GlobalBool(SwarmUpFromStdinFlag.Name) + mimeType = ctx.GlobalString(SwarmUploadMimeType.Name) + client = swarm.NewClient(bzzapi) + toEncrypt = ctx.Bool(SwarmEncryptedFlag.Name) + autoDefaultPath = false + file string ) - + if autoDefaultPathString := os.Getenv(SWARM_AUTO_DEFAULTPATH); autoDefaultPathString != "" { + b, err := strconv.ParseBool(autoDefaultPathString) + if err != nil { + utils.Fatalf("invalid environment variable %s: %v", SWARM_AUTO_DEFAULTPATH, err) + } + autoDefaultPath = b + } if len(args) != 1 { if fromStdin { tmp, err := ioutil.TempFile("", "swarm-stdin") @@ -106,6 +115,15 @@ func upload(ctx *cli.Context) { if !recursive { return "", errors.New("Argument is a directory and recursive upload is disabled") } + if autoDefaultPath && defaultPath == "" { + defaultEntryCandidate := path.Join(file, "index.html") + log.Debug("trying to find default path", "path", defaultEntryCandidate) + defaultEntryStat, err := os.Stat(defaultEntryCandidate) + if err == nil && !defaultEntryStat.IsDir() { + log.Debug("setting auto detected default path", "path", defaultEntryCandidate) + defaultPath = defaultEntryCandidate + } + } if defaultPath != "" { // construct absolute default path absDefaultPath, _ := filepath.Abs(defaultPath) diff --git a/cmd/swarm/upload_test.go b/cmd/swarm/upload_test.go index 0ac2456a5..ba4463e8b 100644 --- a/cmd/swarm/upload_test.go +++ b/cmd/swarm/upload_test.go @@ -243,8 +243,7 @@ func testCLISwarmUpRecursive(toEncrypt bool, t *testing.T) { } defer os.RemoveAll(tmpDownload) bzzLocator := "bzz:/" + hash - flagss := []string{} - flagss = []string{ + flagss := []string{ "--bzzapi", cluster.Nodes[0].URL, "down", "--recursive", diff --git a/common/compiler/solidity.go b/common/compiler/solidity.go index f6e8d2e42..b7c8ec563 100644 --- a/common/compiler/solidity.go +++ b/common/compiler/solidity.go @@ -31,14 +31,15 @@ import ( var versionRegexp = regexp.MustCompile(`([0-9]+)\.([0-9]+)\.([0-9]+)`) -// Contract contains information about a compiled contract, alongside its code. +// Contract contains information about a compiled contract, alongside its code and runtime code. type Contract struct { - Code string `json:"code"` - Info ContractInfo `json:"info"` + Code string `json:"code"` + RuntimeCode string `json:"runtime-code"` + Info ContractInfo `json:"info"` } // ContractInfo contains information about a compiled contract, including access -// to the ABI definition, user and developer docs, and metadata. +// to the ABI definition, source mapping, user and developer docs, and metadata. // // Depending on the source, language version, compiler version, and compiler // options will provide information about how the contract was compiled. @@ -48,6 +49,8 @@ type ContractInfo struct { LanguageVersion string `json:"languageVersion"` CompilerVersion string `json:"compilerVersion"` CompilerOptions string `json:"compilerOptions"` + SrcMap string `json:"srcMap"` + SrcMapRuntime string `json:"srcMapRuntime"` AbiDefinition interface{} `json:"abiDefinition"` UserDoc interface{} `json:"userDoc"` DeveloperDoc interface{} `json:"developerDoc"` @@ -63,14 +66,16 @@ type Solidity struct { // --combined-output format type solcOutput struct { Contracts map[string]struct { - Bin, Abi, Devdoc, Userdoc, Metadata string + BinRuntime string `json:"bin-runtime"` + SrcMapRuntime string `json:"srcmap-runtime"` + Bin, SrcMap, Abi, Devdoc, Userdoc, Metadata string } Version string } func (s *Solidity) makeArgs() []string { p := []string{ - "--combined-json", "bin,abi,userdoc,devdoc", + "--combined-json", "bin,bin-runtime,srcmap,srcmap-runtime,abi,userdoc,devdoc", "--optimize", // code optimizer switched on } if s.Major > 0 || s.Minor > 4 || s.Patch > 6 { @@ -157,7 +162,7 @@ func (s *Solidity) run(cmd *exec.Cmd, source string) (map[string]*Contract, erro // provided source, language and compiler version, and compiler options are all // passed through into the Contract structs. // -// The solc output is expected to contain ABI, user docs, and dev docs. +// The solc output is expected to contain ABI, source mapping, user docs, and dev docs. // // Returns an error if the JSON is malformed or missing data, or if the JSON // embedded within the JSON is malformed. @@ -184,13 +189,16 @@ func ParseCombinedJSON(combinedJSON []byte, source string, languageVersion strin return nil, fmt.Errorf("solc: error reading dev doc: %v", err) } contracts[name] = &Contract{ - Code: "0x" + info.Bin, + Code: "0x" + info.Bin, + RuntimeCode: "0x" + info.BinRuntime, Info: ContractInfo{ Source: source, Language: "Solidity", LanguageVersion: languageVersion, CompilerVersion: compilerVersion, CompilerOptions: compilerOptions, + SrcMap: info.SrcMap, + SrcMapRuntime: info.SrcMapRuntime, AbiDefinition: abi, UserDoc: userdoc, DeveloperDoc: devdoc, diff --git a/core/block_validator.go b/core/block_validator.go index 1329f6242..3b9496fec 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -53,12 +53,6 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { if v.bc.HasBlockAndState(block.Hash(), block.NumberU64()) { return ErrKnownBlock } - if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) { - if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) { - return consensus.ErrUnknownAncestor - } - return consensus.ErrPrunedAncestor - } // Header validity is known at this point, check the uncles and transactions header := block.Header() if err := v.engine.VerifyUncles(v.bc, block); err != nil { @@ -70,6 +64,12 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { if hash := types.DeriveSha(block.Transactions()); hash != header.TxHash { return fmt.Errorf("transaction root hash mismatch: have %x, want %x", hash, header.TxHash) } + if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) { + if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) { + return consensus.ErrUnknownAncestor + } + return consensus.ErrPrunedAncestor + } return nil } diff --git a/core/chain_makers.go b/core/chain_makers.go index 0bc453fdf..e3a5537a4 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -33,12 +33,11 @@ import ( // BlockGen creates blocks for testing. // See GenerateChain for a detailed explanation. type BlockGen struct { - i int - parent *types.Block - chain []*types.Block - chainReader consensus.ChainReader - header *types.Header - statedb *state.StateDB + i int + parent *types.Block + chain []*types.Block + header *types.Header + statedb *state.StateDB gasPool *GasPool txs []*types.Transaction @@ -138,7 +137,7 @@ func (b *BlockGen) AddUncle(h *types.Header) { // For index -1, PrevBlock returns the parent block given to GenerateChain. func (b *BlockGen) PrevBlock(index int) *types.Block { if index >= b.i { - panic("block index out of range") + panic(fmt.Errorf("block index %d out of range (%d,%d)", index, -1, b.i)) } if index == -1 { return b.parent @@ -154,7 +153,8 @@ func (b *BlockGen) OffsetTime(seconds int64) { if b.header.Time.Cmp(b.parent.Header().Time) <= 0 { panic("block time out of range") } - b.header.Difficulty = b.engine.CalcDifficulty(b.chainReader, b.header.Time.Uint64(), b.parent.Header()) + chainreader := &fakeChainReader{config: b.config} + b.header.Difficulty = b.engine.CalcDifficulty(chainreader, b.header.Time.Uint64(), b.parent.Header()) } // GenerateChain creates a chain of n blocks. The first block's @@ -174,14 +174,10 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse config = params.TestChainConfig } blocks, receipts := make(types.Blocks, n), make([]types.Receipts, n) + chainreader := &fakeChainReader{config: config} genblock := func(i int, parent *types.Block, statedb *state.StateDB) (*types.Block, types.Receipts) { - // TODO(karalabe): This is needed for clique, which depends on multiple blocks. - // It's nonetheless ugly to spin up a blockchain here. Get rid of this somehow. - blockchain, _ := NewBlockChain(db, nil, config, engine, vm.Config{}, nil) - defer blockchain.Stop() - - b := &BlockGen{i: i, parent: parent, chain: blocks, chainReader: blockchain, statedb: statedb, config: config, engine: engine} - b.header = makeHeader(b.chainReader, parent, statedb, b.engine) + b := &BlockGen{i: i, chain: blocks, parent: parent, statedb: statedb, config: config, engine: engine} + b.header = makeHeader(chainreader, parent, statedb, b.engine) // Mutate the state and block according to any hard-fork specs if daoBlock := config.DAOForkBlock; daoBlock != nil { @@ -201,7 +197,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse } if b.engine != nil { // Finalize and seal the block - block, _ := b.engine.Finalize(b.chainReader, b.header, statedb, b.txs, b.uncles, b.receipts) + block, _ := b.engine.Finalize(chainreader, b.header, statedb, b.txs, b.uncles, b.receipts) // Write state changes to db root, err := statedb.Commit(config.IsEIP158(b.header.Number)) @@ -269,3 +265,19 @@ func makeBlockChain(parent *types.Block, n int, engine consensus.Engine, db ethd }) return blocks } + +type fakeChainReader struct { + config *params.ChainConfig + genesis *types.Block +} + +// Config returns the chain configuration. +func (cr *fakeChainReader) Config() *params.ChainConfig { + return cr.config +} + +func (cr *fakeChainReader) CurrentHeader() *types.Header { return nil } +func (cr *fakeChainReader) GetHeaderByNumber(number uint64) *types.Header { return nil } +func (cr *fakeChainReader) GetHeaderByHash(hash common.Hash) *types.Header { return nil } +func (cr *fakeChainReader) GetHeader(hash common.Hash, number uint64) *types.Header { return nil } +func (cr *fakeChainReader) GetBlock(hash common.Hash, number uint64) *types.Block { return nil } diff --git a/core/vm/logger_test.go b/core/vm/logger_test.go index cba7c7a0e..2ea7535a7 100644 --- a/core/vm/logger_test.go +++ b/core/vm/logger_test.go @@ -46,7 +46,7 @@ type dummyStatedb struct { state.StateDB } -func (dummyStatedb) GetRefund() uint64 { return 1337 } +func (*dummyStatedb) GetRefund() uint64 { return 1337 } func TestStoreCapture(t *testing.T) { var ( diff --git a/eth/config.go b/eth/config.go index efbaafb6a..e32c01a73 100644 --- a/eth/config.go +++ b/eth/config.go @@ -122,7 +122,7 @@ type Config struct { // Miscellaneous options DocRoot string `toml:"-"` - // Type of the EWASM interpreter ("" for detault) + // Type of the EWASM interpreter ("" for default) EWASMInterpreter string // Type of the EVM interpreter ("" for default) EVMInterpreter string diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f01a8fdbd..56c54c8ed 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -740,6 +740,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err return 0, errBadPeer } start = check + hash = h case <-timeout: p.log.Debug("Waiting for search header timed out", "elapsed", ttl) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index dad626e89..1fe02d884 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -25,22 +25,14 @@ import ( "testing" "time" + ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/consensus/ethash" - "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" ) -var ( - testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - testAddress = crypto.PubkeyToAddress(testKey.PublicKey) -) - // Reduce some of the parameters to make the tester faster. func init() { MaxForkAncestry = uint64(10000) @@ -55,6 +47,7 @@ type downloadTester struct { genesis *types.Block // Genesis blocks used by the tester and peers stateDb ethdb.Database // Database used by the tester for syncing from peers peerDb ethdb.Database // Database of the peers containing all data + peers map[string]*downloadTesterPeer ownHashes []common.Hash // Hash chain belonging to the tester ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester @@ -62,129 +55,27 @@ type downloadTester struct { ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain - peerHashes map[string][]common.Hash // Hash chain belonging to different test peers - peerHeaders map[string]map[common.Hash]*types.Header // Headers belonging to different test peers - peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers - peerReceipts map[string]map[common.Hash]types.Receipts // Receipts belonging to different test peers - peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains - - peerMissingStates map[string]map[common.Hash]bool // State entries that fast sync should not return - lock sync.RWMutex } // newTester creates a new downloader test mocker. func newTester() *downloadTester { - testdb := ethdb.NewMemDatabase() - genesis := core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000)) - tester := &downloadTester{ - genesis: genesis, - peerDb: testdb, - ownHashes: []common.Hash{genesis.Hash()}, - ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()}, - ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis}, - ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): nil}, - ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()}, - peerHashes: make(map[string][]common.Hash), - peerHeaders: make(map[string]map[common.Hash]*types.Header), - peerBlocks: make(map[string]map[common.Hash]*types.Block), - peerReceipts: make(map[string]map[common.Hash]types.Receipts), - peerChainTds: make(map[string]map[common.Hash]*big.Int), - peerMissingStates: make(map[string]map[common.Hash]bool), + genesis: testGenesis, + peerDb: testDB, + peers: make(map[string]*downloadTesterPeer), + ownHashes: []common.Hash{testGenesis.Hash()}, + ownHeaders: map[common.Hash]*types.Header{testGenesis.Hash(): testGenesis.Header()}, + ownBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis}, + ownReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil}, + ownChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()}, } tester.stateDb = ethdb.NewMemDatabase() - tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00}) - + tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00}) tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer) - return tester } -// makeChain creates a chain of n blocks starting at and including parent. -// the returned hash chain is ordered head->parent. In addition, every 3rd block -// contains a transaction and every 5th an uncle to allow testing correct block -// reassembly. -func (dl *downloadTester) makeChain(n int, seed byte, parent *types.Block, parentReceipts types.Receipts, heavy bool) ([]common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]types.Receipts) { - // Generate the block chain - blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), dl.peerDb, n, func(i int, block *core.BlockGen) { - block.SetCoinbase(common.Address{seed}) - - // If a heavy chain is requested, delay blocks to raise difficulty - if heavy { - block.OffsetTime(-1) - } - // If the block number is multiple of 3, send a bonus transaction to the miner - if parent == dl.genesis && i%3 == 0 { - signer := types.MakeSigner(params.TestChainConfig, block.Number()) - tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil), signer, testKey) - if err != nil { - panic(err) - } - block.AddTx(tx) - } - // If the block number is a multiple of 5, add a bonus uncle to the block - if i > 0 && i%5 == 0 { - block.AddUncle(&types.Header{ - ParentHash: block.PrevBlock(i - 1).Hash(), - Number: big.NewInt(block.Number().Int64() - 1), - }) - } - }) - // Convert the block-chain into a hash-chain and header/block maps - hashes := make([]common.Hash, n+1) - hashes[len(hashes)-1] = parent.Hash() - - headerm := make(map[common.Hash]*types.Header, n+1) - headerm[parent.Hash()] = parent.Header() - - blockm := make(map[common.Hash]*types.Block, n+1) - blockm[parent.Hash()] = parent - - receiptm := make(map[common.Hash]types.Receipts, n+1) - receiptm[parent.Hash()] = parentReceipts - - for i, b := range blocks { - hashes[len(hashes)-i-2] = b.Hash() - headerm[b.Hash()] = b.Header() - blockm[b.Hash()] = b - receiptm[b.Hash()] = receipts[i] - } - return hashes, headerm, blockm, receiptm -} - -// makeChainFork creates two chains of length n, such that h1[:f] and -// h2[:f] are different but have a common suffix of length n-f. -func (dl *downloadTester) makeChainFork(n, f int, parent *types.Block, parentReceipts types.Receipts, balanced bool) ([]common.Hash, []common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]*types.Block, map[common.Hash]types.Receipts, map[common.Hash]types.Receipts) { - // Create the common suffix - hashes, headers, blocks, receipts := dl.makeChain(n-f, 0, parent, parentReceipts, false) - - // Create the forks, making the second heavier if non balanced forks were requested - hashes1, headers1, blocks1, receipts1 := dl.makeChain(f, 1, blocks[hashes[0]], receipts[hashes[0]], false) - hashes1 = append(hashes1, hashes[1:]...) - - heavy := false - if !balanced { - heavy = true - } - hashes2, headers2, blocks2, receipts2 := dl.makeChain(f, 2, blocks[hashes[0]], receipts[hashes[0]], heavy) - hashes2 = append(hashes2, hashes[1:]...) - - for hash, header := range headers { - headers1[hash] = header - headers2[hash] = header - } - for hash, block := range blocks { - blocks1[hash] = block - blocks2[hash] = block - } - for hash, receipt := range receipts { - receipts1[hash] = receipt - receipts2[hash] = receipt - } - return hashes1, hashes2, headers1, headers2, blocks1, blocks2, receipts1, receipts2 -} - // terminate aborts any operations on the embedded downloader and releases all // held resources. func (dl *downloadTester) terminate() { @@ -194,13 +85,10 @@ func (dl *downloadTester) terminate() { // sync starts synchronizing with a remote peer, blocking until it completes. func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error { dl.lock.RLock() - hash := dl.peerHashes[id][0] + hash := dl.peers[id].chain.headBlock().Hash() // If no particular TD was requested, load from the peer's blockchain if td == nil { - td = big.NewInt(1) - if diff, ok := dl.peerChainTds[id][hash]; ok { - td = diff - } + td = dl.peers[id].chain.td(hash) } dl.lock.RUnlock() @@ -302,7 +190,7 @@ func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int { } // InsertHeaderChain injects a new batch of headers into the simulated chain. -func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (int, error) { +func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (i int, err error) { dl.lock.Lock() defer dl.lock.Unlock() @@ -331,7 +219,7 @@ func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq i } // InsertChain injects a new batch of blocks into the simulated chain. -func (dl *downloadTester) InsertChain(blocks types.Blocks) (int, error) { +func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) { dl.lock.Lock() defer dl.lock.Unlock() @@ -353,7 +241,7 @@ func (dl *downloadTester) InsertChain(blocks types.Blocks) (int, error) { } // InsertReceiptChain injects a new batch of receipts into the simulated chain. -func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts) (int, error) { +func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts) (i int, err error) { dl.lock.Lock() defer dl.lock.Unlock() @@ -387,60 +275,13 @@ func (dl *downloadTester) Rollback(hashes []common.Hash) { } // newPeer registers a new block download source into the downloader. -func (dl *downloadTester) newPeer(id string, version int, hashes []common.Hash, headers map[common.Hash]*types.Header, blocks map[common.Hash]*types.Block, receipts map[common.Hash]types.Receipts) error { - return dl.newSlowPeer(id, version, hashes, headers, blocks, receipts, 0) -} - -// newSlowPeer registers a new block download source into the downloader, with a -// specific delay time on processing the network packets sent to it, simulating -// potentially slow network IO. -func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Hash, headers map[common.Hash]*types.Header, blocks map[common.Hash]*types.Block, receipts map[common.Hash]types.Receipts, delay time.Duration) error { +func (dl *downloadTester) newPeer(id string, version int, chain *testChain) error { dl.lock.Lock() defer dl.lock.Unlock() - var err = dl.downloader.RegisterPeer(id, version, &downloadTesterPeer{dl: dl, id: id, delay: delay}) - if err == nil { - // Assign the owned hashes, headers and blocks to the peer (deep copy) - dl.peerHashes[id] = make([]common.Hash, len(hashes)) - copy(dl.peerHashes[id], hashes) - - dl.peerHeaders[id] = make(map[common.Hash]*types.Header) - dl.peerBlocks[id] = make(map[common.Hash]*types.Block) - dl.peerReceipts[id] = make(map[common.Hash]types.Receipts) - dl.peerChainTds[id] = make(map[common.Hash]*big.Int) - dl.peerMissingStates[id] = make(map[common.Hash]bool) - - genesis := hashes[len(hashes)-1] - if header := headers[genesis]; header != nil { - dl.peerHeaders[id][genesis] = header - dl.peerChainTds[id][genesis] = header.Difficulty - } - if block := blocks[genesis]; block != nil { - dl.peerBlocks[id][genesis] = block - dl.peerChainTds[id][genesis] = block.Difficulty() - } - - for i := len(hashes) - 2; i >= 0; i-- { - hash := hashes[i] - - if header, ok := headers[hash]; ok { - dl.peerHeaders[id][hash] = header - if _, ok := dl.peerHeaders[id][header.ParentHash]; ok { - dl.peerChainTds[id][hash] = new(big.Int).Add(header.Difficulty, dl.peerChainTds[id][header.ParentHash]) - } - } - if block, ok := blocks[hash]; ok { - dl.peerBlocks[id][hash] = block - if _, ok := dl.peerBlocks[id][block.ParentHash()]; ok { - dl.peerChainTds[id][hash] = new(big.Int).Add(block.Difficulty(), dl.peerChainTds[id][block.ParentHash()]) - } - } - if receipt, ok := receipts[hash]; ok { - dl.peerReceipts[id][hash] = receipt - } - } - } - return err + peer := &downloadTesterPeer{dl: dl, id: id, chain: chain} + dl.peers[id] = peer + return dl.downloader.RegisterPeer(id, version, peer) } // dropPeer simulates a hard peer removal from the connection pool. @@ -448,89 +289,48 @@ func (dl *downloadTester) dropPeer(id string) { dl.lock.Lock() defer dl.lock.Unlock() - delete(dl.peerHashes, id) - delete(dl.peerHeaders, id) - delete(dl.peerBlocks, id) - delete(dl.peerChainTds, id) - + delete(dl.peers, id) dl.downloader.UnregisterPeer(id) } type downloadTesterPeer struct { - dl *downloadTester - id string - delay time.Duration - lock sync.RWMutex -} - -// setDelay is a thread safe setter for the network delay value. -func (dlp *downloadTesterPeer) setDelay(delay time.Duration) { - dlp.lock.Lock() - defer dlp.lock.Unlock() - - dlp.delay = delay -} - -// waitDelay is a thread safe way to sleep for the configured time. -func (dlp *downloadTesterPeer) waitDelay() { - dlp.lock.RLock() - delay := dlp.delay - dlp.lock.RUnlock() - - time.Sleep(delay) + dl *downloadTester + id string + lock sync.RWMutex + chain *testChain + missingStates map[common.Hash]bool // State entries that fast sync should not return } // Head constructs a function to retrieve a peer's current head hash // and total difficulty. func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) { - dlp.dl.lock.RLock() - defer dlp.dl.lock.RUnlock() - - return dlp.dl.peerHashes[dlp.id][0], nil + b := dlp.chain.headBlock() + return b.Hash(), dlp.chain.td(b.Hash()) } // RequestHeadersByHash constructs a GetBlockHeaders function based on a hashed // origin; associated with a particular peer in the download tester. The returned // function can be used to retrieve batches of headers from the particular peer. func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error { - // Find the canonical number of the hash - dlp.dl.lock.RLock() - number := uint64(0) - for num, hash := range dlp.dl.peerHashes[dlp.id] { - if hash == origin { - number = uint64(len(dlp.dl.peerHashes[dlp.id]) - num - 1) - break - } + if reverse { + panic("reverse header requests not supported") } - dlp.dl.lock.RUnlock() - // Use the absolute header fetcher to satisfy the query - return dlp.RequestHeadersByNumber(number, amount, skip, reverse) + result := dlp.chain.headersByHash(origin, amount, skip) + go dlp.dl.downloader.DeliverHeaders(dlp.id, result) + return nil } // RequestHeadersByNumber constructs a GetBlockHeaders function based on a numbered // origin; associated with a particular peer in the download tester. The returned // function can be used to retrieve batches of headers from the particular peer. func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error { - dlp.waitDelay() - - dlp.dl.lock.RLock() - defer dlp.dl.lock.RUnlock() - - // Gather the next batch of headers - hashes := dlp.dl.peerHashes[dlp.id] - headers := dlp.dl.peerHeaders[dlp.id] - result := make([]*types.Header, 0, amount) - for i := 0; i < amount && len(hashes)-int(origin)-1-i*(skip+1) >= 0; i++ { - if header, ok := headers[hashes[len(hashes)-int(origin)-1-i*(skip+1)]]; ok { - result = append(result, header) - } + if reverse { + panic("reverse header requests not supported") } - // Delay delivery a bit to allow attacks to unfold - go func() { - time.Sleep(time.Millisecond) - dlp.dl.downloader.DeliverHeaders(dlp.id, result) - }() + + result := dlp.chain.headersByNumber(origin, amount, skip) + go dlp.dl.downloader.DeliverHeaders(dlp.id, result) return nil } @@ -538,24 +338,8 @@ func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, // peer in the download tester. The returned function can be used to retrieve // batches of block bodies from the particularly requested peer. func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash) error { - dlp.waitDelay() - - dlp.dl.lock.RLock() - defer dlp.dl.lock.RUnlock() - - blocks := dlp.dl.peerBlocks[dlp.id] - - transactions := make([][]*types.Transaction, 0, len(hashes)) - uncles := make([][]*types.Header, 0, len(hashes)) - - for _, hash := range hashes { - if block, ok := blocks[hash]; ok { - transactions = append(transactions, block.Transactions()) - uncles = append(uncles, block.Uncles()) - } - } - go dlp.dl.downloader.DeliverBodies(dlp.id, transactions, uncles) - + txs, uncles := dlp.chain.bodies(hashes) + go dlp.dl.downloader.DeliverBodies(dlp.id, txs, uncles) return nil } @@ -563,21 +347,8 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash) error { // peer in the download tester. The returned function can be used to retrieve // batches of block receipts from the particularly requested peer. func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash) error { - dlp.waitDelay() - - dlp.dl.lock.RLock() - defer dlp.dl.lock.RUnlock() - - receipts := dlp.dl.peerReceipts[dlp.id] - - results := make([][]*types.Receipt, 0, len(hashes)) - for _, hash := range hashes { - if receipt, ok := receipts[hash]; ok { - results = append(results, receipt) - } - } - go dlp.dl.downloader.DeliverReceipts(dlp.id, results) - + receipts := dlp.chain.receipts(hashes) + go dlp.dl.downloader.DeliverReceipts(dlp.id, receipts) return nil } @@ -585,21 +356,18 @@ func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash) error { // peer in the download tester. The returned function can be used to retrieve // batches of node state data from the particularly requested peer. func (dlp *downloadTesterPeer) RequestNodeData(hashes []common.Hash) error { - dlp.waitDelay() - dlp.dl.lock.RLock() defer dlp.dl.lock.RUnlock() results := make([][]byte, 0, len(hashes)) for _, hash := range hashes { if data, err := dlp.dl.peerDb.Get(hash.Bytes()); err == nil { - if !dlp.dl.peerMissingStates[dlp.id][hash] { + if !dlp.missingStates[hash] { results = append(results, data) } } } go dlp.dl.downloader.DeliverNodeData(dlp.id, results) - return nil } @@ -639,21 +407,6 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng if rs := len(tester.ownReceipts); rs != receipts { t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts) } - // Verify the state trie too for fast syncs - /*if tester.downloader.mode == FastSync { - pivot := uint64(0) - var index int - if pivot := int(tester.downloader.queue.fastSyncPivot); pivot < common { - index = pivot - } else { - index = len(tester.ownHashes) - lengths[len(lengths)-1] + int(tester.downloader.queue.fastSyncPivot) - } - if index > 0 { - if statedb, err := state.New(tester.ownHeaders[tester.ownHashes[index]].Root, state.NewDatabase(trie.NewDatabase(tester.stateDb))); statedb == nil || err != nil { - t.Fatalf("state reconstruction failed: %v", err) - } - } - }*/ } // Tests that simple synchronization against a canonical chain works correctly. @@ -673,16 +426,14 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := blockCacheItems - 15 - hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) - - tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) + chain := testChainBase.shorten(blockCacheItems - 15) + tester.newPeer("peer", protocol, chain) // Synchronise with the peer and make sure all relevant data was retrieved if err := tester.sync("peer", nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - assertOwnChain(t, tester, targetBlocks+1) + assertOwnChain(t, tester, chain.len()) } // Tests that if a large batch of blocks are being downloaded, it is throttled @@ -699,10 +450,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a long block chain to download and the tester - targetBlocks := 8 * blockCacheItems - hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) - - tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) + targetBlocks := testChainBase.len() - 1 + tester.newPeer("peer", protocol, testChainBase) // Wrap the importer to allow stepping blocked, proceed := uint32(0), make(chan struct{}) @@ -734,9 +483,7 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { cached = len(tester.downloader.queue.blockDonePool) if mode == FastSync { if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached { - //if tester.downloader.queue.resultCache[receipts].Header.Number.Uint64() < tester.downloader.queue.fastSyncPivot { cached = receipts - //} } } frozen = int(atomic.LoadUint32(&blocked)) @@ -786,24 +533,22 @@ func testForkedSync(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - // Create a long enough forked chain - common, fork := MaxHashFetch, 2*MaxHashFetch - hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, true) - - tester.newPeer("fork A", protocol, hashesA, headersA, blocksA, receiptsA) - tester.newPeer("fork B", protocol, hashesB, headersB, blocksB, receiptsB) + chainA := testChainForkLightA.shorten(testChainBase.len() + 80) + chainB := testChainForkLightB.shorten(testChainBase.len() + 80) + tester.newPeer("fork A", protocol, chainA) + tester.newPeer("fork B", protocol, chainB) // Synchronise with the peer and make sure all blocks were retrieved if err := tester.sync("fork A", nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - assertOwnChain(t, tester, common+fork+1) + assertOwnChain(t, tester, chainA.len()) // Synchronise with the second peer and make sure that fork is pulled too if err := tester.sync("fork B", nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - assertOwnForkedChain(t, tester, common+1, []int{common + fork + 1, common + fork + 1}) + assertOwnForkedChain(t, tester, testChainBase.len(), []int{chainA.len(), chainB.len()}) } // Tests that synchronising against a much shorter but much heavyer fork works @@ -821,24 +566,22 @@ func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - // Create a long enough forked chain - common, fork := MaxHashFetch, 4*MaxHashFetch - hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, false) - - tester.newPeer("light", protocol, hashesA, headersA, blocksA, receiptsA) - tester.newPeer("heavy", protocol, hashesB[fork/2:], headersB, blocksB, receiptsB) + chainA := testChainForkLightA.shorten(testChainBase.len() + 80) + chainB := testChainForkHeavy.shorten(testChainBase.len() + 80) + tester.newPeer("light", protocol, chainA) + tester.newPeer("heavy", protocol, chainB) // Synchronise with the peer and make sure all blocks were retrieved if err := tester.sync("light", nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - assertOwnChain(t, tester, common+fork+1) + assertOwnChain(t, tester, chainA.len()) // Synchronise with the second peer and make sure that fork is pulled too if err := tester.sync("heavy", nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - assertOwnForkedChain(t, tester, common+1, []int{common + fork + 1, common + fork/2 + 1}) + assertOwnForkedChain(t, tester, testChainBase.len(), []int{chainA.len(), chainB.len()}) } // Tests that chain forks are contained within a certain interval of the current @@ -857,18 +600,16 @@ func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - // Create a long enough forked chain - common, fork := 13, int(MaxForkAncestry+17) - hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, true) - - tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA) - tester.newPeer("rewriter", protocol, hashesB, headersB, blocksB, receiptsB) + chainA := testChainForkLightA + chainB := testChainForkLightB + tester.newPeer("original", protocol, chainA) + tester.newPeer("rewriter", protocol, chainB) // Synchronise with the peer and make sure all blocks were retrieved if err := tester.sync("original", nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - assertOwnChain(t, tester, common+fork+1) + assertOwnChain(t, tester, chainA.len()) // Synchronise with the second peer and ensure that the fork is rejected to being too old if err := tester.sync("rewriter", nil, mode); err != errInvalidAncestor { @@ -893,17 +634,16 @@ func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a long enough forked chain - common, fork := 13, int(MaxForkAncestry+17) - hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, false) - - tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA) - tester.newPeer("heavy-rewriter", protocol, hashesB[MaxForkAncestry-17:], headersB, blocksB, receiptsB) // Root the fork below the ancestor limit + chainA := testChainForkLightA + chainB := testChainForkHeavy + tester.newPeer("original", protocol, chainA) + tester.newPeer("heavy-rewriter", protocol, chainB) // Synchronise with the peer and make sure all blocks were retrieved if err := tester.sync("original", nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - assertOwnChain(t, tester, common+fork+1) + assertOwnChain(t, tester, chainA.len()) // Synchronise with the second peer and ensure that the fork is rejected to being too old if err := tester.sync("heavy-rewriter", nil, mode); err != errInvalidAncestor { @@ -924,7 +664,7 @@ func TestInactiveDownloader62(t *testing.T) { t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) } if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive { - t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) } } @@ -962,17 +702,8 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - // Create a small enough block chain to download and the tester - targetBlocks := blockCacheItems - 15 - if targetBlocks >= MaxHashFetch { - targetBlocks = MaxHashFetch - 15 - } - if targetBlocks >= MaxHeaderFetch { - targetBlocks = MaxHeaderFetch - 15 - } - hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) - - tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) + chain := testChainBase.shorten(MaxHeaderFetch) + tester.newPeer("peer", protocol, chain) // Make sure canceling works with a pristine downloader tester.downloader.Cancel() @@ -1005,17 +736,16 @@ func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) { // Create various peers with various parts of the chain targetPeers := 8 - targetBlocks := targetPeers*blockCacheItems - 15 - hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) + chain := testChainBase.shorten(targetPeers * 100) for i := 0; i < targetPeers; i++ { id := fmt.Sprintf("peer #%d", i) - tester.newPeer(id, protocol, hashes[i*blockCacheItems:], headers, blocks, receipts) + tester.newPeer(id, protocol, chain.shorten(chain.len()/(i+1))) } if err := tester.sync("peer #0", nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - assertOwnChain(t, tester, targetBlocks+1) + assertOwnChain(t, tester, chain.len()) } // Tests that synchronisations behave well in multi-version protocol environments @@ -1034,24 +764,23 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := blockCacheItems - 15 - hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) + chain := testChainBase.shorten(blockCacheItems - 15) // Create peers of every type - tester.newPeer("peer 62", 62, hashes, headers, blocks, nil) - tester.newPeer("peer 63", 63, hashes, headers, blocks, receipts) - tester.newPeer("peer 64", 64, hashes, headers, blocks, receipts) + tester.newPeer("peer 62", 62, chain) + tester.newPeer("peer 63", 63, chain) + tester.newPeer("peer 64", 64, chain) // Synchronise with the requested peer and make sure all blocks were retrieved if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - assertOwnChain(t, tester, targetBlocks+1) + assertOwnChain(t, tester, chain.len()) // Check that no peers have been dropped off for _, version := range []int{62, 63, 64} { peer := fmt.Sprintf("peer %d", version) - if _, ok := tester.peerHashes[peer]; !ok { + if _, ok := tester.peers[peer]; !ok { t.Errorf("%s dropped", peer) } } @@ -1073,10 +802,8 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a block chain to download - targetBlocks := 2*blockCacheItems - 15 - hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) - - tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) + chain := testChainBase + tester.newPeer("peer", protocol, chain) // Instrument the downloader to signal body requests bodiesHave, receiptsHave := int32(0), int32(0) @@ -1090,16 +817,16 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { if err := tester.sync("peer", nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - assertOwnChain(t, tester, targetBlocks+1) + assertOwnChain(t, tester, chain.len()) // Validate the number of block bodies that should have been requested bodiesNeeded, receiptsNeeded := 0, 0 - for _, block := range blocks { + for _, block := range chain.blockm { if mode != LightSync && block != tester.genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) { bodiesNeeded++ } } - for _, receipt := range receipts { + for _, receipt := range chain.receiptm { if mode == FastSync && len(receipt) > 0 { receiptsNeeded++ } @@ -1127,24 +854,20 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - // Create a small enough block chain to download - targetBlocks := blockCacheItems - 15 - hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) - - // Attempt a full sync with an attacker feeding gapped headers - tester.newPeer("attack", protocol, hashes, headers, blocks, receipts) - missing := targetBlocks / 2 - delete(tester.peerHeaders["attack"], hashes[missing]) + chain := testChainBase.shorten(blockCacheItems - 15) + brokenChain := chain.shorten(chain.len()) + delete(brokenChain.headerm, brokenChain.chain[brokenChain.len()/2]) + tester.newPeer("attack", protocol, brokenChain) if err := tester.sync("attack", nil, mode); err == nil { t.Fatalf("succeeded attacker synchronisation") } // Synchronise with the valid peer and make sure sync succeeds - tester.newPeer("valid", protocol, hashes, headers, blocks, receipts) + tester.newPeer("valid", protocol, chain) if err := tester.sync("valid", nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - assertOwnChain(t, tester, targetBlocks+1) + assertOwnChain(t, tester, chain.len()) } // Tests that if requested headers are shifted (i.e. first is missing), the queue @@ -1162,25 +885,24 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - // Create a small enough block chain to download - targetBlocks := blockCacheItems - 15 - hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) + chain := testChainBase.shorten(blockCacheItems - 15) // Attempt a full sync with an attacker feeding shifted headers - tester.newPeer("attack", protocol, hashes, headers, blocks, receipts) - delete(tester.peerHeaders["attack"], hashes[len(hashes)-2]) - delete(tester.peerBlocks["attack"], hashes[len(hashes)-2]) - delete(tester.peerReceipts["attack"], hashes[len(hashes)-2]) - + brokenChain := chain.shorten(chain.len()) + delete(brokenChain.headerm, brokenChain.chain[1]) + delete(brokenChain.blockm, brokenChain.chain[1]) + delete(brokenChain.receiptm, brokenChain.chain[1]) + tester.newPeer("attack", protocol, brokenChain) if err := tester.sync("attack", nil, mode); err == nil { t.Fatalf("succeeded attacker synchronisation") } + // Synchronise with the valid peer and make sure sync succeeds - tester.newPeer("valid", protocol, hashes, headers, blocks, receipts) + tester.newPeer("valid", protocol, chain) if err := tester.sync("valid", nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - assertOwnChain(t, tester, targetBlocks+1) + assertOwnChain(t, tester, chain.len()) } // Tests that upon detecting an invalid header, the recent ones are rolled back @@ -1198,13 +920,14 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { // Create a small enough block chain to download targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks - hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) + chain := testChainBase.shorten(targetBlocks) // Attempt to sync with an attacker that feeds junk during the fast sync phase. // This should result in the last fsHeaderSafetyNet headers being rolled back. - tester.newPeer("fast-attack", protocol, hashes, headers, blocks, receipts) missing := fsHeaderSafetyNet + MaxHeaderFetch + 1 - delete(tester.peerHeaders["fast-attack"], hashes[len(hashes)-missing]) + fastAttackChain := chain.shorten(chain.len()) + delete(fastAttackChain.headerm, fastAttackChain.chain[missing]) + tester.newPeer("fast-attack", protocol, fastAttackChain) if err := tester.sync("fast-attack", nil, mode); err == nil { t.Fatalf("succeeded fast attacker synchronisation") @@ -1212,13 +935,15 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { if head := tester.CurrentHeader().Number.Int64(); int(head) > MaxHeaderFetch { t.Errorf("rollback head mismatch: have %v, want at most %v", head, MaxHeaderFetch) } + // Attempt to sync with an attacker that feeds junk during the block import phase. // This should result in both the last fsHeaderSafetyNet number of headers being // rolled back, and also the pivot point being reverted to a non-block status. - tester.newPeer("block-attack", protocol, hashes, headers, blocks, receipts) missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1 - delete(tester.peerHeaders["fast-attack"], hashes[len(hashes)-missing]) // Make sure the fast-attacker doesn't fill in - delete(tester.peerHeaders["block-attack"], hashes[len(hashes)-missing]) + blockAttackChain := chain.shorten(chain.len()) + delete(fastAttackChain.headerm, fastAttackChain.chain[missing]) // Make sure the fast-attacker doesn't fill in + delete(blockAttackChain.headerm, blockAttackChain.chain[missing]) + tester.newPeer("block-attack", protocol, blockAttackChain) if err := tester.sync("block-attack", nil, mode); err == nil { t.Fatalf("succeeded block attacker synchronisation") @@ -1231,19 +956,18 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { t.Errorf("fast sync pivot block #%d not rolled back", head) } } + // Attempt to sync with an attacker that withholds promised blocks after the // fast sync pivot point. This could be a trial to leave the node with a bad // but already imported pivot block. - tester.newPeer("withhold-attack", protocol, hashes, headers, blocks, receipts) - missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1 - + withholdAttackChain := chain.shorten(chain.len()) + tester.newPeer("withhold-attack", protocol, withholdAttackChain) tester.downloader.syncInitHook = func(uint64, uint64) { - for i := missing; i <= len(hashes); i++ { - delete(tester.peerHeaders["withhold-attack"], hashes[len(hashes)-i]) + for i := missing; i < withholdAttackChain.len(); i++ { + delete(withholdAttackChain.headerm, withholdAttackChain.chain[i]) } tester.downloader.syncInitHook = nil } - if err := tester.sync("withhold-attack", nil, mode); err == nil { t.Fatalf("succeeded withholding attacker synchronisation") } @@ -1255,20 +979,21 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { t.Errorf("fast sync pivot block #%d not rolled back", head) } } - // Synchronise with the valid peer and make sure sync succeeds. Since the last - // rollback should also disable fast syncing for this process, verify that we - // did a fresh full sync. Note, we can't assert anything about the receipts - // since we won't purge the database of them, hence we can't use assertOwnChain. - tester.newPeer("valid", protocol, hashes, headers, blocks, receipts) + + // synchronise with the valid peer and make sure sync succeeds. Since the last rollback + // should also disable fast syncing for this process, verify that we did a fresh full + // sync. Note, we can't assert anything about the receipts since we won't purge the + // database of them, hence we can't use assertOwnChain. + tester.newPeer("valid", protocol, chain) if err := tester.sync("valid", nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if hs := len(tester.ownHeaders); hs != len(headers) { - t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, len(headers)) + if hs := len(tester.ownHeaders); hs != chain.len() { + t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, chain.len()) } if mode != LightSync { - if bs := len(tester.ownBlocks); bs != len(blocks) { - t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, len(blocks)) + if bs := len(tester.ownBlocks); bs != chain.len() { + t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, chain.len()) } } } @@ -1288,9 +1013,8 @@ func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - hashes, headers, blocks, receipts := tester.makeChain(0, 0, tester.genesis, nil, false) - tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, headers, blocks, receipts) - + chain := testChainBase.shorten(1) + tester.newPeer("attack", protocol, chain) if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } @@ -1333,21 +1057,22 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { // Run the tests and check disconnection status tester := newTester() defer tester.terminate() + chain := testChainBase.shorten(1) for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) - if err := tester.newPeer(id, protocol, []common.Hash{tester.genesis.Hash()}, nil, nil, nil); err != nil { + if err := tester.newPeer(id, protocol, chain); err != nil { t.Fatalf("test %d: failed to register new peer: %v", i, err) } - if _, ok := tester.peerHashes[id]; !ok { + if _, ok := tester.peers[id]; !ok { t.Fatalf("test %d: registered peer not found", i) } // Simulate a synchronisation and check the required result tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result } tester.downloader.Synchronise(id, tester.genesis.Hash(), big.NewInt(1000), FullSync) - if _, ok := tester.peerHashes[id]; !ok != tt.drop { + if _, ok := tester.peers[id]; !ok != tt.drop { t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop) } } @@ -1367,10 +1092,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - - // Create a small enough block chain to download - targetBlocks := blockCacheItems - 15 - hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) + chain := testChainBase.shorten(blockCacheItems - 15) // Set a sync init hook to catch progress changes starting := make(chan struct{}) @@ -1380,12 +1102,10 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { starting <- struct{}{} <-progress } - // Retrieve the sync progress and ensure they are zero (pristine sync) - if progress := tester.downloader.Progress(); progress.StartingBlock != 0 || progress.CurrentBlock != 0 || progress.HighestBlock != 0 { - t.Fatalf("Pristine progress mismatch: have %v/%v/%v, want %v/%v/%v", progress.StartingBlock, progress.CurrentBlock, progress.HighestBlock, 0, 0, 0) - } + checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{}) + // Synchronise half the blocks and check initial progress - tester.newPeer("peer-half", protocol, hashes[targetBlocks/2:], headers, blocks, receipts) + tester.newPeer("peer-half", protocol, chain.shorten(chain.len()/2)) pending := new(sync.WaitGroup) pending.Add(1) @@ -1396,16 +1116,15 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { } }() <-starting - if progress := tester.downloader.Progress(); progress.StartingBlock != 0 || progress.CurrentBlock != 0 || progress.HighestBlock != uint64(targetBlocks/2+1) { - t.Fatalf("Initial progress mismatch: have %v/%v/%v, want %v/%v/%v", progress.StartingBlock, progress.CurrentBlock, progress.HighestBlock, 0, 0, targetBlocks/2+1) - } + checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ + HighestBlock: uint64(chain.len()/2 - 1), + }) progress <- struct{}{} pending.Wait() // Synchronise all the blocks and check continuation progress - tester.newPeer("peer-full", protocol, hashes, headers, blocks, receipts) + tester.newPeer("peer-full", protocol, chain) pending.Add(1) - go func() { defer pending.Done() if err := tester.sync("peer-full", nil, mode); err != nil { @@ -1413,15 +1132,29 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { } }() <-starting - if progress := tester.downloader.Progress(); progress.StartingBlock != uint64(targetBlocks/2+1) || progress.CurrentBlock != uint64(targetBlocks/2+1) || progress.HighestBlock != uint64(targetBlocks) { - t.Fatalf("Completing progress mismatch: have %v/%v/%v, want %v/%v/%v", progress.StartingBlock, progress.CurrentBlock, progress.HighestBlock, targetBlocks/2+1, targetBlocks/2+1, targetBlocks) - } + checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{ + StartingBlock: uint64(chain.len()/2 - 1), + CurrentBlock: uint64(chain.len()/2 - 1), + HighestBlock: uint64(chain.len() - 1), + }) + + // Check final progress after successful sync progress <- struct{}{} pending.Wait() + checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{ + StartingBlock: uint64(chain.len()/2 - 1), + CurrentBlock: uint64(chain.len() - 1), + HighestBlock: uint64(chain.len() - 1), + }) +} - // Check final progress after successful sync - if progress := tester.downloader.Progress(); progress.StartingBlock != uint64(targetBlocks/2+1) || progress.CurrentBlock != uint64(targetBlocks) || progress.HighestBlock != uint64(targetBlocks) { - t.Fatalf("Final progress mismatch: have %v/%v/%v, want %v/%v/%v", progress.StartingBlock, progress.CurrentBlock, progress.HighestBlock, targetBlocks/2+1, targetBlocks, targetBlocks) +func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.SyncProgress) { + t.Helper() + p := d.Progress() + p.KnownStates, p.PulledStates = 0, 0 + want.KnownStates, want.PulledStates = 0, 0 + if p != want { + t.Fatalf("%s progress mismatch:\nhave %+v\nwant %+v", stage, p, want) } } @@ -1440,10 +1173,8 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - - // Create a forked chain to simulate origin revertal - common, fork := MaxHashFetch, 2*MaxHashFetch - hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := tester.makeChainFork(common+fork, fork, tester.genesis, nil, true) + chainA := testChainForkLightA.shorten(testChainBase.len() + MaxHashFetch) + chainB := testChainForkLightB.shorten(testChainBase.len() + MaxHashFetch) // Set a sync init hook to catch progress changes starting := make(chan struct{}) @@ -1453,15 +1184,12 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { starting <- struct{}{} <-progress } - // Retrieve the sync progress and ensure they are zero (pristine sync) - if progress := tester.downloader.Progress(); progress.StartingBlock != 0 || progress.CurrentBlock != 0 || progress.HighestBlock != 0 { - t.Fatalf("Pristine progress mismatch: have %v/%v/%v, want %v/%v/%v", progress.StartingBlock, progress.CurrentBlock, progress.HighestBlock, 0, 0, 0) - } + checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{}) + // Synchronise with one of the forks and check progress - tester.newPeer("fork A", protocol, hashesA, headersA, blocksA, receiptsA) + tester.newPeer("fork A", protocol, chainA) pending := new(sync.WaitGroup) pending.Add(1) - go func() { defer pending.Done() if err := tester.sync("fork A", nil, mode); err != nil { @@ -1469,9 +1197,10 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { } }() <-starting - if progress := tester.downloader.Progress(); progress.StartingBlock != 0 || progress.CurrentBlock != 0 || progress.HighestBlock != uint64(len(hashesA)-1) { - t.Fatalf("Initial progress mismatch: have %v/%v/%v, want %v/%v/%v", progress.StartingBlock, progress.CurrentBlock, progress.HighestBlock, 0, 0, len(hashesA)-1) - } + + checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ + HighestBlock: uint64(chainA.len() - 1), + }) progress <- struct{}{} pending.Wait() @@ -1479,9 +1208,8 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { tester.downloader.syncStatsChainOrigin = tester.downloader.syncStatsChainHeight // Synchronise with the second fork and check progress resets - tester.newPeer("fork B", protocol, hashesB, headersB, blocksB, receiptsB) + tester.newPeer("fork B", protocol, chainB) pending.Add(1) - go func() { defer pending.Done() if err := tester.sync("fork B", nil, mode); err != nil { @@ -1489,16 +1217,20 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { } }() <-starting - if progress := tester.downloader.Progress(); progress.StartingBlock != uint64(common) || progress.CurrentBlock != uint64(len(hashesA)-1) || progress.HighestBlock != uint64(len(hashesB)-1) { - t.Fatalf("Forking progress mismatch: have %v/%v/%v, want %v/%v/%v", progress.StartingBlock, progress.CurrentBlock, progress.HighestBlock, common, len(hashesA)-1, len(hashesB)-1) - } - progress <- struct{}{} - pending.Wait() + checkProgress(t, tester.downloader, "forking", ethereum.SyncProgress{ + StartingBlock: uint64(testChainBase.len()) - 1, + CurrentBlock: uint64(chainA.len() - 1), + HighestBlock: uint64(chainB.len() - 1), + }) // Check final progress after successful sync - if progress := tester.downloader.Progress(); progress.StartingBlock != uint64(common) || progress.CurrentBlock != uint64(len(hashesB)-1) || progress.HighestBlock != uint64(len(hashesB)-1) { - t.Fatalf("Final progress mismatch: have %v/%v/%v, want %v/%v/%v", progress.StartingBlock, progress.CurrentBlock, progress.HighestBlock, common, len(hashesB)-1, len(hashesB)-1) - } + progress <- struct{}{} + pending.Wait() + checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{ + StartingBlock: uint64(testChainBase.len()) - 1, + CurrentBlock: uint64(chainB.len() - 1), + HighestBlock: uint64(chainB.len() - 1), + }) } // Tests that if synchronisation is aborted due to some failure, then the progress @@ -1516,10 +1248,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - - // Create a small enough block chain to download - targetBlocks := blockCacheItems - 15 - hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) + chain := testChainBase.shorten(blockCacheItems - 15) // Set a sync init hook to catch progress changes starting := make(chan struct{}) @@ -1529,20 +1258,18 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { starting <- struct{}{} <-progress } - // Retrieve the sync progress and ensure they are zero (pristine sync) - if progress := tester.downloader.Progress(); progress.StartingBlock != 0 || progress.CurrentBlock != 0 || progress.HighestBlock != 0 { - t.Fatalf("Pristine progress mismatch: have %v/%v/%v, want %v/%v/%v", progress.StartingBlock, progress.CurrentBlock, progress.HighestBlock, 0, 0, 0) - } + checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{}) + // Attempt a full sync with a faulty peer - tester.newPeer("faulty", protocol, hashes, headers, blocks, receipts) - missing := targetBlocks / 2 - delete(tester.peerHeaders["faulty"], hashes[missing]) - delete(tester.peerBlocks["faulty"], hashes[missing]) - delete(tester.peerReceipts["faulty"], hashes[missing]) + brokenChain := chain.shorten(chain.len()) + missing := brokenChain.len() / 2 + delete(brokenChain.headerm, brokenChain.chain[missing]) + delete(brokenChain.blockm, brokenChain.chain[missing]) + delete(brokenChain.receiptm, brokenChain.chain[missing]) + tester.newPeer("faulty", protocol, brokenChain) pending := new(sync.WaitGroup) pending.Add(1) - go func() { defer pending.Done() if err := tester.sync("faulty", nil, mode); err == nil { @@ -1550,16 +1277,17 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { } }() <-starting - if progress := tester.downloader.Progress(); progress.StartingBlock != 0 || progress.CurrentBlock != 0 || progress.HighestBlock != uint64(targetBlocks) { - t.Fatalf("Initial progress mismatch: have %v/%v/%v, want %v/%v/%v", progress.StartingBlock, progress.CurrentBlock, progress.HighestBlock, 0, 0, targetBlocks) - } + checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ + HighestBlock: uint64(brokenChain.len() - 1), + }) progress <- struct{}{} pending.Wait() + afterFailedSync := tester.downloader.Progress() - // Synchronise with a good peer and check that the progress origin remind the same after a failure - tester.newPeer("valid", protocol, hashes, headers, blocks, receipts) + // Synchronise with a good peer and check that the progress origin remind the same + // after a failure + tester.newPeer("valid", protocol, chain) pending.Add(1) - go func() { defer pending.Done() if err := tester.sync("valid", nil, mode); err != nil { @@ -1567,16 +1295,15 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { } }() <-starting - if progress := tester.downloader.Progress(); progress.StartingBlock != 0 || progress.CurrentBlock > uint64(targetBlocks/2) || progress.HighestBlock != uint64(targetBlocks) { - t.Fatalf("Completing progress mismatch: have %v/%v/%v, want %v/0-%v/%v", progress.StartingBlock, progress.CurrentBlock, progress.HighestBlock, 0, targetBlocks/2, targetBlocks) - } - progress <- struct{}{} - pending.Wait() + checkProgress(t, tester.downloader, "completing", afterFailedSync) // Check final progress after successful sync - if progress := tester.downloader.Progress(); progress.StartingBlock > uint64(targetBlocks/2) || progress.CurrentBlock != uint64(targetBlocks) || progress.HighestBlock != uint64(targetBlocks) { - t.Fatalf("Final progress mismatch: have %v/%v/%v, want 0-%v/%v/%v", progress.StartingBlock, progress.CurrentBlock, progress.HighestBlock, targetBlocks/2, targetBlocks, targetBlocks) - } + progress <- struct{}{} + pending.Wait() + checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{ + CurrentBlock: uint64(chain.len() - 1), + HighestBlock: uint64(chain.len() - 1), + }) } // Tests that if an attacker fakes a chain height, after the attack is detected, @@ -1593,34 +1320,27 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - - // Create a small block chain - targetBlocks := blockCacheItems - 15 - hashes, headers, blocks, receipts := tester.makeChain(targetBlocks+3, 0, tester.genesis, nil, false) + chain := testChainBase.shorten(blockCacheItems - 15) // Set a sync init hook to catch progress changes starting := make(chan struct{}) progress := make(chan struct{}) - tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress } - // Retrieve the sync progress and ensure they are zero (pristine sync) - if progress := tester.downloader.Progress(); progress.StartingBlock != 0 || progress.CurrentBlock != 0 || progress.HighestBlock != 0 { - t.Fatalf("Pristine progress mismatch: have %v/%v/%v, want %v/%v/%v", progress.StartingBlock, progress.CurrentBlock, progress.HighestBlock, 0, 0, 0) - } - // Create and sync with an attacker that promises a higher chain than available - tester.newPeer("attack", protocol, hashes, headers, blocks, receipts) - for i := 1; i < 3; i++ { - delete(tester.peerHeaders["attack"], hashes[i]) - delete(tester.peerBlocks["attack"], hashes[i]) - delete(tester.peerReceipts["attack"], hashes[i]) + checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{}) + + // Create and sync with an attacker that promises a higher chain than available. + brokenChain := chain.shorten(chain.len()) + numMissing := 5 + for i := brokenChain.len() - 2; i > brokenChain.len()-numMissing; i-- { + delete(brokenChain.headerm, brokenChain.chain[i]) } + tester.newPeer("attack", protocol, brokenChain) pending := new(sync.WaitGroup) pending.Add(1) - go func() { defer pending.Done() if err := tester.sync("attack", nil, mode); err == nil { @@ -1628,14 +1348,17 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { } }() <-starting - if progress := tester.downloader.Progress(); progress.StartingBlock != 0 || progress.CurrentBlock != 0 || progress.HighestBlock != uint64(targetBlocks+3) { - t.Fatalf("Initial progress mismatch: have %v/%v/%v, want %v/%v/%v", progress.StartingBlock, progress.CurrentBlock, progress.HighestBlock, 0, 0, targetBlocks+3) - } + checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{ + HighestBlock: uint64(brokenChain.len() - 1), + }) progress <- struct{}{} pending.Wait() + afterFailedSync := tester.downloader.Progress() - // Synchronise with a good peer and check that the progress height has been reduced to the true value - tester.newPeer("valid", protocol, hashes[3:], headers, blocks, receipts) + // Synchronise with a good peer and check that the progress height has been reduced to + // the true value. + validChain := chain.shorten(chain.len() - numMissing) + tester.newPeer("valid", protocol, validChain) pending.Add(1) go func() { @@ -1645,23 +1368,25 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { } }() <-starting - if progress := tester.downloader.Progress(); progress.StartingBlock != 0 || progress.CurrentBlock > uint64(targetBlocks) || progress.HighestBlock != uint64(targetBlocks) { - t.Fatalf("Completing progress mismatch: have %v/%v/%v, want %v/0-%v/%v", progress.StartingBlock, progress.CurrentBlock, progress.HighestBlock, 0, targetBlocks, targetBlocks) - } + checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{ + CurrentBlock: afterFailedSync.CurrentBlock, + HighestBlock: uint64(validChain.len() - 1), + }) + + // Check final progress after successful sync. progress <- struct{}{} pending.Wait() - - // Check final progress after successful sync - if progress := tester.downloader.Progress(); progress.StartingBlock > uint64(targetBlocks) || progress.CurrentBlock != uint64(targetBlocks) || progress.HighestBlock != uint64(targetBlocks) { - t.Fatalf("Final progress mismatch: have %v/%v/%v, want 0-%v/%v/%v", progress.StartingBlock, progress.CurrentBlock, progress.HighestBlock, targetBlocks, targetBlocks, targetBlocks) - } + checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{ + CurrentBlock: uint64(validChain.len() - 1), + HighestBlock: uint64(validChain.len() - 1), + }) } // This test reproduces an issue where unexpected deliveries would // block indefinitely if they arrived at the right time. -// We use data driven subtests to manage this so that it will be parallel on its own -// and not with the other tests, avoiding intermittent failures. func TestDeliverHeadersHang(t *testing.T) { + t.Parallel() + testCases := []struct { protocol int syncMode SyncMode @@ -1675,15 +1400,38 @@ func TestDeliverHeadersHang(t *testing.T) { } for _, tc := range testCases { t.Run(fmt.Sprintf("protocol %d mode %v", tc.protocol, tc.syncMode), func(t *testing.T) { + t.Parallel() testDeliverHeadersHang(t, tc.protocol, tc.syncMode) }) } } +func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { + master := newTester() + defer master.terminate() + chain := testChainBase.shorten(15) + + for i := 0; i < 200; i++ { + tester := newTester() + tester.peerDb = master.peerDb + tester.newPeer("peer", protocol, chain) + + // Whenever the downloader requests headers, flood it with + // a lot of unrequested header deliveries. + tester.downloader.peers.peers["peer"].peer = &floodingTestPeer{ + peer: tester.downloader.peers.peers["peer"].peer, + tester: tester, + } + if err := tester.sync("peer", nil, mode); err != nil { + t.Errorf("test %d: sync failed: %v", i, err) + } + tester.terminate() + } +} + type floodingTestPeer struct { peer Peer tester *downloadTester - pend sync.WaitGroup } func (ftp *floodingTestPeer) Head() (common.Hash, *big.Int) { return ftp.peer.Head() } @@ -1702,54 +1450,32 @@ func (ftp *floodingTestPeer) RequestNodeData(hashes []common.Hash) error { func (ftp *floodingTestPeer) RequestHeadersByNumber(from uint64, count, skip int, reverse bool) error { deliveriesDone := make(chan struct{}, 500) - for i := 0; i < cap(deliveriesDone); i++ { + for i := 0; i < cap(deliveriesDone)-1; i++ { peer := fmt.Sprintf("fake-peer%d", i) - ftp.pend.Add(1) - go func() { ftp.tester.downloader.DeliverHeaders(peer, []*types.Header{{}, {}, {}, {}}) deliveriesDone <- struct{}{} - ftp.pend.Done() }() } - // Deliver the actual requested headers. - go ftp.peer.RequestHeadersByNumber(from, count, skip, reverse) + // None of the extra deliveries should block. timeout := time.After(60 * time.Second) + launched := false for i := 0; i < cap(deliveriesDone); i++ { select { case <-deliveriesDone: + if !launched { + // Start delivering the requested headers + // after one of the flooding responses has arrived. + go func() { + ftp.peer.RequestHeadersByNumber(from, count, skip, reverse) + deliveriesDone <- struct{}{} + }() + launched = true + } case <-timeout: panic("blocked") } } return nil } - -func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { - t.Parallel() - - master := newTester() - defer master.terminate() - - hashes, headers, blocks, receipts := master.makeChain(5, 0, master.genesis, nil, false) - for i := 0; i < 200; i++ { - tester := newTester() - tester.peerDb = master.peerDb - - tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) - // Whenever the downloader requests headers, flood it with - // a lot of unrequested header deliveries. - tester.downloader.peers.peers["peer"].peer = &floodingTestPeer{ - peer: tester.downloader.peers.peers["peer"].peer, - tester: tester, - } - if err := tester.sync("peer", nil, mode); err != nil { - t.Errorf("test %d: sync failed: %v", i, err) - } - tester.terminate() - - // Flush all goroutines to prevent messing with subsequent tests - tester.downloader.peers.peers["peer"].peer.(*floodingTestPeer).pend.Wait() - } -} diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index c6b635aff..863cc8de1 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -664,12 +664,11 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, } // Add the peer to the expiry report along the number of failed requests expiries[id] = len(request.Headers) + + // Remove the expired requests from the pending pool directly + delete(pendPool, id) } } - // Remove the expired requests from the pending pool - for id := range expiries { - delete(pendPool, id) - } return expiries } diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index 8d33dfec7..29d5ee4dd 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -313,11 +313,12 @@ func (s *stateSync) loop() (err error) { s.d.dropPeer(req.peer.id) } // Process all the received blobs and check for stale delivery - if err = s.process(req); err != nil { + delivered, err := s.process(req) + if err != nil { log.Warn("Node data write error", "err", err) return err } - req.peer.SetNodeDataIdle(len(req.response)) + req.peer.SetNodeDataIdle(delivered) } } return nil @@ -398,9 +399,11 @@ func (s *stateSync) fillTasks(n int, req *stateReq) { // process iterates over a batch of delivered state data, injecting each item // into a running state sync, re-queuing any items that were requested but not // delivered. -func (s *stateSync) process(req *stateReq) error { +// Returns whether the peer actually managed to deliver anything of value, +// and any error that occurred +func (s *stateSync) process(req *stateReq) (int, error) { // Collect processing stats and update progress if valid data was received - duplicate, unexpected := 0, 0 + duplicate, unexpected, successful := 0, 0, 0 defer func(start time.Time) { if duplicate > 0 || unexpected > 0 { @@ -410,7 +413,6 @@ func (s *stateSync) process(req *stateReq) error { // Iterate over all the delivered data and inject one-by-one into the trie progress := false - for _, blob := range req.response { prog, hash, err := s.processNodeData(blob) switch err { @@ -418,12 +420,13 @@ func (s *stateSync) process(req *stateReq) error { s.numUncommitted++ s.bytesUncommitted += len(blob) progress = progress || prog + successful++ case trie.ErrNotRequested: unexpected++ case trie.ErrAlreadyProcessed: duplicate++ default: - return fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) + return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) } if _, ok := req.tasks[hash]; ok { delete(req.tasks, hash) @@ -441,12 +444,12 @@ func (s *stateSync) process(req *stateReq) error { // If we've requested the node too many times already, it may be a malicious // sync where nobody has the right data. Abort. if len(task.attempts) >= npeers { - return fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) + return successful, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) } // Missing item, place into the retry queue. s.tasks[hash] = task } - return nil + return successful, nil } // processNodeData tries to inject a trie node data blob delivered from a remote diff --git a/eth/downloader/testchain_test.go b/eth/downloader/testchain_test.go new file mode 100644 index 000000000..0b5a21425 --- /dev/null +++ b/eth/downloader/testchain_test.go @@ -0,0 +1,221 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package downloader + +import ( + "fmt" + "math/big" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/params" +) + +// Test chain parameters. +var ( + testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + testAddress = crypto.PubkeyToAddress(testKey.PublicKey) + testDB = ethdb.NewMemDatabase() + testGenesis = core.GenesisBlockForTesting(testDB, testAddress, big.NewInt(1000000000)) +) + +// The common prefix of all test chains: +var testChainBase = newTestChain(blockCacheItems+200, testGenesis) + +// Different forks on top of the base chain: +var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain + +func init() { + var forkLen = int(MaxForkAncestry + 50) + var wg sync.WaitGroup + wg.Add(3) + go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }() + go func() { testChainForkLightB = testChainBase.makeFork(forkLen, false, 2); wg.Done() }() + go func() { testChainForkHeavy = testChainBase.makeFork(forkLen, true, 3); wg.Done() }() + wg.Wait() +} + +type testChain struct { + genesis *types.Block + chain []common.Hash + headerm map[common.Hash]*types.Header + blockm map[common.Hash]*types.Block + receiptm map[common.Hash][]*types.Receipt + tdm map[common.Hash]*big.Int +} + +// newTestChain creates a blockchain of the given length. +func newTestChain(length int, genesis *types.Block) *testChain { + tc := new(testChain).copy(length) + tc.genesis = genesis + tc.chain = append(tc.chain, genesis.Hash()) + tc.headerm[tc.genesis.Hash()] = tc.genesis.Header() + tc.tdm[tc.genesis.Hash()] = tc.genesis.Difficulty() + tc.blockm[tc.genesis.Hash()] = tc.genesis + tc.generate(length-1, 0, genesis, false) + return tc +} + +// makeFork creates a fork on top of the test chain. +func (tc *testChain) makeFork(length int, heavy bool, seed byte) *testChain { + fork := tc.copy(tc.len() + length) + fork.generate(length, seed, tc.headBlock(), heavy) + return fork +} + +// shorten creates a copy of the chain with the given length. It panics if the +// length is longer than the number of available blocks. +func (tc *testChain) shorten(length int) *testChain { + if length > tc.len() { + panic(fmt.Errorf("can't shorten test chain to %d blocks, it's only %d blocks long", length, tc.len())) + } + return tc.copy(length) +} + +func (tc *testChain) copy(newlen int) *testChain { + cpy := &testChain{ + genesis: tc.genesis, + headerm: make(map[common.Hash]*types.Header, newlen), + blockm: make(map[common.Hash]*types.Block, newlen), + receiptm: make(map[common.Hash][]*types.Receipt, newlen), + tdm: make(map[common.Hash]*big.Int, newlen), + } + for i := 0; i < len(tc.chain) && i < newlen; i++ { + hash := tc.chain[i] + cpy.chain = append(cpy.chain, tc.chain[i]) + cpy.tdm[hash] = tc.tdm[hash] + cpy.blockm[hash] = tc.blockm[hash] + cpy.headerm[hash] = tc.headerm[hash] + cpy.receiptm[hash] = tc.receiptm[hash] + } + return cpy +} + +// generate creates a chain of n blocks starting at and including parent. +// the returned hash chain is ordered head->parent. In addition, every 22th block +// contains a transaction and every 5th an uncle to allow testing correct block +// reassembly. +func (tc *testChain) generate(n int, seed byte, parent *types.Block, heavy bool) { + // start := time.Now() + // defer func() { fmt.Printf("test chain generated in %v\n", time.Since(start)) }() + + blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testDB, n, func(i int, block *core.BlockGen) { + block.SetCoinbase(common.Address{seed}) + // If a heavy chain is requested, delay blocks to raise difficulty + if heavy { + block.OffsetTime(-1) + } + // Include transactions to the miner to make blocks more interesting. + if parent == tc.genesis && i%22 == 0 { + signer := types.MakeSigner(params.TestChainConfig, block.Number()) + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil), signer, testKey) + if err != nil { + panic(err) + } + block.AddTx(tx) + } + // if the block number is a multiple of 5, add a bonus uncle to the block + if i > 0 && i%5 == 0 { + block.AddUncle(&types.Header{ + ParentHash: block.PrevBlock(i - 1).Hash(), + Number: big.NewInt(block.Number().Int64() - 1), + }) + } + }) + + // Convert the block-chain into a hash-chain and header/block maps + td := new(big.Int).Set(tc.td(parent.Hash())) + for i, b := range blocks { + td := td.Add(td, b.Difficulty()) + hash := b.Hash() + tc.chain = append(tc.chain, hash) + tc.blockm[hash] = b + tc.headerm[hash] = b.Header() + tc.receiptm[hash] = receipts[i] + tc.tdm[hash] = new(big.Int).Set(td) + } +} + +// len returns the total number of blocks in the chain. +func (tc *testChain) len() int { + return len(tc.chain) +} + +// headBlock returns the head of the chain. +func (tc *testChain) headBlock() *types.Block { + return tc.blockm[tc.chain[len(tc.chain)-1]] +} + +// td returns the total difficulty of the given block. +func (tc *testChain) td(hash common.Hash) *big.Int { + return tc.tdm[hash] +} + +// headersByHash returns headers in ascending order from the given hash. +func (tc *testChain) headersByHash(origin common.Hash, amount int, skip int) []*types.Header { + num, _ := tc.hashToNumber(origin) + return tc.headersByNumber(num, amount, skip) +} + +// headersByNumber returns headers in ascending order from the given number. +func (tc *testChain) headersByNumber(origin uint64, amount int, skip int) []*types.Header { + result := make([]*types.Header, 0, amount) + for num := origin; num < uint64(len(tc.chain)) && len(result) < amount; num += uint64(skip) + 1 { + if header, ok := tc.headerm[tc.chain[int(num)]]; ok { + result = append(result, header) + } + } + return result +} + +// receipts returns the receipts of the given block hashes. +func (tc *testChain) receipts(hashes []common.Hash) [][]*types.Receipt { + results := make([][]*types.Receipt, 0, len(hashes)) + for _, hash := range hashes { + if receipt, ok := tc.receiptm[hash]; ok { + results = append(results, receipt) + } + } + return results +} + +// bodies returns the block bodies of the given block hashes. +func (tc *testChain) bodies(hashes []common.Hash) ([][]*types.Transaction, [][]*types.Header) { + transactions := make([][]*types.Transaction, 0, len(hashes)) + uncles := make([][]*types.Header, 0, len(hashes)) + for _, hash := range hashes { + if block, ok := tc.blockm[hash]; ok { + transactions = append(transactions, block.Transactions()) + uncles = append(uncles, block.Uncles()) + } + } + return transactions, uncles +} + +func (tc *testChain) hashToNumber(target common.Hash) (uint64, bool) { + for num, hash := range tc.chain { + if hash == target { + return uint64(num), true + } + } + return 0, false +} diff --git a/eth/tracers/tracer_test.go b/eth/tracers/tracer_test.go index 52f29c83f..a45a12115 100644 --- a/eth/tracers/tracer_test.go +++ b/eth/tracers/tracer_test.go @@ -48,7 +48,7 @@ type dummyStatedb struct { state.StateDB } -func (dummyStatedb) GetRefund() uint64 { return 1337 } +func (*dummyStatedb) GetRefund() uint64 { return 1337 } func runTrace(tracer *Tracer) (json.RawMessage, error) { env := vm.NewEVM(vm.Context{BlockNumber: big.NewInt(1)}, &dummyStatedb{}, params.TestChainConfig, vm.Config{Debug: true, Tracer: tracer}) diff --git a/event/event_test.go b/event/event_test.go index a12945a47..2be357ba2 100644 --- a/event/event_test.go +++ b/event/event_test.go @@ -141,7 +141,7 @@ func TestMuxConcurrent(t *testing.T) { } } -func emptySubscriber(mux *TypeMux, types ...interface{}) { +func emptySubscriber(mux *TypeMux) { s := mux.Subscribe(testEvent(0)) go func() { for range s.Chan() { @@ -182,9 +182,9 @@ func BenchmarkPost1000(b *testing.B) { func BenchmarkPostConcurrent(b *testing.B) { var mux = new(TypeMux) defer mux.Stop() - emptySubscriber(mux, testEvent(0)) - emptySubscriber(mux, testEvent(0)) - emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux) + emptySubscriber(mux) + emptySubscriber(mux) var wg sync.WaitGroup poster := func() { diff --git a/event/filter/generic_filter.go b/event/filter/generic_filter.go index d679b8bfa..467bf01be 100644 --- a/event/filter/generic_filter.go +++ b/event/filter/generic_filter.go @@ -25,7 +25,7 @@ type Generic struct { // self = registered, f = incoming func (self Generic) Compare(f Filter) bool { - var strMatch, dataMatch = true, true + var strMatch = true filter := f.(Generic) if (len(self.Str1) > 0 && filter.Str1 != self.Str1) || @@ -40,7 +40,7 @@ func (self Generic) Compare(f Filter) bool { } } - return strMatch && dataMatch + return strMatch } func (self Generic) Trigger(data interface{}) { diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index addf3c766..a5f319653 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -481,6 +481,12 @@ web3._extend({ params: 2, inputFormatter: [web3._extend.formatters.inputBlockNumberFormatter, web3._extend.utils.toHex] }), + new web3._extend.Method({ + name: 'getProof', + call: 'eth_getProof', + params: 3, + inputFormatter: [web3._extend.formatters.inputAddressFormatter, null, web3._extend.formatters.inputBlockNumberFormatter] + }), ], properties: [ new web3._extend.Property({ diff --git a/miner/stress_clique.go b/miner/stress_clique.go index 8961091d5..7e19975ae 100644 --- a/miner/stress_clique.go +++ b/miner/stress_clique.go @@ -22,7 +22,6 @@ package main import ( "bytes" "crypto/ecdsa" - "fmt" "io/ioutil" "math/big" "math/rand" @@ -40,7 +39,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" ) @@ -62,11 +61,11 @@ func main() { var ( nodes []*node.Node - enodes []string + enodes []*enode.Node ) for _, sealer := range sealers { // Start the node and wait until it's up - node, err := makeSealer(genesis, enodes) + node, err := makeSealer(genesis) if err != nil { panic(err) } @@ -76,18 +75,12 @@ func main() { time.Sleep(250 * time.Millisecond) } // Connect the node to al the previous ones - for _, enode := range enodes { - enode, err := discover.ParseNode(enode) - if err != nil { - panic(err) - } - node.Server().AddPeer(enode) + for _, n := range enodes { + node.Server().AddPeer(n) } - // Start tracking the node and it's enode url + // Start tracking the node and it's enode nodes = append(nodes, node) - - enode := fmt.Sprintf("enode://%s@127.0.0.1:%d", node.Server().NodeInfo().ID, node.Server().NodeInfo().Ports.Listener) - enodes = append(enodes, enode) + enodes = append(enodes, node.Server().Self()) // Inject the signer key and start sealing with it store := node.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore) @@ -177,7 +170,7 @@ func makeGenesis(faucets []*ecdsa.PrivateKey, sealers []*ecdsa.PrivateKey) *core return genesis } -func makeSealer(genesis *core.Genesis, nodes []string) (*node.Node, error) { +func makeSealer(genesis *core.Genesis) (*node.Node, error) { // Define the basic configurations for the Ethereum node datadir, _ := ioutil.TempDir("", "") diff --git a/miner/stress_ethash.go b/miner/stress_ethash.go index 5ed11d73a..044ca9a21 100644 --- a/miner/stress_ethash.go +++ b/miner/stress_ethash.go @@ -21,7 +21,6 @@ package main import ( "crypto/ecdsa" - "fmt" "io/ioutil" "math/big" "math/rand" @@ -41,7 +40,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" ) @@ -62,11 +61,11 @@ func main() { var ( nodes []*node.Node - enodes []string + enodes []*enode.Node ) for i := 0; i < 4; i++ { // Start the node and wait until it's up - node, err := makeMiner(genesis, enodes) + node, err := makeMiner(genesis) if err != nil { panic(err) } @@ -76,18 +75,12 @@ func main() { time.Sleep(250 * time.Millisecond) } // Connect the node to al the previous ones - for _, enode := range enodes { - enode, err := discover.ParseNode(enode) - if err != nil { - panic(err) - } - node.Server().AddPeer(enode) + for _, n := range enodes { + node.Server().AddPeer(n) } - // Start tracking the node and it's enode url + // Start tracking the node and it's enode nodes = append(nodes, node) - - enode := fmt.Sprintf("enode://%s@127.0.0.1:%d", node.Server().NodeInfo().ID, node.Server().NodeInfo().Ports.Listener) - enodes = append(enodes, enode) + enodes = append(enodes, node.Server().Self()) // Inject the signer key and start sealing with it store := node.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore) @@ -155,7 +148,7 @@ func makeGenesis(faucets []*ecdsa.PrivateKey) *core.Genesis { return genesis } -func makeMiner(genesis *core.Genesis, nodes []string) (*node.Node, error) { +func makeMiner(genesis *core.Genesis) (*node.Node, error) { // Define the basic configurations for the Ethereum node datadir, _ := ioutil.TempDir("", "") diff --git a/p2p/metrics.go b/p2p/metrics.go index 6a7c0bad3..d7873f39a 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -74,7 +74,7 @@ const ( type MeteredPeerEvent struct { Type MeteredPeerEventType // Type of peer event IP net.IP // IP address of the peer - ID string // NodeID of the peer + ID enode.ID // NodeID of the peer Elapsed time.Duration // Time elapsed between the connection and the handshake/disconnection Ingress uint64 // Ingress count at the moment of the event Egress uint64 // Egress count at the moment of the event @@ -93,7 +93,7 @@ type meteredConn struct { connected time.Time // Connection time of the peer ip net.IP // IP address of the peer - id string // NodeID of the peer + id enode.ID // NodeID of the peer // trafficMetered denotes if the peer is registered in the traffic registries. // Its value is true if the metered peer count doesn't reach the limit in the @@ -160,8 +160,7 @@ func (c *meteredConn) Write(b []byte) (n int, err error) { // handshakeDone is called when a peer handshake is done. Registers the peer to // the ingress and the egress traffic registries using the peer's IP and node ID, // also emits connect event. -func (c *meteredConn) handshakeDone(nodeID enode.ID) { - id := nodeID.String() +func (c *meteredConn) handshakeDone(id enode.ID) { if atomic.AddInt32(&meteredPeerCount, 1) >= MeteredPeerLimit { // Don't register the peer in the traffic registries. atomic.AddInt32(&meteredPeerCount, -1) @@ -170,7 +169,7 @@ func (c *meteredConn) handshakeDone(nodeID enode.ID) { c.lock.Unlock() log.Warn("Metered peer count reached the limit") } else { - key := fmt.Sprintf("%s/%s", c.ip, id) + key := fmt.Sprintf("%s/%s", c.ip, id.String()) c.lock.Lock() c.id, c.trafficMetered = id, true c.ingressMeter = metrics.NewRegisteredMeter(key, PeerIngressRegistry) @@ -190,7 +189,7 @@ func (c *meteredConn) handshakeDone(nodeID enode.ID) { func (c *meteredConn) Close() error { err := c.Conn.Close() c.lock.RLock() - if c.id == "" { + if c.id == (enode.ID{}) { // If the peer disconnects before the handshake. c.lock.RUnlock() meteredPeerFeed.Send(MeteredPeerEvent{ diff --git a/p2p/protocols/protocol_test.go b/p2p/protocols/protocol_test.go index 2874af48d..a26222cd8 100644 --- a/p2p/protocols/protocol_test.go +++ b/p2p/protocols/protocol_test.go @@ -318,7 +318,7 @@ func TestProtocolHook(t *testing.T) { <-testHook.waitC time.Sleep(100 * time.Millisecond) - err = tester.TestDisconnected(&p2ptest.Disconnect{tester.Nodes[1].ID(), testHook.err}) + err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: tester.Nodes[1].ID(), Error: testHook.err}) if err != nil { t.Fatalf("Expected a specific disconnect error, but got different one: %v", err) } diff --git a/p2p/rlpx.go b/p2p/rlpx.go index a105720a4..22a27dd96 100644 --- a/p2p/rlpx.go +++ b/p2p/rlpx.go @@ -151,7 +151,7 @@ func readProtocolHandshake(rw MsgReader, our *protoHandshake) (*protoHandshake, } if msg.Code == discMsg { // Disconnect before protocol handshake is valid according to the - // spec and we send it ourself if the posthanshake checks fail. + // spec and we send it ourself if the post-handshake checks fail. // We can't return the reason directly, though, because it is echoed // back otherwise. Wrap it in a string instead. var reason [1]DiscReason diff --git a/p2p/server.go b/p2p/server.go index 38a881f7b..667860863 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -228,7 +228,7 @@ type transport interface { MsgReadWriter // transports must provide Close because we use MsgPipe in some of // the tests. Closing the actual network connection doesn't do - // anything in those tests because NsgPipe doesn't use it. + // anything in those tests because MsgPipe doesn't use it. close(err error) } diff --git a/signer/core/abihelper_test.go b/signer/core/abihelper_test.go index 2afeec73e..878210be1 100644 --- a/signer/core/abihelper_test.go +++ b/signer/core/abihelper_test.go @@ -38,13 +38,13 @@ func verify(t *testing.T, jsondata, calldata string, exp []interface{}) { cd := common.Hex2Bytes(calldata) sigdata, argdata := cd[:4], cd[4:] method, err := abispec.MethodById(sigdata) - if err != nil { t.Fatal(err) } - data, err := method.Inputs.UnpackValues(argdata) - + if err != nil { + t.Fatal(err) + } if len(data) != len(exp) { t.Fatalf("Mismatched length, expected %d, got %d", len(exp), len(data)) } diff --git a/signer/rules/rules_test.go b/signer/rules/rules_test.go index 55614077c..0b520a15b 100644 --- a/signer/rules/rules_test.go +++ b/signer/rules/rules_test.go @@ -151,7 +151,7 @@ func TestListRequest(t *testing.T) { t.Errorf("Couldn't create evaluator %v", err) return } - resp, err := r.ApproveListing(&core.ListRequest{ + resp, _ := r.ApproveListing(&core.ListRequest{ Accounts: accs, Meta: core.Metadata{Remote: "remoteip", Local: "localip", Scheme: "inproc"}, }) @@ -515,7 +515,7 @@ func TestLimitWindow(t *testing.T) { r.OnApprovedTx(response) } // Fourth should fail - resp, err := r.ApproveTx(dummyTx(h)) + resp, _ := r.ApproveTx(dummyTx(h)) if resp.Approved { t.Errorf("Expected check to resolve to 'Reject'") } @@ -609,8 +609,8 @@ func TestContextIsCleared(t *testing.T) { t.Fatalf("Failed to load bootstrap js: %v", err) } tx := dummyTxWithV(0) - r1, err := r.ApproveTx(tx) - r2, err := r.ApproveTx(tx) + r1, _ := r.ApproveTx(tx) + r2, _ := r.ApproveTx(tx) if r1.Approved != r2.Approved { t.Errorf("Expected execution context to be cleared between executions") } diff --git a/swarm/api/act.go b/swarm/api/act.go index 52d909827..e54369f9a 100644 --- a/swarm/api/act.go +++ b/swarm/api/act.go @@ -458,6 +458,9 @@ func DoACT(ctx *cli.Context, privateKey *ecdsa.PrivateKey, salt []byte, grantees return nil, nil, nil, err } sessionKey, err := NewSessionKeyPK(privateKey, granteePub, salt) + if err != nil { + return nil, nil, nil, err + } hasher := sha3.NewKeccak256() hasher.Write(append(sessionKey, 0)) diff --git a/swarm/api/client/client_test.go b/swarm/api/client/client_test.go index 03c6cbb28..c30d69911 100644 --- a/swarm/api/client/client_test.go +++ b/swarm/api/client/client_test.go @@ -457,6 +457,9 @@ func TestClientCreateUpdateFeed(t *testing.T) { } feedManifestHash, err := client.CreateFeedWithManifest(createRequest) + if err != nil { + t.Fatal(err) + } correctManifestAddrHex := "0e9b645ebc3da167b1d56399adc3276f7a08229301b72a03336be0e7d4b71882" if feedManifestHash != correctManifestAddrHex { diff --git a/swarm/api/filesystem.go b/swarm/api/filesystem.go index 43695efc1..266ef71be 100644 --- a/swarm/api/filesystem.go +++ b/swarm/api/filesystem.go @@ -122,6 +122,10 @@ func (fs *FileSystem) Upload(lpath, index string, toEncrypt bool) (string, error var wait func(context.Context) error ctx := context.TODO() hash, wait, err = fs.api.fileStore.Store(ctx, f, stat.Size(), toEncrypt) + if err != nil { + errors[i] = err + return + } if hash != nil { list[i].Hash = hash.Hex() } diff --git a/swarm/api/http/middleware.go b/swarm/api/http/middleware.go index 3b2dcc7d5..ccc040c54 100644 --- a/swarm/api/http/middleware.go +++ b/swarm/api/http/middleware.go @@ -50,7 +50,7 @@ func ParseURI(h http.Handler) http.Handler { uri, err := api.Parse(strings.TrimLeft(r.URL.Path, "/")) if err != nil { w.WriteHeader(http.StatusBadRequest) - RespondError(w, r, fmt.Sprintf("invalid URI %q", r.URL.Path), http.StatusBadRequest) + respondError(w, r, fmt.Sprintf("invalid URI %q", r.URL.Path), http.StatusBadRequest) return } if uri.Addr != "" && strings.HasPrefix(uri.Addr, "0x") { diff --git a/swarm/api/http/response.go b/swarm/api/http/response.go index c9fb9d285..d4e81d7f6 100644 --- a/swarm/api/http/response.go +++ b/swarm/api/http/response.go @@ -53,23 +53,23 @@ func ShowMultipleChoices(w http.ResponseWriter, r *http.Request, list api.Manife log.Debug("ShowMultipleChoices", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context())) msg := "" if list.Entries == nil { - RespondError(w, r, "Could not resolve", http.StatusInternalServerError) + respondError(w, r, "Could not resolve", http.StatusInternalServerError) return } requestUri := strings.TrimPrefix(r.RequestURI, "/") uri, err := api.Parse(requestUri) if err != nil { - RespondError(w, r, "Bad Request", http.StatusBadRequest) + respondError(w, r, "Bad Request", http.StatusBadRequest) } uri.Scheme = "bzz-list" msg += fmt.Sprintf("Disambiguation:<br/>Your request may refer to multiple choices.<br/>Click <a class=\"orange\" href='"+"/"+uri.String()+"'>here</a> if your browser does not redirect you within 5 seconds.<script>setTimeout(\"location.href='%s';\",5000);</script><br/>", "/"+uri.String()) - RespondTemplate(w, r, "error", msg, http.StatusMultipleChoices) + respondTemplate(w, r, "error", msg, http.StatusMultipleChoices) } -func RespondTemplate(w http.ResponseWriter, r *http.Request, templateName, msg string, code int) { - log.Debug("RespondTemplate", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context())) +func respondTemplate(w http.ResponseWriter, r *http.Request, templateName, msg string, code int) { + log.Debug("respondTemplate", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context())) respond(w, r, &ResponseParams{ Code: code, Msg: template.HTML(msg), @@ -78,13 +78,12 @@ func RespondTemplate(w http.ResponseWriter, r *http.Request, templateName, msg s }) } -func RespondError(w http.ResponseWriter, r *http.Request, msg string, code int) { - log.Debug("RespondError", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()), "code", code) - RespondTemplate(w, r, "error", msg, code) +func respondError(w http.ResponseWriter, r *http.Request, msg string, code int) { + log.Info("respondError", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()), "code", code) + respondTemplate(w, r, "error", msg, code) } func respond(w http.ResponseWriter, r *http.Request, params *ResponseParams) { - w.WriteHeader(params.Code) if params.Code >= 400 { @@ -96,7 +95,7 @@ func respond(w http.ResponseWriter, r *http.Request, params *ResponseParams) { // this cannot be in a switch since an Accept header can have multiple values: "Accept: */*, text/html, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8" if strings.Contains(acceptHeader, "application/json") { if err := respondJSON(w, r, params); err != nil { - RespondError(w, r, "Internal server error", http.StatusInternalServerError) + respondError(w, r, "Internal server error", http.StatusInternalServerError) } } else if strings.Contains(acceptHeader, "text/html") { respondHTML(w, r, params) @@ -107,7 +106,7 @@ func respond(w http.ResponseWriter, r *http.Request, params *ResponseParams) { func respondHTML(w http.ResponseWriter, r *http.Request, params *ResponseParams) { htmlCounter.Inc(1) - log.Debug("respondHTML", "ruid", GetRUID(r.Context())) + log.Info("respondHTML", "ruid", GetRUID(r.Context()), "code", params.Code) err := params.template.Execute(w, params) if err != nil { log.Error(err.Error()) @@ -116,14 +115,14 @@ func respondHTML(w http.ResponseWriter, r *http.Request, params *ResponseParams) func respondJSON(w http.ResponseWriter, r *http.Request, params *ResponseParams) error { jsonCounter.Inc(1) - log.Debug("respondJSON", "ruid", GetRUID(r.Context())) + log.Info("respondJSON", "ruid", GetRUID(r.Context()), "code", params.Code) w.Header().Set("Content-Type", "application/json") return json.NewEncoder(w).Encode(params) } func respondPlaintext(w http.ResponseWriter, r *http.Request, params *ResponseParams) error { plaintextCounter.Inc(1) - log.Debug("respondPlaintext", "ruid", GetRUID(r.Context())) + log.Info("respondPlaintext", "ruid", GetRUID(r.Context()), "code", params.Code) w.Header().Set("Content-Type", "text/plain") strToWrite := "Code: " + fmt.Sprintf("%d", params.Code) + "\n" strToWrite += "Message: " + string(params.Msg) + "\n" diff --git a/swarm/api/http/sctx.go b/swarm/api/http/sctx.go index 431e11735..b8dafab0b 100644 --- a/swarm/api/http/sctx.go +++ b/swarm/api/http/sctx.go @@ -7,14 +7,10 @@ import ( "github.com/ethereum/go-ethereum/swarm/sctx" ) -type contextKey int - -const ( - uriKey contextKey = iota -) +type uriKey struct{} func GetRUID(ctx context.Context) string { - v, ok := ctx.Value(sctx.HTTPRequestIDKey).(string) + v, ok := ctx.Value(sctx.HTTPRequestIDKey{}).(string) if ok { return v } @@ -22,11 +18,11 @@ func GetRUID(ctx context.Context) string { } func SetRUID(ctx context.Context, ruid string) context.Context { - return context.WithValue(ctx, sctx.HTTPRequestIDKey, ruid) + return context.WithValue(ctx, sctx.HTTPRequestIDKey{}, ruid) } func GetURI(ctx context.Context) *api.URI { - v, ok := ctx.Value(uriKey).(*api.URI) + v, ok := ctx.Value(uriKey{}).(*api.URI) if ok { return v } @@ -34,5 +30,5 @@ func GetURI(ctx context.Context) *api.URI { } func SetURI(ctx context.Context, uri *api.URI) context.Context { - return context.WithValue(ctx, uriKey, uri) + return context.WithValue(ctx, uriKey{}, uri) } diff --git a/swarm/api/http/server.go b/swarm/api/http/server.go index b4294b058..3c6735a73 100644 --- a/swarm/api/http/server.go +++ b/swarm/api/http/server.go @@ -41,16 +41,9 @@ import ( "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" - "github.com/rs/cors" ) -type resourceResponse struct { - Manifest storage.Address `json:"manifest"` - Resource string `json:"resource"` - Update storage.Address `json:"update"` -} - var ( postRawCount = metrics.NewRegisteredCounter("api.http.post.raw.count", nil) postRawFail = metrics.NewRegisteredCounter("api.http.post.raw.fail", nil) @@ -191,10 +184,10 @@ func (s *Server) HandleBzzGet(w http.ResponseWriter, r *http.Request) { if err != nil { if isDecryptError(err) { w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=%q", uri.Address().String())) - RespondError(w, r, err.Error(), http.StatusUnauthorized) + respondError(w, r, err.Error(), http.StatusUnauthorized) return } - RespondError(w, r, fmt.Sprintf("Had an error building the tarball: %v", err), http.StatusInternalServerError) + respondError(w, r, fmt.Sprintf("Had an error building the tarball: %v", err), http.StatusInternalServerError) return } defer reader.Close() @@ -218,7 +211,7 @@ func (s *Server) HandleBzzGet(w http.ResponseWriter, r *http.Request) { func (s *Server) HandleRootPaths(w http.ResponseWriter, r *http.Request) { switch r.RequestURI { case "/": - RespondTemplate(w, r, "landing-page", "Swarm: Please request a valid ENS or swarm hash with the appropriate bzz scheme", 200) + respondTemplate(w, r, "landing-page", "Swarm: Please request a valid ENS or swarm hash with the appropriate bzz scheme", 200) return case "/robots.txt": w.Header().Set("Last-Modified", time.Now().Format(http.TimeFormat)) @@ -227,7 +220,7 @@ func (s *Server) HandleRootPaths(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write(faviconBytes) default: - RespondError(w, r, "Not Found", http.StatusNotFound) + respondError(w, r, "Not Found", http.StatusNotFound) } } @@ -247,26 +240,26 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) { if uri.Path != "" { postRawFail.Inc(1) - RespondError(w, r, "raw POST request cannot contain a path", http.StatusBadRequest) + respondError(w, r, "raw POST request cannot contain a path", http.StatusBadRequest) return } if uri.Addr != "" && uri.Addr != "encrypt" { postRawFail.Inc(1) - RespondError(w, r, "raw POST request addr can only be empty or \"encrypt\"", http.StatusBadRequest) + respondError(w, r, "raw POST request addr can only be empty or \"encrypt\"", http.StatusBadRequest) return } if r.Header.Get("Content-Length") == "" { postRawFail.Inc(1) - RespondError(w, r, "missing Content-Length header in request", http.StatusBadRequest) + respondError(w, r, "missing Content-Length header in request", http.StatusBadRequest) return } addr, _, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt) if err != nil { postRawFail.Inc(1) - RespondError(w, r, err.Error(), http.StatusInternalServerError) + respondError(w, r, err.Error(), http.StatusInternalServerError) return } @@ -290,7 +283,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { contentType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { postFilesFail.Inc(1) - RespondError(w, r, err.Error(), http.StatusBadRequest) + respondError(w, r, err.Error(), http.StatusBadRequest) return } @@ -305,7 +298,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { addr, err = s.api.Resolve(r.Context(), uri.Addr) if err != nil { postFilesFail.Inc(1) - RespondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusInternalServerError) + respondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusInternalServerError) return } log.Debug("resolved key", "ruid", ruid, "key", addr) @@ -313,7 +306,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { addr, err = s.api.NewManifest(r.Context(), toEncrypt) if err != nil { postFilesFail.Inc(1) - RespondError(w, r, err.Error(), http.StatusInternalServerError) + respondError(w, r, err.Error(), http.StatusInternalServerError) return } log.Debug("new manifest", "ruid", ruid, "key", addr) @@ -324,7 +317,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { case "application/x-tar": _, err := s.handleTarUpload(r, mw) if err != nil { - RespondError(w, r, fmt.Sprintf("error uploading tarball: %v", err), http.StatusInternalServerError) + respondError(w, r, fmt.Sprintf("error uploading tarball: %v", err), http.StatusInternalServerError) return err } return nil @@ -337,7 +330,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { }) if err != nil { postFilesFail.Inc(1) - RespondError(w, r, fmt.Sprintf("cannot create manifest: %s", err), http.StatusInternalServerError) + respondError(w, r, fmt.Sprintf("cannot create manifest: %s", err), http.StatusInternalServerError) return } @@ -373,7 +366,7 @@ func (s *Server) handleMultipartUpload(r *http.Request, boundary string, mw *api } var size int64 - var reader io.Reader = part + var reader io.Reader if contentLength := part.Header.Get("Content-Length"); contentLength != "" { size, err = strconv.ParseInt(contentLength, 10, 64) if err != nil { @@ -446,7 +439,7 @@ func (s *Server) HandleDelete(w http.ResponseWriter, r *http.Request) { newKey, err := s.api.Delete(r.Context(), uri.Addr, uri.Path) if err != nil { deleteFail.Inc(1) - RespondError(w, r, fmt.Sprintf("could not delete from manifest: %v", err), http.StatusInternalServerError) + respondError(w, r, fmt.Sprintf("could not delete from manifest: %v", err), http.StatusInternalServerError) return } @@ -467,7 +460,7 @@ func (s *Server) HandlePostFeed(w http.ResponseWriter, r *http.Request) { // Creation and update must send feed.updateRequestJSON JSON structure body, err := ioutil.ReadAll(r.Body) if err != nil { - RespondError(w, r, err.Error(), http.StatusInternalServerError) + respondError(w, r, err.Error(), http.StatusInternalServerError) return } @@ -478,7 +471,7 @@ func (s *Server) HandlePostFeed(w http.ResponseWriter, r *http.Request) { if err == api.ErrCannotLoadFeedManifest || err == api.ErrCannotResolveFeedURI { httpStatus = http.StatusNotFound } - RespondError(w, r, fmt.Sprintf("cannot retrieve feed from manifest: %s", err), httpStatus) + respondError(w, r, fmt.Sprintf("cannot retrieve feed from manifest: %s", err), httpStatus) return } @@ -487,32 +480,32 @@ func (s *Server) HandlePostFeed(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() if err := updateRequest.FromValues(query, body); err != nil { // decodes request from query parameters - RespondError(w, r, err.Error(), http.StatusBadRequest) + respondError(w, r, err.Error(), http.StatusBadRequest) return } - if updateRequest.IsUpdate() { + switch { + case updateRequest.IsUpdate(): // Verify that the signature is intact and that the signer is authorized // to update this feed // Check this early, to avoid creating a feed and then not being able to set its first update. if err = updateRequest.Verify(); err != nil { - RespondError(w, r, err.Error(), http.StatusForbidden) + respondError(w, r, err.Error(), http.StatusForbidden) return } _, err = s.api.FeedsUpdate(r.Context(), &updateRequest) if err != nil { - RespondError(w, r, err.Error(), http.StatusInternalServerError) + respondError(w, r, err.Error(), http.StatusInternalServerError) return } - } - - if query.Get("manifest") == "1" { + fallthrough + case query.Get("manifest") == "1": // we create a manifest so we can retrieve feed updates with bzz:// later // this manifest has a special "feed type" manifest, and saves the // feed identification used to retrieve feed updates later m, err := s.api.NewFeedManifest(r.Context(), &updateRequest.Feed) if err != nil { - RespondError(w, r, fmt.Sprintf("failed to create feed manifest: %v", err), http.StatusInternalServerError) + respondError(w, r, fmt.Sprintf("failed to create feed manifest: %v", err), http.StatusInternalServerError) return } // the key to the manifest will be passed back to the client @@ -520,12 +513,14 @@ func (s *Server) HandlePostFeed(w http.ResponseWriter, r *http.Request) { // the manifest key can be set as content in the resolver of the ENS name outdata, err := json.Marshal(m) if err != nil { - RespondError(w, r, fmt.Sprintf("failed to create json response: %s", err), http.StatusInternalServerError) + respondError(w, r, fmt.Sprintf("failed to create json response: %s", err), http.StatusInternalServerError) return } fmt.Fprint(w, string(outdata)) w.Header().Add("Content-type", "application/json") + default: + respondError(w, r, "Missing signature in feed update request", http.StatusBadRequest) } } @@ -557,7 +552,7 @@ func (s *Server) HandleGetFeed(w http.ResponseWriter, r *http.Request) { if err == api.ErrCannotLoadFeedManifest || err == api.ErrCannotResolveFeedURI { httpStatus = http.StatusNotFound } - RespondError(w, r, fmt.Sprintf("cannot retrieve feed information from manifest: %s", err), httpStatus) + respondError(w, r, fmt.Sprintf("cannot retrieve feed information from manifest: %s", err), httpStatus) return } @@ -566,12 +561,12 @@ func (s *Server) HandleGetFeed(w http.ResponseWriter, r *http.Request) { unsignedUpdateRequest, err := s.api.FeedsNewRequest(r.Context(), fd) if err != nil { getFail.Inc(1) - RespondError(w, r, fmt.Sprintf("cannot retrieve feed metadata for feed=%s: %s", fd.Hex(), err), http.StatusNotFound) + respondError(w, r, fmt.Sprintf("cannot retrieve feed metadata for feed=%s: %s", fd.Hex(), err), http.StatusNotFound) return } rawResponse, err := unsignedUpdateRequest.MarshalJSON() if err != nil { - RespondError(w, r, fmt.Sprintf("cannot encode unsigned feed update request: %v", err), http.StatusInternalServerError) + respondError(w, r, fmt.Sprintf("cannot encode unsigned feed update request: %v", err), http.StatusInternalServerError) return } w.Header().Add("Content-type", "application/json") @@ -582,7 +577,7 @@ func (s *Server) HandleGetFeed(w http.ResponseWriter, r *http.Request) { lookupParams := &feed.Query{Feed: *fd} if err = lookupParams.FromValues(r.URL.Query()); err != nil { // parse period, version - RespondError(w, r, fmt.Sprintf("invalid feed update request:%s", err), http.StatusBadRequest) + respondError(w, r, fmt.Sprintf("invalid feed update request:%s", err), http.StatusBadRequest) return } @@ -591,7 +586,7 @@ func (s *Server) HandleGetFeed(w http.ResponseWriter, r *http.Request) { // any error from the switch statement will end up here if err != nil { code, err2 := s.translateFeedError(w, r, "feed lookup fail", err) - RespondError(w, r, err2.Error(), code) + respondError(w, r, err2.Error(), code) return } @@ -637,7 +632,7 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *http.Request) { addr, err := s.api.ResolveURI(r.Context(), uri, pass) if err != nil { getFail.Inc(1) - RespondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound) + respondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound) return } w.Header().Set("Cache-Control", "max-age=2147483648, immutable") // url was of type bzz://<hex key>/path, so we are sure it is immutable. @@ -661,7 +656,7 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *http.Request) { reader, isEncrypted := s.api.Retrieve(r.Context(), addr) if _, err := reader.Size(r.Context(), nil); err != nil { getFail.Inc(1) - RespondError(w, r, fmt.Sprintf("root chunk not found %s: %s", addr, err), http.StatusNotFound) + respondError(w, r, fmt.Sprintf("root chunk not found %s: %s", addr, err), http.StatusNotFound) return } @@ -701,7 +696,7 @@ func (s *Server) HandleGetList(w http.ResponseWriter, r *http.Request) { addr, err := s.api.Resolve(r.Context(), uri.Addr) if err != nil { getListFail.Inc(1) - RespondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound) + respondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound) return } log.Debug("handle.get.list: resolved", "ruid", ruid, "key", addr) @@ -711,10 +706,10 @@ func (s *Server) HandleGetList(w http.ResponseWriter, r *http.Request) { getListFail.Inc(1) if isDecryptError(err) { w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=%q", addr.String())) - RespondError(w, r, err.Error(), http.StatusUnauthorized) + respondError(w, r, err.Error(), http.StatusUnauthorized) return } - RespondError(w, r, err.Error(), http.StatusInternalServerError) + respondError(w, r, err.Error(), http.StatusInternalServerError) return } @@ -762,7 +757,7 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) { manifestAddr, err = s.api.Resolve(r.Context(), uri.Addr) if err != nil { getFileFail.Inc(1) - RespondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound) + respondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound) return } } else { @@ -786,17 +781,17 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) { if err != nil { if isDecryptError(err) { w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=%q", manifestAddr)) - RespondError(w, r, err.Error(), http.StatusUnauthorized) + respondError(w, r, err.Error(), http.StatusUnauthorized) return } switch status { case http.StatusNotFound: getFileNotFound.Inc(1) - RespondError(w, r, err.Error(), http.StatusNotFound) + respondError(w, r, err.Error(), http.StatusNotFound) default: getFileFail.Inc(1) - RespondError(w, r, err.Error(), http.StatusInternalServerError) + respondError(w, r, err.Error(), http.StatusInternalServerError) } return } @@ -809,10 +804,10 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) { getFileFail.Inc(1) if isDecryptError(err) { w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=%q", manifestAddr)) - RespondError(w, r, err.Error(), http.StatusUnauthorized) + respondError(w, r, err.Error(), http.StatusUnauthorized) return } - RespondError(w, r, err.Error(), http.StatusInternalServerError) + respondError(w, r, err.Error(), http.StatusInternalServerError) return } @@ -825,7 +820,7 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) { // check the root chunk exists by retrieving the file's size if _, err := reader.Size(r.Context(), nil); err != nil { getFileNotFound.Inc(1) - RespondError(w, r, fmt.Sprintf("file not found %s: %s", uri, err), http.StatusNotFound) + respondError(w, r, fmt.Sprintf("file not found %s: %s", uri, err), http.StatusNotFound) return } diff --git a/swarm/api/http/server_test.go b/swarm/api/http/server_test.go index 1cf7ff577..04d0e045a 100644 --- a/swarm/api/http/server_test.go +++ b/swarm/api/http/server_test.go @@ -263,7 +263,7 @@ func TestBzzFeed(t *testing.T) { if resp.StatusCode == http.StatusOK { t.Fatal("Expected error status since feed update does not contain multihash. Received 200 OK") } - b, err = ioutil.ReadAll(resp.Body) + _, err = ioutil.ReadAll(resp.Body) if err != nil { t.Fatal(err) } @@ -333,15 +333,45 @@ func TestBzzFeed(t *testing.T) { } urlQuery = testUrl.Query() body = updateRequest.AppendValues(urlQuery) // this adds all query parameters + goodQueryParameters := urlQuery.Encode() // save the query parameters for a second attempt + + // create bad query parameters in which the signature is missing + urlQuery.Del("signature") testUrl.RawQuery = urlQuery.Encode() + // 1st attempt with bad query parameters in which the signature is missing resp, err = http.Post(testUrl.String(), "application/octet-stream", bytes.NewReader(body)) if err != nil { t.Fatal(err) } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - t.Fatalf("Update returned %s", resp.Status) + expectedCode := http.StatusBadRequest + if resp.StatusCode != expectedCode { + t.Fatalf("Update returned %s. Expected %d", resp.Status, expectedCode) + } + + // 2nd attempt with bad query parameters in which the signature is of incorrect length + urlQuery.Set("signature", "0xabcd") // should be 130 hex chars + resp, err = http.Post(testUrl.String(), "application/octet-stream", bytes.NewReader(body)) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + expectedCode = http.StatusBadRequest + if resp.StatusCode != expectedCode { + t.Fatalf("Update returned %s. Expected %d", resp.Status, expectedCode) + } + + // 3rd attempt, with good query parameters: + testUrl.RawQuery = goodQueryParameters + resp, err = http.Post(testUrl.String(), "application/octet-stream", bytes.NewReader(body)) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + expectedCode = http.StatusOK + if resp.StatusCode != expectedCode { + t.Fatalf("Update returned %s. Expected %d", resp.Status, expectedCode) } // get latest update through bzz-feed directly @@ -461,6 +491,9 @@ func testBzzGetPath(encrypted bool, t *testing.T) { } defer resp.Body.Close() respbody, err = ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Error while reading response body: %v", err) + } if string(respbody) != testmanifest[v] { isexpectedfailrequest := false diff --git a/swarm/api/manifest.go b/swarm/api/manifest.go index 7c4cc88e4..890ed88bd 100644 --- a/swarm/api/manifest.go +++ b/swarm/api/manifest.go @@ -557,7 +557,6 @@ func (mt *manifestTrie) findPrefixOf(path string, quitC chan bool) (entry *manif if path != entry.Path { return nil, 0 } - pos = epl } } return nil, 0 diff --git a/swarm/network/hive_test.go b/swarm/network/hive_test.go index 059c3dc96..56adc5a8e 100644 --- a/swarm/network/hive_test.go +++ b/swarm/network/hive_test.go @@ -70,6 +70,9 @@ func TestHiveStatePersistance(t *testing.T) { defer os.RemoveAll(dir) store, err := state.NewDBStore(dir) //start the hive with an empty dbstore + if err != nil { + t.Fatal(err) + } params := NewHiveParams() s, pp := newHiveTester(t, params, 5, store) @@ -90,6 +93,9 @@ func TestHiveStatePersistance(t *testing.T) { store.Close() persistedStore, err := state.NewDBStore(dir) //start the hive with an empty dbstore + if err != nil { + t.Fatal(err) + } s1, pp := newHiveTester(t, params, 1, persistedStore) diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index 55a0c6f13..cd94741be 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -261,7 +261,7 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) { // found among live peers, do nothing return v }) - if ins { + if ins && !p.BzzPeer.LightNode { a := newEntry(p.BzzAddr) a.conn = p // insert new online peer into addrs @@ -329,14 +329,18 @@ func (k *Kademlia) Off(p *Peer) { k.lock.Lock() defer k.lock.Unlock() var del bool - k.addrs, _, _, _ = pot.Swap(k.addrs, p, pof, func(v pot.Val) pot.Val { - // v cannot be nil, must check otherwise we overwrite entry - if v == nil { - panic(fmt.Sprintf("connected peer not found %v", p)) - } + if !p.BzzPeer.LightNode { + k.addrs, _, _, _ = pot.Swap(k.addrs, p, pof, func(v pot.Val) pot.Val { + // v cannot be nil, must check otherwise we overwrite entry + if v == nil { + panic(fmt.Sprintf("connected peer not found %v", p)) + } + del = true + return newEntry(p.BzzAddr) + }) + } else { del = true - return newEntry(p.BzzAddr) - }) + } if del { k.conns, _, _, _ = pot.Swap(k.conns, p, pof, func(_ pot.Val) pot.Val { diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go index 903c8dbda..d2e051f45 100644 --- a/swarm/network/kademlia_test.go +++ b/swarm/network/kademlia_test.go @@ -46,19 +46,19 @@ func newTestKademlia(b string) *Kademlia { return NewKademlia(base, params) } -func newTestKadPeer(k *Kademlia, s string) *Peer { - return NewPeer(&BzzPeer{BzzAddr: testKadPeerAddr(s)}, k) +func newTestKadPeer(k *Kademlia, s string, lightNode bool) *Peer { + return NewPeer(&BzzPeer{BzzAddr: testKadPeerAddr(s), LightNode: lightNode}, k) } func On(k *Kademlia, ons ...string) { for _, s := range ons { - k.On(newTestKadPeer(k, s)) + k.On(newTestKadPeer(k, s, false)) } } func Off(k *Kademlia, offs ...string) { for _, s := range offs { - k.Off(newTestKadPeer(k, s)) + k.Off(newTestKadPeer(k, s, false)) } } @@ -254,6 +254,56 @@ func TestSuggestPeerFindPeers(t *testing.T) { } +// a node should stay in the address book if it's removed from the kademlia +func TestOffEffectingAddressBookNormalNode(t *testing.T) { + k := newTestKademlia("00000000") + // peer added to kademlia + k.On(newTestKadPeer(k, "01000000", false)) + // peer should be in the address book + if k.addrs.Size() != 1 { + t.Fatal("known peer addresses should contain 1 entry") + } + // peer should be among live connections + if k.conns.Size() != 1 { + t.Fatal("live peers should contain 1 entry") + } + // remove peer from kademlia + k.Off(newTestKadPeer(k, "01000000", false)) + // peer should be in the address book + if k.addrs.Size() != 1 { + t.Fatal("known peer addresses should contain 1 entry") + } + // peer should not be among live connections + if k.conns.Size() != 0 { + t.Fatal("live peers should contain 0 entry") + } +} + +// a light node should not be in the address book +func TestOffEffectingAddressBookLightNode(t *testing.T) { + k := newTestKademlia("00000000") + // light node peer added to kademlia + k.On(newTestKadPeer(k, "01000000", true)) + // peer should not be in the address book + if k.addrs.Size() != 0 { + t.Fatal("known peer addresses should contain 0 entry") + } + // peer should be among live connections + if k.conns.Size() != 1 { + t.Fatal("live peers should contain 1 entry") + } + // remove peer from kademlia + k.Off(newTestKadPeer(k, "01000000", true)) + // peer should not be in the address book + if k.addrs.Size() != 0 { + t.Fatal("known peer addresses should contain 0 entry") + } + // peer should not be among live connections + if k.conns.Size() != 0 { + t.Fatal("live peers should contain 0 entry") + } +} + func TestSuggestPeerRetries(t *testing.T) { k := newTestKademlia("00000000") k.RetryInterval = int64(300 * time.Millisecond) // cycle diff --git a/swarm/network/protocol_test.go b/swarm/network/protocol_test.go index 4b83c7a27..f0d266628 100644 --- a/swarm/network/protocol_test.go +++ b/swarm/network/protocol_test.go @@ -50,10 +50,6 @@ type testStore struct { values map[string][]byte } -func newTestStore() *testStore { - return &testStore{values: make(map[string][]byte)} -} - func (t *testStore) Load(key string) ([]byte, error) { t.Lock() defer t.Unlock() @@ -157,17 +153,7 @@ func newBzzHandshakeTester(t *testing.T, n int, addr *BzzAddr, lightNode bool) * // should test handshakes in one exchange? parallelisation func (s *bzzTester) testHandshake(lhs, rhs *HandshakeMsg, disconnects ...*p2ptest.Disconnect) error { - var peers []enode.ID - id := rhs.Addr.ID() - if len(disconnects) > 0 { - for _, d := range disconnects { - peers = append(peers, d.Peer) - } - } else { - peers = []enode.ID{id} - } - - if err := s.TestExchanges(HandshakeMsgExchange(lhs, rhs, id)...); err != nil { + if err := s.TestExchanges(HandshakeMsgExchange(lhs, rhs, rhs.Addr.ID())...); err != nil { return err } diff --git a/swarm/network/simulation/bucket_test.go b/swarm/network/simulation/bucket_test.go index 461d99825..69df19bfe 100644 --- a/swarm/network/simulation/bucket_test.go +++ b/swarm/network/simulation/bucket_test.go @@ -94,7 +94,7 @@ func TestServiceBucket(t *testing.T) { t.Fatalf("expected %q, got %q", customValue, s) } - v, ok = sim.NodeItem(id2, customKey) + _, ok = sim.NodeItem(id2, customKey) if ok { t.Fatal("bucket item should not be found") } @@ -119,7 +119,7 @@ func TestServiceBucket(t *testing.T) { t.Fatalf("expected %q, got %q", testValue+id1.String(), s) } - v, ok = items[id2] + _, ok = items[id2] if ok { t.Errorf("node 2 item should not be found") } diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 9092ffe3e..0109fbdef 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -245,7 +245,10 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) ( } else { d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool { id := p.ID() - // TODO: skip light nodes that do not accept retrieve requests + if p.LightNode { + // skip light nodes + return true + } if req.SkipPeer(id.String()) { log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id) return true diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 949645558..c77682e0e 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -29,17 +29,25 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/protocols" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" p2ptest "github.com/ethereum/go-ethereum/p2p/testing" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" + pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" ) +//Tests initializing a retrieve request func TestStreamerRetrieveRequest(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t, nil) + regOpts := &RegistryOptions{ + Retrieval: RetrievalClientOnly, + Syncing: SyncingDisabled, + } + tester, streamer, _, teardown, err := newStreamerTester(t, regOpts) defer teardown() if err != nil { t.Fatal(err) @@ -55,10 +63,21 @@ func TestStreamerRetrieveRequest(t *testing.T) { ) streamer.delivery.RequestFromPeers(ctx, req) + stream := NewStream(swarmChunkServerStreamName, "", true) + err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", Expects: []p2ptest.Expect{ - { + { //start expecting a subscription for RETRIEVE_REQUEST due to `RetrievalClientOnly` + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + History: nil, + Priority: Top, + }, + Peer: node.ID(), + }, + { //expect a retrieve request message for the given hash Code: 5, Msg: &RetrieveRequestMsg{ Addr: hash0[:], @@ -74,9 +93,12 @@ func TestStreamerRetrieveRequest(t *testing.T) { } } +//Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet) +//Should time out as the peer does not have the chunk (no syncing happened previously) func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ - DoServeRetrieve: true, + Retrieval: RetrievalEnabled, + Syncing: SyncingDisabled, //do no syncing }) defer teardown() if err != nil { @@ -89,16 +111,31 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { peer := streamer.getPeer(node.ID()) + stream := NewStream(swarmChunkServerStreamName, "", true) + //simulate pre-subscription to RETRIEVE_REQUEST stream on peer peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ - Stream: NewStream(swarmChunkServerStreamName, "", true), + Stream: stream, History: nil, Priority: Top, }) + //test the exchange err = tester.TestExchanges(p2ptest.Exchange{ + Expects: []p2ptest.Expect{ + { //first expect a subscription to the RETRIEVE_REQUEST stream + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + History: nil, + Priority: Top, + }, + Peer: node.ID(), + }, + }, + }, p2ptest.Exchange{ Label: "RetrieveRequestMsg", Triggers: []p2ptest.Trigger{ - { + { //then the actual RETRIEVE_REQUEST.... Code: 5, Msg: &RetrieveRequestMsg{ Addr: chunk.Address()[:], @@ -107,7 +144,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { }, }, Expects: []p2ptest.Expect{ - { + { //to which the peer responds with offered hashes Code: 1, Msg: &OfferedHashesMsg{ HandoverProof: nil, @@ -120,7 +157,9 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { }, }) - expectedError := `exchange #0 "RetrieveRequestMsg": timed out` + //should fail with a timeout as the peer we are requesting + //the chunk from does not have the chunk + expectedError := `exchange #1 "RetrieveRequestMsg": timed out` if err == nil || err.Error() != expectedError { t.Fatalf("Expected error %v, got %v", expectedError, err) } @@ -130,7 +169,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { // offered hashes or delivery if skipHash is set to true func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{ - DoServeRetrieve: true, + Retrieval: RetrievalEnabled, + Syncing: SyncingDisabled, }) defer teardown() if err != nil { @@ -138,6 +178,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } node := tester.Nodes[0] + peer := streamer.getPeer(node.ID()) stream := NewStream(swarmChunkServerStreamName, "", true) @@ -156,6 +197,18 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } err = tester.TestExchanges(p2ptest.Exchange{ + Expects: []p2ptest.Expect{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + History: nil, + Priority: Top, + }, + Peer: node.ID(), + }, + }, + }, p2ptest.Exchange{ Label: "RetrieveRequestMsg", Triggers: []p2ptest.Trigger{ { @@ -224,9 +277,90 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } } +// if there is one peer in the Kademlia, RequestFromPeers should return it +func TestRequestFromPeers(t *testing.T) { + dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8") + + addr := network.RandomAddr() + to := network.NewKademlia(addr.OAddr, network.NewKadParams()) + delivery := NewDelivery(to, nil) + protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil) + peer := network.NewPeer(&network.BzzPeer{ + BzzAddr: network.RandomAddr(), + LightNode: false, + Peer: protocolsPeer, + }, to) + to.On(peer) + r := NewRegistry(addr.ID(), delivery, nil, nil, nil) + + // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished + sp := &Peer{ + Peer: protocolsPeer, + pq: pq.New(int(PriorityQueue), PriorityQueueCap), + streamer: r, + } + r.setPeer(sp) + req := network.NewRequest( + storage.Address(hash0[:]), + true, + &sync.Map{}, + ) + ctx := context.Background() + id, _, err := delivery.RequestFromPeers(ctx, req) + + if err != nil { + t.Fatal(err) + } + if *id != dummyPeerID { + t.Fatalf("Expected an id, got %v", id) + } +} + +// RequestFromPeers should not return light nodes +func TestRequestFromPeersWithLightNode(t *testing.T) { + dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8") + + addr := network.RandomAddr() + to := network.NewKademlia(addr.OAddr, network.NewKadParams()) + delivery := NewDelivery(to, nil) + + protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil) + // setting up a lightnode + peer := network.NewPeer(&network.BzzPeer{ + BzzAddr: network.RandomAddr(), + LightNode: true, + Peer: protocolsPeer, + }, to) + to.On(peer) + r := NewRegistry(addr.ID(), delivery, nil, nil, nil) + // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished + sp := &Peer{ + Peer: protocolsPeer, + pq: pq.New(int(PriorityQueue), PriorityQueueCap), + streamer: r, + } + r.setPeer(sp) + + req := network.NewRequest( + storage.Address(hash0[:]), + true, + &sync.Map{}, + ) + + ctx := context.Background() + // making a request which should return with "no peer found" + _, _, err := delivery.RequestFromPeers(ctx, req) + + expectedError := "no peer found" + if err.Error() != expectedError { + t.Fatalf("expected '%v', got %v", expectedError, err) + } +} + func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{ - DoServeRetrieve: true, + Retrieval: RetrievalDisabled, + Syncing: SyncingDisabled, }) defer teardown() if err != nil { @@ -241,6 +375,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { node := tester.Nodes[0] + //subscribe to custom stream stream := NewStream("foo", "", true) err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top) if err != nil { @@ -253,7 +388,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", Expects: []p2ptest.Expect{ - { + { //first expect subscription to the custom stream... Code: 4, Msg: &SubscribeMsg{ Stream: stream, @@ -267,7 +402,8 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { p2ptest.Exchange{ Label: "ChunkDelivery message", Triggers: []p2ptest.Trigger{ - { + { //...then trigger a chunk delivery for the given chunk from peer in order for + //local node to get the chunk delivered Code: 6, Msg: &ChunkDeliveryMsg{ Addr: chunkKey, @@ -342,8 +478,9 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - SkipCheck: skipCheck, - DoServeRetrieve: true, + SkipCheck: skipCheck, + Syncing: SyncingDisabled, + Retrieval: RetrievalEnabled, }) bucket.Store(bucketKeyRegistry, r) @@ -408,20 +545,6 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck return err } - //each of the nodes (except pivot node) subscribes to the stream of the next node - for j, node := range nodeIDs[0 : nodes-1] { - sid := nodeIDs[j+1] - item, ok := sim.NodeItem(node, bucketKeyRegistry) - if !ok { - return fmt.Errorf("No registry") - } - registry := item.(*Registry) - err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", true), nil, Top) - if err != nil { - return err - } - } - //get the pivot node's filestore item, ok := sim.NodeItem(*sim.PivotNodeID(), bucketKeyFileStore) if !ok { @@ -530,7 +653,8 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, - DoSync: true, + Syncing: SyncingDisabled, + Retrieval: RetrievalDisabled, SyncUpdateDelay: 0, }) diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index 3164193b3..0c95fabb7 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -83,6 +83,8 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + Retrieval: RetrievalDisabled, + Syncing: SyncingRegisterOnly, SkipCheck: skipCheck, }) bucket.Store(bucketKeyRegistry, r) diff --git a/swarm/network/stream/lightnode_test.go b/swarm/network/stream/lightnode_test.go index 0d3bc7f54..65cde2411 100644 --- a/swarm/network/stream/lightnode_test.go +++ b/swarm/network/stream/lightnode_test.go @@ -25,7 +25,8 @@ import ( // when it is serving Retrieve requests. func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) { registryOptions := &RegistryOptions{ - DoServeRetrieve: true, + Retrieval: RetrievalClientOnly, + Syncing: SyncingDisabled, } tester, _, _, teardown, err := newStreamerTester(t, registryOptions) defer teardown() @@ -63,7 +64,8 @@ func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) { // requests are disabled func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) { registryOptions := &RegistryOptions{ - DoServeRetrieve: false, + Retrieval: RetrievalDisabled, + Syncing: SyncingDisabled, } tester, _, _, teardown, err := newStreamerTester(t, registryOptions) defer teardown() @@ -106,7 +108,8 @@ func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) { // when syncing is enabled. func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { registryOptions := &RegistryOptions{ - DoSync: true, + Retrieval: RetrievalDisabled, + Syncing: SyncingRegisterOnly, } tester, _, _, teardown, err := newStreamerTester(t, registryOptions) defer teardown() @@ -150,7 +153,8 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { // when syncing is disabled. func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) { registryOptions := &RegistryOptions{ - DoSync: false, + Retrieval: RetrievalDisabled, + Syncing: SyncingDisabled, } tester, _, _, teardown, err := newStreamerTester(t, registryOptions) defer teardown() diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index b81cfc0ca..ad1519341 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -127,10 +127,9 @@ func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s no netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - DoSync: true, + Retrieval: RetrievalEnabled, + Syncing: SyncingAutoSubscribe, SyncUpdateDelay: 3 * time.Second, - DoRetrieve: true, - DoServeRetrieve: true, }) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 8d89f28cb..2ddbed936 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -165,8 +165,8 @@ func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Servic netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - DoSync: true, - DoServeRetrieve: true, + Retrieval: RetrievalDisabled, + Syncing: SyncingAutoSubscribe, SyncUpdateDelay: 3 * time.Second, }) @@ -360,8 +360,8 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - DoServeRetrieve: true, - DoSync: true, + Retrieval: RetrievalDisabled, + Syncing: SyncingRegisterOnly, }) bucket.Store(bucketKeyRegistry, r) diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 0ac374def..695ff0c50 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -47,6 +47,31 @@ const ( HashSize = 32 ) +//Enumerate options for syncing and retrieval +type SyncingOption int +type RetrievalOption int + +//Syncing options +const ( + //Syncing disabled + SyncingDisabled SyncingOption = iota + //Register the client and the server but not subscribe + SyncingRegisterOnly + //Both client and server funcs are registered, subscribe sent automatically + SyncingAutoSubscribe +) + +const ( + //Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only) + RetrievalDisabled RetrievalOption = iota + //Only the client side of the retrieve request is registered. + //(light nodes do not serve retrieve requests) + //once the client is registered, subscription to retrieve request stream is always sent + RetrievalClientOnly + //Both client and server funcs are registered, subscribe sent automatically + RetrievalEnabled +) + // Registry registry for outgoing and incoming streamer constructors type Registry struct { addr enode.ID @@ -60,16 +85,15 @@ type Registry struct { peers map[enode.ID]*Peer delivery *Delivery intervalsStore state.Store - doRetrieve bool + autoRetrieval bool //automatically subscribe to retrieve request stream maxPeerServers int } // RegistryOptions holds optional values for NewRegistry constructor. type RegistryOptions struct { SkipCheck bool - DoSync bool // Sets if the server syncs with peers. Default is true, set to false by lightnode or nosync flags. - DoRetrieve bool // Sets if the server issues Retrieve requests. Default is true. - DoServeRetrieve bool // Sets if the server serves Retrieve requests. Default is true, set to false by lightnode flag. + Syncing SyncingOption //Defines syncing behavior + Retrieval RetrievalOption //Defines retrieval behavior SyncUpdateDelay time.Duration MaxPeerServers int // The limit of servers for each peer in registry } @@ -82,6 +106,9 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy if options.SyncUpdateDelay <= 0 { options.SyncUpdateDelay = 15 * time.Second } + //check if retriaval has been disabled + retrieval := options.Retrieval != RetrievalDisabled + streamer := &Registry{ addr: localID, skipCheck: options.SkipCheck, @@ -90,13 +117,14 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy peers: make(map[enode.ID]*Peer), delivery: delivery, intervalsStore: intervalsStore, - doRetrieve: options.DoRetrieve, + autoRetrieval: retrieval, maxPeerServers: options.MaxPeerServers, } streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer - if options.DoServeRetrieve { + //if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only) + if options.Retrieval == RetrievalEnabled { streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) { if !live { return nil, errors.New("only live retrieval requests supported") @@ -105,16 +133,21 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy }) } - streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) - }) + //if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests) + if options.Retrieval != RetrievalDisabled { + streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { + return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) + }) + } - if options.DoSync { + //If syncing is not disabled, the syncing functions are registered (both client and server) + if options.Syncing != SyncingDisabled { RegisterSwarmSyncerServer(streamer, syncChunkStore) RegisterSwarmSyncerClient(streamer, syncChunkStore) } - if options.DoSync { + //if syncing is set to automatically subscribe to the syncing stream, start the subscription process + if options.Syncing == SyncingAutoSubscribe { // latestIntC function ensures that // - receiving from the in chan is not blocked by processing inside the for loop // - the latest int value is delivered to the loop after the processing is done @@ -385,7 +418,7 @@ func (r *Registry) Run(p *network.BzzPeer) error { defer close(sp.quit) defer sp.close() - if r.doRetrieve { + if r.autoRetrieval && !p.LightNode { err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top) if err != nil { return err diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index e7f79e7a1..16c74d3b3 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -765,6 +765,8 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { func TestMaxPeerServersWithUnsubscribe(t *testing.T) { var maxPeerServers = 6 tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + Retrieval: RetrievalDisabled, + Syncing: SyncingDisabled, MaxPeerServers: maxPeerServers, }) defer teardown() diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index f2be3bef9..b0e35b0db 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -62,6 +62,9 @@ func createMockStore(globalStore *mockdb.GlobalStore, id enode.ID, addr *network params.Init(datadir) params.BaseKey = addr.Over() lstore, err = storage.NewLocalStore(params, mockStore) + if err != nil { + return nil, "", err + } return lstore, datadir, nil } @@ -114,6 +117,8 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bucket.Store(bucketKeyDelivery, delivery) r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + Retrieval: RetrievalDisabled, + Syncing: SyncingAutoSubscribe, SkipCheck: skipCheck, }) diff --git a/swarm/pot/address.go b/swarm/pot/address.go index 3974ebcaa..728dac14e 100644 --- a/swarm/pot/address.go +++ b/swarm/pot/address.go @@ -79,46 +79,6 @@ func (a Address) Bytes() []byte { return a[:] } -/* -Proximity(x, y) returns the proximity order of the MSB distance between x and y - -The distance metric MSB(x, y) of two equal length byte sequences x an y is the -value of the binary integer cast of the x^y, ie., x and y bitwise xor-ed. -the binary cast is big endian: most significant bit first (=MSB). - -Proximity(x, y) is a discrete logarithmic scaling of the MSB distance. -It is defined as the reverse rank of the integer part of the base 2 -logarithm of the distance. -It is calculated by counting the number of common leading zeros in the (MSB) -binary representation of the x^y. - -(0 farthest, 255 closest, 256 self) -*/ -func proximity(one, other Address) (ret int, eq bool) { - return posProximity(one, other, 0) -} - -// posProximity(a, b, pos) returns proximity order of b wrt a (symmetric) pretending -// the first pos bits match, checking only bits index >= pos -func posProximity(one, other Address, pos int) (ret int, eq bool) { - for i := pos / 8; i < len(one); i++ { - if one[i] == other[i] { - continue - } - oxo := one[i] ^ other[i] - start := 0 - if i == pos/8 { - start = pos % 8 - } - for j := start; j < 8; j++ { - if (oxo>>uint8(7-j))&0x01 != 0 { - return i*8 + j, false - } - } - } - return len(one) * 8, true -} - // ProxCmp compares the distances a->target and b->target. // Returns -1 if a is closer to target, 1 if b is closer to target // and 0 if they are equal. diff --git a/swarm/pss/client/client_test.go b/swarm/pss/client/client_test.go index 48edc6cce..8f2f0e805 100644 --- a/swarm/pss/client/client_test.go +++ b/swarm/pss/client/client_test.go @@ -252,7 +252,13 @@ func newServices() adapters.Services { ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() keys, err := wapi.NewKeyPair(ctxlocal) + if err != nil { + return nil, err + } privkey, err := w.GetPrivateKey(keys) + if err != nil { + return nil, err + } psparams := pss.NewPssParams().WithPrivateKey(privkey) pskad := kademlia(ctx.Config.ID) ps, err := pss.NewPss(pskad, psparams) @@ -288,10 +294,6 @@ type testStore struct { values map[string][]byte } -func newTestStore() *testStore { - return &testStore{values: make(map[string][]byte)} -} - func (t *testStore) Load(key string) ([]byte, error) { return nil, nil } diff --git a/swarm/pss/notify/notify_test.go b/swarm/pss/notify/notify_test.go index 675b41ada..d4d383a6b 100644 --- a/swarm/pss/notify/notify_test.go +++ b/swarm/pss/notify/notify_test.go @@ -223,7 +223,13 @@ func newServices(allowRaw bool) adapters.Services { ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() keys, err := wapi.NewKeyPair(ctxlocal) + if err != nil { + return nil, err + } privkey, err := w.GetPrivateKey(keys) + if err != nil { + return nil, err + } pssp := pss.NewPssParams().WithPrivateKey(privkey) pssp.MsgTTL = time.Second * 30 pssp.AllowRaw = allowRaw diff --git a/swarm/pss/protocol_test.go b/swarm/pss/protocol_test.go index f4209fea5..4ef3e90a0 100644 --- a/swarm/pss/protocol_test.go +++ b/swarm/pss/protocol_test.go @@ -93,11 +93,17 @@ func testProtocol(t *testing.T) { lctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic) + if err != nil { + t.Fatal(err) + } defer lsub.Unsubscribe() rmsgC := make(chan APIMsg) rctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic) + if err != nil { + t.Fatal(err) + } defer rsub.Unsubscribe() // set reciprocal public keys diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index 574714114..66a90be62 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -976,11 +976,6 @@ func TestNetwork10000(t *testing.T) { } func testNetwork(t *testing.T) { - type msgnotifyC struct { - id enode.ID - msgIdx int - } - paramstring := strings.Split(t.Name(), "/") nodecount, _ := strconv.ParseInt(paramstring[1], 10, 0) msgcount, _ := strconv.ParseInt(paramstring[2], 10, 0) diff --git a/swarm/pss/types.go b/swarm/pss/types.go index 1e33ecdca..56c2c51dc 100644 --- a/swarm/pss/types.go +++ b/swarm/pss/types.go @@ -169,10 +169,6 @@ type stateStore struct { values map[string][]byte } -func newStateStore() *stateStore { - return &stateStore{values: make(map[string][]byte)} -} - func (store *stateStore) Load(key string) ([]byte, error) { return nil, nil } diff --git a/swarm/sctx/sctx.go b/swarm/sctx/sctx.go index bed2b1145..fb7d35b00 100644 --- a/swarm/sctx/sctx.go +++ b/swarm/sctx/sctx.go @@ -2,19 +2,17 @@ package sctx import "context" -type ContextKey int - -const ( - HTTPRequestIDKey ContextKey = iota - requestHostKey +type ( + HTTPRequestIDKey struct{} + requestHostKey struct{} ) func SetHost(ctx context.Context, domain string) context.Context { - return context.WithValue(ctx, requestHostKey, domain) + return context.WithValue(ctx, requestHostKey{}, domain) } func GetHost(ctx context.Context) string { - v, ok := ctx.Value(requestHostKey).(string) + v, ok := ctx.Value(requestHostKey{}).(string) if ok { return v } diff --git a/swarm/state/dbstore.go b/swarm/state/dbstore.go index 5e5c172b2..b0aa92e27 100644 --- a/swarm/state/dbstore.go +++ b/swarm/state/dbstore.go @@ -69,7 +69,7 @@ func (s *DBStore) Get(key string, i interface{}) (err error) { // Put stores an object that implements Binary for a specific key. func (s *DBStore) Put(key string, i interface{}) (err error) { - bytes := []byte{} + var bytes []byte marshaler, ok := i.(encoding.BinaryMarshaler) if !ok { diff --git a/swarm/state/dbstore_test.go b/swarm/state/dbstore_test.go index 6683e788f..f7098956d 100644 --- a/swarm/state/dbstore_test.go +++ b/swarm/state/dbstore_test.go @@ -112,6 +112,9 @@ func testPersistedStore(t *testing.T, store Store) { as := []string{} err = store.Get("key2", &as) + if err != nil { + t.Fatal(err) + } if len(as) != 3 { t.Fatalf("serialized array did not match expectation") diff --git a/swarm/state/inmemorystore.go b/swarm/state/inmemorystore.go index 1ca25404a..3ba48592b 100644 --- a/swarm/state/inmemorystore.go +++ b/swarm/state/inmemorystore.go @@ -59,7 +59,7 @@ func (s *InmemoryStore) Get(key string, i interface{}) (err error) { func (s *InmemoryStore) Put(key string, i interface{}) (err error) { s.mu.Lock() defer s.mu.Unlock() - bytes := []byte{} + var bytes []byte marshaler, ok := i.(encoding.BinaryMarshaler) if !ok { diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go index 33133edd7..600be164a 100644 --- a/swarm/storage/common_test.go +++ b/swarm/storage/common_test.go @@ -88,17 +88,6 @@ func mputRandomChunks(store ChunkStore, n int, chunksize int64) ([]Chunk, error) return mput(store, n, GenerateRandomChunk) } -func mputChunks(store ChunkStore, chunks ...Chunk) error { - i := 0 - f := func(n int64) Chunk { - chunk := chunks[i] - i++ - return chunk - } - _, err := mput(store, len(chunks), f) - return err -} - func mput(store ChunkStore, n int, f func(i int64) Chunk) (hs []Chunk, err error) { // put to localstore and wait for stored channel // does not check delivery error state diff --git a/swarm/storage/feed/handler_test.go b/swarm/storage/feed/handler_test.go index cf95bc1f5..fb2ef3a6b 100644 --- a/swarm/storage/feed/handler_test.go +++ b/swarm/storage/feed/handler_test.go @@ -27,7 +27,6 @@ import ( "time" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" @@ -506,15 +505,3 @@ func newCharlieSigner() *GenericSigner { privKey, _ := crypto.HexToECDSA("facadefacadefacadefacadefacadefacadefacadefacadefacadefacadefaca") return NewGenericSigner(privKey) } - -func getUpdateDirect(rh *Handler, addr storage.Address) ([]byte, error) { - chunk, err := rh.chunkStore.Get(context.TODO(), addr) - if err != nil { - return nil, err - } - var r Request - if err := r.fromChunk(addr, chunk.Data()); err != nil { - return nil, err - } - return r.data, nil -} diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index 49508911f..9feb68741 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -39,7 +39,6 @@ import ( "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage/mock" "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/opt" ) const ( @@ -72,13 +71,6 @@ var ( ErrDBClosed = errors.New("LDBStore closed") ) -type gcItem struct { - idx *dpaDBIndex - value uint64 - idxKey []byte - po uint8 -} - type LDBStoreParams struct { *StoreParams Path string @@ -961,15 +953,3 @@ func (s *LDBStore) SyncIterator(since uint64, until uint64, po uint8, f func(Add } return it.Error() } - -func databaseExists(path string) bool { - o := &opt.Options{ - ErrorIfMissing: true, - } - tdb, err := leveldb.OpenFile(path, o) - if err != nil { - return false - } - defer tdb.Close() - return true -} diff --git a/swarm/storage/types.go b/swarm/storage/types.go index 8c70f4584..092843db0 100644 --- a/swarm/storage/types.go +++ b/swarm/storage/types.go @@ -80,6 +80,19 @@ func (a Address) bits(i, j uint) uint { return res } +// Proximity(x, y) returns the proximity order of the MSB distance between x and y +// +// The distance metric MSB(x, y) of two equal length byte sequences x an y is the +// value of the binary integer cast of the x^y, ie., x and y bitwise xor-ed. +// the binary cast is big endian: most significant bit first (=MSB). +// +// Proximity(x, y) is a discrete logarithmic scaling of the MSB distance. +// It is defined as the reverse rank of the integer part of the base 2 +// logarithm of the distance. +// It is calculated by counting the number of common leading zeros in the (MSB) +// binary representation of the x^y. +// +// (0 farthest, 255 closest, 256 self) func Proximity(one, other []byte) (ret int) { b := (MaxPO-1)/8 + 1 if b > len(one) { @@ -231,11 +244,8 @@ func GenerateRandomChunk(dataSize int64) Chunk { } func GenerateRandomChunks(dataSize int64, count int) (chunks []Chunk) { - if dataSize > ch.DefaultSize { - dataSize = ch.DefaultSize - } for i := 0; i < count; i++ { - ch := GenerateRandomChunk(ch.DefaultSize) + ch := GenerateRandomChunk(dataSize) chunks = append(chunks, ch) } return chunks diff --git a/swarm/swarm.go b/swarm/swarm.go index 7214abbda..1fb5443fd 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -175,18 +175,24 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e if err := nodeID.UnmarshalText([]byte(config.NodeID)); err != nil { return nil, err } + + syncing := stream.SyncingAutoSubscribe + if !config.SyncEnabled || config.LightNodeEnabled { + syncing = stream.SyncingDisabled + } + + retrieval := stream.RetrievalEnabled + if config.LightNodeEnabled { + retrieval = stream.RetrievalClientOnly + } + registryOptions := &stream.RegistryOptions{ SkipCheck: config.DeliverySkipCheck, - DoSync: config.SyncEnabled, - DoRetrieve: true, - DoServeRetrieve: true, + Syncing: syncing, + Retrieval: retrieval, SyncUpdateDelay: config.SyncUpdateDelay, MaxPeerServers: config.MaxStreamPeerServers, } - if config.LightNodeEnabled { - registryOptions.DoSync = false - registryOptions.DoRetrieve = false - } self.streamer = stream.NewRegistry(nodeID, delivery, self.netStore, stateStore, registryOptions) // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage |