aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/ldbstore.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/ldbstore.go')
-rw-r--r--swarm/storage/ldbstore.go1082
1 files changed, 0 insertions, 1082 deletions
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go
deleted file mode 100644
index fd5ec9e30..000000000
--- a/swarm/storage/ldbstore.go
+++ /dev/null
@@ -1,1082 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-// disk storage layer for the package bzz
-// DbStore implements the ChunkStore interface and is used by the FileStore as
-// persistent storage of chunks
-// it implements purging based on access count allowing for external control of
-// max capacity
-
-package storage
-
-import (
- "archive/tar"
- "bytes"
- "context"
- "encoding/binary"
- "encoding/hex"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "sync"
-
- "github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/rlp"
- "github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/storage/mock"
- "github.com/syndtr/goleveldb/leveldb"
-)
-
-const (
- defaultGCRatio = 10
- defaultMaxGCRound = 10000
- defaultMaxGCBatch = 5000
-
- wEntryCnt = 1 << 0
- wIndexCnt = 1 << 1
- wAccessCnt = 1 << 2
-)
-
-var (
- dbEntryCount = metrics.NewRegisteredCounter("ldbstore.entryCnt", nil)
-)
-
-var (
- keyIndex = byte(0)
- keyAccessCnt = []byte{2}
- keyEntryCnt = []byte{3}
- keyDataIdx = []byte{4}
- keyData = byte(6)
- keyDistanceCnt = byte(7)
- keySchema = []byte{8}
- keyGCIdx = byte(9) // access to chunk data index, used by garbage collection in ascending order from first entry
-)
-
-var (
- ErrDBClosed = errors.New("LDBStore closed")
-)
-
-type LDBStoreParams struct {
- *StoreParams
- Path string
- Po func(Address) uint8
-}
-
-// NewLDBStoreParams constructs LDBStoreParams with the specified values.
-func NewLDBStoreParams(storeparams *StoreParams, path string) *LDBStoreParams {
- return &LDBStoreParams{
- StoreParams: storeparams,
- Path: path,
- Po: func(k Address) (ret uint8) { return uint8(Proximity(storeparams.BaseKey, k[:])) },
- }
-}
-
-type garbage struct {
- maxRound int // maximum number of chunks to delete in one garbage collection round
- maxBatch int // maximum number of chunks to delete in one db request batch
- ratio int // 1/x ratio to calculate the number of chunks to gc on a low capacity db
- count int // number of chunks deleted in running round
- target int // number of chunks to delete in running round
- batch *dbBatch // the delete batch
- runC chan struct{} // struct in chan means gc is NOT running
-}
-
-type LDBStore struct {
- db *LDBDatabase
-
- // this should be stored in db, accessed transactionally
- entryCnt uint64 // number of items in the LevelDB
- accessCnt uint64 // ever-accumulating number increased every time we read/access an entry
- dataIdx uint64 // similar to entryCnt, but we only increment it
- capacity uint64
- bucketCnt []uint64
-
- hashfunc SwarmHasher
- po func(Address) uint8
-
- batchesC chan struct{}
- closed bool
- batch *dbBatch
- lock sync.RWMutex
- quit chan struct{}
- gc *garbage
-
- // Functions encodeDataFunc is used to bypass
- // the default functionality of DbStore with
- // mock.NodeStore for testing purposes.
- encodeDataFunc func(chunk Chunk) []byte
- // If getDataFunc is defined, it will be used for
- // retrieving the chunk data instead from the local
- // LevelDB database.
- getDataFunc func(key Address) (data []byte, err error)
-}
-
-type dbBatch struct {
- *leveldb.Batch
- err error
- c chan struct{}
-}
-
-func newBatch() *dbBatch {
- return &dbBatch{Batch: new(leveldb.Batch), c: make(chan struct{})}
-}
-
-// TODO: Instead of passing the distance function, just pass the address from which distances are calculated
-// to avoid the appearance of a pluggable distance metric and opportunities of bugs associated with providing
-// a function different from the one that is actually used.
-func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) {
- s = new(LDBStore)
- s.hashfunc = params.Hash
- s.quit = make(chan struct{})
-
- s.batchesC = make(chan struct{}, 1)
- go s.writeBatches()
- s.batch = newBatch()
- // associate encodeData with default functionality
- s.encodeDataFunc = encodeData
-
- s.db, err = NewLDBDatabase(params.Path)
- if err != nil {
- return nil, err
- }
-
- s.po = params.Po
- s.setCapacity(params.DbCapacity)
-
- s.bucketCnt = make([]uint64, 0x100)
- for i := 0; i < 0x100; i++ {
- k := make([]byte, 2)
- k[0] = keyDistanceCnt
- k[1] = uint8(i)
- cnt, _ := s.db.Get(k)
- s.bucketCnt[i] = BytesToU64(cnt)
- }
- data, _ := s.db.Get(keyEntryCnt)
- s.entryCnt = BytesToU64(data)
- data, _ = s.db.Get(keyAccessCnt)
- s.accessCnt = BytesToU64(data)
- data, _ = s.db.Get(keyDataIdx)
- s.dataIdx = BytesToU64(data)
-
- // set up garbage collection
- s.gc = &garbage{
- maxBatch: defaultMaxGCBatch,
- maxRound: defaultMaxGCRound,
- ratio: defaultGCRatio,
- }
-
- s.gc.runC = make(chan struct{}, 1)
- s.gc.runC <- struct{}{}
-
- return s, nil
-}
-
-// MarkAccessed increments the access counter as a best effort for a chunk, so
-// the chunk won't get garbage collected.
-func (s *LDBStore) MarkAccessed(addr Address) {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- if s.closed {
- return
- }
-
- proximity := s.po(addr)
- s.tryAccessIdx(addr, proximity)
-}
-
-// initialize and set values for processing of gc round
-func (s *LDBStore) startGC(c int) {
-
- s.gc.count = 0
- // calculate the target number of deletions
- if c >= s.gc.maxRound {
- s.gc.target = s.gc.maxRound
- } else {
- s.gc.target = c / s.gc.ratio
- }
- s.gc.batch = newBatch()
- log.Debug("startgc", "requested", c, "target", s.gc.target)
-}
-
-// NewMockDbStore creates a new instance of DbStore with
-// mockStore set to a provided value. If mockStore argument is nil,
-// this function behaves exactly as NewDbStore.
-func NewMockDbStore(params *LDBStoreParams, mockStore *mock.NodeStore) (s *LDBStore, err error) {
- s, err = NewLDBStore(params)
- if err != nil {
- return nil, err
- }
-
- // replace put and get with mock store functionality
- if mockStore != nil {
- s.encodeDataFunc = newMockEncodeDataFunc(mockStore)
- s.getDataFunc = newMockGetDataFunc(mockStore)
- }
- return
-}
-
-type dpaDBIndex struct {
- Idx uint64
- Access uint64
-}
-
-func BytesToU64(data []byte) uint64 {
- if len(data) < 8 {
- return 0
- }
- return binary.BigEndian.Uint64(data)
-}
-
-func U64ToBytes(val uint64) []byte {
- data := make([]byte, 8)
- binary.BigEndian.PutUint64(data, val)
- return data
-}
-
-func getIndexKey(hash Address) []byte {
- hashSize := len(hash)
- key := make([]byte, hashSize+1)
- key[0] = keyIndex
- copy(key[1:], hash[:])
- return key
-}
-
-func getDataKey(idx uint64, po uint8) []byte {
- key := make([]byte, 10)
- key[0] = keyData
- key[1] = po
- binary.BigEndian.PutUint64(key[2:], idx)
-
- return key
-}
-
-func getGCIdxKey(index *dpaDBIndex) []byte {
- key := make([]byte, 9)
- key[0] = keyGCIdx
- binary.BigEndian.PutUint64(key[1:], index.Access)
- return key
-}
-
-func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte {
- val := make([]byte, 41) // po = 1, index.Index = 8, Address = 32
- val[0] = po
- binary.BigEndian.PutUint64(val[1:], index.Idx)
- copy(val[9:], addr)
- return val
-}
-
-func parseIdxKey(key []byte) (byte, []byte) {
- return key[0], key[1:]
-}
-
-func parseGCIdxEntry(accessCnt []byte, val []byte) (index *dpaDBIndex, po uint8, addr Address) {
- index = &dpaDBIndex{
- Idx: binary.BigEndian.Uint64(val[1:]),
- Access: binary.BigEndian.Uint64(accessCnt),
- }
- po = val[0]
- addr = val[9:]
- return
-}
-
-func encodeIndex(index *dpaDBIndex) []byte {
- data, _ := rlp.EncodeToBytes(index)
- return data
-}
-
-func encodeData(chunk Chunk) []byte {
- // Always create a new underlying array for the returned byte slice.
- // The chunk.Address array may be used in the returned slice which
- // may be changed later in the code or by the LevelDB, resulting
- // that the Address is changed as well.
- return append(append([]byte{}, chunk.Address()[:]...), chunk.Data()...)
-}
-
-func decodeIndex(data []byte, index *dpaDBIndex) error {
- dec := rlp.NewStream(bytes.NewReader(data), 0)
- return dec.Decode(index)
-}
-
-func decodeData(addr Address, data []byte) (Chunk, error) {
- return NewChunk(addr, data[32:]), nil
-}
-
-func (s *LDBStore) collectGarbage() error {
- // prevent duplicate gc from starting when one is already running
- select {
- case <-s.gc.runC:
- default:
- return nil
- }
-
- s.lock.Lock()
- entryCnt := s.entryCnt
- s.lock.Unlock()
-
- metrics.GetOrRegisterCounter("ldbstore.collectgarbage", nil).Inc(1)
-
- // calculate the amount of chunks to collect and reset counter
- s.startGC(int(entryCnt))
- log.Debug("collectGarbage", "target", s.gc.target, "entryCnt", entryCnt)
-
- for s.gc.count < s.gc.target {
- it := s.db.NewIterator()
- ok := it.Seek([]byte{keyGCIdx})
- var singleIterationCount int
-
- // every batch needs a lock so we avoid entries changing accessidx in the meantime
- s.lock.Lock()
- for ; ok && (singleIterationCount < s.gc.maxBatch); ok = it.Next() {
-
- // quit if no more access index keys
- itkey := it.Key()
- if (itkey == nil) || (itkey[0] != keyGCIdx) {
- break
- }
-
- // get chunk data entry from access index
- val := it.Value()
- index, po, hash := parseGCIdxEntry(itkey[1:], val)
- keyIdx := make([]byte, 33)
- keyIdx[0] = keyIndex
- copy(keyIdx[1:], hash)
-
- // add delete operation to batch
- s.delete(s.gc.batch.Batch, index, keyIdx, po)
- singleIterationCount++
- s.gc.count++
- log.Trace("garbage collect enqueued chunk for deletion", "key", hash)
-
- // break if target is not on max garbage batch boundary
- if s.gc.count >= s.gc.target {
- break
- }
- }
-
- s.writeBatch(s.gc.batch, wEntryCnt)
- log.Trace("garbage collect batch done", "batch", singleIterationCount, "total", s.gc.count)
- s.lock.Unlock()
- it.Release()
- }
-
- metrics.GetOrRegisterCounter("ldbstore.collectgarbage.delete", nil).Inc(int64(s.gc.count))
- log.Debug("garbage collect done", "c", s.gc.count)
- s.gc.runC <- struct{}{}
-
- return nil
-}
-
-// Export writes all chunks from the store to a tar archive, returning the
-// number of chunks written.
-func (s *LDBStore) Export(out io.Writer) (int64, error) {
- tw := tar.NewWriter(out)
- defer tw.Close()
-
- it := s.db.NewIterator()
- defer it.Release()
- var count int64
- for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
- key := it.Key()
- if (key == nil) || (key[0] != keyIndex) {
- break
- }
-
- var index dpaDBIndex
-
- hash := key[1:]
- decodeIndex(it.Value(), &index)
- po := s.po(hash)
- datakey := getDataKey(index.Idx, po)
- log.Trace("store.export", "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po)
- data, err := s.db.Get(datakey)
- if err != nil {
- log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key, err))
- continue
- }
-
- hdr := &tar.Header{
- Name: hex.EncodeToString(hash),
- Mode: 0644,
- Size: int64(len(data)),
- }
- if err := tw.WriteHeader(hdr); err != nil {
- return count, err
- }
- if _, err := tw.Write(data); err != nil {
- return count, err
- }
- count++
- }
-
- return count, nil
-}
-
-// of chunks read.
-func (s *LDBStore) Import(in io.Reader) (int64, error) {
- tr := tar.NewReader(in)
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- countC := make(chan int64)
- errC := make(chan error)
- var count int64
- go func() {
- for {
- hdr, err := tr.Next()
- if err == io.EOF {
- break
- } else if err != nil {
- select {
- case errC <- err:
- case <-ctx.Done():
- }
- }
-
- if len(hdr.Name) != 64 {
- log.Warn("ignoring non-chunk file", "name", hdr.Name)
- continue
- }
-
- keybytes, err := hex.DecodeString(hdr.Name)
- if err != nil {
- log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
- continue
- }
-
- data, err := ioutil.ReadAll(tr)
- if err != nil {
- select {
- case errC <- err:
- case <-ctx.Done():
- }
- }
- key := Address(keybytes)
- chunk := NewChunk(key, data[32:])
-
- go func() {
- select {
- case errC <- s.Put(ctx, chunk):
- case <-ctx.Done():
- }
- }()
-
- count++
- }
- countC <- count
- }()
-
- // wait for all chunks to be stored
- i := int64(0)
- var total int64
- for {
- select {
- case err := <-errC:
- if err != nil {
- return count, err
- }
- i++
- case total = <-countC:
- case <-ctx.Done():
- return i, ctx.Err()
- }
- if total > 0 && i == total {
- return total, nil
- }
- }
-}
-
-// Cleanup iterates over the database and deletes chunks if they pass the `f` condition
-func (s *LDBStore) Cleanup(f func(Chunk) bool) {
- var errorsFound, removed, total int
-
- it := s.db.NewIterator()
- defer it.Release()
- for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
- key := it.Key()
- if (key == nil) || (key[0] != keyIndex) {
- break
- }
- total++
- var index dpaDBIndex
- err := decodeIndex(it.Value(), &index)
- if err != nil {
- log.Warn("Cannot decode")
- errorsFound++
- continue
- }
- hash := key[1:]
- po := s.po(hash)
- datakey := getDataKey(index.Idx, po)
- data, err := s.db.Get(datakey)
- if err != nil {
- found := false
-
- // The highest possible proximity is 255, so exit loop upon overflow.
- for po = uint8(1); po != 0; po++ {
- datakey = getDataKey(index.Idx, po)
- data, err = s.db.Get(datakey)
- if err == nil {
- found = true
- break
- }
- }
-
- if !found {
- log.Warn(fmt.Sprintf("Chunk %x found but count not be accessed with any po", key))
- errorsFound++
- continue
- }
- }
-
- ck := data[:32]
- c, err := decodeData(ck, data)
- if err != nil {
- log.Error("decodeData error", "err", err)
- continue
- }
-
- sdata := c.Data()
-
- cs := int64(binary.LittleEndian.Uint64(sdata[:8]))
- log.Trace("chunk", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(sdata), "size", cs)
-
- // if chunk is to be removed
- if f(c) {
- log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(sdata), "size", cs)
- s.deleteNow(&index, getIndexKey(key[1:]), po)
- removed++
- errorsFound++
- }
- }
-
- log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed))
-}
-
-// CleanGCIndex rebuilds the garbage collector index from scratch, while
-// removing inconsistent elements, e.g., indices with missing data chunks.
-// WARN: it's a pretty heavy, long running function.
-func (s *LDBStore) CleanGCIndex() error {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- batch := leveldb.Batch{}
-
- var okEntryCount uint64
- var totalEntryCount uint64
-
- // throw out all gc indices, we will rebuild from cleaned index
- it := s.db.NewIterator()
- it.Seek([]byte{keyGCIdx})
- var gcDeletes int
- for it.Valid() {
- rowType, _ := parseIdxKey(it.Key())
- if rowType != keyGCIdx {
- break
- }
- batch.Delete(it.Key())
- gcDeletes++
- it.Next()
- }
- log.Debug("gc", "deletes", gcDeletes)
- if err := s.db.Write(&batch); err != nil {
- return err
- }
- batch.Reset()
-
- it.Release()
-
- // corrected po index pointer values
- var poPtrs [256]uint64
-
- // set to true if chunk count not on 4096 iteration boundary
- var doneIterating bool
-
- // last key index in previous iteration
- lastIdxKey := []byte{keyIndex}
-
- // counter for debug output
- var cleanBatchCount int
-
- // go through all key index entries
- for !doneIterating {
- cleanBatchCount++
- var idxs []dpaDBIndex
- var chunkHashes [][]byte
- var pos []uint8
- it := s.db.NewIterator()
-
- it.Seek(lastIdxKey)
-
- // 4096 is just a nice number, don't look for any hidden meaning here...
- var i int
- for i = 0; i < 4096; i++ {
-
- // this really shouldn't happen unless database is empty
- // but let's keep it to be safe
- if !it.Valid() {
- doneIterating = true
- break
- }
-
- // if it's not keyindex anymore we're done iterating
- rowType, chunkHash := parseIdxKey(it.Key())
- if rowType != keyIndex {
- doneIterating = true
- break
- }
-
- // decode the retrieved index
- var idx dpaDBIndex
- err := decodeIndex(it.Value(), &idx)
- if err != nil {
- return fmt.Errorf("corrupt index: %v", err)
- }
- po := s.po(chunkHash)
- lastIdxKey = it.Key()
-
- // if we don't find the data key, remove the entry
- // if we find it, add to the array of new gc indices to create
- dataKey := getDataKey(idx.Idx, po)
- _, err = s.db.Get(dataKey)
- if err != nil {
- log.Warn("deleting inconsistent index (missing data)", "key", chunkHash)
- batch.Delete(it.Key())
- } else {
- idxs = append(idxs, idx)
- chunkHashes = append(chunkHashes, chunkHash)
- pos = append(pos, po)
- okEntryCount++
- if idx.Idx > poPtrs[po] {
- poPtrs[po] = idx.Idx
- }
- }
- totalEntryCount++
- it.Next()
- }
- it.Release()
-
- // flush the key index corrections
- err := s.db.Write(&batch)
- if err != nil {
- return err
- }
- batch.Reset()
-
- // add correct gc indices
- for i, okIdx := range idxs {
- gcIdxKey := getGCIdxKey(&okIdx)
- gcIdxData := getGCIdxValue(&okIdx, pos[i], chunkHashes[i])
- batch.Put(gcIdxKey, gcIdxData)
- log.Trace("clean ok", "key", chunkHashes[i], "gcKey", gcIdxKey, "gcData", gcIdxData)
- }
-
- // flush them
- err = s.db.Write(&batch)
- if err != nil {
- return err
- }
- batch.Reset()
-
- log.Debug("clean gc index pass", "batch", cleanBatchCount, "checked", i, "kept", len(idxs))
- }
-
- log.Debug("gc cleanup entries", "ok", okEntryCount, "total", totalEntryCount, "batchlen", batch.Len())
-
- // lastly add updated entry count
- var entryCount [8]byte
- binary.BigEndian.PutUint64(entryCount[:], okEntryCount)
- batch.Put(keyEntryCnt, entryCount[:])
-
- // and add the new po index pointers
- var poKey [2]byte
- poKey[0] = keyDistanceCnt
- for i, poPtr := range poPtrs {
- poKey[1] = uint8(i)
- if poPtr == 0 {
- batch.Delete(poKey[:])
- } else {
- var idxCount [8]byte
- binary.BigEndian.PutUint64(idxCount[:], poPtr)
- batch.Put(poKey[:], idxCount[:])
- }
- }
-
- // if you made it this far your harddisk has survived. Congratulations
- return s.db.Write(&batch)
-}
-
-// Delete is removes a chunk and updates indices.
-// Is thread safe
-func (s *LDBStore) Delete(addr Address) error {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- ikey := getIndexKey(addr)
-
- idata, err := s.db.Get(ikey)
- if err != nil {
- return err
- }
-
- var idx dpaDBIndex
- decodeIndex(idata, &idx)
- proximity := s.po(addr)
- return s.deleteNow(&idx, ikey, proximity)
-}
-
-// executes one delete operation immediately
-// see *LDBStore.delete
-func (s *LDBStore) deleteNow(idx *dpaDBIndex, idxKey []byte, po uint8) error {
- batch := new(leveldb.Batch)
- s.delete(batch, idx, idxKey, po)
- return s.db.Write(batch)
-}
-
-// adds a delete chunk operation to the provided batch
-// if called directly, decrements entrycount regardless if the chunk exists upon deletion. Risk of wrap to max uint64
-func (s *LDBStore) delete(batch *leveldb.Batch, idx *dpaDBIndex, idxKey []byte, po uint8) {
- metrics.GetOrRegisterCounter("ldbstore.delete", nil).Inc(1)
-
- gcIdxKey := getGCIdxKey(idx)
- batch.Delete(gcIdxKey)
- dataKey := getDataKey(idx.Idx, po)
- batch.Delete(dataKey)
- batch.Delete(idxKey)
- s.entryCnt--
- dbEntryCount.Dec(1)
- cntKey := make([]byte, 2)
- cntKey[0] = keyDistanceCnt
- cntKey[1] = po
- batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
- batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
-}
-
-func (s *LDBStore) BinIndex(po uint8) uint64 {
- s.lock.RLock()
- defer s.lock.RUnlock()
- return s.bucketCnt[po]
-}
-
-// Put adds a chunk to the database, adding indices and incrementing global counters.
-// If it already exists, it merely increments the access count of the existing entry.
-// Is thread safe
-func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error {
- metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1)
- log.Trace("ldbstore.put", "key", chunk.Address())
-
- ikey := getIndexKey(chunk.Address())
- var index dpaDBIndex
-
- po := s.po(chunk.Address())
-
- s.lock.Lock()
-
- if s.closed {
- s.lock.Unlock()
- return ErrDBClosed
- }
- batch := s.batch
-
- log.Trace("ldbstore.put: s.db.Get", "key", chunk.Address(), "ikey", fmt.Sprintf("%x", ikey))
- _, err := s.db.Get(ikey)
- if err != nil {
- s.doPut(chunk, &index, po)
- }
- idata := encodeIndex(&index)
- s.batch.Put(ikey, idata)
-
- // add the access-chunkindex index for garbage collection
- gcIdxKey := getGCIdxKey(&index)
- gcIdxData := getGCIdxValue(&index, po, chunk.Address())
- s.batch.Put(gcIdxKey, gcIdxData)
- s.lock.Unlock()
-
- select {
- case s.batchesC <- struct{}{}:
- default:
- }
-
- select {
- case <-batch.c:
- return batch.err
- case <-ctx.Done():
- return ctx.Err()
- }
-}
-
-// force putting into db, does not check or update necessary indices
-func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) {
- data := s.encodeDataFunc(chunk)
- dkey := getDataKey(s.dataIdx, po)
- s.batch.Put(dkey, data)
- index.Idx = s.dataIdx
- s.bucketCnt[po] = s.dataIdx
- s.entryCnt++
- dbEntryCount.Inc(1)
- s.dataIdx++
- index.Access = s.accessCnt
- s.accessCnt++
- cntKey := make([]byte, 2)
- cntKey[0] = keyDistanceCnt
- cntKey[1] = po
- s.batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
-}
-
-func (s *LDBStore) writeBatches() {
- for {
- select {
- case <-s.quit:
- log.Debug("DbStore: quit batch write loop")
- return
- case <-s.batchesC:
- err := s.writeCurrentBatch()
- if err != nil {
- log.Debug("DbStore: quit batch write loop", "err", err.Error())
- return
- }
- }
- }
-
-}
-
-func (s *LDBStore) writeCurrentBatch() error {
- s.lock.Lock()
- defer s.lock.Unlock()
- b := s.batch
- l := b.Len()
- if l == 0 {
- return nil
- }
- s.batch = newBatch()
- b.err = s.writeBatch(b, wEntryCnt|wAccessCnt|wIndexCnt)
- close(b.c)
- if s.entryCnt >= s.capacity {
- go s.collectGarbage()
- }
- return nil
-}
-
-// must be called non concurrently
-func (s *LDBStore) writeBatch(b *dbBatch, wFlag uint8) error {
- if wFlag&wEntryCnt > 0 {
- b.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
- }
- if wFlag&wIndexCnt > 0 {
- b.Put(keyDataIdx, U64ToBytes(s.dataIdx))
- }
- if wFlag&wAccessCnt > 0 {
- b.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
- }
- l := b.Len()
- if err := s.db.Write(b.Batch); err != nil {
- return fmt.Errorf("unable to write batch: %v", err)
- }
- log.Trace(fmt.Sprintf("batch write (%d entries)", l))
- return nil
-}
-
-// newMockEncodeDataFunc returns a function that stores the chunk data
-// to a mock store to bypass the default functionality encodeData.
-// The constructed function always returns the nil data, as DbStore does
-// not need to store the data, but still need to create the index.
-func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte {
- return func(chunk Chunk) []byte {
- if err := mockStore.Put(chunk.Address(), encodeData(chunk)); err != nil {
- log.Error(fmt.Sprintf("%T: Chunk %v put: %v", mockStore, chunk.Address().Log(), err))
- }
- return chunk.Address()[:]
- }
-}
-
-// tryAccessIdx tries to find index entry. If found then increments the access
-// count for garbage collection and returns the index entry and true for found,
-// otherwise returns nil and false.
-func (s *LDBStore) tryAccessIdx(addr Address, po uint8) (*dpaDBIndex, bool) {
- ikey := getIndexKey(addr)
- idata, err := s.db.Get(ikey)
- if err != nil {
- return nil, false
- }
-
- index := new(dpaDBIndex)
- decodeIndex(idata, index)
- oldGCIdxKey := getGCIdxKey(index)
- s.batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
- index.Access = s.accessCnt
- idata = encodeIndex(index)
- s.accessCnt++
- s.batch.Put(ikey, idata)
- newGCIdxKey := getGCIdxKey(index)
- newGCIdxData := getGCIdxValue(index, po, ikey[1:])
- s.batch.Delete(oldGCIdxKey)
- s.batch.Put(newGCIdxKey, newGCIdxData)
- select {
- case s.batchesC <- struct{}{}:
- default:
- }
- return index, true
-}
-
-// GetSchema is returning the current named schema of the datastore as read from LevelDB
-func (s *LDBStore) GetSchema() (string, error) {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- data, err := s.db.Get(keySchema)
- if err != nil {
- if err == leveldb.ErrNotFound {
- return DbSchemaNone, nil
- }
- return "", err
- }
-
- return string(data), nil
-}
-
-// PutSchema is saving a named schema to the LevelDB datastore
-func (s *LDBStore) PutSchema(schema string) error {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- return s.db.Put(keySchema, []byte(schema))
-}
-
-// Get retrieves the chunk matching the provided key from the database.
-// If the chunk entry does not exist, it returns an error
-// Updates access count and is thread safe
-func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) {
- metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1)
- log.Trace("ldbstore.get", "key", addr)
-
- s.lock.Lock()
- defer s.lock.Unlock()
- return s.get(addr)
-}
-
-// Has queries the underlying DB if a chunk with the given address is stored
-// Returns true if the chunk is found, false if not
-func (s *LDBStore) Has(_ context.Context, addr Address) bool {
- s.lock.RLock()
- defer s.lock.RUnlock()
-
- ikey := getIndexKey(addr)
- _, err := s.db.Get(ikey)
-
- return err == nil
-}
-
-// TODO: To conform with other private methods of this object indices should not be updated
-func (s *LDBStore) get(addr Address) (chunk Chunk, err error) {
- if s.closed {
- return nil, ErrDBClosed
- }
- proximity := s.po(addr)
- index, found := s.tryAccessIdx(addr, proximity)
- if found {
- var data []byte
- if s.getDataFunc != nil {
- // if getDataFunc is defined, use it to retrieve the chunk data
- log.Trace("ldbstore.get retrieve with getDataFunc", "key", addr)
- data, err = s.getDataFunc(addr)
- if err != nil {
- return
- }
- } else {
- // default DbStore functionality to retrieve chunk data
- datakey := getDataKey(index.Idx, proximity)
- data, err = s.db.Get(datakey)
- log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", index.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity)
- if err != nil {
- log.Trace("ldbstore.get chunk found but could not be accessed", "key", addr, "err", err)
- s.deleteNow(index, getIndexKey(addr), s.po(addr))
- if err == leveldb.ErrNotFound {
- return nil, ErrChunkNotFound
- }
- return nil, err
- }
- }
-
- return decodeData(addr, data)
- } else {
- err = ErrChunkNotFound
- }
-
- return
-}
-
-// newMockGetFunc returns a function that reads chunk data from
-// the mock database, which is used as the value for DbStore.getFunc
-// to bypass the default functionality of DbStore with a mock store.
-func newMockGetDataFunc(mockStore *mock.NodeStore) func(addr Address) (data []byte, err error) {
- return func(addr Address) (data []byte, err error) {
- data, err = mockStore.Get(addr)
- if err == mock.ErrNotFound {
- // preserve ErrChunkNotFound error
- err = ErrChunkNotFound
- }
- return data, err
- }
-}
-
-func (s *LDBStore) setCapacity(c uint64) {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- s.capacity = c
-
- for s.entryCnt > c {
- s.collectGarbage()
- }
-}
-
-func (s *LDBStore) Close() {
- close(s.quit)
- s.lock.Lock()
- s.closed = true
- s.lock.Unlock()
- // force writing out current batch
- s.writeCurrentBatch()
- s.db.Close()
-}
-
-// SyncIterator(start, stop, po, f) calls f on each hash of a bin po from start to stop
-func (s *LDBStore) SyncIterator(since uint64, until uint64, po uint8, f func(Address, uint64) bool) error {
- metrics.GetOrRegisterCounter("ldbstore.synciterator", nil).Inc(1)
-
- sincekey := getDataKey(since, po)
- untilkey := getDataKey(until, po)
- it := s.db.NewIterator()
- defer it.Release()
-
- for ok := it.Seek(sincekey); ok; ok = it.Next() {
- metrics.GetOrRegisterCounter("ldbstore.synciterator.seek", nil).Inc(1)
-
- dbkey := it.Key()
- if dbkey[0] != keyData || dbkey[1] != po || bytes.Compare(untilkey, dbkey) < 0 {
- break
- }
- key := make([]byte, 32)
- val := it.Value()
- copy(key, val[:32])
- if !f(Address(key), binary.BigEndian.Uint64(dbkey[2:])) {
- break
- }
- }
- return it.Error()
-}