From 9184249b393e4e332ae6a2f5d774880a88a9bfd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 2 Mar 2017 15:06:16 +0200 Subject: Logger updates 3 (#3730) * accounts, cmd, eth, ethdb: port logs over to new system * ethdb: drop concept of cache distribution between dbs * eth: fix some log nitpicks to make them nicer --- accounts/url.go | 9 +++ cmd/geth/chaincmd.go | 2 +- cmd/geth/main.go | 14 ++-- cmd/swarm/cleandb.go | 7 +- cmd/swarm/hash.go | 9 ++- cmd/swarm/main.go | 6 +- cmd/swarm/manifest.go | 30 ++++----- cmd/swarm/upload.go | 23 +++---- cmd/utils/cmd.go | 19 +++--- cmd/utils/flags.go | 6 +- eth/api.go | 13 ++-- eth/backend.go | 34 +++++----- eth/bad_block.go | 4 +- eth/db_upgrade.go | 20 +++--- eth/downloader/downloader.go | 64 +++++++++--------- eth/downloader/peer.go | 6 +- eth/fetcher/fetcher.go | 58 ++++++----------- eth/gasprice/gasprice.go | 3 +- eth/handler.go | 50 +++++++------- eth/peer.go | 13 ++-- eth/sync.go | 7 +- ethdb/database.go | 151 ++++++++++++++++++++----------------------- 22 files changed, 256 insertions(+), 292 deletions(-) diff --git a/accounts/url.go b/accounts/url.go index a2d00c1c6..47f9d8ee4 100644 --- a/accounts/url.go +++ b/accounts/url.go @@ -60,6 +60,15 @@ func (u URL) String() string { return u.Path } +// TerminalString implements the log.TerminalStringer interface. +func (u URL) TerminalString() string { + url := u.String() + if len(url) > 32 { + return url[:31] + "…" + } + return url +} + // MarshalJSON implements the json.Marshaller interface. func (u URL) MarshalJSON() ([]byte, error) { return json.Marshal(u.String()) diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index 1c21c4ded..784692261 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -117,7 +117,7 @@ func initGenesis(ctx *cli.Context) error { if err != nil { utils.Fatalf("failed to write genesis block: %v", err) } - log.Info(fmt.Sprintf("successfully wrote genesis block and/or chain rule set: %x", block.Hash())) + log.Info("Successfully wrote genesis state", "hash", block.Hash()) return nil } diff --git a/cmd/geth/main.go b/cmd/geth/main.go index fa61f7386..79893cc04 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/console" "github.com/ethereum/go-ethereum/contracts/release" "github.com/ethereum/go-ethereum/eth" @@ -202,11 +203,10 @@ func makeFullNode(ctx *cli.Context) *node.Node { }{uint(params.VersionMajor<<16 | params.VersionMinor<<8 | params.VersionPatch), clientIdentifier, runtime.Version(), runtime.GOOS} extra, err := rlp.EncodeToBytes(clientInfo) if err != nil { - log.Warn(fmt.Sprint("error setting canonical miner information:", err)) + log.Warn("Failed to set canonical miner information", "err", err) } if uint64(len(extra)) > params.MaximumExtraDataSize { - log.Warn(fmt.Sprint("error setting canonical miner information: extra exceeds", params.MaximumExtraDataSize)) - log.Debug(fmt.Sprintf("extra: %x\n", extra)) + log.Warn("Miner extra data exceed limit", "extra", hexutil.Bytes(extra), "limit", params.MaximumExtraDataSize) extra = nil } stack := utils.MakeNode(ctx, clientIdentifier, gitCommit) @@ -271,7 +271,7 @@ func startNode(ctx *cli.Context, stack *node.Node) { // Open and self derive any wallets already attached for _, wallet := range stack.AccountManager().Wallets() { if err := wallet.Open(""); err != nil { - log.Warn(fmt.Sprintf("Failed to open wallet %s: %v", wallet.URL(), err)) + log.Warn("Failed to open wallet", "url", wallet.URL(), "err", err) } else { wallet.SelfDerive(accounts.DefaultBaseDerivationPath, stateReader) } @@ -280,13 +280,13 @@ func startNode(ctx *cli.Context, stack *node.Node) { for event := range events { if event.Arrive { if err := event.Wallet.Open(""); err != nil { - log.Info(fmt.Sprintf("New wallet appeared: %s, failed to open: %s", event.Wallet.URL(), err)) + log.Warn("New wallet appeared, failed to open", "url", event.Wallet.URL(), "err", err) } else { - log.Info(fmt.Sprintf("New wallet appeared: %s, %s", event.Wallet.URL(), event.Wallet.Status())) + log.Info("New wallet appeared", "url", event.Wallet.URL(), "status", event.Wallet.Status()) event.Wallet.SelfDerive(accounts.DefaultBaseDerivationPath, stateReader) } } else { - log.Info(fmt.Sprintf("Old wallet dropped: %s", event.Wallet.URL())) + log.Info("Old wallet dropped", "url", event.Wallet.URL()) event.Wallet.Close() } } diff --git a/cmd/swarm/cleandb.go b/cmd/swarm/cleandb.go index 81636ada5..29d01ba0f 100644 --- a/cmd/swarm/cleandb.go +++ b/cmd/swarm/cleandb.go @@ -17,8 +17,7 @@ package main import ( - "log" - + "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/swarm/storage" "gopkg.in/urfave/cli.v1" ) @@ -26,14 +25,14 @@ import ( func cleandb(ctx *cli.Context) { args := ctx.Args() if len(args) != 1 { - log.Fatal("need path to chunks database as the first and only argument") + utils.Fatalf("Need path to chunks database as the first and only argument") } chunkDbPath := args[0] hash := storage.MakeHashFunc("SHA3") dbStore, err := storage.NewDbStore(chunkDbPath, hash, 10000000, 0) if err != nil { - log.Fatalf("cannot initialise dbstore: %v", err) + utils.Fatalf("Cannot initialise dbstore: %v", err) } dbStore.Cleanup() } diff --git a/cmd/swarm/hash.go b/cmd/swarm/hash.go index bcba77a2a..792e8d0d7 100644 --- a/cmd/swarm/hash.go +++ b/cmd/swarm/hash.go @@ -19,9 +19,9 @@ package main import ( "fmt" - "log" "os" + "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/swarm/storage" "gopkg.in/urfave/cli.v1" ) @@ -29,12 +29,11 @@ import ( func hash(ctx *cli.Context) { args := ctx.Args() if len(args) < 1 { - log.Fatal("Usage: swarm hash ") + utils.Fatalf("Usage: swarm hash ") } f, err := os.Open(args[0]) if err != nil { - fmt.Println("Error opening file " + args[1]) - os.Exit(1) + utils.Fatalf("Error opening file " + args[1]) } defer f.Close() @@ -42,7 +41,7 @@ func hash(ctx *cli.Context) { chunker := storage.NewTreeChunker(storage.NewChunkerParams()) key, err := chunker.Split(f, stat.Size(), nil, nil, nil) if err != nil { - log.Fatalf("%v\n", err) + utils.Fatalf("%v\n", err) } else { fmt.Printf("%v\n", key) } diff --git a/cmd/swarm/main.go b/cmd/swarm/main.go index 12cad328f..171677146 100644 --- a/cmd/swarm/main.go +++ b/cmd/swarm/main.go @@ -277,7 +277,7 @@ func bzzd(ctx *cli.Context) error { signal.Notify(sigc, syscall.SIGTERM) defer signal.Stop(sigc) <-sigc - log.Info(fmt.Sprint("Got sigterm, shutting down...")) + log.Info("Got sigterm, shutting swarm down...") stack.Stop() }() networkId := ctx.GlobalUint64(SwarmNetworkIdFlag.Name) @@ -342,7 +342,7 @@ func getAccount(ctx *cli.Context, stack *node.Node) *ecdsa.PrivateKey { } // Try to load the arg as a hex key file. if key, err := crypto.LoadECDSA(keyid); err == nil { - log.Info(fmt.Sprintf("swarm account key loaded: %#x", crypto.PubkeyToAddress(key.PublicKey))) + log.Info("Swarm account key loaded", "address", crypto.PubkeyToAddress(key.PublicKey)) return key } // Otherwise try getting it from the keystore. @@ -399,7 +399,7 @@ func injectBootnodes(srv *p2p.Server, nodes []string) { for _, url := range nodes { n, err := discover.ParseNode(url) if err != nil { - log.Error(fmt.Sprintf("invalid bootnode %q", err)) + log.Error("Invalid swarm bootnode", "err", err) continue } srv.AddPeer(n) diff --git a/cmd/swarm/manifest.go b/cmd/swarm/manifest.go index f64792689..2b6b02313 100644 --- a/cmd/swarm/manifest.go +++ b/cmd/swarm/manifest.go @@ -20,19 +20,18 @@ package main import ( "encoding/json" "fmt" - "log" "mime" "path/filepath" "strings" + "github.com/ethereum/go-ethereum/cmd/utils" "gopkg.in/urfave/cli.v1" ) func add(ctx *cli.Context) { - args := ctx.Args() if len(args) < 3 { - log.Fatal("need atleast three arguments []") + utils.Fatalf("Need atleast three arguments []") } var ( @@ -66,7 +65,7 @@ func update(ctx *cli.Context) { args := ctx.Args() if len(args) < 3 { - log.Fatal("need atleast three arguments ") + utils.Fatalf("Need atleast three arguments ") } var ( @@ -98,7 +97,7 @@ func update(ctx *cli.Context) { func remove(ctx *cli.Context) { args := ctx.Args() if len(args) < 2 { - log.Fatal("need atleast two arguments ") + utils.Fatalf("Need atleast two arguments ") } var ( @@ -134,19 +133,19 @@ func addEntryToManifest(ctx *cli.Context, mhash, path, hash, ctype string) strin mroot, err := client.downloadManifest(mhash) if err != nil { - log.Fatalln("manifest download failed:", err) + utils.Fatalf("Manifest download failed: %v", err) } //TODO: check if the "hash" to add is valid and present in swarm _, err = client.downloadManifest(hash) if err != nil { - log.Fatalln("hash to add is not present:", err) + utils.Fatalf("Hash to add is not present: %v", err) } // See if we path is in this Manifest or do we have to dig deeper for _, entry := range mroot.Entries { if path == entry.Path { - log.Fatal(path, "Already present, not adding anything") + utils.Fatalf("Path %s already present, not adding anything", path) } else { if entry.ContentType == "application/bzz-manifest+json" { prfxlen := strings.HasPrefix(path, entry.Path) @@ -183,7 +182,7 @@ func addEntryToManifest(ctx *cli.Context, mhash, path, hash, ctype string) strin newManifestHash, err := client.uploadManifest(mroot) if err != nil { - log.Fatalln("manifest upload failed:", err) + utils.Fatalf("Manifest upload failed: %v", err) } return newManifestHash @@ -208,7 +207,7 @@ func updateEntryInManifest(ctx *cli.Context, mhash, path, hash, ctype string) st mroot, err := client.downloadManifest(mhash) if err != nil { - log.Fatalln("manifest download failed:", err) + utils.Fatalf("Manifest download failed: %v", err) } //TODO: check if the "hash" with which to update is valid and present in swarm @@ -228,7 +227,7 @@ func updateEntryInManifest(ctx *cli.Context, mhash, path, hash, ctype string) st } if longestPathEntry.Path == "" && newEntry.Path == "" { - log.Fatal(path, " Path not present in the Manifest, not setting anything") + utils.Fatalf("Path %s not present in the Manifest, not setting anything", path) } if longestPathEntry.Path != "" { @@ -268,7 +267,7 @@ func updateEntryInManifest(ctx *cli.Context, mhash, path, hash, ctype string) st newManifestHash, err := client.uploadManifest(mroot) if err != nil { - log.Fatalln("manifest upload failed:", err) + utils.Fatalf("Manifest upload failed: %v", err) } return newManifestHash } @@ -292,7 +291,7 @@ func removeEntryFromManifest(ctx *cli.Context, mhash, path string) string { mroot, err := client.downloadManifest(mhash) if err != nil { - log.Fatalln("manifest download failed:", err) + utils.Fatalf("Manifest download failed: %v", err) } // See if we path is in this Manifest or do we have to dig deeper @@ -310,7 +309,7 @@ func removeEntryFromManifest(ctx *cli.Context, mhash, path string) string { } if longestPathEntry.Path == "" && entryToRemove.Path == "" { - log.Fatal(path, "Path not present in the Manifest, not removing anything") + utils.Fatalf("Path %s not present in the Manifest, not removing anything", path) } if longestPathEntry.Path != "" { @@ -342,8 +341,7 @@ func removeEntryFromManifest(ctx *cli.Context, mhash, path string) string { newManifestHash, err := client.uploadManifest(mroot) if err != nil { - log.Fatalln("manifest upload failed:", err) + utils.Fatalf("Manifest upload failed: %v", err) } return newManifestHash - } diff --git a/cmd/swarm/upload.go b/cmd/swarm/upload.go index 9f3a2abe0..7b4961778 100644 --- a/cmd/swarm/upload.go +++ b/cmd/swarm/upload.go @@ -23,7 +23,6 @@ import ( "fmt" "io" "io/ioutil" - "log" "mime" "net/http" "os" @@ -32,6 +31,8 @@ import ( "path/filepath" "strings" + "github.com/ethereum/go-ethereum/cmd/utils" + "github.com/ethereum/go-ethereum/log" "gopkg.in/urfave/cli.v1" ) @@ -44,7 +45,7 @@ func upload(ctx *cli.Context) { defaultPath = ctx.GlobalString(SwarmUploadDefaultPath.Name) ) if len(args) != 1 { - log.Fatal("need filename as the first and only argument") + utils.Fatalf("Need filename as the first and only argument") } var ( @@ -53,25 +54,25 @@ func upload(ctx *cli.Context) { ) fi, err := os.Stat(expandPath(file)) if err != nil { - log.Fatal(err) + utils.Fatalf("Failed to stat file: %v", err) } if fi.IsDir() { if !recursive { - log.Fatal("argument is a directory and recursive upload is disabled") + utils.Fatalf("Argument is a directory and recursive upload is disabled") } if !wantManifest { - log.Fatal("manifest is required for directory uploads") + utils.Fatalf("Manifest is required for directory uploads") } mhash, err := client.uploadDirectory(file, defaultPath) if err != nil { - log.Fatal(err) + utils.Fatalf("Failed to upload directory: %v", err) } fmt.Println(mhash) return } entry, err := client.uploadFile(file, fi) if err != nil { - log.Fatalln("upload failed:", err) + utils.Fatalf("Upload failed: %v", err) } mroot := manifest{[]manifestEntry{entry}} if !wantManifest { @@ -82,7 +83,7 @@ func upload(ctx *cli.Context) { } hash, err := client.uploadManifest(mroot) if err != nil { - log.Fatalln("manifest upload failed:", err) + utils.Fatalf("Manifest upload failed: %v", err) } fmt.Println(hash) } @@ -173,7 +174,7 @@ func (c *client) uploadFileContent(file string, fi os.FileInfo) (string, error) return "", err } defer fd.Close() - log.Printf("uploading file %s (%d bytes)", file, fi.Size()) + log.Info("Uploading swarm content", "file", file, "bytes", fi.Size()) return c.postRaw("application/octet-stream", fi.Size(), fd) } @@ -182,7 +183,7 @@ func (c *client) uploadManifest(m manifest) (string, error) { if err != nil { panic(err) } - log.Println("uploading manifest") + log.Info("Uploading swarm manifest") return c.postRaw("application/json", int64(len(jsm)), ioutil.NopCloser(bytes.NewReader(jsm))) } @@ -192,7 +193,7 @@ func (c *client) uploadToManifest(mhash string, path string, fpath string, fi os return "", err } defer fd.Close() - log.Printf("uploading file %s (%d bytes) and adding path %v", fpath, fi.Size(), path) + log.Info("Uploading swarm content and path", "file", fpath, "bytes", fi.Size(), "path", path) req, err := http.NewRequest("PUT", c.api+"/bzz:/"+mhash+"/"+path, fd) if err != nil { return "", err diff --git a/cmd/utils/cmd.go b/cmd/utils/cmd.go index 843cb5b4e..17c258c6c 100644 --- a/cmd/utils/cmd.go +++ b/cmd/utils/cmd.go @@ -67,12 +67,12 @@ func StartNode(stack *node.Node) { signal.Notify(sigc, os.Interrupt) defer signal.Stop(sigc) <-sigc - log.Info(fmt.Sprint("Got interrupt, shutting down...")) + log.Info("Got interrupt, shutting down...") go stack.Stop() for i := 10; i > 0; i-- { <-sigc if i > 1 { - log.Info(fmt.Sprintf("Already shutting down, interrupt %d more times for panic.", i-1)) + log.Warn("Already shutting down, interrupt more to panic.", "times", i-1) } } debug.Exit() // ensure trace and CPU profile data is flushed. @@ -90,7 +90,7 @@ func ImportChain(chain *core.BlockChain, fn string) error { defer close(interrupt) go func() { if _, ok := <-interrupt; ok { - log.Info(fmt.Sprint("caught interrupt during import, will stop at next batch")) + log.Info("Interrupted during import, stopping at next batch") } close(stop) }() @@ -103,7 +103,7 @@ func ImportChain(chain *core.BlockChain, fn string) error { } } - log.Info(fmt.Sprint("Importing blockchain ", fn)) + log.Info("Importing blockchain", "file", fn) fh, err := os.Open(fn) if err != nil { return err @@ -151,8 +151,7 @@ func ImportChain(chain *core.BlockChain, fn string) error { return fmt.Errorf("interrupted") } if hasAllBlocks(chain, blocks[:i]) { - log.Info(fmt.Sprintf("skipping batch %d, all blocks present [%x / %x]", - batch, blocks[0].Hash().Bytes()[:4], blocks[i-1].Hash().Bytes()[:4])) + log.Info("Skipping batch as all blocks present", "batch", batch, "first", blocks[0].Hash(), "last", blocks[i-1].Hash()) continue } @@ -173,7 +172,7 @@ func hasAllBlocks(chain *core.BlockChain, bs []*types.Block) bool { } func ExportChain(blockchain *core.BlockChain, fn string) error { - log.Info(fmt.Sprint("Exporting blockchain to ", fn)) + log.Info("Exporting blockchain", "file", fn) fh, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.ModePerm) if err != nil { return err @@ -189,13 +188,13 @@ func ExportChain(blockchain *core.BlockChain, fn string) error { if err := blockchain.Export(writer); err != nil { return err } - log.Info(fmt.Sprint("Exported blockchain to ", fn)) + log.Info("Exported blockchain", "file", fn) return nil } func ExportAppendChain(blockchain *core.BlockChain, fn string, first uint64, last uint64) error { - log.Info(fmt.Sprint("Exporting blockchain to ", fn)) + log.Info("Exporting blockchain", "file", fn) // TODO verify mode perms fh, err := os.OpenFile(fn, os.O_CREATE|os.O_APPEND|os.O_WRONLY, os.ModePerm) if err != nil { @@ -212,6 +211,6 @@ func ExportAppendChain(blockchain *core.BlockChain, fn string, first uint64, las if err := blockchain.ExportN(writer, first, last); err != nil { return err } - log.Info(fmt.Sprint("Exported blockchain to ", fn)) + log.Info("Exported blockchain to", "file", fn) return nil } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index a586ffeb3..adc18a203 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -492,7 +492,7 @@ func MakeBootstrapNodes(ctx *cli.Context) []*discover.Node { for _, url := range urls { node, err := discover.ParseNode(url) if err != nil { - log.Error(fmt.Sprintf("Bootstrap URL %s: %v\n", url, err)) + log.Error("Bootstrap URL invalid", "enode", url, "err", err) continue } bootnodes = append(bootnodes, node) @@ -512,7 +512,7 @@ func MakeBootstrapNodesV5(ctx *cli.Context) []*discv5.Node { for _, url := range urls { node, err := discv5.ParseNode(url) if err != nil { - log.Error(fmt.Sprintf("Bootstrap URL %s: %v\n", url, err)) + log.Error("Bootstrap URL invalid", "enode", url, "err", err) continue } bootnodes = append(bootnodes, node) @@ -609,7 +609,7 @@ func MakeAddress(ks *keystore.KeyStore, account string) (accounts.Account, error func MakeEtherbase(ks *keystore.KeyStore, ctx *cli.Context) common.Address { accounts := ks.Accounts() if !ctx.GlobalIsSet(EtherbaseFlag.Name) && len(accounts) == 0 { - log.Error(fmt.Sprint("WARNING: No etherbase set and no accounts found as default")) + log.Warn("No etherbase set and no accounts found as default") return common.Address{} } etherbase := ctx.GlobalString(EtherbaseFlag.Name) diff --git a/eth/api.go b/eth/api.go index 3cec749df..3aac34ee0 100644 --- a/eth/api.go +++ b/eth/api.go @@ -37,7 +37,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/internal/ethapi" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/miner" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" @@ -103,17 +102,17 @@ func (s *PublicMinerAPI) SubmitWork(nonce types.BlockNonce, solution, digest com // result[0], 32 bytes hex encoded current block header pow-hash // result[1], 32 bytes hex encoded seed hash used for DAG // result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty -func (s *PublicMinerAPI) GetWork() (work [3]string, err error) { +func (s *PublicMinerAPI) GetWork() ([3]string, error) { if !s.e.IsMining() { if err := s.e.StartMining(0); err != nil { - return work, err + return [3]string{}, err } } - if work, err = s.agent.GetWork(); err == nil { - return + work, err := s.agent.GetWork() + if err != nil { + return work, fmt.Errorf("mining not ready: %v", err) } - log.Debug(fmt.Sprintf("%v", err)) - return work, fmt.Errorf("mining not ready") + return work, nil } // SubmitHashrate can be used for remote miners to submit their hash rate. This enables the node to report the combined diff --git a/eth/backend.go b/eth/backend.go index c288b2557..5a8b41f5d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -179,8 +179,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if err := addMipmapBloomBins(chainDb); err != nil { return nil, err } - - log.Info(fmt.Sprintf("Protocol Versions: %v, Network Id: %v", ProtocolVersions, config.NetworkId)) + log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId) if !config.SkipBcVersionCheck { bcVersion := core.GetBlockChainVersion(chainDb) @@ -198,7 +197,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if err != nil { return nil, err } - log.Info(fmt.Sprint("WARNING: Wrote default ethereum genesis block")) + log.Warn("Wrote default Ethereum genesis block") } if config.ChainConfig == nil { @@ -208,7 +207,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { eth.chainConfig = config.ChainConfig - log.Info(fmt.Sprint("Chain config:", eth.chainConfig)) + log.Info("Initialised chain configuration", "config", eth.chainConfig) eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.pow, eth.EventMux(), vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}) if err != nil { @@ -269,7 +268,7 @@ func SetupGenesisBlock(chainDb *ethdb.Database, config *Config) error { if err != nil { return err } - log.Info(fmt.Sprintf("Successfully wrote custom genesis block: %x", block.Hash())) + log.Info("Successfully wrote custom genesis block", "hash", block.Hash()) } // Load up a test setup if directly injected if config.TestGenesisState != nil { @@ -288,13 +287,13 @@ func SetupGenesisBlock(chainDb *ethdb.Database, config *Config) error { func CreatePoW(config *Config) (pow.PoW, error) { switch { case config.PowFake: - log.Info(fmt.Sprintf("ethash used in fake mode")) + log.Warn("Ethash used in fake mode") return pow.PoW(core.FakePow{}), nil case config.PowTest: - log.Info(fmt.Sprintf("ethash used in test mode")) + log.Warn("Ethash used in test mode") return ethash.NewForTesting() case config.PowShared: - log.Info(fmt.Sprintf("ethash used in shared mode")) + log.Warn("Ethash used in shared mode") return ethash.NewShared(), nil default: return ethash.New(), nil @@ -377,9 +376,8 @@ func (self *Ethereum) SetEtherbase(etherbase common.Address) { func (s *Ethereum) StartMining(threads int) error { eb, err := s.Etherbase() if err != nil { - err = fmt.Errorf("Cannot start mining without etherbase address: %v", err) - log.Error(fmt.Sprint(err)) - return err + log.Error("Cannot start mining without etherbase", "err", err) + return fmt.Errorf("etherbase missing: %v", err) } go s.miner.Start(eb, threads) return nil @@ -466,14 +464,14 @@ func (self *Ethereum) StartAutoDAG() { return // already started } go func() { - log.Info(fmt.Sprintf("Automatic pregeneration of ethash DAG ON (ethash dir: %s)", ethash.DefaultDir)) + log.Info("Pre-generation of ethash DAG on", "dir", ethash.DefaultDir) var nextEpoch uint64 timer := time.After(0) self.autodagquit = make(chan bool) for { select { case <-timer: - log.Info(fmt.Sprintf("checking DAG (ethash dir: %s)", ethash.DefaultDir)) + log.Info("Checking DAG availability", "dir", ethash.DefaultDir) currentBlock := self.BlockChain().CurrentBlock().NumberU64() thisEpoch := currentBlock / epochLength if nextEpoch <= thisEpoch { @@ -482,19 +480,19 @@ func (self *Ethereum) StartAutoDAG() { previousDag, previousDagFull := dagFiles(thisEpoch - 1) os.Remove(filepath.Join(ethash.DefaultDir, previousDag)) os.Remove(filepath.Join(ethash.DefaultDir, previousDagFull)) - log.Info(fmt.Sprintf("removed DAG for epoch %d (%s)", thisEpoch-1, previousDag)) + log.Info("Removed previous DAG", "epoch", thisEpoch-1, "dag", previousDag) } nextEpoch = thisEpoch + 1 dag, _ := dagFiles(nextEpoch) if _, err := os.Stat(dag); os.IsNotExist(err) { - log.Info(fmt.Sprintf("Pregenerating DAG for epoch %d (%s)", nextEpoch, dag)) + log.Info("Pre-generating next DAG", "epoch", nextEpoch, "dag", dag) err := ethash.MakeDAG(nextEpoch*epochLength, "") // "" -> ethash.DefaultDir if err != nil { - log.Error(fmt.Sprintf("Error generating DAG for epoch %d (%s)", nextEpoch, dag)) + log.Error("Error generating DAG", "epoch", nextEpoch, "dag", dag, "err", err) return } } else { - log.Error(fmt.Sprintf("DAG for epoch %d (%s)", nextEpoch, dag)) + log.Warn("DAG already exists", "epoch", nextEpoch, "dag", dag) } } } @@ -512,7 +510,7 @@ func (self *Ethereum) StopAutoDAG() { close(self.autodagquit) self.autodagquit = nil } - log.Info(fmt.Sprintf("Automatic pregeneration of ethash DAG OFF (ethash dir: %s)", ethash.DefaultDir)) + log.Info("Pre-generation of ethash DAG off", "dir", ethash.DefaultDir) } // dagFiles(epoch) returns the two alternative DAG filenames (not a path) diff --git a/eth/bad_block.go b/eth/bad_block.go index 0812af7c0..dd1ced804 100644 --- a/eth/bad_block.go +++ b/eth/bad_block.go @@ -65,9 +65,9 @@ func sendBadBlockReport(block *types.Block, err error) { client := http.Client{Timeout: 8 * time.Second} resp, err := client.Post(badBlocksURL, "application/json", bytes.NewReader(jsonStr)) if err != nil { - log.Debug(fmt.Sprint(err)) + log.Debug("Failed to report bad block", "err", err) return } - log.Debug(fmt.Sprintf("Bad Block Report posted (%d)", resp.StatusCode)) + log.Debug("Bad block report posted", "status", resp.StatusCode) resp.Body.Close() } diff --git a/eth/db_upgrade.go b/eth/db_upgrade.go index 2a61af364..82cdd7e55 100644 --- a/eth/db_upgrade.go +++ b/eth/db_upgrade.go @@ -49,7 +49,7 @@ func upgradeSequentialKeys(db ethdb.Database) (stopFn func()) { return nil // empty database, nothing to do } - log.Info(fmt.Sprintf("Upgrading chain database to use sequential keys")) + log.Warn("Upgrading chain database to use sequential keys") stopChn := make(chan struct{}) stoppedChn := make(chan struct{}) @@ -72,11 +72,11 @@ func upgradeSequentialKeys(db ethdb.Database) (stopFn func()) { err, stopped = upgradeSequentialOrphanedReceipts(db, stopFn) } if err == nil && !stopped { - log.Info(fmt.Sprintf("Database conversion successful")) + log.Info("Database conversion successful") db.Put(useSequentialKeys, []byte{42}) } if err != nil { - log.Error(fmt.Sprintf("Database conversion failed: %v", err)) + log.Error("Database conversion failed", "err", err) } close(stoppedChn) }() @@ -105,7 +105,7 @@ func upgradeSequentialCanonicalNumbers(db ethdb.Database, stopFn func() bool) (e it.Release() it = db.(*ethdb.LDBDatabase).NewIterator() it.Seek(keyPtr) - log.Info(fmt.Sprintf("converting %d canonical numbers...", cnt)) + log.Info("Converting canonical numbers", "count", cnt) } number := big.NewInt(0).SetBytes(keyPtr[10:]).Uint64() newKey := []byte("h12345678n") @@ -124,7 +124,7 @@ func upgradeSequentialCanonicalNumbers(db ethdb.Database, stopFn func() bool) (e it.Next() } if cnt > 0 { - log.Info(fmt.Sprintf("converted %d canonical numbers...", cnt)) + log.Info("converted canonical numbers", "count", cnt) } return nil, false } @@ -148,7 +148,7 @@ func upgradeSequentialBlocks(db ethdb.Database, stopFn func() bool) (error, bool it.Release() it = db.(*ethdb.LDBDatabase).NewIterator() it.Seek(keyPtr) - log.Info(fmt.Sprintf("converting %d blocks...", cnt)) + log.Info("Converting blocks", "count", cnt) } // convert header, body, td and block receipts var keyPrefix [38]byte @@ -176,7 +176,7 @@ func upgradeSequentialBlocks(db ethdb.Database, stopFn func() bool) (error, bool } } if cnt > 0 { - log.Info(fmt.Sprintf("converted %d blocks...", cnt)) + log.Info("Converted blocks", "count", cnt) } return nil, false } @@ -203,7 +203,7 @@ func upgradeSequentialOrphanedReceipts(db ethdb.Database, stopFn func() bool) (e it.Next() } if cnt > 0 { - log.Info(fmt.Sprintf("removed %d orphaned block receipts...", cnt)) + log.Info("Removed orphaned block receipts", "count", cnt) } return nil, false } @@ -283,7 +283,7 @@ func addMipmapBloomBins(db ethdb.Database) (err error) { } tstart := time.Now() - log.Info(fmt.Sprint("upgrading db log bloom bins")) + log.Warn("Upgrading db log bloom bins") for i := uint64(0); i <= latestBlock.NumberU64(); i++ { hash := core.GetCanonicalHash(db, i) if (hash == common.Hash{}) { @@ -291,6 +291,6 @@ func addMipmapBloomBins(db ethdb.Database) (err error) { } core.WriteMipmapBloom(db, i, core.GetBlockReceipts(db, hash, i)) } - log.Info(fmt.Sprint("upgrade completed in", time.Since(tstart))) + log.Info("Bloom-bin upgrade completed", "elapsed", common.PrettyDuration(time.Since(tstart))) return nil } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index be3d8d177..f7aca031a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -518,7 +518,7 @@ func (d *Downloader) Terminate() { // fetchHeight retrieves the head header of the remote peer to aid in estimating // the total time a pending synchronisation would take. func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { - p.logger.Debug("Retrieving remote chain height") + p.log.Debug("Retrieving remote chain height") // Request the advertised remote head block and wait for the response head, _ := p.currentHead() @@ -540,15 +540,15 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { // Make sure the peer actually gave something valid headers := packet.(*headerPack).headers if len(headers) != 1 { - p.logger.Debug("Multiple headers for single request", "headers", len(headers)) + p.log.Debug("Multiple headers for single request", "headers", len(headers)) return nil, errBadPeer } head := headers[0] - p.logger.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash()) + p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash()) return head, nil case <-timeout: - p.logger.Debug("Waiting for head header timed out", "elapsed", ttl) + p.log.Debug("Waiting for head header timed out", "elapsed", ttl) return nil, errTimeout case <-d.bodyCh: @@ -568,7 +568,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // Figure out the valid ancestor range to prevent rewrite attacks floor, ceil := int64(-1), d.headHeader().Number.Uint64() - p.logger.Debug("Looking for common ancestor", "local", ceil, "remote", height) + p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height) if d.mode == FullSync { ceil = d.headBlock().NumberU64() } else if d.mode == FastSync { @@ -614,13 +614,13 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // Make sure the peer actually gave something valid headers := packet.(*headerPack).headers if len(headers) == 0 { - p.logger.Warn("Empty head header set") + p.log.Warn("Empty head header set") return 0, errEmptyHeaderSet } // Make sure the peer's reply conforms to the request for i := 0; i < len(headers); i++ { if number := headers[i].Number.Int64(); number != from+int64(i)*16 { - p.logger.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number) + p.log.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number) return 0, errInvalidChain } } @@ -637,7 +637,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // If every header is known, even future ones, the peer straight out lied about its head if number > height && i == limit-1 { - p.logger.Warn("Lied about chain head", "reported", height, "found", number) + p.log.Warn("Lied about chain head", "reported", height, "found", number) return 0, errStallingPeer } break @@ -645,7 +645,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { } case <-timeout: - p.logger.Debug("Waiting for head header timed out", "elapsed", ttl) + p.log.Debug("Waiting for head header timed out", "elapsed", ttl) return 0, errTimeout case <-d.bodyCh: @@ -657,10 +657,10 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // If the head fetch already found an ancestor, return if !common.EmptyHash(hash) { if int64(number) <= floor { - p.logger.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor) + p.log.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor) return 0, errInvalidAncestor } - p.logger.Debug("Found common ancestor", "number", number, "hash", hash) + p.log.Debug("Found common ancestor", "number", number, "hash", hash) return number, nil } // Ancestor not found, we need to binary search over our chain @@ -692,7 +692,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // Make sure the peer actually gave something valid headers := packer.(*headerPack).headers if len(headers) != 1 { - p.logger.Debug("Multiple headers for single request", "headers", len(headers)) + p.log.Debug("Multiple headers for single request", "headers", len(headers)) return 0, errBadPeer } arrived = true @@ -704,13 +704,13 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { } header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists if header.Number.Uint64() != check { - p.logger.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check) + p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check) return 0, errBadPeer } start = check case <-timeout: - p.logger.Debug("Waiting for search header timed out", "elapsed", ttl) + p.log.Debug("Waiting for search header timed out", "elapsed", ttl) return 0, errTimeout case <-d.bodyCh: @@ -722,10 +722,10 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { } // Ensure valid ancestry and return if int64(start) <= floor { - p.logger.Warn("Ancestor below allowance", "number", start, "hash", hash, "allowance", floor) + p.log.Warn("Ancestor below allowance", "number", start, "hash", hash, "allowance", floor) return 0, errInvalidAncestor } - p.logger.Debug("Found common ancestor", "number", start, "hash", hash) + p.log.Debug("Found common ancestor", "number", start, "hash", hash) return start, nil } @@ -738,8 +738,8 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // can fill in the skeleton - not even the origin peer - it's assumed invalid and // the origin is dropped. func (d *Downloader) fetchHeaders(p *peer, from uint64) error { - p.logger.Debug("Directing header downloads", "origin", from) - defer p.logger.Debug("Header download terminated") + p.log.Debug("Directing header downloads", "origin", from) + defer p.log.Debug("Header download terminated") // Create a timeout timer, and the associated header fetcher skeleton := true // Skeleton assembly phase or finishing up @@ -756,10 +756,10 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { timeout.Reset(ttl) if skeleton { - p.logger.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) + p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) } else { - p.logger.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) + p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) go p.getAbsHeaders(from, MaxHeaderFetch, 0, false) } } @@ -788,7 +788,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { } // If no more headers are inbound, notify the content fetchers and return if packet.Items() == 0 { - p.logger.Debug("No more headers available") + p.log.Debug("No more headers available") select { case d.headerProcCh <- nil: return nil @@ -802,7 +802,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { if skeleton { filled, proced, err := d.fillHeaderSkeleton(from, headers) if err != nil { - p.logger.Debug("Skeleton chain invalid", "err", err) + p.log.Debug("Skeleton chain invalid", "err", err) return errInvalidChain } headers = filled[proced:] @@ -810,7 +810,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { } // Insert all the new headers and fetch the next batch if len(headers) > 0 { - p.logger.Trace("Scheduling new headers", "count", len(headers), "from", from) + p.log.Trace("Scheduling new headers", "count", len(headers), "from", from) select { case d.headerProcCh <- headers: case <-d.cancelCh: @@ -822,7 +822,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { case <-timeout.C: // Header retrieval timed out, consider the peer bad and drop - p.logger.Debug("Header request timed out", "elapsed", ttl) + p.log.Debug("Header request timed out", "elapsed", ttl) headerTimeoutMeter.Mark(1) d.dropPeer(p.id) @@ -1050,11 +1050,11 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv // Issue a log to the user to see what's going on switch { case err == nil && packet.Items() == 0: - peer.logger.Trace("Requested data not delivered", "type", kind) + peer.log.Trace("Requested data not delivered", "type", kind) case err == nil: - peer.logger.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats()) + peer.log.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats()) default: - peer.logger.Trace("Failed to deliver retrieved data", "type", kind, "err", err) + peer.log.Trace("Failed to deliver retrieved data", "type", kind, "err", err) } } // Blocks assembled, try to update the progress @@ -1097,10 +1097,10 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv // and latency of a peer separately, which requires pushing the measures capacity a bit and seeing // how response times reacts, to it always requests one more than the minimum (i.e. min 2). if fails > 2 { - peer.logger.Trace("Data delivery timed out", "type", kind) + peer.log.Trace("Data delivery timed out", "type", kind) setIdle(peer, 0) } else { - peer.logger.Debug("Stalling delivery, dropping", "type", kind) + peer.log.Debug("Stalling delivery, dropping", "type", kind) d.dropPeer(pid) } } @@ -1137,11 +1137,11 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv continue } if request.From > 0 { - peer.logger.Trace("Requesting new batch of data", "type", kind, "from", request.From) + peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From) } else if len(request.Headers) > 0 { - peer.logger.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number) + peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number) } else { - peer.logger.Trace("Requesting new batch of data", "type", kind, "count", len(request.Hashes)) + peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Hashes)) } // Fetch the chunk and make sure any errors return the hashes to the queue if fetchHook != nil { diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index c1a9b859d..15a912f1f 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -87,7 +87,7 @@ type peer struct { getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data version int // Eth protocol version number to switch strategies - logger log.Logger // Contextual logger to add extra infos to peer logs + log log.Logger // Contextual logger to add extra infos to peer logs lock sync.RWMutex } @@ -110,7 +110,7 @@ func newPeer(id string, version int, currentHead currentHeadRetrievalFn, getNodeData: getNodeData, version: version, - logger: logger, + log: logger, } } @@ -272,7 +272,7 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id *throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured p.rtt = time.Duration((1-measurementImpact)*float64(p.rtt) + measurementImpact*float64(elapsed)) - p.logger.Trace("Peer throughput measurements updated", + p.log.Trace("Peer throughput measurements updated", "hps", p.headerThroughput, "bps", p.blockThroughput, "rps", p.receiptThroughput, "sps", p.stateThroughput, "miss", len(p.lacking), "rtt", p.rtt) diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index 33f9dbe93..d82f4f3e6 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -19,7 +19,6 @@ package fetcher import ( "errors" - "fmt" "math/rand" "time" @@ -78,8 +77,8 @@ type announce struct { origin string // Identifier of the peer originating the notification - fetchHeader headerRequesterFn // [eth/62] Fetcher function to retrieve the header of an announced block - fetchBodies bodyRequesterFn // [eth/62] Fetcher function to retrieve the body of an announced block + fetchHeader headerRequesterFn // Fetcher function to retrieve the header of an announced block + fetchBodies bodyRequesterFn // Fetcher function to retrieve the body of an announced block } // headerFilterTask represents a batch of headers needing fetcher filtering. @@ -220,7 +219,7 @@ func (f *Fetcher) Enqueue(peer string, block *types.Block) error { // FilterHeaders extracts all the headers that were explicitly requested by the fetcher, // returning those that should be handled differently. func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*types.Header { - log.Trace(fmt.Sprintf("[eth/62] filtering %d headers", len(headers))) + log.Trace("Filtering headers", "headers", len(headers)) // Send the filter channel to the fetcher filter := make(chan *headerFilterTask) @@ -248,7 +247,7 @@ func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*type // FilterBodies extracts all the block bodies that were explicitly requested by // the fetcher, returning those that should be handled differently. func (f *Fetcher) FilterBodies(transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) { - log.Trace(fmt.Sprintf("[eth/62] filtering %d:%d bodies", len(transactions), len(uncles))) + log.Trace("Filtering bodies", "txs", len(transactions), "uncles", len(uncles)) // Send the filter channel to the fetcher filter := make(chan *bodyFilterTask) @@ -323,14 +322,14 @@ func (f *Fetcher) loop() { count := f.announces[notification.origin] + 1 if count > hashLimit { - log.Debug(fmt.Sprintf("Peer %s: exceeded outstanding announces (%d)", notification.origin, hashLimit)) + log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit) propAnnounceDOSMeter.Mark(1) break } // If we have a valid block number, check that it's potentially useful if notification.number > 0 { if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { - log.Debug(fmt.Sprintf("[eth/62] Peer %s: discarded announcement #%d [%x…], distance %d", notification.origin, notification.number, notification.hash[:4], dist)) + log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist) propAnnounceDropMeter.Mark(1) break } @@ -380,16 +379,8 @@ func (f *Fetcher) loop() { } // Send out all block header requests for peer, hashes := range request { - if len(hashes) > 0 { - log.Trace("", "msg", log.Lazy{Fn: func() string { - list := "[" - for _, hash := range hashes { - list += fmt.Sprintf("%x…, ", hash[:4]) - } - list = list[:len(list)-2] + "]" - return fmt.Sprintf("[eth/62] Peer %s: fetching headers %s", peer, list) - }}) - } + log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes) + // Create a closure of the fetch and schedule in on a new thread fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes go func() { @@ -422,17 +413,8 @@ func (f *Fetcher) loop() { } // Send out all block body requests for peer, hashes := range request { - if len(hashes) > 0 { - log.Trace("", "msg", log.Lazy{Fn: func() string { - list := "[" - for _, hash := range hashes { - list += fmt.Sprintf("%x…, ", hash[:4]) - } - list = list[:len(list)-2] + "]" + log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes) - return fmt.Sprintf("[eth/62] Peer %s: fetching bodies %s", peer, list) - }}) - } // Create a closure of the fetch and schedule in on a new thread if f.completingHook != nil { f.completingHook(hashes) @@ -465,7 +447,7 @@ func (f *Fetcher) loop() { if announce := f.fetching[hash]; announce != nil && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil { // If the delivered header does not match the promised number, drop the announcer if header.Number.Uint64() != announce.number { - log.Trace(fmt.Sprintf("[eth/62] Peer %s: invalid block number for [%x…]: announced %d, provided %d", announce.origin, header.Hash().Bytes()[:4], announce.number, header.Number.Uint64())) + log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number) f.dropPeer(announce.origin) f.forgetHash(hash) continue @@ -477,7 +459,7 @@ func (f *Fetcher) loop() { // If the block is empty (header only), short circuit into the final import queue if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) { - log.Trace(fmt.Sprintf("[eth/62] Peer %s: block #%d [%x…] empty, skipping body retrieval", announce.origin, header.Number.Uint64(), header.Hash().Bytes()[:4])) + log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash()) block := types.NewBlockWithHeader(header) block.ReceivedAt = task.time @@ -489,7 +471,7 @@ func (f *Fetcher) loop() { // Otherwise add to the list of blocks needing completion incomplete = append(incomplete, announce) } else { - log.Trace(fmt.Sprintf("[eth/62] Peer %s: block #%d [%x…] already imported, discarding header", announce.origin, header.Number.Uint64(), header.Hash().Bytes()[:4])) + log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash()) f.forgetHash(hash) } } else { @@ -620,14 +602,14 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) { // Ensure the peer isn't DOSing us count := f.queues[peer] + 1 if count > blockLimit { - log.Debug(fmt.Sprintf("Peer %s: discarded block #%d [%x…], exceeded allowance (%d)", peer, block.NumberU64(), hash.Bytes()[:4], blockLimit)) + log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit) propBroadcastDOSMeter.Mark(1) f.forgetHash(hash) return } // Discard any past or too distant blocks if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { - log.Debug(fmt.Sprintf("Peer %s: discarded block #%d [%x…], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist)) + log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist) propBroadcastDropMeter.Mark(1) f.forgetHash(hash) return @@ -644,9 +626,7 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) { if f.queueChangeHook != nil { f.queueChangeHook(op.block.Hash(), true) } - log.Debug("", "msg", log.Lazy{Fn: func() string { - return fmt.Sprintf("Peer %s: queued block #%d [%x…], total %v", peer, block.NumberU64(), hash.Bytes()[:4], f.queue.Size()) - }}) + log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size()) } } @@ -657,14 +637,14 @@ func (f *Fetcher) insert(peer string, block *types.Block) { hash := block.Hash() // Run the import on a new thread - log.Debug(fmt.Sprintf("Peer %s: importing block #%d [%x…]", peer, block.NumberU64(), hash[:4])) + log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) go func() { defer func() { f.done <- hash }() // If the parent's unknown, abort insertion parent := f.getBlock(block.ParentHash()) if parent == nil { - log.Debug(fmt.Sprintf("Peer %s: parent [%x…] of block #%d [%x…] unknown", peer, block.ParentHash().Bytes()[:4], block.NumberU64(), hash[:4])) + log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) return } // Quickly validate the header and propagate the block if it passes @@ -679,13 +659,13 @@ func (f *Fetcher) insert(peer string, block *types.Block) { default: // Something went very wrong, drop the peer - log.Debug(fmt.Sprintf("Peer %s: block #%d [%x…] verification failed: %v", peer, block.NumberU64(), hash[:4], err)) + log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) f.dropPeer(peer) return } // Run the actual import and log any issues if _, err := f.insertChain(types.Blocks{block}); err != nil { - log.Warn(fmt.Sprintf("Peer %s: block #%d [%x…] import failed: %v", peer, block.NumberU64(), hash[:4], err)) + log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) return } // If import succeeded, broadcast the block diff --git a/eth/gasprice/gasprice.go b/eth/gasprice/gasprice.go index 0e0b1b66a..73951bce9 100644 --- a/eth/gasprice/gasprice.go +++ b/eth/gasprice/gasprice.go @@ -17,7 +17,6 @@ package gasprice import ( - "fmt" "math/big" "math/rand" "sync" @@ -176,7 +175,7 @@ func (self *GasPriceOracle) processBlock(block *types.Block) { self.lastBase = newBase self.lastBaseMutex.Unlock() - log.Trace(fmt.Sprintf("Processed block #%v, base price is %v\n", i, newBase.Int64())) + log.Trace("Processed block, base price updated", "number", i, "base", newBase) } // returns the lowers possible price with which a tx was or could have been included diff --git a/eth/handler.go b/eth/handler.go index bcb83ed90..a7f4be0b6 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -115,7 +115,7 @@ func NewProtocolManager(config *params.ChainConfig, fastSync bool, networkId int } // Figure out whether to allow fast sync or not if fastSync && blockchain.CurrentBlock().NumberU64() > 0 { - log.Info(fmt.Sprintf("blockchain not empty, fast sync disabled")) + log.Warn("Blockchain not empty, fast sync disabled") fastSync = false } if fastSync { @@ -178,7 +178,7 @@ func NewProtocolManager(config *params.ChainConfig, fastSync bool, networkId int manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) if blockchain.Genesis().Hash().Hex() == defaultGenesisHash && networkId == 1 { - log.Debug(fmt.Sprint("Bad Block Reporting is enabled")) + log.Debug("Bad block reporting is enabled") manager.badBlockReportingEnabled = true } @@ -199,12 +199,12 @@ func (pm *ProtocolManager) removePeer(id string) { if peer == nil { return } - log.Debug(fmt.Sprint("Removing peer", id)) + log.Debug("Removing Ethereum peer", "peer", id) // Unregister the peer from the downloader and Ethereum peer set pm.downloader.UnregisterPeer(id) if err := pm.peers.Unregister(id); err != nil { - log.Error(fmt.Sprint("Removal failed:", err)) + log.Error("Peer removal failed", "peer", id, "err", err) } // Hard disconnect at the networking layer if peer != nil { @@ -226,7 +226,7 @@ func (pm *ProtocolManager) Start() { } func (pm *ProtocolManager) Stop() { - log.Info(fmt.Sprint("Stopping ethereum protocol handler...")) + log.Info("Stopping Ethereum protocol") pm.txSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop @@ -247,7 +247,7 @@ func (pm *ProtocolManager) Stop() { // Wait for all peer handler goroutines and the loops to come down. pm.wg.Wait() - log.Info(fmt.Sprint("Ethereum protocol handler stopped")) + log.Info("Ethereum protocol stopped") } func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { @@ -260,22 +260,20 @@ func (pm *ProtocolManager) handle(p *peer) error { if pm.peers.Len() >= pm.maxPeers { return p2p.DiscTooManyPeers } - - log.Debug(fmt.Sprintf("%v: peer connected [%s]", p, p.Name())) + p.Log().Debug("Ethereum peer connected", "name", p.Name()) // Execute the Ethereum handshake td, head, genesis := pm.blockchain.Status() if err := p.Handshake(pm.networkId, td, head, genesis); err != nil { - log.Debug(fmt.Sprintf("%v: handshake failed: %v", p, err)) + p.Log().Debug("Ethereum handshake failed", "err", err) return err } if rw, ok := p.rw.(*meteredMsgReadWriter); ok { rw.Init(p.version) } // Register the peer locally - log.Trace(fmt.Sprintf("%v: adding peer", p)) if err := pm.peers.Register(p); err != nil { - log.Error(fmt.Sprintf("%v: addition failed: %v", p, err)) + p.Log().Error("Ethereum peer registration failed", "err", err) return err } defer pm.removePeer(p.id) @@ -296,7 +294,7 @@ func (pm *ProtocolManager) handle(p *peer) error { } // Start a timer to disconnect if the peer doesn't reply in time p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() { - log.Debug(fmt.Sprintf("%v: timed out DAO fork-check, dropping", p)) + p.Log().Debug("Timed out DAO fork-check, dropping") pm.removePeer(p.id) }) // Make sure it's cleaned up if the peer dies off @@ -310,7 +308,7 @@ func (pm *ProtocolManager) handle(p *peer) error { // main loop. handle incoming messages. for { if err := pm.handleMsg(p); err != nil { - log.Debug(fmt.Sprintf("%v: message handling failed: %v", p, err)) + p.Log().Debug("Message handling failed", "err", err) return err } } @@ -386,7 +384,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { ) if next <= current { infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ") - log.Warn(fmt.Sprintf("%v: GetBlockHeaders skip overflow attack (current %v, skip %v, next %v)\nMalicious peer infos: %s", p, current, query.Skip, next, infos)) + p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos) unknown = true } else { if header := pm.blockchain.GetHeaderByNumber(next); header != nil { @@ -434,7 +432,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } // If we're seemingly on the same chain, disable the drop timer if verifyDAO { - log.Debug(fmt.Sprintf("%v: seems to be on the same side of the DAO fork", p)) + p.Log().Debug("Seems to be on the same side of the DAO fork") p.forkDrop.Stop() p.forkDrop = nil return nil @@ -451,10 +449,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Validate the header and either drop the peer or continue if err := core.ValidateDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil { - log.Debug(fmt.Sprintf("%v: verified to be on the other side of the DAO fork, dropping", p)) + p.Log().Debug("Verified to be on the other side of the DAO fork, dropping") return err } - log.Debug(fmt.Sprintf("%v: verified to be on the same side of the DAO fork", p)) + p.Log().Debug("Verified to be on the same side of the DAO fork") return nil } // Irrelevant of the fork checks, send the header to the fetcher just in case @@ -463,7 +461,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if len(headers) > 0 || !filter { err := pm.downloader.DeliverHeaders(p.id, headers) if err != nil { - log.Debug(fmt.Sprint(err)) + log.Debug("Failed to deliver headers", "err", err) } } @@ -516,7 +514,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if len(trasactions) > 0 || len(uncles) > 0 || !filter { err := pm.downloader.DeliverBodies(p.id, trasactions, uncles) if err != nil { - log.Debug(fmt.Sprint(err)) + log.Debug("Failed to deliver bodies", "err", err) } } @@ -555,7 +553,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } // Deliver all to the downloader if err := pm.downloader.DeliverNodeData(p.id, data); err != nil { - log.Debug(fmt.Sprintf("failed to deliver node state data: %v", err)) + log.Debug("Failed to deliver node state data", "err", err) } case p.version >= eth63 && msg.Code == GetReceiptsMsg: @@ -586,7 +584,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } // If known, encode and queue for response packet if encoded, err := rlp.EncodeToBytes(results); err != nil { - log.Error(fmt.Sprintf("failed to encode receipt: %v", err)) + log.Error("Failed to encode receipt", "err", err) } else { receipts = append(receipts, encoded) bytes += len(encoded) @@ -602,7 +600,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } // Deliver all to the downloader if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil { - log.Debug(fmt.Sprintf("failed to deliver receipts: %v", err)) + log.Debug("Failed to deliver receipts", "err", err) } case msg.Code == NewBlockHashesMsg: @@ -695,7 +693,7 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1)) } else { - log.Error(fmt.Sprintf("propagating dangling block #%d [%x]", block.NumberU64(), hash[:4])) + log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) return } // Send the block to a subset of our peers @@ -703,14 +701,14 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { for _, peer := range transfer { peer.SendNewBlock(block, td) } - log.Trace(fmt.Sprintf("propagated block %x to %d peers in %v", hash[:4], len(transfer), time.Since(block.ReceivedAt))) + log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) } // Otherwise if the block is indeed in out own chain, announce it if pm.blockchain.HasBlock(hash) { for _, peer := range peers { peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()}) } - log.Trace(fmt.Sprintf("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt))) + log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) } } @@ -723,7 +721,7 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) for _, peer := range peers { peer.SendTransactions(types.Transactions{tx}) } - log.Trace(fmt.Sprint("broadcast tx to", len(peers), "peers")) + log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers)) } // Mined broadcast loop diff --git a/eth/peer.go b/eth/peer.go index e87438953..443463b8c 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" "gopkg.in/fatih/set.v0" @@ -191,41 +190,41 @@ func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error { // RequestHeaders is a wrapper around the header query functions to fetch a // single header. It is used solely by the fetcher. func (p *peer) RequestOneHeader(hash common.Hash) error { - log.Debug(fmt.Sprintf("%v fetching a single header: %x", p, hash)) + p.Log().Debug("Fetching single header", "hash", hash) return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false}) } // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the // specified header query, based on the hash of an origin block. func (p *peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error { - log.Debug(fmt.Sprintf("%v fetching %d headers from %x, skipping %d (reverse = %v)", p, amount, origin[:4], skip, reverse)) + p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse) return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) } // RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the // specified header query, based on the number of an origin block. func (p *peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error { - log.Debug(fmt.Sprintf("%v fetching %d headers from #%d, skipping %d (reverse = %v)", p, amount, origin, skip, reverse)) + p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse) return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) } // RequestBodies fetches a batch of blocks' bodies corresponding to the hashes // specified. func (p *peer) RequestBodies(hashes []common.Hash) error { - log.Debug(fmt.Sprintf("%v fetching %d block bodies", p, len(hashes))) + p.Log().Debug("Fetching batch of block bodies", "count", len(hashes)) return p2p.Send(p.rw, GetBlockBodiesMsg, hashes) } // RequestNodeData fetches a batch of arbitrary data from a node's known state // data, corresponding to the specified hashes. func (p *peer) RequestNodeData(hashes []common.Hash) error { - log.Debug(fmt.Sprintf("%v fetching %v state data", p, len(hashes))) + p.Log().Debug("Fetching batch of state data", "count", len(hashes)) return p2p.Send(p.rw, GetNodeDataMsg, hashes) } // RequestReceipts fetches a batch of transaction receipts from a remote node. func (p *peer) RequestReceipts(hashes []common.Hash) error { - log.Debug(fmt.Sprintf("%v fetching %v receipts", p, len(hashes))) + p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) return p2p.Send(p.rw, GetReceiptsMsg, hashes) } diff --git a/eth/sync.go b/eth/sync.go index 1075578b9..6e2c7c432 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -17,7 +17,6 @@ package eth import ( - "fmt" "math/rand" "sync/atomic" "time" @@ -87,7 +86,7 @@ func (pm *ProtocolManager) txsyncLoop() { delete(pending, s.p.ID()) } // Send the pack in the background. - log.Trace(fmt.Sprintf("%v: sending %d transactions (%v)", s.p.Peer, len(pack.txs), size)) + s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size) sending = true go func() { done <- pack.p.SendTransactions(pack.txs) }() } @@ -117,7 +116,7 @@ func (pm *ProtocolManager) txsyncLoop() { sending = false // Stop tracking peers that cause send failures. if err != nil { - log.Debug(fmt.Sprintf("%v: tx send failed: %v", pack.p.Peer, err)) + pack.p.Log().Debug("Transaction send failed", "err", err) delete(pending, pack.p.ID()) } // Schedule the next send. @@ -187,7 +186,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) { if atomic.LoadUint32(&pm.fastSync) == 1 { // Disable fast sync if we indeed have something in our chain if pm.blockchain.CurrentBlock().NumberU64() > 0 { - log.Info(fmt.Sprintf("fast sync complete, auto disabling")) + log.Info("Fast sync complete, auto disabling") atomic.StoreUint32(&pm.fastSync, 0) } } diff --git a/ethdb/database.go b/ethdb/database.go index e82528f25..7d5fb0b9e 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -17,8 +17,6 @@ package ethdb import ( - "fmt" - "path/filepath" "strconv" "strings" "sync" @@ -37,20 +35,6 @@ import ( var OpenFileLimit = 64 -// cacheRatio specifies how the total allotted cache is distributed between the -// various system databases. -var cacheRatio = map[string]float64{ - "chaindata": 1.0, - "lightchaindata": 1.0, -} - -// handleRatio specifies how the total allotted file descriptors is distributed -// between the various system databases. -var handleRatio = map[string]float64{ - "chaindata": 1.0, - "lightchaindata": 1.0, -} - type LDBDatabase struct { fn string // filename for reporting db *leveldb.DB // LevelDB instance @@ -67,20 +51,22 @@ type LDBDatabase struct { quitLock sync.Mutex // Mutex protecting the quit channel access quitChan chan chan error // Quit channel to stop the metrics collection before closing the database + + log log.Logger // Contextual logger tracking the database path } // NewLDBDatabase returns a LevelDB wrapped object. func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) { - // Calculate the cache and file descriptor allowance for this particular database - cache = int(float64(cache) * cacheRatio[filepath.Base(file)]) + logger := log.New("database", file) + + // Ensure we have some minimal caching and file guarantees if cache < 16 { cache = 16 } - handles = int(float64(handles) * handleRatio[filepath.Base(file)]) if handles < 16 { handles = 16 } - log.Info(fmt.Sprintf("Allotted %dMB cache and %d file handles to %s", cache, handles, file)) + logger.Info("Allocated cache and file handles", "cache", cache, "handles", handles) // Open the db and recover any potential corruptions db, err := leveldb.OpenFile(file, &opt.Options{ @@ -97,8 +83,9 @@ func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) { return nil, err } return &LDBDatabase{ - fn: file, - db: db, + fn: file, + db: db, + log: logger, }, nil } @@ -108,103 +95,103 @@ func (db *LDBDatabase) Path() string { } // Put puts the given key / value to the queue -func (self *LDBDatabase) Put(key []byte, value []byte) error { +func (db *LDBDatabase) Put(key []byte, value []byte) error { // Measure the database put latency, if requested - if self.putTimer != nil { - defer self.putTimer.UpdateSince(time.Now()) + if db.putTimer != nil { + defer db.putTimer.UpdateSince(time.Now()) } // Generate the data to write to disk, update the meter and write //value = rle.Compress(value) - if self.writeMeter != nil { - self.writeMeter.Mark(int64(len(value))) + if db.writeMeter != nil { + db.writeMeter.Mark(int64(len(value))) } - return self.db.Put(key, value, nil) + return db.db.Put(key, value, nil) } // Get returns the given key if it's present. -func (self *LDBDatabase) Get(key []byte) ([]byte, error) { +func (db *LDBDatabase) Get(key []byte) ([]byte, error) { // Measure the database get latency, if requested - if self.getTimer != nil { - defer self.getTimer.UpdateSince(time.Now()) + if db.getTimer != nil { + defer db.getTimer.UpdateSince(time.Now()) } // Retrieve the key and increment the miss counter if not found - dat, err := self.db.Get(key, nil) + dat, err := db.db.Get(key, nil) if err != nil { - if self.missMeter != nil { - self.missMeter.Mark(1) + if db.missMeter != nil { + db.missMeter.Mark(1) } return nil, err } // Otherwise update the actually retrieved amount of data - if self.readMeter != nil { - self.readMeter.Mark(int64(len(dat))) + if db.readMeter != nil { + db.readMeter.Mark(int64(len(dat))) } return dat, nil //return rle.Decompress(dat) } // Delete deletes the key from the queue and database -func (self *LDBDatabase) Delete(key []byte) error { +func (db *LDBDatabase) Delete(key []byte) error { // Measure the database delete latency, if requested - if self.delTimer != nil { - defer self.delTimer.UpdateSince(time.Now()) + if db.delTimer != nil { + defer db.delTimer.UpdateSince(time.Now()) } // Execute the actual operation - return self.db.Delete(key, nil) + return db.db.Delete(key, nil) } -func (self *LDBDatabase) NewIterator() iterator.Iterator { - return self.db.NewIterator(nil, nil) +func (db *LDBDatabase) NewIterator() iterator.Iterator { + return db.db.NewIterator(nil, nil) } -func (self *LDBDatabase) Close() { +func (db *LDBDatabase) Close() { // Stop the metrics collection to avoid internal database races - self.quitLock.Lock() - defer self.quitLock.Unlock() + db.quitLock.Lock() + defer db.quitLock.Unlock() - if self.quitChan != nil { + if db.quitChan != nil { errc := make(chan error) - self.quitChan <- errc + db.quitChan <- errc if err := <-errc; err != nil { - log.Error(fmt.Sprintf("metrics failure in '%s': %v\n", self.fn, err)) + db.log.Error("Metrics collection failed", "err", err) } } - err := self.db.Close() + err := db.db.Close() if err == nil { - log.Info(fmt.Sprint("closed db:", self.fn)) + db.log.Info("Database closed") } else { - log.Error(fmt.Sprintf("error closing db %s: %v", self.fn, err)) + db.log.Error("Failed to close database", "err", err) } } -func (self *LDBDatabase) LDB() *leveldb.DB { - return self.db +func (db *LDBDatabase) LDB() *leveldb.DB { + return db.db } // Meter configures the database metrics collectors and -func (self *LDBDatabase) Meter(prefix string) { +func (db *LDBDatabase) Meter(prefix string) { // Short circuit metering if the metrics system is disabled if !metrics.Enabled { return } // Initialize all the metrics collector at the requested prefix - self.getTimer = metrics.NewTimer(prefix + "user/gets") - self.putTimer = metrics.NewTimer(prefix + "user/puts") - self.delTimer = metrics.NewTimer(prefix + "user/dels") - self.missMeter = metrics.NewMeter(prefix + "user/misses") - self.readMeter = metrics.NewMeter(prefix + "user/reads") - self.writeMeter = metrics.NewMeter(prefix + "user/writes") - self.compTimeMeter = metrics.NewMeter(prefix + "compact/time") - self.compReadMeter = metrics.NewMeter(prefix + "compact/input") - self.compWriteMeter = metrics.NewMeter(prefix + "compact/output") + db.getTimer = metrics.NewTimer(prefix + "user/gets") + db.putTimer = metrics.NewTimer(prefix + "user/puts") + db.delTimer = metrics.NewTimer(prefix + "user/dels") + db.missMeter = metrics.NewMeter(prefix + "user/misses") + db.readMeter = metrics.NewMeter(prefix + "user/reads") + db.writeMeter = metrics.NewMeter(prefix + "user/writes") + db.compTimeMeter = metrics.NewMeter(prefix + "compact/time") + db.compReadMeter = metrics.NewMeter(prefix + "compact/input") + db.compWriteMeter = metrics.NewMeter(prefix + "compact/output") // Create a quit channel for the periodic collector and run it - self.quitLock.Lock() - self.quitChan = make(chan chan error) - self.quitLock.Unlock() + db.quitLock.Lock() + db.quitChan = make(chan chan error) + db.quitLock.Unlock() - go self.meter(3 * time.Second) + go db.meter(3 * time.Second) } // meter periodically retrieves internal leveldb counters and reports them to @@ -218,7 +205,7 @@ func (self *LDBDatabase) Meter(prefix string) { // 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294 // 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884 // 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000 -func (self *LDBDatabase) meter(refresh time.Duration) { +func (db *LDBDatabase) meter(refresh time.Duration) { // Create the counters to store current and previous values counters := make([][]float64, 2) for i := 0; i < 2; i++ { @@ -227,9 +214,9 @@ func (self *LDBDatabase) meter(refresh time.Duration) { // Iterate ad infinitum and collect the stats for i := 1; ; i++ { // Retrieve the database stats - stats, err := self.db.GetProperty("leveldb.stats") + stats, err := db.db.GetProperty("leveldb.stats") if err != nil { - log.Error(fmt.Sprintf("failed to read database stats: %v", err)) + db.log.Error("Failed to read database stats", "err", err) return } // Find the compaction table, skip the header @@ -238,7 +225,7 @@ func (self *LDBDatabase) meter(refresh time.Duration) { lines = lines[1:] } if len(lines) <= 3 { - log.Error(fmt.Sprintf("compaction table not found")) + db.log.Error("Compaction table not found") return } lines = lines[3:] @@ -253,27 +240,27 @@ func (self *LDBDatabase) meter(refresh time.Duration) { break } for idx, counter := range parts[3:] { - if value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64); err != nil { - log.Error(fmt.Sprintf("compaction entry parsing failed: %v", err)) + value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64) + if err != nil { + db.log.Error("Compaction entry parsing failed", "err", err) return - } else { - counters[i%2][idx] += value } + counters[i%2][idx] += value } } // Update all the requested meters - if self.compTimeMeter != nil { - self.compTimeMeter.Mark(int64((counters[i%2][0] - counters[(i-1)%2][0]) * 1000 * 1000 * 1000)) + if db.compTimeMeter != nil { + db.compTimeMeter.Mark(int64((counters[i%2][0] - counters[(i-1)%2][0]) * 1000 * 1000 * 1000)) } - if self.compReadMeter != nil { - self.compReadMeter.Mark(int64((counters[i%2][1] - counters[(i-1)%2][1]) * 1024 * 1024)) + if db.compReadMeter != nil { + db.compReadMeter.Mark(int64((counters[i%2][1] - counters[(i-1)%2][1]) * 1024 * 1024)) } - if self.compWriteMeter != nil { - self.compWriteMeter.Mark(int64((counters[i%2][2] - counters[(i-1)%2][2]) * 1024 * 1024)) + if db.compWriteMeter != nil { + db.compWriteMeter.Mark(int64((counters[i%2][2] - counters[(i-1)%2][2]) * 1024 * 1024)) } // Sleep a bit, then repeat the stats collection select { - case errc := <-self.quitChan: + case errc := <-db.quitChan: // Quit requesting, stop hammering the database errc <- nil return -- cgit v1.2.3