diff options
33 files changed, 1660 insertions, 127 deletions
diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 0288b3380..69802a48a 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -89,6 +89,7 @@ var ( utils.LightKDFFlag, utils.CacheFlag, utils.CacheDatabaseFlag, + utils.CacheTrieFlag, utils.CacheGCFlag, utils.TrieCacheGenFlag, utils.ListenPortFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 8b0491ce3..82f17e0ee 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -132,6 +132,7 @@ var AppHelpFlagGroups = []flagGroup{ Flags: []cli.Flag{ utils.CacheFlag, utils.CacheDatabaseFlag, + utils.CacheTrieFlag, utils.CacheGCFlag, utils.TrieCacheGenFlag, }, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 429c2bbb9..d7b698c7e 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -295,7 +295,12 @@ var ( CacheDatabaseFlag = cli.IntFlag{ Name: "cache.database", Usage: "Percentage of cache memory allowance to use for database io", - Value: 75, + Value: 50, + } + CacheTrieFlag = cli.IntFlag{ + Name: "cache.trie", + Usage: "Percentage of cache memory allowance to use for trie caching", + Value: 25, } CacheGCFlag = cli.IntFlag{ Name: "cache.gc", @@ -1157,8 +1162,11 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { } cfg.NoPruning = ctx.GlobalString(GCModeFlag.Name) == "archive" + if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheTrieFlag.Name) { + cfg.TrieCleanCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheTrieFlag.Name) / 100 + } if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheGCFlag.Name) { - cfg.TrieCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheGCFlag.Name) / 100 + cfg.TrieDirtyCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheGCFlag.Name) / 100 } if ctx.GlobalIsSet(MinerNotifyFlag.Name) { cfg.MinerNotify = strings.Split(ctx.GlobalString(MinerNotifyFlag.Name), ",") @@ -1393,12 +1401,16 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai Fatalf("--%s must be either 'full' or 'archive'", GCModeFlag.Name) } cache := &core.CacheConfig{ - Disabled: ctx.GlobalString(GCModeFlag.Name) == "archive", - TrieNodeLimit: eth.DefaultConfig.TrieCache, - TrieTimeLimit: eth.DefaultConfig.TrieTimeout, + Disabled: ctx.GlobalString(GCModeFlag.Name) == "archive", + TrieCleanLimit: eth.DefaultConfig.TrieCleanCache, + TrieDirtyLimit: eth.DefaultConfig.TrieDirtyCache, + TrieTimeLimit: eth.DefaultConfig.TrieTimeout, + } + if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheTrieFlag.Name) { + cache.TrieCleanLimit = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheTrieFlag.Name) / 100 } if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheGCFlag.Name) { - cache.TrieNodeLimit = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheGCFlag.Name) / 100 + cache.TrieDirtyLimit = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheGCFlag.Name) / 100 } vmcfg := vm.Config{EnablePreimageRecording: ctx.GlobalBool(VMEnableDebugFlag.Name)} chain, err = core.NewBlockChain(chainDb, cache, config, engine, vmcfg, nil) diff --git a/core/blockchain.go b/core/blockchain.go index 26ac75b8c..22f130ce6 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -68,9 +68,10 @@ const ( // CacheConfig contains the configuration values for the trie caching/pruning // that's resident in a blockchain. type CacheConfig struct { - Disabled bool // Whether to disable trie write caching (archive node) - TrieNodeLimit int // Memory limit (MB) at which to flush the current in-memory trie to disk - TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk + Disabled bool // Whether to disable trie write caching (archive node) + TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory + TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk + TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk } // BlockChain represents the canonical chain given a database with a genesis @@ -140,8 +141,9 @@ type BlockChain struct { func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool) (*BlockChain, error) { if cacheConfig == nil { cacheConfig = &CacheConfig{ - TrieNodeLimit: 256, - TrieTimeLimit: 5 * time.Minute, + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieTimeLimit: 5 * time.Minute, } } bodyCache, _ := lru.New(bodyCacheLimit) @@ -156,7 +158,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par cacheConfig: cacheConfig, db: db, triegc: prque.New(nil), - stateCache: state.NewDatabase(db), + stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit), quit: make(chan struct{}), shouldPreserve: shouldPreserve, bodyCache: bodyCache, @@ -393,6 +395,11 @@ func (bc *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) { return state.New(root, bc.stateCache) } +// StateCache returns the caching database underpinning the blockchain instance. +func (bc *BlockChain) StateCache() state.Database { + return bc.stateCache +} + // Reset purges the entire blockchain, restoring it to its genesis state. func (bc *BlockChain) Reset() error { return bc.ResetWithGenesisBlock(bc.genesisBlock) @@ -938,7 +945,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. // If we exceeded our memory allowance, flush matured singleton nodes to disk var ( nodes, imgs = triedb.Size() - limit = common.StorageSize(bc.cacheConfig.TrieNodeLimit) * 1024 * 1024 + limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 ) if nodes > limit || imgs > 4*1024*1024 { triedb.Cap(limit - ethdb.IdealBatchSize) diff --git a/core/state/database.go b/core/state/database.go index c1b630991..f6ea144b9 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -72,13 +72,19 @@ type Trie interface { } // NewDatabase creates a backing store for state. The returned database is safe for -// concurrent use and retains cached trie nodes in memory. The pool is an optional -// intermediate trie-node memory pool between the low level storage layer and the -// high level trie abstraction. +// concurrent use and retains a few recent expanded trie nodes in memory. To keep +// more historical state in memory, use the NewDatabaseWithCache constructor. func NewDatabase(db ethdb.Database) Database { + return NewDatabaseWithCache(db, 0) +} + +// NewDatabase creates a backing store for state. The returned database is safe for +// concurrent use and retains both a few recent expanded trie nodes in memory, as +// well as a lot of collapsed RLP trie nodes in a large memory cache. +func NewDatabaseWithCache(db ethdb.Database, cache int) Database { csc, _ := lru.New(codeSizeCacheSize) return &cachingDB{ - db: trie.NewDatabase(db), + db: trie.NewDatabaseWithCache(db, cache), codeSizeCache: csc, } } diff --git a/eth/api.go b/eth/api.go index 3ec3afb81..816b9cd33 100644 --- a/eth/api.go +++ b/eth/api.go @@ -444,16 +444,16 @@ func (api *PrivateDebugAPI) getModifiedAccounts(startBlock, endBlock *types.Bloc if startBlock.Number().Uint64() >= endBlock.Number().Uint64() { return nil, fmt.Errorf("start block height (%d) must be less than end block height (%d)", startBlock.Number().Uint64(), endBlock.Number().Uint64()) } + triedb := api.eth.BlockChain().StateCache().TrieDB() - oldTrie, err := trie.NewSecure(startBlock.Root(), trie.NewDatabase(api.eth.chainDb), 0) + oldTrie, err := trie.NewSecure(startBlock.Root(), triedb, 0) if err != nil { return nil, err } - newTrie, err := trie.NewSecure(endBlock.Root(), trie.NewDatabase(api.eth.chainDb), 0) + newTrie, err := trie.NewSecure(endBlock.Root(), triedb, 0) if err != nil { return nil, err } - diff, _ := trie.NewDifferenceIterator(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{})) iter := trie.NewIterator(diff) diff --git a/eth/api_tracer.go b/eth/api_tracer.go index 80552ada8..2ebbcc5fd 100644 --- a/eth/api_tracer.go +++ b/eth/api_tracer.go @@ -138,7 +138,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl // Ensure we have a valid starting state before doing any work origin := start.NumberU64() - database := state.NewDatabase(api.eth.ChainDb()) + database := state.NewDatabaseWithCache(api.eth.ChainDb(), 16) // Chain tracing will probably start at genesis if number := start.NumberU64(); number > 0 { start = api.eth.blockchain.GetBlock(start.ParentHash(), start.NumberU64()-1) @@ -492,7 +492,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (* } // Otherwise try to reexec blocks until we find a state or reach our limit origin := block.NumberU64() - database := state.NewDatabase(api.eth.ChainDb()) + database := state.NewDatabaseWithCache(api.eth.ChainDb(), 16) for i := uint64(0); i < reexec; i++ { block = api.eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) diff --git a/eth/backend.go b/eth/backend.go index b555b064a..472140842 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -154,7 +154,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { EWASMInterpreter: config.EWASMInterpreter, EVMInterpreter: config.EVMInterpreter, } - cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout} + cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieCleanLimit: config.TrieCleanCache, TrieDirtyLimit: config.TrieDirtyCache, TrieTimeLimit: config.TrieTimeout} ) eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig, eth.shouldPreserve) if err != nil { diff --git a/eth/config.go b/eth/config.go index e32c01a73..601f4735e 100644 --- a/eth/config.go +++ b/eth/config.go @@ -43,15 +43,16 @@ var DefaultConfig = Config{ DatasetsInMem: 1, DatasetsOnDisk: 2, }, - NetworkId: 1, - LightPeers: 100, - DatabaseCache: 768, - TrieCache: 256, - TrieTimeout: 60 * time.Minute, - MinerGasFloor: 8000000, - MinerGasCeil: 8000000, - MinerGasPrice: big.NewInt(params.GWei), - MinerRecommit: 3 * time.Second, + NetworkId: 1, + LightPeers: 100, + DatabaseCache: 512, + TrieCleanCache: 256, + TrieDirtyCache: 256, + TrieTimeout: 60 * time.Minute, + MinerGasFloor: 8000000, + MinerGasCeil: 8000000, + MinerGasPrice: big.NewInt(params.GWei), + MinerRecommit: 3 * time.Second, TxPool: core.DefaultTxPoolConfig, GPO: gasprice.Config{ @@ -94,7 +95,8 @@ type Config struct { SkipBcVersionCheck bool `toml:"-"` DatabaseHandles int `toml:"-"` DatabaseCache int - TrieCache int + TrieCleanCache int + TrieDirtyCache int TrieTimeout time.Duration // Mining-related options diff --git a/eth/gen_config.go b/eth/gen_config.go index d401a917d..2777aa9e8 100644 --- a/eth/gen_config.go +++ b/eth/gen_config.go @@ -28,7 +28,8 @@ func (c Config) MarshalTOML() (interface{}, error) { SkipBcVersionCheck bool `toml:"-"` DatabaseHandles int `toml:"-"` DatabaseCache int - TrieCache int + TrieCleanCache int + TrieDirtyCache int TrieTimeout time.Duration Etherbase common.Address `toml:",omitempty"` MinerNotify []string `toml:",omitempty"` @@ -43,6 +44,8 @@ func (c Config) MarshalTOML() (interface{}, error) { GPO gasprice.Config EnablePreimageRecording bool DocRoot string `toml:"-"` + EWASMInterpreter string + EVMInterpreter string } var enc Config enc.Genesis = c.Genesis @@ -54,7 +57,8 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.SkipBcVersionCheck = c.SkipBcVersionCheck enc.DatabaseHandles = c.DatabaseHandles enc.DatabaseCache = c.DatabaseCache - enc.TrieCache = c.TrieCache + enc.TrieCleanCache = c.TrieCleanCache + enc.TrieDirtyCache = c.TrieDirtyCache enc.TrieTimeout = c.TrieTimeout enc.Etherbase = c.Etherbase enc.MinerNotify = c.MinerNotify @@ -69,6 +73,8 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.GPO = c.GPO enc.EnablePreimageRecording = c.EnablePreimageRecording enc.DocRoot = c.DocRoot + enc.EWASMInterpreter = c.EWASMInterpreter + enc.EVMInterpreter = c.EVMInterpreter return &enc, nil } @@ -84,7 +90,8 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { SkipBcVersionCheck *bool `toml:"-"` DatabaseHandles *int `toml:"-"` DatabaseCache *int - TrieCache *int + TrieCleanCache *int + TrieDirtyCache *int TrieTimeout *time.Duration Etherbase *common.Address `toml:",omitempty"` MinerNotify []string `toml:",omitempty"` @@ -99,6 +106,8 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { GPO *gasprice.Config EnablePreimageRecording *bool DocRoot *string `toml:"-"` + EWASMInterpreter *string + EVMInterpreter *string } var dec Config if err := unmarshal(&dec); err != nil { @@ -131,8 +140,11 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.DatabaseCache != nil { c.DatabaseCache = *dec.DatabaseCache } - if dec.TrieCache != nil { - c.TrieCache = *dec.TrieCache + if dec.TrieCleanCache != nil { + c.TrieCleanCache = *dec.TrieCleanCache + } + if dec.TrieDirtyCache != nil { + c.TrieDirtyCache = *dec.TrieDirtyCache } if dec.TrieTimeout != nil { c.TrieTimeout = *dec.TrieTimeout @@ -176,5 +188,11 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.DocRoot != nil { c.DocRoot = *dec.DocRoot } + if dec.EWASMInterpreter != nil { + c.EWASMInterpreter = *dec.EWASMInterpreter + } + if dec.EVMInterpreter != nil { + c.EVMInterpreter = *dec.EVMInterpreter + } return nil } diff --git a/light/postprocess.go b/light/postprocess.go index 1cfd7535e..dd1b74a7b 100644 --- a/light/postprocess.go +++ b/light/postprocess.go @@ -159,7 +159,7 @@ func NewChtIndexer(db ethdb.Database, odr OdrBackend, size, confirms uint64) *co diskdb: db, odr: odr, trieTable: trieTable, - triedb: trie.NewDatabase(trieTable), + triedb: trie.NewDatabaseWithCache(trieTable, 1), // Use a tiny cache only to keep memory down sectionSize: size, } return core.NewChainIndexer(db, ethdb.NewTable(db, "chtIndex-"), backend, size, confirms, time.Millisecond*100, "cht") @@ -281,7 +281,7 @@ func NewBloomTrieIndexer(db ethdb.Database, odr OdrBackend, parentSize, size uin diskdb: db, odr: odr, trieTable: trieTable, - triedb: trie.NewDatabase(trieTable), + triedb: trie.NewDatabaseWithCache(trieTable, 1), // Use a tiny cache only to keep memory down parentSize: parentSize, size: size, } diff --git a/tests/block_test_util.go b/tests/block_test_util.go index 12cba3244..9fa69bf4e 100644 --- a/tests/block_test_util.go +++ b/tests/block_test_util.go @@ -118,7 +118,7 @@ func (t *BlockTest) Run() error { } else { engine = ethash.NewShared() } - chain, err := core.NewBlockChain(db, nil, config, engine, vm.Config{}, nil) + chain, err := core.NewBlockChain(db, &core.CacheConfig{TrieCleanLimit: 0}, config, engine, vm.Config{}, nil) if err != nil { return err } diff --git a/trie/database.go b/trie/database.go index d0691b637..71190b3f3 100644 --- a/trie/database.go +++ b/trie/database.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/allegro/bigcache" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" @@ -30,6 +31,11 @@ import ( ) var ( + memcacheCleanHitMeter = metrics.NewRegisteredMeter("trie/memcache/clean/hit", nil) + memcacheCleanMissMeter = metrics.NewRegisteredMeter("trie/memcache/clean/miss", nil) + memcacheCleanReadMeter = metrics.NewRegisteredMeter("trie/memcache/clean/read", nil) + memcacheCleanWriteMeter = metrics.NewRegisteredMeter("trie/memcache/clean/write", nil) + memcacheFlushTimeTimer = metrics.NewRegisteredResettingTimer("trie/memcache/flush/time", nil) memcacheFlushNodesMeter = metrics.NewRegisteredMeter("trie/memcache/flush/nodes", nil) memcacheFlushSizeMeter = metrics.NewRegisteredMeter("trie/memcache/flush/size", nil) @@ -64,9 +70,10 @@ type DatabaseReader interface { type Database struct { diskdb ethdb.Database // Persistent storage for matured trie nodes - nodes map[common.Hash]*cachedNode // Data and references relationships of a node - oldest common.Hash // Oldest tracked node, flush-list head - newest common.Hash // Newest tracked node, flush-list tail + cleans *bigcache.BigCache // GC friendly memory cache of clean node RLPs + dirties map[common.Hash]*cachedNode // Data and references relationships of dirty nodes + oldest common.Hash // Oldest tracked node, flush-list head + newest common.Hash // Newest tracked node, flush-list tail preimages map[common.Hash][]byte // Preimages of nodes from the secure trie seckeybuf [secureKeyLength]byte // Ephemeral buffer for calculating preimage keys @@ -79,7 +86,7 @@ type Database struct { flushnodes uint64 // Nodes flushed since last commit flushsize common.StorageSize // Data storage flushed since last commit - nodesSize common.StorageSize // Storage size of the nodes cache (exc. flushlist) + dirtiesSize common.StorageSize // Storage size of the dirty node cache (exc. flushlist) preimagesSize common.StorageSize // Storage size of the preimages cache lock sync.RWMutex @@ -262,11 +269,30 @@ func expandNode(hash hashNode, n node, cachegen uint16) node { } // NewDatabase creates a new trie database to store ephemeral trie content before -// its written out to disk or garbage collected. +// its written out to disk or garbage collected. No read cache is created, so all +// data retrievals will hit the underlying disk database. func NewDatabase(diskdb ethdb.Database) *Database { + return NewDatabaseWithCache(diskdb, 0) +} + +// NewDatabaseWithCache creates a new trie database to store ephemeral trie content +// before its written out to disk or garbage collected. It also acts as a read cache +// for nodes loaded from disk. +func NewDatabaseWithCache(diskdb ethdb.Database, cache int) *Database { + var cleans *bigcache.BigCache + if cache > 0 { + cleans, _ = bigcache.NewBigCache(bigcache.Config{ + Shards: 1024, + LifeWindow: time.Hour, + MaxEntriesInWindow: cache * 1024, + MaxEntrySize: 512, + HardMaxCacheSize: cache, + }) + } return &Database{ diskdb: diskdb, - nodes: map[common.Hash]*cachedNode{{}: {}}, + cleans: cleans, + dirties: map[common.Hash]*cachedNode{{}: {}}, preimages: make(map[common.Hash][]byte), } } @@ -293,7 +319,7 @@ func (db *Database) InsertBlob(hash common.Hash, blob []byte) { // size tracking. func (db *Database) insert(hash common.Hash, blob []byte, node node) { // If the node's already cached, skip - if _, ok := db.nodes[hash]; ok { + if _, ok := db.dirties[hash]; ok { return } // Create the cached entry for this node @@ -303,19 +329,19 @@ func (db *Database) insert(hash common.Hash, blob []byte, node node) { flushPrev: db.newest, } for _, child := range entry.childs() { - if c := db.nodes[child]; c != nil { + if c := db.dirties[child]; c != nil { c.parents++ } } - db.nodes[hash] = entry + db.dirties[hash] = entry // Update the flush-list endpoints if db.oldest == (common.Hash{}) { db.oldest, db.newest = hash, hash } else { - db.nodes[db.newest].flushNext, db.newest = hash, hash + db.dirties[db.newest].flushNext, db.newest = hash, hash } - db.nodesSize += common.StorageSize(common.HashLength + entry.size) + db.dirtiesSize += common.StorageSize(common.HashLength + entry.size) } // insertPreimage writes a new trie node pre-image to the memory database if it's @@ -333,35 +359,64 @@ func (db *Database) insertPreimage(hash common.Hash, preimage []byte) { // node retrieves a cached trie node from memory, or returns nil if none can be // found in the memory cache. func (db *Database) node(hash common.Hash, cachegen uint16) node { - // Retrieve the node from cache if available + // Retrieve the node from the clean cache if available + if db.cleans != nil { + if enc, err := db.cleans.Get(string(hash[:])); err == nil && enc != nil { + memcacheCleanHitMeter.Mark(1) + memcacheCleanReadMeter.Mark(int64(len(enc))) + return mustDecodeNode(hash[:], enc, cachegen) + } + } + // Retrieve the node from the dirty cache if available db.lock.RLock() - node := db.nodes[hash] + dirty := db.dirties[hash] db.lock.RUnlock() - if node != nil { - return node.obj(hash, cachegen) + if dirty != nil { + return dirty.obj(hash, cachegen) } // Content unavailable in memory, attempt to retrieve from disk enc, err := db.diskdb.Get(hash[:]) if err != nil || enc == nil { return nil } + if db.cleans != nil { + db.cleans.Set(string(hash[:]), enc) + memcacheCleanMissMeter.Mark(1) + memcacheCleanWriteMeter.Mark(int64(len(enc))) + } return mustDecodeNode(hash[:], enc, cachegen) } // Node retrieves an encoded cached trie node from memory. If it cannot be found // cached, the method queries the persistent database for the content. func (db *Database) Node(hash common.Hash) ([]byte, error) { - // Retrieve the node from cache if available + // Retrieve the node from the clean cache if available + if db.cleans != nil { + if enc, err := db.cleans.Get(string(hash[:])); err == nil && enc != nil { + memcacheCleanHitMeter.Mark(1) + memcacheCleanReadMeter.Mark(int64(len(enc))) + return enc, nil + } + } + // Retrieve the node from the dirty cache if available db.lock.RLock() - node := db.nodes[hash] + dirty := db.dirties[hash] db.lock.RUnlock() - if node != nil { - return node.rlp(), nil + if dirty != nil { + return dirty.rlp(), nil } // Content unavailable in memory, attempt to retrieve from disk - return db.diskdb.Get(hash[:]) + enc, err := db.diskdb.Get(hash[:]) + if err == nil && enc != nil { + if db.cleans != nil { + db.cleans.Set(string(hash[:]), enc) + memcacheCleanMissMeter.Mark(1) + memcacheCleanWriteMeter.Mark(int64(len(enc))) + } + } + return enc, err } // preimage retrieves a cached trie node pre-image from memory. If it cannot be @@ -395,8 +450,8 @@ func (db *Database) Nodes() []common.Hash { db.lock.RLock() defer db.lock.RUnlock() - var hashes = make([]common.Hash, 0, len(db.nodes)) - for hash := range db.nodes { + var hashes = make([]common.Hash, 0, len(db.dirties)) + for hash := range db.dirties { if hash != (common.Hash{}) { // Special case for "root" references/nodes hashes = append(hashes, hash) } @@ -415,18 +470,18 @@ func (db *Database) Reference(child common.Hash, parent common.Hash) { // reference is the private locked version of Reference. func (db *Database) reference(child common.Hash, parent common.Hash) { // If the node does not exist, it's a node pulled from disk, skip - node, ok := db.nodes[child] + node, ok := db.dirties[child] if !ok { return } // If the reference already exists, only duplicate for roots - if db.nodes[parent].children == nil { - db.nodes[parent].children = make(map[common.Hash]uint16) - } else if _, ok = db.nodes[parent].children[child]; ok && parent != (common.Hash{}) { + if db.dirties[parent].children == nil { + db.dirties[parent].children = make(map[common.Hash]uint16) + } else if _, ok = db.dirties[parent].children[child]; ok && parent != (common.Hash{}) { return } node.parents++ - db.nodes[parent].children[child]++ + db.dirties[parent].children[child]++ } // Dereference removes an existing reference from a root node. @@ -439,25 +494,25 @@ func (db *Database) Dereference(root common.Hash) { db.lock.Lock() defer db.lock.Unlock() - nodes, storage, start := len(db.nodes), db.nodesSize, time.Now() + nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now() db.dereference(root, common.Hash{}) - db.gcnodes += uint64(nodes - len(db.nodes)) - db.gcsize += storage - db.nodesSize + db.gcnodes += uint64(nodes - len(db.dirties)) + db.gcsize += storage - db.dirtiesSize db.gctime += time.Since(start) memcacheGCTimeTimer.Update(time.Since(start)) - memcacheGCSizeMeter.Mark(int64(storage - db.nodesSize)) - memcacheGCNodesMeter.Mark(int64(nodes - len(db.nodes))) + memcacheGCSizeMeter.Mark(int64(storage - db.dirtiesSize)) + memcacheGCNodesMeter.Mark(int64(nodes - len(db.dirties))) - log.Debug("Dereferenced trie from memory database", "nodes", nodes-len(db.nodes), "size", storage-db.nodesSize, "time", time.Since(start), - "gcnodes", db.gcnodes, "gcsize", db.gcsize, "gctime", db.gctime, "livenodes", len(db.nodes), "livesize", db.nodesSize) + log.Debug("Dereferenced trie from memory database", "nodes", nodes-len(db.dirties), "size", storage-db.dirtiesSize, "time", time.Since(start), + "gcnodes", db.gcnodes, "gcsize", db.gcsize, "gctime", db.gctime, "livenodes", len(db.dirties), "livesize", db.dirtiesSize) } // dereference is the private locked version of Dereference. func (db *Database) dereference(child common.Hash, parent common.Hash) { // Dereference the parent-child - node := db.nodes[parent] + node := db.dirties[parent] if node.children != nil && node.children[child] > 0 { node.children[child]-- @@ -466,7 +521,7 @@ func (db *Database) dereference(child common.Hash, parent common.Hash) { } } // If the child does not exist, it's a previously committed node. - node, ok := db.nodes[child] + node, ok := db.dirties[child] if !ok { return } @@ -483,20 +538,20 @@ func (db *Database) dereference(child common.Hash, parent common.Hash) { switch child { case db.oldest: db.oldest = node.flushNext - db.nodes[node.flushNext].flushPrev = common.Hash{} + db.dirties[node.flushNext].flushPrev = common.Hash{} case db.newest: db.newest = node.flushPrev - db.nodes[node.flushPrev].flushNext = common.Hash{} + db.dirties[node.flushPrev].flushNext = common.Hash{} default: - db.nodes[node.flushPrev].flushNext = node.flushNext - db.nodes[node.flushNext].flushPrev = node.flushPrev + db.dirties[node.flushPrev].flushNext = node.flushNext + db.dirties[node.flushNext].flushPrev = node.flushPrev } // Dereference all children and delete the node for _, hash := range node.childs() { db.dereference(hash, child) } - delete(db.nodes, child) - db.nodesSize -= common.StorageSize(common.HashLength + int(node.size)) + delete(db.dirties, child) + db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size)) } } @@ -509,13 +564,13 @@ func (db *Database) Cap(limit common.StorageSize) error { // by only uncaching existing data when the database write finalizes. db.lock.RLock() - nodes, storage, start := len(db.nodes), db.nodesSize, time.Now() + nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now() batch := db.diskdb.NewBatch() - // db.nodesSize only contains the useful data in the cache, but when reporting + // db.dirtiesSize only contains the useful data in the cache, but when reporting // the total memory consumption, the maintenance metadata is also needed to be // counted. For every useful node, we track 2 extra hashes as the flushlist. - size := db.nodesSize + common.StorageSize((len(db.nodes)-1)*2*common.HashLength) + size := db.dirtiesSize + common.StorageSize((len(db.dirties)-1)*2*common.HashLength) // If the preimage cache got large enough, push to disk. If it's still small // leave for later to deduplicate writes. @@ -540,7 +595,7 @@ func (db *Database) Cap(limit common.StorageSize) error { oldest := db.oldest for size > limit && oldest != (common.Hash{}) { // Fetch the oldest referenced node and push into the batch - node := db.nodes[oldest] + node := db.dirties[oldest] if err := batch.Put(oldest[:], node.rlp()); err != nil { db.lock.RUnlock() return err @@ -578,25 +633,25 @@ func (db *Database) Cap(limit common.StorageSize) error { db.preimagesSize = 0 } for db.oldest != oldest { - node := db.nodes[db.oldest] - delete(db.nodes, db.oldest) + node := db.dirties[db.oldest] + delete(db.dirties, db.oldest) db.oldest = node.flushNext - db.nodesSize -= common.StorageSize(common.HashLength + int(node.size)) + db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size)) } if db.oldest != (common.Hash{}) { - db.nodes[db.oldest].flushPrev = common.Hash{} + db.dirties[db.oldest].flushPrev = common.Hash{} } - db.flushnodes += uint64(nodes - len(db.nodes)) - db.flushsize += storage - db.nodesSize + db.flushnodes += uint64(nodes - len(db.dirties)) + db.flushsize += storage - db.dirtiesSize db.flushtime += time.Since(start) memcacheFlushTimeTimer.Update(time.Since(start)) - memcacheFlushSizeMeter.Mark(int64(storage - db.nodesSize)) - memcacheFlushNodesMeter.Mark(int64(nodes - len(db.nodes))) + memcacheFlushSizeMeter.Mark(int64(storage - db.dirtiesSize)) + memcacheFlushNodesMeter.Mark(int64(nodes - len(db.dirties))) - log.Debug("Persisted nodes from memory database", "nodes", nodes-len(db.nodes), "size", storage-db.nodesSize, "time", time.Since(start), - "flushnodes", db.flushnodes, "flushsize", db.flushsize, "flushtime", db.flushtime, "livenodes", len(db.nodes), "livesize", db.nodesSize) + log.Debug("Persisted nodes from memory database", "nodes", nodes-len(db.dirties), "size", storage-db.dirtiesSize, "time", time.Since(start), + "flushnodes", db.flushnodes, "flushsize", db.flushsize, "flushtime", db.flushtime, "livenodes", len(db.dirties), "livesize", db.dirtiesSize) return nil } @@ -630,7 +685,7 @@ func (db *Database) Commit(node common.Hash, report bool) error { } } // Move the trie itself into the batch, flushing if enough data is accumulated - nodes, storage := len(db.nodes), db.nodesSize + nodes, storage := len(db.dirties), db.dirtiesSize if err := db.commit(node, batch); err != nil { log.Error("Failed to commit trie from trie database", "err", err) db.lock.RUnlock() @@ -654,15 +709,15 @@ func (db *Database) Commit(node common.Hash, report bool) error { db.uncache(node) memcacheCommitTimeTimer.Update(time.Since(start)) - memcacheCommitSizeMeter.Mark(int64(storage - db.nodesSize)) - memcacheCommitNodesMeter.Mark(int64(nodes - len(db.nodes))) + memcacheCommitSizeMeter.Mark(int64(storage - db.dirtiesSize)) + memcacheCommitNodesMeter.Mark(int64(nodes - len(db.dirties))) logger := log.Info if !report { logger = log.Debug } - logger("Persisted trie from memory database", "nodes", nodes-len(db.nodes)+int(db.flushnodes), "size", storage-db.nodesSize+db.flushsize, "time", time.Since(start)+db.flushtime, - "gcnodes", db.gcnodes, "gcsize", db.gcsize, "gctime", db.gctime, "livenodes", len(db.nodes), "livesize", db.nodesSize) + logger("Persisted trie from memory database", "nodes", nodes-len(db.dirties)+int(db.flushnodes), "size", storage-db.dirtiesSize+db.flushsize, "time", time.Since(start)+db.flushtime, + "gcnodes", db.gcnodes, "gcsize", db.gcsize, "gctime", db.gctime, "livenodes", len(db.dirties), "livesize", db.dirtiesSize) // Reset the garbage collection statistics db.gcnodes, db.gcsize, db.gctime = 0, 0, 0 @@ -674,7 +729,7 @@ func (db *Database) Commit(node common.Hash, report bool) error { // commit is the private locked version of Commit. func (db *Database) commit(hash common.Hash, batch ethdb.Batch) error { // If the node does not exist, it's a previously committed node - node, ok := db.nodes[hash] + node, ok := db.dirties[hash] if !ok { return nil } @@ -702,7 +757,7 @@ func (db *Database) commit(hash common.Hash, batch ethdb.Batch) error { // to disk. func (db *Database) uncache(hash common.Hash) { // If the node does not exist, we're done on this path - node, ok := db.nodes[hash] + node, ok := db.dirties[hash] if !ok { return } @@ -710,20 +765,20 @@ func (db *Database) uncache(hash common.Hash) { switch hash { case db.oldest: db.oldest = node.flushNext - db.nodes[node.flushNext].flushPrev = common.Hash{} + db.dirties[node.flushNext].flushPrev = common.Hash{} case db.newest: db.newest = node.flushPrev - db.nodes[node.flushPrev].flushNext = common.Hash{} + db.dirties[node.flushPrev].flushNext = common.Hash{} default: - db.nodes[node.flushPrev].flushNext = node.flushNext - db.nodes[node.flushNext].flushPrev = node.flushPrev + db.dirties[node.flushPrev].flushNext = node.flushNext + db.dirties[node.flushNext].flushPrev = node.flushPrev } // Uncache the node's subtries and remove the node itself too for _, child := range node.childs() { db.uncache(child) } - delete(db.nodes, hash) - db.nodesSize -= common.StorageSize(common.HashLength + int(node.size)) + delete(db.dirties, hash) + db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size)) } // Size returns the current storage size of the memory cache in front of the @@ -732,11 +787,11 @@ func (db *Database) Size() (common.StorageSize, common.StorageSize) { db.lock.RLock() defer db.lock.RUnlock() - // db.nodesSize only contains the useful data in the cache, but when reporting + // db.dirtiesSize only contains the useful data in the cache, but when reporting // the total memory consumption, the maintenance metadata is also needed to be // counted. For every useful node, we track 2 extra hashes as the flushlist. - var flushlistSize = common.StorageSize((len(db.nodes) - 1) * 2 * common.HashLength) - return db.nodesSize + flushlistSize, db.preimagesSize + var flushlistSize = common.StorageSize((len(db.dirties) - 1) * 2 * common.HashLength) + return db.dirtiesSize + flushlistSize, db.preimagesSize } // verifyIntegrity is a debug method to iterate over the entire trie stored in @@ -749,12 +804,12 @@ func (db *Database) verifyIntegrity() { // Iterate over all the cached nodes and accumulate them into a set reachable := map[common.Hash]struct{}{{}: {}} - for child := range db.nodes[common.Hash{}].children { + for child := range db.dirties[common.Hash{}].children { db.accumulate(child, reachable) } // Find any unreachable but cached nodes unreachable := []string{} - for hash, node := range db.nodes { + for hash, node := range db.dirties { if _, ok := reachable[hash]; !ok { unreachable = append(unreachable, fmt.Sprintf("%x: {Node: %v, Parents: %d, Prev: %x, Next: %x}", hash, node.node, node.parents, node.flushPrev, node.flushNext)) @@ -769,7 +824,7 @@ func (db *Database) verifyIntegrity() { // cached children found in memory. func (db *Database) accumulate(hash common.Hash, reachable map[common.Hash]struct{}) { // Mark the node reachable if present in the memory cache - node, ok := db.nodes[hash] + node, ok := db.dirties[hash] if !ok { return } diff --git a/trie/iterator_test.go b/trie/iterator_test.go index 2a510b1c2..4f633b195 100644 --- a/trie/iterator_test.go +++ b/trie/iterator_test.go @@ -113,7 +113,7 @@ func TestNodeIteratorCoverage(t *testing.T) { t.Errorf("failed to retrieve reported node %x: %v", hash, err) } } - for hash, obj := range db.nodes { + for hash, obj := range db.dirties { if obj != nil && hash != (common.Hash{}) { if _, ok := hashes[hash]; !ok { t.Errorf("state entry not reported %x", hash) @@ -333,8 +333,8 @@ func testIteratorContinueAfterError(t *testing.T, memonly bool) { } } if memonly { - robj = triedb.nodes[rkey] - delete(triedb.nodes, rkey) + robj = triedb.dirties[rkey] + delete(triedb.dirties, rkey) } else { rval, _ = diskdb.Get(rkey[:]) diskdb.Delete(rkey[:]) @@ -350,7 +350,7 @@ func testIteratorContinueAfterError(t *testing.T, memonly bool) { // Add the node back and continue iteration. if memonly { - triedb.nodes[rkey] = robj + triedb.dirties[rkey] = robj } else { diskdb.Put(rkey[:], rval) } @@ -393,8 +393,8 @@ func testIteratorContinueAfterSeekError(t *testing.T, memonly bool) { barNodeObj *cachedNode ) if memonly { - barNodeObj = triedb.nodes[barNodeHash] - delete(triedb.nodes, barNodeHash) + barNodeObj = triedb.dirties[barNodeHash] + delete(triedb.dirties, barNodeHash) } else { barNodeBlob, _ = diskdb.Get(barNodeHash[:]) diskdb.Delete(barNodeHash[:]) @@ -411,7 +411,7 @@ func testIteratorContinueAfterSeekError(t *testing.T, memonly bool) { } // Reinsert the missing node. if memonly { - triedb.nodes[barNodeHash] = barNodeObj + triedb.dirties[barNodeHash] = barNodeObj } else { diskdb.Put(barNodeHash[:], barNodeBlob) } diff --git a/trie/trie_test.go b/trie/trie_test.go index f8e5fd12a..f9d6029c9 100644 --- a/trie/trie_test.go +++ b/trie/trie_test.go @@ -119,7 +119,7 @@ func testMissingNode(t *testing.T, memonly bool) { hash := common.HexToHash("0xe1d943cc8f061a0c0b98162830b970395ac9315654824bf21b73b891365262f9") if memonly { - delete(triedb.nodes, hash) + delete(triedb.dirties, hash) } else { diskdb.Delete(hash[:]) } @@ -342,15 +342,16 @@ func TestCacheUnload(t *testing.T) { // Commit the trie repeatedly and access key1. // The branch containing it is loaded from DB exactly two times: // in the 0th and 6th iteration. - db := &countingDB{Database: trie.db.diskdb, gets: make(map[string]int)} - trie, _ = New(root, NewDatabase(db)) + diskdb := &countingDB{Database: trie.db.diskdb, gets: make(map[string]int)} + triedb := NewDatabase(diskdb) + trie, _ = New(root, triedb) trie.SetCacheLimit(5) for i := 0; i < 12; i++ { getString(trie, key1) trie.Commit(nil) } // Check that it got loaded two times. - for dbkey, count := range db.gets { + for dbkey, count := range diskdb.gets { if count != 2 { t.Errorf("db key %x loaded %d times, want %d times", []byte(dbkey), count, 2) } diff --git a/vendor/github.com/allegro/bigcache/LICENSE b/vendor/github.com/allegro/bigcache/LICENSE new file mode 100644 index 000000000..8dada3eda --- /dev/null +++ b/vendor/github.com/allegro/bigcache/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/allegro/bigcache/README.md b/vendor/github.com/allegro/bigcache/README.md new file mode 100644 index 000000000..c23f7f36c --- /dev/null +++ b/vendor/github.com/allegro/bigcache/README.md @@ -0,0 +1,150 @@ +# BigCache [![Build Status](https://travis-ci.org/allegro/bigcache.svg?branch=master)](https://travis-ci.org/allegro/bigcache) [![Coverage Status](https://coveralls.io/repos/github/allegro/bigcache/badge.svg?branch=master)](https://coveralls.io/github/allegro/bigcache?branch=master) [![GoDoc](https://godoc.org/github.com/allegro/bigcache?status.svg)](https://godoc.org/github.com/allegro/bigcache) [![Go Report Card](https://goreportcard.com/badge/github.com/allegro/bigcache)](https://goreportcard.com/report/github.com/allegro/bigcache) + +Fast, concurrent, evicting in-memory cache written to keep big number of entries without impact on performance. +BigCache keeps entries on heap but omits GC for them. To achieve that operations on bytes arrays take place, +therefore entries (de)serialization in front of the cache will be needed in most use cases. + +## Usage + +### Simple initialization + +```go +import "github.com/allegro/bigcache" + +cache, _ := bigcache.NewBigCache(bigcache.DefaultConfig(10 * time.Minute)) + +cache.Set("my-unique-key", []byte("value")) + +entry, _ := cache.Get("my-unique-key") +fmt.Println(string(entry)) +``` + +### Custom initialization + +When cache load can be predicted in advance then it is better to use custom initialization because additional memory +allocation can be avoided in that way. + +```go +import ( + "log" + + "github.com/allegro/bigcache" +) + +config := bigcache.Config { + // number of shards (must be a power of 2) + Shards: 1024, + // time after which entry can be evicted + LifeWindow: 10 * time.Minute, + // rps * lifeWindow, used only in initial memory allocation + MaxEntriesInWindow: 1000 * 10 * 60, + // max entry size in bytes, used only in initial memory allocation + MaxEntrySize: 500, + // prints information about additional memory allocation + Verbose: true, + // cache will not allocate more memory than this limit, value in MB + // if value is reached then the oldest entries can be overridden for the new ones + // 0 value means no size limit + HardMaxCacheSize: 8192, + // callback fired when the oldest entry is removed because of its expiration time or no space left + // for the new entry, or because delete was called. A bitmask representing the reason will be returned. + // Default value is nil which means no callback and it prevents from unwrapping the oldest entry. + OnRemove: nil, + // OnRemoveWithReason is a callback fired when the oldest entry is removed because of its expiration time or no space left + // for the new entry, or because delete was called. A constant representing the reason will be passed through. + // Default value is nil which means no callback and it prevents from unwrapping the oldest entry. + // Ignored if OnRemove is specified. + OnRemoveWithReason: nil, + } + +cache, initErr := bigcache.NewBigCache(config) +if initErr != nil { + log.Fatal(initErr) +} + +cache.Set("my-unique-key", []byte("value")) + +if entry, err := cache.Get("my-unique-key"); err == nil { + fmt.Println(string(entry)) +} +``` + +## Benchmarks + +Three caches were compared: bigcache, [freecache](https://github.com/coocood/freecache) and map. +Benchmark tests were made using an i7-6700K with 32GB of RAM on Windows 10. + +### Writes and reads + +```bash +cd caches_bench; go test -bench=. -benchtime=10s ./... -timeout 30m + +BenchmarkMapSet-8 3000000 569 ns/op 202 B/op 3 allocs/op +BenchmarkConcurrentMapSet-8 1000000 1592 ns/op 347 B/op 8 allocs/op +BenchmarkFreeCacheSet-8 3000000 775 ns/op 355 B/op 2 allocs/op +BenchmarkBigCacheSet-8 3000000 640 ns/op 303 B/op 2 allocs/op +BenchmarkMapGet-8 5000000 407 ns/op 24 B/op 1 allocs/op +BenchmarkConcurrentMapGet-8 3000000 558 ns/op 24 B/op 2 allocs/op +BenchmarkFreeCacheGet-8 2000000 682 ns/op 136 B/op 2 allocs/op +BenchmarkBigCacheGet-8 3000000 512 ns/op 152 B/op 4 allocs/op +BenchmarkBigCacheSetParallel-8 10000000 225 ns/op 313 B/op 3 allocs/op +BenchmarkFreeCacheSetParallel-8 10000000 218 ns/op 341 B/op 3 allocs/op +BenchmarkConcurrentMapSetParallel-8 5000000 318 ns/op 200 B/op 6 allocs/op +BenchmarkBigCacheGetParallel-8 20000000 178 ns/op 152 B/op 4 allocs/op +BenchmarkFreeCacheGetParallel-8 20000000 295 ns/op 136 B/op 3 allocs/op +BenchmarkConcurrentMapGetParallel-8 10000000 237 ns/op 24 B/op 2 allocs/op +``` + +Writes and reads in bigcache are faster than in freecache. +Writes to map are the slowest. + +### GC pause time + +```bash +cd caches_bench; go run caches_gc_overhead_comparison.go + +Number of entries: 20000000 +GC pause for bigcache: 5.8658ms +GC pause for freecache: 32.4341ms +GC pause for map: 52.9661ms +``` + +Test shows how long are the GC pauses for caches filled with 20mln of entries. +Bigcache and freecache have very similar GC pause time. +It is clear that both reduce GC overhead in contrast to map +which GC pause time took more than 10 seconds. + +## How it works + +BigCache relies on optimization presented in 1.5 version of Go ([issue-9477](https://github.com/golang/go/issues/9477)). +This optimization states that if map without pointers in keys and values is used then GC will omit its content. +Therefore BigCache uses `map[uint64]uint32` where keys are hashed and values are offsets of entries. + +Entries are kept in bytes array, to omit GC again. +Bytes array size can grow to gigabytes without impact on performance +because GC will only see single pointer to it. + +## Bigcache vs Freecache + +Both caches provide the same core features but they reduce GC overhead in different ways. +Bigcache relies on `map[uint64]uint32`, freecache implements its own mapping built on +slices to reduce number of pointers. + +Results from benchmark tests are presented above. +One of the advantage of bigcache over freecache is that you don’t need to know +the size of the cache in advance, because when bigcache is full, +it can allocate additional memory for new entries instead of +overwriting existing ones as freecache does currently. +However hard max size in bigcache also can be set, check [HardMaxCacheSize](https://godoc.org/github.com/allegro/bigcache#Config). + +## HTTP Server + +This package also includes an easily deployable HTTP implementation of BigCache, which can be found in the [server](/server) package. + +## More + +Bigcache genesis is described in allegro.tech blog post: [writing a very fast cache service in Go](http://allegro.tech/2016/03/writing-fast-cache-service-in-go.html) + +## License + +BigCache is released under the Apache 2.0 license (see [LICENSE](LICENSE)) diff --git a/vendor/github.com/allegro/bigcache/bigcache.go b/vendor/github.com/allegro/bigcache/bigcache.go new file mode 100644 index 000000000..e6aee8663 --- /dev/null +++ b/vendor/github.com/allegro/bigcache/bigcache.go @@ -0,0 +1,202 @@ +package bigcache + +import ( + "fmt" + "time" +) + +const ( + minimumEntriesInShard = 10 // Minimum number of entries in single shard +) + +// BigCache is fast, concurrent, evicting cache created to keep big number of entries without impact on performance. +// It keeps entries on heap but omits GC for them. To achieve that, operations take place on byte arrays, +// therefore entries (de)serialization in front of the cache will be needed in most use cases. +type BigCache struct { + shards []*cacheShard + lifeWindow uint64 + clock clock + hash Hasher + config Config + shardMask uint64 + maxShardSize uint32 + close chan struct{} +} + +// RemoveReason is a value used to signal to the user why a particular key was removed in the OnRemove callback. +type RemoveReason uint32 + +const ( + // Expired means the key is past its LifeWindow. + Expired RemoveReason = iota + // NoSpace means the key is the oldest and the cache size was at its maximum when Set was called, or the + // entry exceeded the maximum shard size. + NoSpace + // Deleted means Delete was called and this key was removed as a result. + Deleted +) + +// NewBigCache initialize new instance of BigCache +func NewBigCache(config Config) (*BigCache, error) { + return newBigCache(config, &systemClock{}) +} + +func newBigCache(config Config, clock clock) (*BigCache, error) { + + if !isPowerOfTwo(config.Shards) { + return nil, fmt.Errorf("Shards number must be power of two") + } + + if config.Hasher == nil { + config.Hasher = newDefaultHasher() + } + + cache := &BigCache{ + shards: make([]*cacheShard, config.Shards), + lifeWindow: uint64(config.LifeWindow.Seconds()), + clock: clock, + hash: config.Hasher, + config: config, + shardMask: uint64(config.Shards - 1), + maxShardSize: uint32(config.maximumShardSize()), + close: make(chan struct{}), + } + + var onRemove func(wrappedEntry []byte, reason RemoveReason) + if config.OnRemove != nil { + onRemove = cache.providedOnRemove + } else if config.OnRemoveWithReason != nil { + onRemove = cache.providedOnRemoveWithReason + } else { + onRemove = cache.notProvidedOnRemove + } + + for i := 0; i < config.Shards; i++ { + cache.shards[i] = initNewShard(config, onRemove, clock) + } + + if config.CleanWindow > 0 { + go func() { + ticker := time.NewTicker(config.CleanWindow) + defer ticker.Stop() + for { + select { + case t := <-ticker.C: + cache.cleanUp(uint64(t.Unix())) + case <-cache.close: + return + } + } + }() + } + + return cache, nil +} + +// Close is used to signal a shutdown of the cache when you are done with it. +// This allows the cleaning goroutines to exit and ensures references are not +// kept to the cache preventing GC of the entire cache. +func (c *BigCache) Close() error { + close(c.close) + return nil +} + +// Get reads entry for the key. +// It returns an EntryNotFoundError when +// no entry exists for the given key. +func (c *BigCache) Get(key string) ([]byte, error) { + hashedKey := c.hash.Sum64(key) + shard := c.getShard(hashedKey) + return shard.get(key, hashedKey) +} + +// Set saves entry under the key +func (c *BigCache) Set(key string, entry []byte) error { + hashedKey := c.hash.Sum64(key) + shard := c.getShard(hashedKey) + return shard.set(key, hashedKey, entry) +} + +// Delete removes the key +func (c *BigCache) Delete(key string) error { + hashedKey := c.hash.Sum64(key) + shard := c.getShard(hashedKey) + return shard.del(key, hashedKey) +} + +// Reset empties all cache shards +func (c *BigCache) Reset() error { + for _, shard := range c.shards { + shard.reset(c.config) + } + return nil +} + +// Len computes number of entries in cache +func (c *BigCache) Len() int { + var len int + for _, shard := range c.shards { + len += shard.len() + } + return len +} + +// Capacity returns amount of bytes store in the cache. +func (c *BigCache) Capacity() int { + var len int + for _, shard := range c.shards { + len += shard.capacity() + } + return len +} + +// Stats returns cache's statistics +func (c *BigCache) Stats() Stats { + var s Stats + for _, shard := range c.shards { + tmp := shard.getStats() + s.Hits += tmp.Hits + s.Misses += tmp.Misses + s.DelHits += tmp.DelHits + s.DelMisses += tmp.DelMisses + s.Collisions += tmp.Collisions + } + return s +} + +// Iterator returns iterator function to iterate over EntryInfo's from whole cache. +func (c *BigCache) Iterator() *EntryInfoIterator { + return newIterator(c) +} + +func (c *BigCache) onEvict(oldestEntry []byte, currentTimestamp uint64, evict func(reason RemoveReason) error) bool { + oldestTimestamp := readTimestampFromEntry(oldestEntry) + if currentTimestamp-oldestTimestamp > c.lifeWindow { + evict(Expired) + return true + } + return false +} + +func (c *BigCache) cleanUp(currentTimestamp uint64) { + for _, shard := range c.shards { + shard.cleanUp(currentTimestamp) + } +} + +func (c *BigCache) getShard(hashedKey uint64) (shard *cacheShard) { + return c.shards[hashedKey&c.shardMask] +} + +func (c *BigCache) providedOnRemove(wrappedEntry []byte, reason RemoveReason) { + c.config.OnRemove(readKeyFromEntry(wrappedEntry), readEntry(wrappedEntry)) +} + +func (c *BigCache) providedOnRemoveWithReason(wrappedEntry []byte, reason RemoveReason) { + if c.config.onRemoveFilter == 0 || (1<<uint(reason))&c.config.onRemoveFilter > 0 { + c.config.OnRemoveWithReason(readKeyFromEntry(wrappedEntry), readEntry(wrappedEntry), reason) + } +} + +func (c *BigCache) notProvidedOnRemove(wrappedEntry []byte, reason RemoveReason) { +} diff --git a/vendor/github.com/allegro/bigcache/bytes.go b/vendor/github.com/allegro/bigcache/bytes.go new file mode 100644 index 000000000..3944bfe13 --- /dev/null +++ b/vendor/github.com/allegro/bigcache/bytes.go @@ -0,0 +1,14 @@ +// +build !appengine + +package bigcache + +import ( + "reflect" + "unsafe" +) + +func bytesToString(b []byte) string { + bytesHeader := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + strHeader := reflect.StringHeader{Data: bytesHeader.Data, Len: bytesHeader.Len} + return *(*string)(unsafe.Pointer(&strHeader)) +} diff --git a/vendor/github.com/allegro/bigcache/bytes_appengine.go b/vendor/github.com/allegro/bigcache/bytes_appengine.go new file mode 100644 index 000000000..3892f3b54 --- /dev/null +++ b/vendor/github.com/allegro/bigcache/bytes_appengine.go @@ -0,0 +1,7 @@ +// +build appengine + +package bigcache + +func bytesToString(b []byte) string { + return string(b) +} diff --git a/vendor/github.com/allegro/bigcache/clock.go b/vendor/github.com/allegro/bigcache/clock.go new file mode 100644 index 000000000..f8b535e13 --- /dev/null +++ b/vendor/github.com/allegro/bigcache/clock.go @@ -0,0 +1,14 @@ +package bigcache + +import "time" + +type clock interface { + epoch() int64 +} + +type systemClock struct { +} + +func (c systemClock) epoch() int64 { + return time.Now().Unix() +} diff --git a/vendor/github.com/allegro/bigcache/config.go b/vendor/github.com/allegro/bigcache/config.go new file mode 100644 index 000000000..9654143ab --- /dev/null +++ b/vendor/github.com/allegro/bigcache/config.go @@ -0,0 +1,86 @@ +package bigcache + +import "time" + +// Config for BigCache +type Config struct { + // Number of cache shards, value must be a power of two + Shards int + // Time after which entry can be evicted + LifeWindow time.Duration + // Interval between removing expired entries (clean up). + // If set to <= 0 then no action is performed. Setting to < 1 second is counterproductive — bigcache has a one second resolution. + CleanWindow time.Duration + // Max number of entries in life window. Used only to calculate initial size for cache shards. + // When proper value is set then additional memory allocation does not occur. + MaxEntriesInWindow int + // Max size of entry in bytes. Used only to calculate initial size for cache shards. + MaxEntrySize int + // Verbose mode prints information about new memory allocation + Verbose bool + // Hasher used to map between string keys and unsigned 64bit integers, by default fnv64 hashing is used. + Hasher Hasher + // HardMaxCacheSize is a limit for cache size in MB. Cache will not allocate more memory than this limit. + // It can protect application from consuming all available memory on machine, therefore from running OOM Killer. + // Default value is 0 which means unlimited size. When the limit is higher than 0 and reached then + // the oldest entries are overridden for the new ones. + HardMaxCacheSize int + // OnRemove is a callback fired when the oldest entry is removed because of its expiration time or no space left + // for the new entry, or because delete was called. + // Default value is nil which means no callback and it prevents from unwrapping the oldest entry. + OnRemove func(key string, entry []byte) + // OnRemoveWithReason is a callback fired when the oldest entry is removed because of its expiration time or no space left + // for the new entry, or because delete was called. A constant representing the reason will be passed through. + // Default value is nil which means no callback and it prevents from unwrapping the oldest entry. + // Ignored if OnRemove is specified. + OnRemoveWithReason func(key string, entry []byte, reason RemoveReason) + + onRemoveFilter int + + // Logger is a logging interface and used in combination with `Verbose` + // Defaults to `DefaultLogger()` + Logger Logger +} + +// DefaultConfig initializes config with default values. +// When load for BigCache can be predicted in advance then it is better to use custom config. +func DefaultConfig(eviction time.Duration) Config { + return Config{ + Shards: 1024, + LifeWindow: eviction, + CleanWindow: 0, + MaxEntriesInWindow: 1000 * 10 * 60, + MaxEntrySize: 500, + Verbose: true, + Hasher: newDefaultHasher(), + HardMaxCacheSize: 0, + Logger: DefaultLogger(), + } +} + +// initialShardSize computes initial shard size +func (c Config) initialShardSize() int { + return max(c.MaxEntriesInWindow/c.Shards, minimumEntriesInShard) +} + +// maximumShardSize computes maximum shard size +func (c Config) maximumShardSize() int { + maxShardSize := 0 + + if c.HardMaxCacheSize > 0 { + maxShardSize = convertMBToBytes(c.HardMaxCacheSize) / c.Shards + } + + return maxShardSize +} + +// OnRemoveFilterSet sets which remove reasons will trigger a call to OnRemoveWithReason. +// Filtering out reasons prevents bigcache from unwrapping them, which saves cpu. +func (c Config) OnRemoveFilterSet(reasons ...RemoveReason) Config { + c.onRemoveFilter = 0 + for i := range reasons { + c.onRemoveFilter |= 1 << uint(reasons[i]) + } + + return c +} diff --git a/vendor/github.com/allegro/bigcache/encoding.go b/vendor/github.com/allegro/bigcache/encoding.go new file mode 100644 index 000000000..4d434e5dc --- /dev/null +++ b/vendor/github.com/allegro/bigcache/encoding.go @@ -0,0 +1,62 @@ +package bigcache + +import ( + "encoding/binary" +) + +const ( + timestampSizeInBytes = 8 // Number of bytes used for timestamp + hashSizeInBytes = 8 // Number of bytes used for hash + keySizeInBytes = 2 // Number of bytes used for size of entry key + headersSizeInBytes = timestampSizeInBytes + hashSizeInBytes + keySizeInBytes // Number of bytes used for all headers +) + +func wrapEntry(timestamp uint64, hash uint64, key string, entry []byte, buffer *[]byte) []byte { + keyLength := len(key) + blobLength := len(entry) + headersSizeInBytes + keyLength + + if blobLength > len(*buffer) { + *buffer = make([]byte, blobLength) + } + blob := *buffer + + binary.LittleEndian.PutUint64(blob, timestamp) + binary.LittleEndian.PutUint64(blob[timestampSizeInBytes:], hash) + binary.LittleEndian.PutUint16(blob[timestampSizeInBytes+hashSizeInBytes:], uint16(keyLength)) + copy(blob[headersSizeInBytes:], key) + copy(blob[headersSizeInBytes+keyLength:], entry) + + return blob[:blobLength] +} + +func readEntry(data []byte) []byte { + length := binary.LittleEndian.Uint16(data[timestampSizeInBytes+hashSizeInBytes:]) + + // copy on read + dst := make([]byte, len(data)-int(headersSizeInBytes+length)) + copy(dst, data[headersSizeInBytes+length:]) + + return dst +} + +func readTimestampFromEntry(data []byte) uint64 { + return binary.LittleEndian.Uint64(data) +} + +func readKeyFromEntry(data []byte) string { + length := binary.LittleEndian.Uint16(data[timestampSizeInBytes+hashSizeInBytes:]) + + // copy on read + dst := make([]byte, length) + copy(dst, data[headersSizeInBytes:headersSizeInBytes+length]) + + return bytesToString(dst) +} + +func readHashFromEntry(data []byte) uint64 { + return binary.LittleEndian.Uint64(data[timestampSizeInBytes:]) +} + +func resetKeyFromEntry(data []byte) { + binary.LittleEndian.PutUint64(data[timestampSizeInBytes:], 0) +} diff --git a/vendor/github.com/allegro/bigcache/entry_not_found_error.go b/vendor/github.com/allegro/bigcache/entry_not_found_error.go new file mode 100644 index 000000000..883bdc29e --- /dev/null +++ b/vendor/github.com/allegro/bigcache/entry_not_found_error.go @@ -0,0 +1,17 @@ +package bigcache + +import "fmt" + +// EntryNotFoundError is an error type struct which is returned when entry was not found for provided key +type EntryNotFoundError struct { + key string +} + +func notFound(key string) error { + return &EntryNotFoundError{key} +} + +// Error returned when entry does not exist. +func (e EntryNotFoundError) Error() string { + return fmt.Sprintf("Entry %q not found", e.key) +} diff --git a/vendor/github.com/allegro/bigcache/fnv.go b/vendor/github.com/allegro/bigcache/fnv.go new file mode 100644 index 000000000..188c9aa6d --- /dev/null +++ b/vendor/github.com/allegro/bigcache/fnv.go @@ -0,0 +1,28 @@ +package bigcache + +// newDefaultHasher returns a new 64-bit FNV-1a Hasher which makes no memory allocations. +// Its Sum64 method will lay the value out in big-endian byte order. +// See https://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function +func newDefaultHasher() Hasher { + return fnv64a{} +} + +type fnv64a struct{} + +const ( + // offset64 FNVa offset basis. See https://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function#FNV-1a_hash + offset64 = 14695981039346656037 + // prime64 FNVa prime value. See https://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function#FNV-1a_hash + prime64 = 1099511628211 +) + +// Sum64 gets the string and returns its uint64 hash value. +func (f fnv64a) Sum64(key string) uint64 { + var hash uint64 = offset64 + for i := 0; i < len(key); i++ { + hash ^= uint64(key[i]) + hash *= prime64 + } + + return hash +} diff --git a/vendor/github.com/allegro/bigcache/hash.go b/vendor/github.com/allegro/bigcache/hash.go new file mode 100644 index 000000000..5f8ade774 --- /dev/null +++ b/vendor/github.com/allegro/bigcache/hash.go @@ -0,0 +1,8 @@ +package bigcache + +// Hasher is responsible for generating unsigned, 64 bit hash of provided string. Hasher should minimize collisions +// (generating same hash for different strings) and while performance is also important fast functions are preferable (i.e. +// you can use FarmHash family). +type Hasher interface { + Sum64(string) uint64 +} diff --git a/vendor/github.com/allegro/bigcache/iterator.go b/vendor/github.com/allegro/bigcache/iterator.go new file mode 100644 index 000000000..70b98d900 --- /dev/null +++ b/vendor/github.com/allegro/bigcache/iterator.go @@ -0,0 +1,122 @@ +package bigcache + +import "sync" + +type iteratorError string + +func (e iteratorError) Error() string { + return string(e) +} + +// ErrInvalidIteratorState is reported when iterator is in invalid state +const ErrInvalidIteratorState = iteratorError("Iterator is in invalid state. Use SetNext() to move to next position") + +// ErrCannotRetrieveEntry is reported when entry cannot be retrieved from underlying +const ErrCannotRetrieveEntry = iteratorError("Could not retrieve entry from cache") + +var emptyEntryInfo = EntryInfo{} + +// EntryInfo holds informations about entry in the cache +type EntryInfo struct { + timestamp uint64 + hash uint64 + key string + value []byte +} + +// Key returns entry's underlying key +func (e EntryInfo) Key() string { + return e.key +} + +// Hash returns entry's hash value +func (e EntryInfo) Hash() uint64 { + return e.hash +} + +// Timestamp returns entry's timestamp (time of insertion) +func (e EntryInfo) Timestamp() uint64 { + return e.timestamp +} + +// Value returns entry's underlying value +func (e EntryInfo) Value() []byte { + return e.value +} + +// EntryInfoIterator allows to iterate over entries in the cache +type EntryInfoIterator struct { + mutex sync.Mutex + cache *BigCache + currentShard int + currentIndex int + elements []uint32 + elementsCount int + valid bool +} + +// SetNext moves to next element and returns true if it exists. +func (it *EntryInfoIterator) SetNext() bool { + it.mutex.Lock() + + it.valid = false + it.currentIndex++ + + if it.elementsCount > it.currentIndex { + it.valid = true + it.mutex.Unlock() + return true + } + + for i := it.currentShard + 1; i < it.cache.config.Shards; i++ { + it.elements, it.elementsCount = it.cache.shards[i].copyKeys() + + // Non empty shard - stick with it + if it.elementsCount > 0 { + it.currentIndex = 0 + it.currentShard = i + it.valid = true + it.mutex.Unlock() + return true + } + } + it.mutex.Unlock() + return false +} + +func newIterator(cache *BigCache) *EntryInfoIterator { + elements, count := cache.shards[0].copyKeys() + + return &EntryInfoIterator{ + cache: cache, + currentShard: 0, + currentIndex: -1, + elements: elements, + elementsCount: count, + } +} + +// Value returns current value from the iterator +func (it *EntryInfoIterator) Value() (EntryInfo, error) { + it.mutex.Lock() + + if !it.valid { + it.mutex.Unlock() + return emptyEntryInfo, ErrInvalidIteratorState + } + + entry, err := it.cache.shards[it.currentShard].getEntry(int(it.elements[it.currentIndex])) + + if err != nil { + it.mutex.Unlock() + return emptyEntryInfo, ErrCannotRetrieveEntry + } + it.mutex.Unlock() + + return EntryInfo{ + timestamp: readTimestampFromEntry(entry), + hash: readHashFromEntry(entry), + key: readKeyFromEntry(entry), + value: readEntry(entry), + }, nil +} diff --git a/vendor/github.com/allegro/bigcache/logger.go b/vendor/github.com/allegro/bigcache/logger.go new file mode 100644 index 000000000..50e84abc8 --- /dev/null +++ b/vendor/github.com/allegro/bigcache/logger.go @@ -0,0 +1,30 @@ +package bigcache + +import ( + "log" + "os" +) + +// Logger is invoked when `Config.Verbose=true` +type Logger interface { + Printf(format string, v ...interface{}) +} + +// this is a safeguard, breaking on compile time in case +// `log.Logger` does not adhere to our `Logger` interface. +// see https://golang.org/doc/faq#guarantee_satisfies_interface +var _ Logger = &log.Logger{} + +// DefaultLogger returns a `Logger` implementation +// backed by stdlib's log +func DefaultLogger() *log.Logger { + return log.New(os.Stdout, "", log.LstdFlags) +} + +func newLogger(custom Logger) Logger { + if custom != nil { + return custom + } + + return DefaultLogger() +} diff --git a/vendor/github.com/allegro/bigcache/queue/bytes_queue.go b/vendor/github.com/allegro/bigcache/queue/bytes_queue.go new file mode 100644 index 000000000..0285c72cd --- /dev/null +++ b/vendor/github.com/allegro/bigcache/queue/bytes_queue.go @@ -0,0 +1,210 @@ +package queue + +import ( + "encoding/binary" + "log" + "time" +) + +const ( + // Number of bytes used to keep information about entry size + headerEntrySize = 4 + // Bytes before left margin are not used. Zero index means element does not exist in queue, useful while reading slice from index + leftMarginIndex = 1 + // Minimum empty blob size in bytes. Empty blob fills space between tail and head in additional memory allocation. + // It keeps entries indexes unchanged + minimumEmptyBlobSize = 32 + headerEntrySize +) + +// BytesQueue is a non-thread safe queue type of fifo based on bytes array. +// For every push operation index of entry is returned. It can be used to read the entry later +type BytesQueue struct { + array []byte + capacity int + maxCapacity int + head int + tail int + count int + rightMargin int + headerBuffer []byte + verbose bool + initialCapacity int +} + +type queueError struct { + message string +} + +// NewBytesQueue initialize new bytes queue. +// Initial capacity is used in bytes array allocation +// When verbose flag is set then information about memory allocation are printed +func NewBytesQueue(initialCapacity int, maxCapacity int, verbose bool) *BytesQueue { + return &BytesQueue{ + array: make([]byte, initialCapacity), + capacity: initialCapacity, + maxCapacity: maxCapacity, + headerBuffer: make([]byte, headerEntrySize), + tail: leftMarginIndex, + head: leftMarginIndex, + rightMargin: leftMarginIndex, + verbose: verbose, + initialCapacity: initialCapacity, + } +} + +// Reset removes all entries from queue +func (q *BytesQueue) Reset() { + // Just reset indexes + q.tail = leftMarginIndex + q.head = leftMarginIndex + q.rightMargin = leftMarginIndex + q.count = 0 +} + +// Push copies entry at the end of queue and moves tail pointer. Allocates more space if needed. +// Returns index for pushed data or error if maximum size queue limit is reached. +func (q *BytesQueue) Push(data []byte) (int, error) { + dataLen := len(data) + + if q.availableSpaceAfterTail() < dataLen+headerEntrySize { + if q.availableSpaceBeforeHead() >= dataLen+headerEntrySize { + q.tail = leftMarginIndex + } else if q.capacity+headerEntrySize+dataLen >= q.maxCapacity && q.maxCapacity > 0 { + return -1, &queueError{"Full queue. Maximum size limit reached."} + } else { + q.allocateAdditionalMemory(dataLen + headerEntrySize) + } + } + + index := q.tail + + q.push(data, dataLen) + + return index, nil +} + +func (q *BytesQueue) allocateAdditionalMemory(minimum int) { + start := time.Now() + if q.capacity < minimum { + q.capacity += minimum + } + q.capacity = q.capacity * 2 + if q.capacity > q.maxCapacity && q.maxCapacity > 0 { + q.capacity = q.maxCapacity + } + + oldArray := q.array + q.array = make([]byte, q.capacity) + + if leftMarginIndex != q.rightMargin { + copy(q.array, oldArray[:q.rightMargin]) + + if q.tail < q.head { + emptyBlobLen := q.head - q.tail - headerEntrySize + q.push(make([]byte, emptyBlobLen), emptyBlobLen) + q.head = leftMarginIndex + q.tail = q.rightMargin + } + } + + if q.verbose { + log.Printf("Allocated new queue in %s; Capacity: %d \n", time.Since(start), q.capacity) + } +} + +func (q *BytesQueue) push(data []byte, len int) { + binary.LittleEndian.PutUint32(q.headerBuffer, uint32(len)) + q.copy(q.headerBuffer, headerEntrySize) + + q.copy(data, len) + + if q.tail > q.head { + q.rightMargin = q.tail + } + + q.count++ +} + +func (q *BytesQueue) copy(data []byte, len int) { + q.tail += copy(q.array[q.tail:], data[:len]) +} + +// Pop reads the oldest entry from queue and moves head pointer to the next one +func (q *BytesQueue) Pop() ([]byte, error) { + data, size, err := q.peek(q.head) + if err != nil { + return nil, err + } + + q.head += headerEntrySize + size + q.count-- + + if q.head == q.rightMargin { + q.head = leftMarginIndex + if q.tail == q.rightMargin { + q.tail = leftMarginIndex + } + q.rightMargin = q.tail + } + + return data, nil +} + +// Peek reads the oldest entry from list without moving head pointer +func (q *BytesQueue) Peek() ([]byte, error) { + data, _, err := q.peek(q.head) + return data, err +} + +// Get reads entry from index +func (q *BytesQueue) Get(index int) ([]byte, error) { + data, _, err := q.peek(index) + return data, err +} + +// Capacity returns number of allocated bytes for queue +func (q *BytesQueue) Capacity() int { + return q.capacity +} + +// Len returns number of entries kept in queue +func (q *BytesQueue) Len() int { + return q.count +} + +// Error returns error message +func (e *queueError) Error() string { + return e.message +} + +func (q *BytesQueue) peek(index int) ([]byte, int, error) { + + if q.count == 0 { + return nil, 0, &queueError{"Empty queue"} + } + + if index <= 0 { + return nil, 0, &queueError{"Index must be grater than zero. Invalid index."} + } + + if index+headerEntrySize >= len(q.array) { + return nil, 0, &queueError{"Index out of range"} + } + + blockSize := int(binary.LittleEndian.Uint32(q.array[index : index+headerEntrySize])) + return q.array[index+headerEntrySize : index+headerEntrySize+blockSize], blockSize, nil +} + +func (q *BytesQueue) availableSpaceAfterTail() int { + if q.tail >= q.head { + return q.capacity - q.tail + } + return q.head - q.tail - minimumEmptyBlobSize +} + +func (q *BytesQueue) availableSpaceBeforeHead() int { + if q.tail >= q.head { + return q.head - leftMarginIndex - minimumEmptyBlobSize + } + return q.head - q.tail - minimumEmptyBlobSize +} diff --git a/vendor/github.com/allegro/bigcache/shard.go b/vendor/github.com/allegro/bigcache/shard.go new file mode 100644 index 000000000..56a9fb089 --- /dev/null +++ b/vendor/github.com/allegro/bigcache/shard.go @@ -0,0 +1,236 @@ +package bigcache + +import ( + "fmt" + "sync" + "sync/atomic" + + "github.com/allegro/bigcache/queue" +) + +type onRemoveCallback func(wrappedEntry []byte, reason RemoveReason) + +type cacheShard struct { + hashmap map[uint64]uint32 + entries queue.BytesQueue + lock sync.RWMutex + entryBuffer []byte + onRemove onRemoveCallback + + isVerbose bool + logger Logger + clock clock + lifeWindow uint64 + + stats Stats +} + +func (s *cacheShard) get(key string, hashedKey uint64) ([]byte, error) { + s.lock.RLock() + itemIndex := s.hashmap[hashedKey] + + if itemIndex == 0 { + s.lock.RUnlock() + s.miss() + return nil, notFound(key) + } + + wrappedEntry, err := s.entries.Get(int(itemIndex)) + if err != nil { + s.lock.RUnlock() + s.miss() + return nil, err + } + if entryKey := readKeyFromEntry(wrappedEntry); key != entryKey { + if s.isVerbose { + s.logger.Printf("Collision detected. Both %q and %q have the same hash %x", key, entryKey, hashedKey) + } + s.lock.RUnlock() + s.collision() + return nil, notFound(key) + } + s.lock.RUnlock() + s.hit() + return readEntry(wrappedEntry), nil +} + +func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error { + currentTimestamp := uint64(s.clock.epoch()) + + s.lock.Lock() + + if previousIndex := s.hashmap[hashedKey]; previousIndex != 0 { + if previousEntry, err := s.entries.Get(int(previousIndex)); err == nil { + resetKeyFromEntry(previousEntry) + } + } + + if oldestEntry, err := s.entries.Peek(); err == nil { + s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry) + } + + w := wrapEntry(currentTimestamp, hashedKey, key, entry, &s.entryBuffer) + + for { + if index, err := s.entries.Push(w); err == nil { + s.hashmap[hashedKey] = uint32(index) + s.lock.Unlock() + return nil + } + if s.removeOldestEntry(NoSpace) != nil { + s.lock.Unlock() + return fmt.Errorf("entry is bigger than max shard size") + } + } +} + +func (s *cacheShard) del(key string, hashedKey uint64) error { + s.lock.RLock() + itemIndex := s.hashmap[hashedKey] + + if itemIndex == 0 { + s.lock.RUnlock() + s.delmiss() + return notFound(key) + } + + wrappedEntry, err := s.entries.Get(int(itemIndex)) + if err != nil { + s.lock.RUnlock() + s.delmiss() + return err + } + s.lock.RUnlock() + + s.lock.Lock() + { + delete(s.hashmap, hashedKey) + s.onRemove(wrappedEntry, Deleted) + resetKeyFromEntry(wrappedEntry) + } + s.lock.Unlock() + + s.delhit() + return nil +} + +func (s *cacheShard) onEvict(oldestEntry []byte, currentTimestamp uint64, evict func(reason RemoveReason) error) bool { + oldestTimestamp := readTimestampFromEntry(oldestEntry) + if currentTimestamp-oldestTimestamp > s.lifeWindow { + evict(Expired) + return true + } + return false +} + +func (s *cacheShard) cleanUp(currentTimestamp uint64) { + s.lock.Lock() + for { + if oldestEntry, err := s.entries.Peek(); err != nil { + break + } else if evicted := s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry); !evicted { + break + } + } + s.lock.Unlock() +} + +func (s *cacheShard) getOldestEntry() ([]byte, error) { + return s.entries.Peek() +} + +func (s *cacheShard) getEntry(index int) ([]byte, error) { + return s.entries.Get(index) +} + +func (s *cacheShard) copyKeys() (keys []uint32, next int) { + keys = make([]uint32, len(s.hashmap)) + + s.lock.RLock() + + for _, index := range s.hashmap { + keys[next] = index + next++ + } + + s.lock.RUnlock() + return keys, next +} + +func (s *cacheShard) removeOldestEntry(reason RemoveReason) error { + oldest, err := s.entries.Pop() + if err == nil { + hash := readHashFromEntry(oldest) + delete(s.hashmap, hash) + s.onRemove(oldest, reason) + return nil + } + return err +} + +func (s *cacheShard) reset(config Config) { + s.lock.Lock() + s.hashmap = make(map[uint64]uint32, config.initialShardSize()) + s.entryBuffer = make([]byte, config.MaxEntrySize+headersSizeInBytes) + s.entries.Reset() + s.lock.Unlock() +} + +func (s *cacheShard) len() int { + s.lock.RLock() + res := len(s.hashmap) + s.lock.RUnlock() + return res +} + +func (s *cacheShard) capacity() int { + s.lock.RLock() + res := s.entries.Capacity() + s.lock.RUnlock() + return res +} + +func (s *cacheShard) getStats() Stats { + var stats = Stats{ + Hits: atomic.LoadInt64(&s.stats.Hits), + Misses: atomic.LoadInt64(&s.stats.Misses), + DelHits: atomic.LoadInt64(&s.stats.DelHits), + DelMisses: atomic.LoadInt64(&s.stats.DelMisses), + Collisions: atomic.LoadInt64(&s.stats.Collisions), + } + return stats +} + +func (s *cacheShard) hit() { + atomic.AddInt64(&s.stats.Hits, 1) +} + +func (s *cacheShard) miss() { + atomic.AddInt64(&s.stats.Misses, 1) +} + +func (s *cacheShard) delhit() { + atomic.AddInt64(&s.stats.DelHits, 1) +} + +func (s *cacheShard) delmiss() { + atomic.AddInt64(&s.stats.DelMisses, 1) +} + +func (s *cacheShard) collision() { + atomic.AddInt64(&s.stats.Collisions, 1) +} + +func initNewShard(config Config, callback onRemoveCallback, clock clock) *cacheShard { + return &cacheShard{ + hashmap: make(map[uint64]uint32, config.initialShardSize()), + entries: *queue.NewBytesQueue(config.initialShardSize()*config.MaxEntrySize, config.maximumShardSize(), config.Verbose), + entryBuffer: make([]byte, config.MaxEntrySize+headersSizeInBytes), + onRemove: callback, + + isVerbose: config.Verbose, + logger: newLogger(config.Logger), + clock: clock, + lifeWindow: uint64(config.LifeWindow.Seconds()), + } +} diff --git a/vendor/github.com/allegro/bigcache/stats.go b/vendor/github.com/allegro/bigcache/stats.go new file mode 100644 index 000000000..07157132a --- /dev/null +++ b/vendor/github.com/allegro/bigcache/stats.go @@ -0,0 +1,15 @@ +package bigcache + +// Stats stores cache statistics +type Stats struct { + // Hits is a number of successfully found keys + Hits int64 `json:"hits"` + // Misses is a number of not found keys + Misses int64 `json:"misses"` + // DelHits is a number of successfully deleted keys + DelHits int64 `json:"delete_hits"` + // DelMisses is a number of not deleted keys + DelMisses int64 `json:"delete_misses"` + // Collisions is a number of happened key-collisions + Collisions int64 `json:"collisions"` +} diff --git a/vendor/github.com/allegro/bigcache/utils.go b/vendor/github.com/allegro/bigcache/utils.go new file mode 100644 index 000000000..ca1df79b9 --- /dev/null +++ b/vendor/github.com/allegro/bigcache/utils.go @@ -0,0 +1,16 @@ +package bigcache + +func max(a, b int) int { + if a > b { + return a + } + return b +} + +func convertMBToBytes(value int) int { + return value * 1024 * 1024 +} + +func isPowerOfTwo(number int) bool { + return (number & (number - 1)) == 0 +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 06268535e..1bfe09da7 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -39,6 +39,18 @@ "revisionTime": "2018-01-16T20:38:02Z" }, { + "checksumSHA1": "9Niiu1GNhWUrXnGZrl8AU4EzbVE=", + "path": "github.com/allegro/bigcache", + "revision": "bff00e20c68d9f136477d62d182a7dc917bae0ca", + "revisionTime": "2018-10-22T20:06:25Z" + }, + { + "checksumSHA1": "zqToN+R6KybEskp1D4G/lAOKXU4=", + "path": "github.com/allegro/bigcache/queue", + "revision": "bff00e20c68d9f136477d62d182a7dc917bae0ca", + "revisionTime": "2018-10-22T20:06:25Z" + }, + { "checksumSHA1": "USkefO0g1U9mr+8hagv3fpSkrxg=", "path": "github.com/aristanetworks/goarista/monotime", "revision": "ea17b1a17847fb6e4c0a91de0b674704693469b0", |