aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/ldbstore.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/ldbstore.go')
-rw-r--r--swarm/storage/ldbstore.go310
1 files changed, 182 insertions, 128 deletions
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go
index 675b5de01..8ab7e60b3 100644
--- a/swarm/storage/ldbstore.go
+++ b/swarm/storage/ldbstore.go
@@ -28,6 +28,7 @@ import (
"context"
"encoding/binary"
"encoding/hex"
+ "errors"
"fmt"
"io"
"io/ioutil"
@@ -36,7 +37,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
- "github.com/ethereum/go-ethereum/swarm/chunk"
+ ch "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
"github.com/syndtr/goleveldb/leveldb"
@@ -62,6 +63,10 @@ var (
keyDistanceCnt = byte(7)
)
+var (
+ ErrDBClosed = errors.New("LDBStore closed")
+)
+
type gcItem struct {
idx uint64
value uint64
@@ -99,18 +104,29 @@ type LDBStore struct {
batchC chan bool
batchesC chan struct{}
- batch *leveldb.Batch
+ closed bool
+ batch *dbBatch
lock sync.RWMutex
quit chan struct{}
// Functions encodeDataFunc is used to bypass
// the default functionality of DbStore with
// mock.NodeStore for testing purposes.
- encodeDataFunc func(chunk *Chunk) []byte
+ encodeDataFunc func(chunk Chunk) []byte
// If getDataFunc is defined, it will be used for
// retrieving the chunk data instead from the local
// LevelDB database.
- getDataFunc func(addr Address) (data []byte, err error)
+ getDataFunc func(key Address) (data []byte, err error)
+}
+
+type dbBatch struct {
+ *leveldb.Batch
+ err error
+ c chan struct{}
+}
+
+func newBatch() *dbBatch {
+ return &dbBatch{Batch: new(leveldb.Batch), c: make(chan struct{})}
}
// TODO: Instead of passing the distance function, just pass the address from which distances are calculated
@@ -121,10 +137,9 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) {
s.hashfunc = params.Hash
s.quit = make(chan struct{})
- s.batchC = make(chan bool)
s.batchesC = make(chan struct{}, 1)
go s.writeBatches()
- s.batch = new(leveldb.Batch)
+ s.batch = newBatch()
// associate encodeData with default functionality
s.encodeDataFunc = encodeData
@@ -143,7 +158,6 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) {
k[1] = uint8(i)
cnt, _ := s.db.Get(k)
s.bucketCnt[i] = BytesToU64(cnt)
- s.bucketCnt[i]++
}
data, _ := s.db.Get(keyEntryCnt)
s.entryCnt = BytesToU64(data)
@@ -202,14 +216,6 @@ func getIndexKey(hash Address) []byte {
return key
}
-func getOldDataKey(idx uint64) []byte {
- key := make([]byte, 9)
- key[0] = keyOldData
- binary.BigEndian.PutUint64(key[1:9], idx)
-
- return key
-}
-
func getDataKey(idx uint64, po uint8) []byte {
key := make([]byte, 10)
key[0] = keyData
@@ -224,12 +230,12 @@ func encodeIndex(index *dpaDBIndex) []byte {
return data
}
-func encodeData(chunk *Chunk) []byte {
+func encodeData(chunk Chunk) []byte {
// Always create a new underlying array for the returned byte slice.
- // The chunk.Key array may be used in the returned slice which
+ // The chunk.Address array may be used in the returned slice which
// may be changed later in the code or by the LevelDB, resulting
- // that the Key is changed as well.
- return append(append([]byte{}, chunk.Addr[:]...), chunk.SData...)
+ // that the Address is changed as well.
+ return append(append([]byte{}, chunk.Address()[:]...), chunk.Data()...)
}
func decodeIndex(data []byte, index *dpaDBIndex) error {
@@ -237,14 +243,8 @@ func decodeIndex(data []byte, index *dpaDBIndex) error {
return dec.Decode(index)
}
-func decodeData(data []byte, chunk *Chunk) {
- chunk.SData = data[32:]
- chunk.Size = int64(binary.BigEndian.Uint64(data[32:40]))
-}
-
-func decodeOldData(data []byte, chunk *Chunk) {
- chunk.SData = data
- chunk.Size = int64(binary.BigEndian.Uint64(data[0:8]))
+func decodeData(addr Address, data []byte) (*chunk, error) {
+ return NewChunk(addr, data[32:]), nil
}
func (s *LDBStore) collectGarbage(ratio float32) {
@@ -347,44 +347,75 @@ func (s *LDBStore) Export(out io.Writer) (int64, error) {
func (s *LDBStore) Import(in io.Reader) (int64, error) {
tr := tar.NewReader(in)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ countC := make(chan int64)
+ errC := make(chan error)
var count int64
- var wg sync.WaitGroup
- for {
- hdr, err := tr.Next()
- if err == io.EOF {
- break
- } else if err != nil {
- return count, err
- }
+ go func() {
+ for {
+ hdr, err := tr.Next()
+ if err == io.EOF {
+ break
+ } else if err != nil {
+ select {
+ case errC <- err:
+ case <-ctx.Done():
+ }
+ }
- if len(hdr.Name) != 64 {
- log.Warn("ignoring non-chunk file", "name", hdr.Name)
- continue
- }
+ if len(hdr.Name) != 64 {
+ log.Warn("ignoring non-chunk file", "name", hdr.Name)
+ continue
+ }
- keybytes, err := hex.DecodeString(hdr.Name)
- if err != nil {
- log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
- continue
+ keybytes, err := hex.DecodeString(hdr.Name)
+ if err != nil {
+ log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
+ continue
+ }
+
+ data, err := ioutil.ReadAll(tr)
+ if err != nil {
+ select {
+ case errC <- err:
+ case <-ctx.Done():
+ }
+ }
+ key := Address(keybytes)
+ chunk := NewChunk(key, data[32:])
+
+ go func() {
+ select {
+ case errC <- s.Put(ctx, chunk):
+ case <-ctx.Done():
+ }
+ }()
+
+ count++
}
+ countC <- count
+ }()
- data, err := ioutil.ReadAll(tr)
- if err != nil {
- return count, err
+ // wait for all chunks to be stored
+ i := int64(0)
+ var total int64
+ for {
+ select {
+ case err := <-errC:
+ if err != nil {
+ return count, err
+ }
+ i++
+ case total = <-countC:
+ case <-ctx.Done():
+ return i, ctx.Err()
+ }
+ if total > 0 && i == total {
+ return total, nil
}
- key := Address(keybytes)
- chunk := NewChunk(key, nil)
- chunk.SData = data[32:]
- s.Put(context.TODO(), chunk)
- wg.Add(1)
- go func() {
- defer wg.Done()
- <-chunk.dbStoredC
- }()
- count++
}
- wg.Wait()
- return count, nil
}
func (s *LDBStore) Cleanup() {
@@ -430,15 +461,18 @@ func (s *LDBStore) Cleanup() {
}
}
- c := &Chunk{}
ck := data[:32]
- decodeData(data, c)
+ c, err := decodeData(ck, data)
+ if err != nil {
+ log.Error("decodeData error", "err", err)
+ continue
+ }
- cs := int64(binary.LittleEndian.Uint64(c.SData[:8]))
- log.Trace("chunk", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.SData), "size", cs)
+ cs := int64(binary.LittleEndian.Uint64(c.sdata[:8]))
+ log.Trace("chunk", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs)
- if len(c.SData) > chunk.DefaultSize+8 {
- log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.SData), "size", cs)
+ if len(c.sdata) > ch.DefaultSize+8 {
+ log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs)
s.delete(index.Idx, getIndexKey(key[1:]), po)
removed++
errorsFound++
@@ -511,7 +545,6 @@ func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) {
batch.Delete(getDataKey(idx, po))
s.entryCnt--
dbEntryCount.Dec(1)
- s.bucketCnt[po]--
cntKey := make([]byte, 2)
cntKey[0] = keyDistanceCnt
cntKey[1] = po
@@ -520,10 +553,9 @@ func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) {
s.db.Write(batch)
}
-func (s *LDBStore) CurrentBucketStorageIndex(po uint8) uint64 {
+func (s *LDBStore) BinIndex(po uint8) uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
-
return s.bucketCnt[po]
}
@@ -539,43 +571,53 @@ func (s *LDBStore) CurrentStorageIndex() uint64 {
return s.dataIdx
}
-func (s *LDBStore) Put(ctx context.Context, chunk *Chunk) {
+func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error {
metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1)
- log.Trace("ldbstore.put", "key", chunk.Addr)
+ log.Trace("ldbstore.put", "key", chunk.Address())
- ikey := getIndexKey(chunk.Addr)
+ ikey := getIndexKey(chunk.Address())
var index dpaDBIndex
- po := s.po(chunk.Addr)
+ po := s.po(chunk.Address())
+
s.lock.Lock()
- defer s.lock.Unlock()
- log.Trace("ldbstore.put: s.db.Get", "key", chunk.Addr, "ikey", fmt.Sprintf("%x", ikey))
+ if s.closed {
+ s.lock.Unlock()
+ return ErrDBClosed
+ }
+ batch := s.batch
+
+ log.Trace("ldbstore.put: s.db.Get", "key", chunk.Address(), "ikey", fmt.Sprintf("%x", ikey))
idata, err := s.db.Get(ikey)
if err != nil {
s.doPut(chunk, &index, po)
- batchC := s.batchC
- go func() {
- <-batchC
- chunk.markAsStored()
- }()
} else {
- log.Trace("ldbstore.put: chunk already exists, only update access", "key", chunk.Addr)
+ log.Trace("ldbstore.put: chunk already exists, only update access", "key", chunk.Address)
decodeIndex(idata, &index)
- chunk.markAsStored()
}
index.Access = s.accessCnt
s.accessCnt++
idata = encodeIndex(&index)
s.batch.Put(ikey, idata)
+
+ s.lock.Unlock()
+
select {
case s.batchesC <- struct{}{}:
default:
}
+
+ select {
+ case <-batch.c:
+ return batch.err
+ case <-ctx.Done():
+ return ctx.Err()
+ }
}
// force putting into db, does not check access index
-func (s *LDBStore) doPut(chunk *Chunk, index *dpaDBIndex, po uint8) {
+func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) {
data := s.encodeDataFunc(chunk)
dkey := getDataKey(s.dataIdx, po)
s.batch.Put(dkey, data)
@@ -592,58 +634,64 @@ func (s *LDBStore) doPut(chunk *Chunk, index *dpaDBIndex, po uint8) {
}
func (s *LDBStore) writeBatches() {
-mainLoop:
for {
select {
case <-s.quit:
- break mainLoop
+ log.Debug("DbStore: quit batch write loop")
+ return
case <-s.batchesC:
- s.lock.Lock()
- b := s.batch
- e := s.entryCnt
- d := s.dataIdx
- a := s.accessCnt
- c := s.batchC
- s.batchC = make(chan bool)
- s.batch = new(leveldb.Batch)
- err := s.writeBatch(b, e, d, a)
- // TODO: set this error on the batch, then tell the chunk
+ err := s.writeCurrentBatch()
if err != nil {
- log.Error(fmt.Sprintf("spawn batch write (%d entries): %v", b.Len(), err))
+ log.Debug("DbStore: quit batch write loop", "err", err.Error())
+ return
}
- close(c)
- for e > s.capacity {
- log.Trace("for >", "e", e, "s.capacity", s.capacity)
- // Collect garbage in a separate goroutine
- // to be able to interrupt this loop by s.quit.
- done := make(chan struct{})
- go func() {
- s.collectGarbage(gcArrayFreeRatio)
- log.Trace("collectGarbage closing done")
- close(done)
- }()
+ }
+ }
- select {
- case <-s.quit:
- s.lock.Unlock()
- break mainLoop
- case <-done:
- }
- e = s.entryCnt
- }
- s.lock.Unlock()
+}
+
+func (s *LDBStore) writeCurrentBatch() error {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ b := s.batch
+ l := b.Len()
+ if l == 0 {
+ return nil
+ }
+ e := s.entryCnt
+ d := s.dataIdx
+ a := s.accessCnt
+ s.batch = newBatch()
+ b.err = s.writeBatch(b, e, d, a)
+ close(b.c)
+ for e > s.capacity {
+ log.Trace("for >", "e", e, "s.capacity", s.capacity)
+ // Collect garbage in a separate goroutine
+ // to be able to interrupt this loop by s.quit.
+ done := make(chan struct{})
+ go func() {
+ s.collectGarbage(gcArrayFreeRatio)
+ log.Trace("collectGarbage closing done")
+ close(done)
+ }()
+
+ select {
+ case <-s.quit:
+ return errors.New("CollectGarbage terminated due to quit")
+ case <-done:
}
+ e = s.entryCnt
}
- log.Trace(fmt.Sprintf("DbStore: quit batch write loop"))
+ return nil
}
// must be called non concurrently
-func (s *LDBStore) writeBatch(b *leveldb.Batch, entryCnt, dataIdx, accessCnt uint64) error {
+func (s *LDBStore) writeBatch(b *dbBatch, entryCnt, dataIdx, accessCnt uint64) error {
b.Put(keyEntryCnt, U64ToBytes(entryCnt))
b.Put(keyDataIdx, U64ToBytes(dataIdx))
b.Put(keyAccessCnt, U64ToBytes(accessCnt))
l := b.Len()
- if err := s.db.Write(b); err != nil {
+ if err := s.db.Write(b.Batch); err != nil {
return fmt.Errorf("unable to write batch: %v", err)
}
log.Trace(fmt.Sprintf("batch write (%d entries)", l))
@@ -654,12 +702,12 @@ func (s *LDBStore) writeBatch(b *leveldb.Batch, entryCnt, dataIdx, accessCnt uin
// to a mock store to bypass the default functionality encodeData.
// The constructed function always returns the nil data, as DbStore does
// not need to store the data, but still need to create the index.
-func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk *Chunk) []byte {
- return func(chunk *Chunk) []byte {
- if err := mockStore.Put(chunk.Addr, encodeData(chunk)); err != nil {
- log.Error(fmt.Sprintf("%T: Chunk %v put: %v", mockStore, chunk.Addr.Log(), err))
+func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte {
+ return func(chunk Chunk) []byte {
+ if err := mockStore.Put(chunk.Address(), encodeData(chunk)); err != nil {
+ log.Error(fmt.Sprintf("%T: Chunk %v put: %v", mockStore, chunk.Address().Log(), err))
}
- return chunk.Addr[:]
+ return chunk.Address()[:]
}
}
@@ -682,7 +730,7 @@ func (s *LDBStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool {
return true
}
-func (s *LDBStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) {
+func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) {
metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1)
log.Trace("ldbstore.get", "key", addr)
@@ -691,9 +739,11 @@ func (s *LDBStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err err
return s.get(addr)
}
-func (s *LDBStore) get(addr Address) (chunk *Chunk, err error) {
+func (s *LDBStore) get(addr Address) (chunk *chunk, err error) {
var indx dpaDBIndex
-
+ if s.closed {
+ return nil, ErrDBClosed
+ }
if s.tryAccessIdx(getIndexKey(addr), &indx) {
var data []byte
if s.getDataFunc != nil {
@@ -716,9 +766,7 @@ func (s *LDBStore) get(addr Address) (chunk *Chunk, err error) {
}
}
- chunk = NewChunk(addr, nil)
- chunk.markAsStored()
- decodeData(data, chunk)
+ return decodeData(addr, data)
} else {
err = ErrChunkNotFound
}
@@ -772,6 +820,12 @@ func (s *LDBStore) setCapacity(c uint64) {
func (s *LDBStore) Close() {
close(s.quit)
+ s.lock.Lock()
+ s.closed = true
+ s.lock.Unlock()
+ // force writing out current batch
+ s.writeCurrentBatch()
+ close(s.batchesC)
s.db.Close()
}