diff options
Diffstat (limited to 'swarm/storage/ldbstore.go')
-rw-r--r-- | swarm/storage/ldbstore.go | 310 |
1 files changed, 182 insertions, 128 deletions
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index 675b5de01..8ab7e60b3 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -28,6 +28,7 @@ import ( "context" "encoding/binary" "encoding/hex" + "errors" "fmt" "io" "io/ioutil" @@ -36,7 +37,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage/mock" "github.com/syndtr/goleveldb/leveldb" @@ -62,6 +63,10 @@ var ( keyDistanceCnt = byte(7) ) +var ( + ErrDBClosed = errors.New("LDBStore closed") +) + type gcItem struct { idx uint64 value uint64 @@ -99,18 +104,29 @@ type LDBStore struct { batchC chan bool batchesC chan struct{} - batch *leveldb.Batch + closed bool + batch *dbBatch lock sync.RWMutex quit chan struct{} // Functions encodeDataFunc is used to bypass // the default functionality of DbStore with // mock.NodeStore for testing purposes. - encodeDataFunc func(chunk *Chunk) []byte + encodeDataFunc func(chunk Chunk) []byte // If getDataFunc is defined, it will be used for // retrieving the chunk data instead from the local // LevelDB database. - getDataFunc func(addr Address) (data []byte, err error) + getDataFunc func(key Address) (data []byte, err error) +} + +type dbBatch struct { + *leveldb.Batch + err error + c chan struct{} +} + +func newBatch() *dbBatch { + return &dbBatch{Batch: new(leveldb.Batch), c: make(chan struct{})} } // TODO: Instead of passing the distance function, just pass the address from which distances are calculated @@ -121,10 +137,9 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) { s.hashfunc = params.Hash s.quit = make(chan struct{}) - s.batchC = make(chan bool) s.batchesC = make(chan struct{}, 1) go s.writeBatches() - s.batch = new(leveldb.Batch) + s.batch = newBatch() // associate encodeData with default functionality s.encodeDataFunc = encodeData @@ -143,7 +158,6 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) { k[1] = uint8(i) cnt, _ := s.db.Get(k) s.bucketCnt[i] = BytesToU64(cnt) - s.bucketCnt[i]++ } data, _ := s.db.Get(keyEntryCnt) s.entryCnt = BytesToU64(data) @@ -202,14 +216,6 @@ func getIndexKey(hash Address) []byte { return key } -func getOldDataKey(idx uint64) []byte { - key := make([]byte, 9) - key[0] = keyOldData - binary.BigEndian.PutUint64(key[1:9], idx) - - return key -} - func getDataKey(idx uint64, po uint8) []byte { key := make([]byte, 10) key[0] = keyData @@ -224,12 +230,12 @@ func encodeIndex(index *dpaDBIndex) []byte { return data } -func encodeData(chunk *Chunk) []byte { +func encodeData(chunk Chunk) []byte { // Always create a new underlying array for the returned byte slice. - // The chunk.Key array may be used in the returned slice which + // The chunk.Address array may be used in the returned slice which // may be changed later in the code or by the LevelDB, resulting - // that the Key is changed as well. - return append(append([]byte{}, chunk.Addr[:]...), chunk.SData...) + // that the Address is changed as well. + return append(append([]byte{}, chunk.Address()[:]...), chunk.Data()...) } func decodeIndex(data []byte, index *dpaDBIndex) error { @@ -237,14 +243,8 @@ func decodeIndex(data []byte, index *dpaDBIndex) error { return dec.Decode(index) } -func decodeData(data []byte, chunk *Chunk) { - chunk.SData = data[32:] - chunk.Size = int64(binary.BigEndian.Uint64(data[32:40])) -} - -func decodeOldData(data []byte, chunk *Chunk) { - chunk.SData = data - chunk.Size = int64(binary.BigEndian.Uint64(data[0:8])) +func decodeData(addr Address, data []byte) (*chunk, error) { + return NewChunk(addr, data[32:]), nil } func (s *LDBStore) collectGarbage(ratio float32) { @@ -347,44 +347,75 @@ func (s *LDBStore) Export(out io.Writer) (int64, error) { func (s *LDBStore) Import(in io.Reader) (int64, error) { tr := tar.NewReader(in) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + countC := make(chan int64) + errC := make(chan error) var count int64 - var wg sync.WaitGroup - for { - hdr, err := tr.Next() - if err == io.EOF { - break - } else if err != nil { - return count, err - } + go func() { + for { + hdr, err := tr.Next() + if err == io.EOF { + break + } else if err != nil { + select { + case errC <- err: + case <-ctx.Done(): + } + } - if len(hdr.Name) != 64 { - log.Warn("ignoring non-chunk file", "name", hdr.Name) - continue - } + if len(hdr.Name) != 64 { + log.Warn("ignoring non-chunk file", "name", hdr.Name) + continue + } - keybytes, err := hex.DecodeString(hdr.Name) - if err != nil { - log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err) - continue + keybytes, err := hex.DecodeString(hdr.Name) + if err != nil { + log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err) + continue + } + + data, err := ioutil.ReadAll(tr) + if err != nil { + select { + case errC <- err: + case <-ctx.Done(): + } + } + key := Address(keybytes) + chunk := NewChunk(key, data[32:]) + + go func() { + select { + case errC <- s.Put(ctx, chunk): + case <-ctx.Done(): + } + }() + + count++ } + countC <- count + }() - data, err := ioutil.ReadAll(tr) - if err != nil { - return count, err + // wait for all chunks to be stored + i := int64(0) + var total int64 + for { + select { + case err := <-errC: + if err != nil { + return count, err + } + i++ + case total = <-countC: + case <-ctx.Done(): + return i, ctx.Err() + } + if total > 0 && i == total { + return total, nil } - key := Address(keybytes) - chunk := NewChunk(key, nil) - chunk.SData = data[32:] - s.Put(context.TODO(), chunk) - wg.Add(1) - go func() { - defer wg.Done() - <-chunk.dbStoredC - }() - count++ } - wg.Wait() - return count, nil } func (s *LDBStore) Cleanup() { @@ -430,15 +461,18 @@ func (s *LDBStore) Cleanup() { } } - c := &Chunk{} ck := data[:32] - decodeData(data, c) + c, err := decodeData(ck, data) + if err != nil { + log.Error("decodeData error", "err", err) + continue + } - cs := int64(binary.LittleEndian.Uint64(c.SData[:8])) - log.Trace("chunk", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.SData), "size", cs) + cs := int64(binary.LittleEndian.Uint64(c.sdata[:8])) + log.Trace("chunk", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs) - if len(c.SData) > chunk.DefaultSize+8 { - log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.SData), "size", cs) + if len(c.sdata) > ch.DefaultSize+8 { + log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs) s.delete(index.Idx, getIndexKey(key[1:]), po) removed++ errorsFound++ @@ -511,7 +545,6 @@ func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) { batch.Delete(getDataKey(idx, po)) s.entryCnt-- dbEntryCount.Dec(1) - s.bucketCnt[po]-- cntKey := make([]byte, 2) cntKey[0] = keyDistanceCnt cntKey[1] = po @@ -520,10 +553,9 @@ func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) { s.db.Write(batch) } -func (s *LDBStore) CurrentBucketStorageIndex(po uint8) uint64 { +func (s *LDBStore) BinIndex(po uint8) uint64 { s.lock.RLock() defer s.lock.RUnlock() - return s.bucketCnt[po] } @@ -539,43 +571,53 @@ func (s *LDBStore) CurrentStorageIndex() uint64 { return s.dataIdx } -func (s *LDBStore) Put(ctx context.Context, chunk *Chunk) { +func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error { metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1) - log.Trace("ldbstore.put", "key", chunk.Addr) + log.Trace("ldbstore.put", "key", chunk.Address()) - ikey := getIndexKey(chunk.Addr) + ikey := getIndexKey(chunk.Address()) var index dpaDBIndex - po := s.po(chunk.Addr) + po := s.po(chunk.Address()) + s.lock.Lock() - defer s.lock.Unlock() - log.Trace("ldbstore.put: s.db.Get", "key", chunk.Addr, "ikey", fmt.Sprintf("%x", ikey)) + if s.closed { + s.lock.Unlock() + return ErrDBClosed + } + batch := s.batch + + log.Trace("ldbstore.put: s.db.Get", "key", chunk.Address(), "ikey", fmt.Sprintf("%x", ikey)) idata, err := s.db.Get(ikey) if err != nil { s.doPut(chunk, &index, po) - batchC := s.batchC - go func() { - <-batchC - chunk.markAsStored() - }() } else { - log.Trace("ldbstore.put: chunk already exists, only update access", "key", chunk.Addr) + log.Trace("ldbstore.put: chunk already exists, only update access", "key", chunk.Address) decodeIndex(idata, &index) - chunk.markAsStored() } index.Access = s.accessCnt s.accessCnt++ idata = encodeIndex(&index) s.batch.Put(ikey, idata) + + s.lock.Unlock() + select { case s.batchesC <- struct{}{}: default: } + + select { + case <-batch.c: + return batch.err + case <-ctx.Done(): + return ctx.Err() + } } // force putting into db, does not check access index -func (s *LDBStore) doPut(chunk *Chunk, index *dpaDBIndex, po uint8) { +func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) { data := s.encodeDataFunc(chunk) dkey := getDataKey(s.dataIdx, po) s.batch.Put(dkey, data) @@ -592,58 +634,64 @@ func (s *LDBStore) doPut(chunk *Chunk, index *dpaDBIndex, po uint8) { } func (s *LDBStore) writeBatches() { -mainLoop: for { select { case <-s.quit: - break mainLoop + log.Debug("DbStore: quit batch write loop") + return case <-s.batchesC: - s.lock.Lock() - b := s.batch - e := s.entryCnt - d := s.dataIdx - a := s.accessCnt - c := s.batchC - s.batchC = make(chan bool) - s.batch = new(leveldb.Batch) - err := s.writeBatch(b, e, d, a) - // TODO: set this error on the batch, then tell the chunk + err := s.writeCurrentBatch() if err != nil { - log.Error(fmt.Sprintf("spawn batch write (%d entries): %v", b.Len(), err)) + log.Debug("DbStore: quit batch write loop", "err", err.Error()) + return } - close(c) - for e > s.capacity { - log.Trace("for >", "e", e, "s.capacity", s.capacity) - // Collect garbage in a separate goroutine - // to be able to interrupt this loop by s.quit. - done := make(chan struct{}) - go func() { - s.collectGarbage(gcArrayFreeRatio) - log.Trace("collectGarbage closing done") - close(done) - }() + } + } - select { - case <-s.quit: - s.lock.Unlock() - break mainLoop - case <-done: - } - e = s.entryCnt - } - s.lock.Unlock() +} + +func (s *LDBStore) writeCurrentBatch() error { + s.lock.Lock() + defer s.lock.Unlock() + b := s.batch + l := b.Len() + if l == 0 { + return nil + } + e := s.entryCnt + d := s.dataIdx + a := s.accessCnt + s.batch = newBatch() + b.err = s.writeBatch(b, e, d, a) + close(b.c) + for e > s.capacity { + log.Trace("for >", "e", e, "s.capacity", s.capacity) + // Collect garbage in a separate goroutine + // to be able to interrupt this loop by s.quit. + done := make(chan struct{}) + go func() { + s.collectGarbage(gcArrayFreeRatio) + log.Trace("collectGarbage closing done") + close(done) + }() + + select { + case <-s.quit: + return errors.New("CollectGarbage terminated due to quit") + case <-done: } + e = s.entryCnt } - log.Trace(fmt.Sprintf("DbStore: quit batch write loop")) + return nil } // must be called non concurrently -func (s *LDBStore) writeBatch(b *leveldb.Batch, entryCnt, dataIdx, accessCnt uint64) error { +func (s *LDBStore) writeBatch(b *dbBatch, entryCnt, dataIdx, accessCnt uint64) error { b.Put(keyEntryCnt, U64ToBytes(entryCnt)) b.Put(keyDataIdx, U64ToBytes(dataIdx)) b.Put(keyAccessCnt, U64ToBytes(accessCnt)) l := b.Len() - if err := s.db.Write(b); err != nil { + if err := s.db.Write(b.Batch); err != nil { return fmt.Errorf("unable to write batch: %v", err) } log.Trace(fmt.Sprintf("batch write (%d entries)", l)) @@ -654,12 +702,12 @@ func (s *LDBStore) writeBatch(b *leveldb.Batch, entryCnt, dataIdx, accessCnt uin // to a mock store to bypass the default functionality encodeData. // The constructed function always returns the nil data, as DbStore does // not need to store the data, but still need to create the index. -func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk *Chunk) []byte { - return func(chunk *Chunk) []byte { - if err := mockStore.Put(chunk.Addr, encodeData(chunk)); err != nil { - log.Error(fmt.Sprintf("%T: Chunk %v put: %v", mockStore, chunk.Addr.Log(), err)) +func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte { + return func(chunk Chunk) []byte { + if err := mockStore.Put(chunk.Address(), encodeData(chunk)); err != nil { + log.Error(fmt.Sprintf("%T: Chunk %v put: %v", mockStore, chunk.Address().Log(), err)) } - return chunk.Addr[:] + return chunk.Address()[:] } } @@ -682,7 +730,7 @@ func (s *LDBStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool { return true } -func (s *LDBStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) { +func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) { metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1) log.Trace("ldbstore.get", "key", addr) @@ -691,9 +739,11 @@ func (s *LDBStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err err return s.get(addr) } -func (s *LDBStore) get(addr Address) (chunk *Chunk, err error) { +func (s *LDBStore) get(addr Address) (chunk *chunk, err error) { var indx dpaDBIndex - + if s.closed { + return nil, ErrDBClosed + } if s.tryAccessIdx(getIndexKey(addr), &indx) { var data []byte if s.getDataFunc != nil { @@ -716,9 +766,7 @@ func (s *LDBStore) get(addr Address) (chunk *Chunk, err error) { } } - chunk = NewChunk(addr, nil) - chunk.markAsStored() - decodeData(data, chunk) + return decodeData(addr, data) } else { err = ErrChunkNotFound } @@ -772,6 +820,12 @@ func (s *LDBStore) setCapacity(c uint64) { func (s *LDBStore) Close() { close(s.quit) + s.lock.Lock() + s.closed = true + s.lock.Unlock() + // force writing out current batch + s.writeCurrentBatch() + close(s.batchesC) s.db.Close() } |