aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/dbstore.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/dbstore.go')
-rw-r--r--swarm/storage/dbstore.go473
1 files changed, 473 insertions, 0 deletions
diff --git a/swarm/storage/dbstore.go b/swarm/storage/dbstore.go
new file mode 100644
index 000000000..5ecc5c500
--- /dev/null
+++ b/swarm/storage/dbstore.go
@@ -0,0 +1,473 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// disk storage layer for the package bzz
+// DbStore implements the ChunkStore interface and is used by the DPA as
+// persistent storage of chunks
+// it implements purging based on access count allowing for external control of
+// max capacity
+
+package storage
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/iterator"
+)
+
+const (
+ defaultDbCapacity = 5000000
+ defaultRadius = 0 // not yet used
+
+ gcArraySize = 10000
+ gcArrayFreeRatio = 0.1
+
+ // key prefixes for leveldb storage
+ kpIndex = 0
+ kpData = 1
+)
+
+var (
+ keyAccessCnt = []byte{2}
+ keyEntryCnt = []byte{3}
+ keyDataIdx = []byte{4}
+ keyGCPos = []byte{5}
+)
+
+type gcItem struct {
+ idx uint64
+ value uint64
+ idxKey []byte
+}
+
+type DbStore struct {
+ db *LDBDatabase
+
+ // this should be stored in db, accessed transactionally
+ entryCnt, accessCnt, dataIdx, capacity uint64
+
+ gcPos, gcStartPos []byte
+ gcArray []*gcItem
+
+ hashfunc Hasher
+
+ lock sync.Mutex
+}
+
+func NewDbStore(path string, hash Hasher, capacity uint64, radius int) (s *DbStore, err error) {
+ s = new(DbStore)
+
+ s.hashfunc = hash
+
+ s.db, err = NewLDBDatabase(path)
+ if err != nil {
+ return
+ }
+
+ s.setCapacity(capacity)
+
+ s.gcStartPos = make([]byte, 1)
+ s.gcStartPos[0] = kpIndex
+ s.gcArray = make([]*gcItem, gcArraySize)
+
+ data, _ := s.db.Get(keyEntryCnt)
+ s.entryCnt = BytesToU64(data)
+ data, _ = s.db.Get(keyAccessCnt)
+ s.accessCnt = BytesToU64(data)
+ data, _ = s.db.Get(keyDataIdx)
+ s.dataIdx = BytesToU64(data)
+ s.gcPos, _ = s.db.Get(keyGCPos)
+ if s.gcPos == nil {
+ s.gcPos = s.gcStartPos
+ }
+ return
+}
+
+type dpaDBIndex struct {
+ Idx uint64
+ Access uint64
+}
+
+func BytesToU64(data []byte) uint64 {
+ if len(data) < 8 {
+ return 0
+ }
+ return binary.LittleEndian.Uint64(data)
+}
+
+func U64ToBytes(val uint64) []byte {
+ data := make([]byte, 8)
+ binary.LittleEndian.PutUint64(data, val)
+ return data
+}
+
+func getIndexGCValue(index *dpaDBIndex) uint64 {
+ return index.Access
+}
+
+func (s *DbStore) updateIndexAccess(index *dpaDBIndex) {
+ index.Access = s.accessCnt
+}
+
+func getIndexKey(hash Key) []byte {
+ HashSize := len(hash)
+ key := make([]byte, HashSize+1)
+ key[0] = 0
+ copy(key[1:], hash[:])
+ return key
+}
+
+func getDataKey(idx uint64) []byte {
+ key := make([]byte, 9)
+ key[0] = 1
+ binary.BigEndian.PutUint64(key[1:9], idx)
+
+ return key
+}
+
+func encodeIndex(index *dpaDBIndex) []byte {
+ data, _ := rlp.EncodeToBytes(index)
+ return data
+}
+
+func encodeData(chunk *Chunk) []byte {
+ return chunk.SData
+}
+
+func decodeIndex(data []byte, index *dpaDBIndex) {
+ dec := rlp.NewStream(bytes.NewReader(data), 0)
+ dec.Decode(index)
+}
+
+func decodeData(data []byte, chunk *Chunk) {
+ chunk.SData = data
+ chunk.Size = int64(binary.LittleEndian.Uint64(data[0:8]))
+}
+
+func gcListPartition(list []*gcItem, left int, right int, pivotIndex int) int {
+ pivotValue := list[pivotIndex].value
+ dd := list[pivotIndex]
+ list[pivotIndex] = list[right]
+ list[right] = dd
+ storeIndex := left
+ for i := left; i < right; i++ {
+ if list[i].value < pivotValue {
+ dd = list[storeIndex]
+ list[storeIndex] = list[i]
+ list[i] = dd
+ storeIndex++
+ }
+ }
+ dd = list[storeIndex]
+ list[storeIndex] = list[right]
+ list[right] = dd
+ return storeIndex
+}
+
+func gcListSelect(list []*gcItem, left int, right int, n int) int {
+ if left == right {
+ return left
+ }
+ pivotIndex := (left + right) / 2
+ pivotIndex = gcListPartition(list, left, right, pivotIndex)
+ if n == pivotIndex {
+ return n
+ } else {
+ if n < pivotIndex {
+ return gcListSelect(list, left, pivotIndex-1, n)
+ } else {
+ return gcListSelect(list, pivotIndex+1, right, n)
+ }
+ }
+}
+
+func (s *DbStore) collectGarbage(ratio float32) {
+ it := s.db.NewIterator()
+ it.Seek(s.gcPos)
+ if it.Valid() {
+ s.gcPos = it.Key()
+ } else {
+ s.gcPos = nil
+ }
+ gcnt := 0
+
+ for (gcnt < gcArraySize) && (uint64(gcnt) < s.entryCnt) {
+
+ if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) {
+ it.Seek(s.gcStartPos)
+ if it.Valid() {
+ s.gcPos = it.Key()
+ } else {
+ s.gcPos = nil
+ }
+ }
+
+ if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) {
+ break
+ }
+
+ gci := new(gcItem)
+ gci.idxKey = s.gcPos
+ var index dpaDBIndex
+ decodeIndex(it.Value(), &index)
+ gci.idx = index.Idx
+ // the smaller, the more likely to be gc'd
+ gci.value = getIndexGCValue(&index)
+ s.gcArray[gcnt] = gci
+ gcnt++
+ it.Next()
+ if it.Valid() {
+ s.gcPos = it.Key()
+ } else {
+ s.gcPos = nil
+ }
+ }
+ it.Release()
+
+ cutidx := gcListSelect(s.gcArray, 0, gcnt-1, int(float32(gcnt)*ratio))
+ cutval := s.gcArray[cutidx].value
+
+ // fmt.Print(gcnt, " ", s.entryCnt, " ")
+
+ // actual gc
+ for i := 0; i < gcnt; i++ {
+ if s.gcArray[i].value <= cutval {
+ batch := new(leveldb.Batch)
+ batch.Delete(s.gcArray[i].idxKey)
+ batch.Delete(getDataKey(s.gcArray[i].idx))
+ s.entryCnt--
+ batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
+ s.db.Write(batch)
+ }
+ }
+
+ // fmt.Println(s.entryCnt)
+
+ s.db.Put(keyGCPos, s.gcPos)
+}
+
+func (s *DbStore) Counter() uint64 {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ return s.dataIdx
+}
+
+func (s *DbStore) Put(chunk *Chunk) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ ikey := getIndexKey(chunk.Key)
+ var index dpaDBIndex
+
+ if s.tryAccessIdx(ikey, &index) {
+ if chunk.dbStored != nil {
+ close(chunk.dbStored)
+ }
+ return // already exists, only update access
+ }
+
+ data := encodeData(chunk)
+ //data := ethutil.Encode([]interface{}{entry})
+
+ if s.entryCnt >= s.capacity {
+ s.collectGarbage(gcArrayFreeRatio)
+ }
+
+ batch := new(leveldb.Batch)
+
+ batch.Put(getDataKey(s.dataIdx), data)
+
+ index.Idx = s.dataIdx
+ s.updateIndexAccess(&index)
+
+ idata := encodeIndex(&index)
+ batch.Put(ikey, idata)
+
+ batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
+ s.entryCnt++
+ batch.Put(keyDataIdx, U64ToBytes(s.dataIdx))
+ s.dataIdx++
+ batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
+ s.accessCnt++
+
+ s.db.Write(batch)
+ if chunk.dbStored != nil {
+ close(chunk.dbStored)
+ }
+ glog.V(logger.Detail).Infof("DbStore.Put: %v. db storage counter: %v ", chunk.Key.Log(), s.dataIdx)
+}
+
+// try to find index; if found, update access cnt and return true
+func (s *DbStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool {
+ idata, err := s.db.Get(ikey)
+ if err != nil {
+ return false
+ }
+ decodeIndex(idata, index)
+
+ batch := new(leveldb.Batch)
+
+ batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
+ s.accessCnt++
+ s.updateIndexAccess(index)
+ idata = encodeIndex(index)
+ batch.Put(ikey, idata)
+
+ s.db.Write(batch)
+
+ return true
+}
+
+func (s *DbStore) Get(key Key) (chunk *Chunk, err error) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ var index dpaDBIndex
+
+ if s.tryAccessIdx(getIndexKey(key), &index) {
+ var data []byte
+ data, err = s.db.Get(getDataKey(index.Idx))
+ if err != nil {
+ return
+ }
+
+ hasher := s.hashfunc()
+ hasher.Write(data)
+ hash := hasher.Sum(nil)
+ if bytes.Compare(hash, key) != 0 {
+ s.db.Delete(getDataKey(index.Idx))
+ err = fmt.Errorf("invalid chunk. hash=%x, key=%v", hash, key[:])
+ return
+ }
+
+ chunk = &Chunk{
+ Key: key,
+ }
+ decodeData(data, chunk)
+ } else {
+ err = notFound
+ }
+
+ return
+
+}
+
+func (s *DbStore) updateAccessCnt(key Key) {
+
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ var index dpaDBIndex
+ s.tryAccessIdx(getIndexKey(key), &index) // result_chn == nil, only update access cnt
+
+}
+
+func (s *DbStore) setCapacity(c uint64) {
+
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.capacity = c
+
+ if s.entryCnt > c {
+ var ratio float32
+ ratio = float32(1.01) - float32(c)/float32(s.entryCnt)
+ if ratio < gcArrayFreeRatio {
+ ratio = gcArrayFreeRatio
+ }
+ if ratio > 1 {
+ ratio = 1
+ }
+ for s.entryCnt > c {
+ s.collectGarbage(ratio)
+ }
+ }
+}
+
+func (s *DbStore) getEntryCnt() uint64 {
+ return s.entryCnt
+}
+
+func (s *DbStore) close() {
+ s.db.Close()
+}
+
+// describes a section of the DbStore representing the unsynced
+// domain relevant to a peer
+// Start - Stop designate a continuous area Keys in an address space
+// typically the addresses closer to us than to the peer but not closer
+// another closer peer in between
+// From - To designates a time interval typically from the last disconnect
+// till the latest connection (real time traffic is relayed)
+type DbSyncState struct {
+ Start, Stop Key
+ First, Last uint64
+}
+
+// implements the syncer iterator interface
+// iterates by storage index (~ time of storage = first entry to db)
+type dbSyncIterator struct {
+ it iterator.Iterator
+ DbSyncState
+}
+
+// initialises a sync iterator from a syncToken (passed in with the handshake)
+func (self *DbStore) NewSyncIterator(state DbSyncState) (si *dbSyncIterator, err error) {
+ if state.First > state.Last {
+ return nil, fmt.Errorf("no entries found")
+ }
+ si = &dbSyncIterator{
+ it: self.db.NewIterator(),
+ DbSyncState: state,
+ }
+ si.it.Seek(getIndexKey(state.Start))
+ return si, nil
+}
+
+// walk the area from Start to Stop and returns items within time interval
+// First to Last
+func (self *dbSyncIterator) Next() (key Key) {
+ for self.it.Valid() {
+ dbkey := self.it.Key()
+ if dbkey[0] != 0 {
+ break
+ }
+ key = Key(make([]byte, len(dbkey)-1))
+ copy(key[:], dbkey[1:])
+ if bytes.Compare(key[:], self.Start) <= 0 {
+ self.it.Next()
+ continue
+ }
+ if bytes.Compare(key[:], self.Stop) > 0 {
+ break
+ }
+ var index dpaDBIndex
+ decodeIndex(self.it.Value(), &index)
+ self.it.Next()
+ if (index.Idx >= self.First) && (index.Idx < self.Last) {
+ return
+ }
+ }
+ self.it.Release()
+ return nil
+}