diff options
author | Viktor TrĂ³n <viktor.tron@gmail.com> | 2018-06-22 05:00:43 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-06-22 05:00:43 +0800 |
commit | eaff89291ce998ba4bf9b9816ca8a15c8b85f440 (patch) | |
tree | c77d7a06627a1a7f578d0fec8e39788e66672e53 /swarm/storage/dbstore.go | |
parent | d926bf2c7e3182d694c15829a37a0ca7331cd03c (diff) | |
parent | e187711c6545487d4cac3701f0f506bb536234e2 (diff) | |
download | dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar.gz dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar.bz2 dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar.lz dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar.xz dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar.zst dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.zip |
Merge pull request #17041 from ethersphere/swarm-network-rewrite-merge
Swarm POC3 - happy solstice
Diffstat (limited to 'swarm/storage/dbstore.go')
-rw-r--r-- | swarm/storage/dbstore.go | 600 |
1 files changed, 0 insertions, 600 deletions
diff --git a/swarm/storage/dbstore.go b/swarm/storage/dbstore.go deleted file mode 100644 index 1ff42a0c0..000000000 --- a/swarm/storage/dbstore.go +++ /dev/null @@ -1,600 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// disk storage layer for the package bzz -// DbStore implements the ChunkStore interface and is used by the DPA as -// persistent storage of chunks -// it implements purging based on access count allowing for external control of -// max capacity - -package storage - -import ( - "archive/tar" - "bytes" - "encoding/binary" - "encoding/hex" - "fmt" - "io" - "io/ioutil" - "sync" - - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/rlp" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/iterator" -) - -//metrics variables -var ( - gcCounter = metrics.NewRegisteredCounter("storage.db.dbstore.gc.count", nil) - dbStoreDeleteCounter = metrics.NewRegisteredCounter("storage.db.dbstore.rm.count", nil) -) - -const ( - defaultDbCapacity = 5000000 - defaultRadius = 0 // not yet used - - gcArraySize = 10000 - gcArrayFreeRatio = 0.1 - - // key prefixes for leveldb storage - kpIndex = 0 -) - -var ( - keyAccessCnt = []byte{2} - keyEntryCnt = []byte{3} - keyDataIdx = []byte{4} - keyGCPos = []byte{5} -) - -type gcItem struct { - idx uint64 - value uint64 - idxKey []byte -} - -type DbStore struct { - db *LDBDatabase - - // this should be stored in db, accessed transactionally - entryCnt, accessCnt, dataIdx, capacity uint64 - - gcPos, gcStartPos []byte - gcArray []*gcItem - - hashfunc SwarmHasher - - lock sync.Mutex -} - -func NewDbStore(path string, hash SwarmHasher, capacity uint64, radius int) (s *DbStore, err error) { - s = new(DbStore) - - s.hashfunc = hash - - s.db, err = NewLDBDatabase(path) - if err != nil { - return - } - - s.setCapacity(capacity) - - s.gcStartPos = make([]byte, 1) - s.gcStartPos[0] = kpIndex - s.gcArray = make([]*gcItem, gcArraySize) - - data, _ := s.db.Get(keyEntryCnt) - s.entryCnt = BytesToU64(data) - data, _ = s.db.Get(keyAccessCnt) - s.accessCnt = BytesToU64(data) - data, _ = s.db.Get(keyDataIdx) - s.dataIdx = BytesToU64(data) - s.gcPos, _ = s.db.Get(keyGCPos) - if s.gcPos == nil { - s.gcPos = s.gcStartPos - } - return -} - -type dpaDBIndex struct { - Idx uint64 - Access uint64 -} - -func BytesToU64(data []byte) uint64 { - if len(data) < 8 { - return 0 - } - return binary.LittleEndian.Uint64(data) -} - -func U64ToBytes(val uint64) []byte { - data := make([]byte, 8) - binary.LittleEndian.PutUint64(data, val) - return data -} - -func getIndexGCValue(index *dpaDBIndex) uint64 { - return index.Access -} - -func (s *DbStore) updateIndexAccess(index *dpaDBIndex) { - index.Access = s.accessCnt -} - -func getIndexKey(hash Key) []byte { - HashSize := len(hash) - key := make([]byte, HashSize+1) - key[0] = 0 - copy(key[1:], hash[:]) - return key -} - -func getDataKey(idx uint64) []byte { - key := make([]byte, 9) - key[0] = 1 - binary.BigEndian.PutUint64(key[1:9], idx) - - return key -} - -func encodeIndex(index *dpaDBIndex) []byte { - data, _ := rlp.EncodeToBytes(index) - return data -} - -func encodeData(chunk *Chunk) []byte { - return chunk.SData -} - -func decodeIndex(data []byte, index *dpaDBIndex) { - dec := rlp.NewStream(bytes.NewReader(data), 0) - dec.Decode(index) -} - -func decodeData(data []byte, chunk *Chunk) { - chunk.SData = data - chunk.Size = int64(binary.LittleEndian.Uint64(data[0:8])) -} - -func gcListPartition(list []*gcItem, left int, right int, pivotIndex int) int { - pivotValue := list[pivotIndex].value - dd := list[pivotIndex] - list[pivotIndex] = list[right] - list[right] = dd - storeIndex := left - for i := left; i < right; i++ { - if list[i].value < pivotValue { - dd = list[storeIndex] - list[storeIndex] = list[i] - list[i] = dd - storeIndex++ - } - } - dd = list[storeIndex] - list[storeIndex] = list[right] - list[right] = dd - return storeIndex -} - -func gcListSelect(list []*gcItem, left int, right int, n int) int { - if left == right { - return left - } - pivotIndex := (left + right) / 2 - pivotIndex = gcListPartition(list, left, right, pivotIndex) - if n == pivotIndex { - return n - } else { - if n < pivotIndex { - return gcListSelect(list, left, pivotIndex-1, n) - } else { - return gcListSelect(list, pivotIndex+1, right, n) - } - } -} - -func (s *DbStore) collectGarbage(ratio float32) { - it := s.db.NewIterator() - it.Seek(s.gcPos) - if it.Valid() { - s.gcPos = it.Key() - } else { - s.gcPos = nil - } - gcnt := 0 - - for (gcnt < gcArraySize) && (uint64(gcnt) < s.entryCnt) { - - if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) { - it.Seek(s.gcStartPos) - if it.Valid() { - s.gcPos = it.Key() - } else { - s.gcPos = nil - } - } - - if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) { - break - } - - gci := new(gcItem) - gci.idxKey = s.gcPos - var index dpaDBIndex - decodeIndex(it.Value(), &index) - gci.idx = index.Idx - // the smaller, the more likely to be gc'd - gci.value = getIndexGCValue(&index) - s.gcArray[gcnt] = gci - gcnt++ - it.Next() - if it.Valid() { - s.gcPos = it.Key() - } else { - s.gcPos = nil - } - } - it.Release() - - cutidx := gcListSelect(s.gcArray, 0, gcnt-1, int(float32(gcnt)*ratio)) - cutval := s.gcArray[cutidx].value - - // fmt.Print(gcnt, " ", s.entryCnt, " ") - - // actual gc - for i := 0; i < gcnt; i++ { - if s.gcArray[i].value <= cutval { - gcCounter.Inc(1) - s.delete(s.gcArray[i].idx, s.gcArray[i].idxKey) - } - } - - // fmt.Println(s.entryCnt) - - s.db.Put(keyGCPos, s.gcPos) -} - -// Export writes all chunks from the store to a tar archive, returning the -// number of chunks written. -func (s *DbStore) Export(out io.Writer) (int64, error) { - tw := tar.NewWriter(out) - defer tw.Close() - - it := s.db.NewIterator() - defer it.Release() - var count int64 - for ok := it.Seek([]byte{kpIndex}); ok; ok = it.Next() { - key := it.Key() - if (key == nil) || (key[0] != kpIndex) { - break - } - - var index dpaDBIndex - decodeIndex(it.Value(), &index) - - data, err := s.db.Get(getDataKey(index.Idx)) - if err != nil { - log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key[:], err)) - continue - } - - hdr := &tar.Header{ - Name: hex.EncodeToString(key[1:]), - Mode: 0644, - Size: int64(len(data)), - } - if err := tw.WriteHeader(hdr); err != nil { - return count, err - } - if _, err := tw.Write(data); err != nil { - return count, err - } - count++ - } - - return count, nil -} - -// Import reads chunks into the store from a tar archive, returning the number -// of chunks read. -func (s *DbStore) Import(in io.Reader) (int64, error) { - tr := tar.NewReader(in) - - var count int64 - for { - hdr, err := tr.Next() - if err == io.EOF { - break - } else if err != nil { - return count, err - } - - if len(hdr.Name) != 64 { - log.Warn("ignoring non-chunk file", "name", hdr.Name) - continue - } - - key, err := hex.DecodeString(hdr.Name) - if err != nil { - log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err) - continue - } - - data, err := ioutil.ReadAll(tr) - if err != nil { - return count, err - } - - s.Put(&Chunk{Key: key, SData: data}) - count++ - } - - return count, nil -} - -func (s *DbStore) Cleanup() { - //Iterates over the database and checks that there are no faulty chunks - it := s.db.NewIterator() - startPosition := []byte{kpIndex} - it.Seek(startPosition) - var key []byte - var errorsFound, total int - for it.Valid() { - key = it.Key() - if (key == nil) || (key[0] != kpIndex) { - break - } - total++ - var index dpaDBIndex - decodeIndex(it.Value(), &index) - - data, err := s.db.Get(getDataKey(index.Idx)) - if err != nil { - log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key[:], err)) - s.delete(index.Idx, getIndexKey(key[1:])) - errorsFound++ - } else { - hasher := s.hashfunc() - hasher.Write(data) - hash := hasher.Sum(nil) - if !bytes.Equal(hash, key[1:]) { - log.Warn(fmt.Sprintf("Found invalid chunk. Hash mismatch. hash=%x, key=%x", hash, key[:])) - s.delete(index.Idx, getIndexKey(key[1:])) - errorsFound++ - } - } - it.Next() - } - it.Release() - log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total)) -} - -func (s *DbStore) delete(idx uint64, idxKey []byte) { - batch := new(leveldb.Batch) - batch.Delete(idxKey) - batch.Delete(getDataKey(idx)) - dbStoreDeleteCounter.Inc(1) - s.entryCnt-- - batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt)) - s.db.Write(batch) -} - -func (s *DbStore) Counter() uint64 { - s.lock.Lock() - defer s.lock.Unlock() - return s.dataIdx -} - -func (s *DbStore) Put(chunk *Chunk) { - s.lock.Lock() - defer s.lock.Unlock() - - ikey := getIndexKey(chunk.Key) - var index dpaDBIndex - - if s.tryAccessIdx(ikey, &index) { - if chunk.dbStored != nil { - close(chunk.dbStored) - } - log.Trace(fmt.Sprintf("Storing to DB: chunk already exists, only update access")) - return // already exists, only update access - } - - data := encodeData(chunk) - //data := ethutil.Encode([]interface{}{entry}) - - if s.entryCnt >= s.capacity { - s.collectGarbage(gcArrayFreeRatio) - } - - batch := new(leveldb.Batch) - - batch.Put(getDataKey(s.dataIdx), data) - - index.Idx = s.dataIdx - s.updateIndexAccess(&index) - - idata := encodeIndex(&index) - batch.Put(ikey, idata) - - batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt)) - s.entryCnt++ - batch.Put(keyDataIdx, U64ToBytes(s.dataIdx)) - s.dataIdx++ - batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt)) - s.accessCnt++ - - s.db.Write(batch) - if chunk.dbStored != nil { - close(chunk.dbStored) - } - log.Trace(fmt.Sprintf("DbStore.Put: %v. db storage counter: %v ", chunk.Key.Log(), s.dataIdx)) -} - -// try to find index; if found, update access cnt and return true -func (s *DbStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool { - idata, err := s.db.Get(ikey) - if err != nil { - return false - } - decodeIndex(idata, index) - - batch := new(leveldb.Batch) - - batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt)) - s.accessCnt++ - s.updateIndexAccess(index) - idata = encodeIndex(index) - batch.Put(ikey, idata) - - s.db.Write(batch) - - return true -} - -func (s *DbStore) Get(key Key) (chunk *Chunk, err error) { - s.lock.Lock() - defer s.lock.Unlock() - - var index dpaDBIndex - - if s.tryAccessIdx(getIndexKey(key), &index) { - var data []byte - data, err = s.db.Get(getDataKey(index.Idx)) - if err != nil { - log.Trace(fmt.Sprintf("DBStore: Chunk %v found but could not be accessed: %v", key.Log(), err)) - s.delete(index.Idx, getIndexKey(key)) - return - } - - hasher := s.hashfunc() - hasher.Write(data) - hash := hasher.Sum(nil) - if !bytes.Equal(hash, key) { - s.delete(index.Idx, getIndexKey(key)) - log.Warn("Invalid Chunk in Database. Please repair with command: 'swarm cleandb'") - } - - chunk = &Chunk{ - Key: key, - } - decodeData(data, chunk) - } else { - err = notFound - } - - return - -} - -func (s *DbStore) updateAccessCnt(key Key) { - - s.lock.Lock() - defer s.lock.Unlock() - - var index dpaDBIndex - s.tryAccessIdx(getIndexKey(key), &index) // result_chn == nil, only update access cnt - -} - -func (s *DbStore) setCapacity(c uint64) { - - s.lock.Lock() - defer s.lock.Unlock() - - s.capacity = c - - if s.entryCnt > c { - ratio := float32(1.01) - float32(c)/float32(s.entryCnt) - if ratio < gcArrayFreeRatio { - ratio = gcArrayFreeRatio - } - if ratio > 1 { - ratio = 1 - } - for s.entryCnt > c { - s.collectGarbage(ratio) - } - } -} - -func (s *DbStore) Close() { - s.db.Close() -} - -// describes a section of the DbStore representing the unsynced -// domain relevant to a peer -// Start - Stop designate a continuous area Keys in an address space -// typically the addresses closer to us than to the peer but not closer -// another closer peer in between -// From - To designates a time interval typically from the last disconnect -// till the latest connection (real time traffic is relayed) -type DbSyncState struct { - Start, Stop Key - First, Last uint64 -} - -// implements the syncer iterator interface -// iterates by storage index (~ time of storage = first entry to db) -type dbSyncIterator struct { - it iterator.Iterator - DbSyncState -} - -// initialises a sync iterator from a syncToken (passed in with the handshake) -func (self *DbStore) NewSyncIterator(state DbSyncState) (si *dbSyncIterator, err error) { - if state.First > state.Last { - return nil, fmt.Errorf("no entries found") - } - si = &dbSyncIterator{ - it: self.db.NewIterator(), - DbSyncState: state, - } - si.it.Seek(getIndexKey(state.Start)) - return si, nil -} - -// walk the area from Start to Stop and returns items within time interval -// First to Last -func (self *dbSyncIterator) Next() (key Key) { - for self.it.Valid() { - dbkey := self.it.Key() - if dbkey[0] != 0 { - break - } - key = Key(make([]byte, len(dbkey)-1)) - copy(key[:], dbkey[1:]) - if bytes.Compare(key[:], self.Start) <= 0 { - self.it.Next() - continue - } - if bytes.Compare(key[:], self.Stop) > 0 { - break - } - var index dpaDBIndex - decodeIndex(self.it.Value(), &index) - self.it.Next() - if (index.Idx >= self.First) && (index.Idx < self.Last) { - return - } - } - self.it.Release() - return nil -} |