aboutsummaryrefslogblamecommitdiffstats
path: root/swarm/storage/dbstore.go
blob: 421bb061d5239fe9a53ec0b627b47f4c7df399d4 (plain) (tree)
























                                                                                  
                     

                         
                      
             

                   

              
                                             
                                                 




                                                      





                                                                                               

































                                                                
                            



                       
                                                                                                     
















































































































































































                                                                               
                                        
                                                                       







                                   













































































                                                                                                          

















                                                                               
                                                                                                          






                                                                 
                                                                                                                          






                                                                         
                                                                                      





                                                     
                                   




                                                      
















                                             
                                                                                                 






























                                                            
                                                                                                      
































                                                                     
                                                                                                                       
                                                             





                                       
                                            
                                                             
                                                                                                          































                                                                                             
                                                                       











                                                
                           





























































                                                                                         
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

// disk storage layer for the package bzz
// DbStore implements the ChunkStore interface and is used by the DPA as
// persistent storage of chunks
// it implements purging based on access count allowing for external control of
// max capacity

package storage

import (
    "archive/tar"
    "bytes"
    "encoding/binary"
    "encoding/hex"
    "fmt"
    "io"
    "io/ioutil"
    "sync"

    "github.com/ethereum/go-ethereum/log"
    "github.com/ethereum/go-ethereum/metrics"
    "github.com/ethereum/go-ethereum/rlp"
    "github.com/syndtr/goleveldb/leveldb"
    "github.com/syndtr/goleveldb/leveldb/iterator"
)

//metrics variables
var (
    gcCounter            = metrics.NewRegisteredCounter("storage.db.dbstore.gc.count", nil)
    dbStoreDeleteCounter = metrics.NewRegisteredCounter("storage.db.dbstore.rm.count", nil)
)

const (
    defaultDbCapacity = 5000000
    defaultRadius     = 0 // not yet used

    gcArraySize      = 10000
    gcArrayFreeRatio = 0.1

    // key prefixes for leveldb storage
    kpIndex = 0
    kpData  = 1
)

var (
    keyAccessCnt = []byte{2}
    keyEntryCnt  = []byte{3}
    keyDataIdx   = []byte{4}
    keyGCPos     = []byte{5}
)

type gcItem struct {
    idx    uint64
    value  uint64
    idxKey []byte
}

type DbStore struct {
    db *LDBDatabase

    // this should be stored in db, accessed transactionally
    entryCnt, accessCnt, dataIdx, capacity uint64

    gcPos, gcStartPos []byte
    gcArray           []*gcItem

    hashfunc SwarmHasher

    lock sync.Mutex
}

func NewDbStore(path string, hash SwarmHasher, capacity uint64, radius int) (s *DbStore, err error) {
    s = new(DbStore)

    s.hashfunc = hash

    s.db, err = NewLDBDatabase(path)
    if err != nil {
        return
    }

    s.setCapacity(capacity)

    s.gcStartPos = make([]byte, 1)
    s.gcStartPos[0] = kpIndex
    s.gcArray = make([]*gcItem, gcArraySize)

    data, _ := s.db.Get(keyEntryCnt)
    s.entryCnt = BytesToU64(data)
    data, _ = s.db.Get(keyAccessCnt)
    s.accessCnt = BytesToU64(data)
    data, _ = s.db.Get(keyDataIdx)
    s.dataIdx = BytesToU64(data)
    s.gcPos, _ = s.db.Get(keyGCPos)
    if s.gcPos == nil {
        s.gcPos = s.gcStartPos
    }
    return
}

type dpaDBIndex struct {
    Idx    uint64
    Access uint64
}

func BytesToU64(data []byte) uint64 {
    if len(data) < 8 {
        return 0
    }
    return binary.LittleEndian.Uint64(data)
}

func U64ToBytes(val uint64) []byte {
    data := make([]byte, 8)
    binary.LittleEndian.PutUint64(data, val)
    return data
}

func getIndexGCValue(index *dpaDBIndex) uint64 {
    return index.Access
}

func (s *DbStore) updateIndexAccess(index *dpaDBIndex) {
    index.Access = s.accessCnt
}

func getIndexKey(hash Key) []byte {
    HashSize := len(hash)
    key := make([]byte, HashSize+1)
    key[0] = 0
    copy(key[1:], hash[:])
    return key
}

func getDataKey(idx uint64) []byte {
    key := make([]byte, 9)
    key[0] = 1
    binary.BigEndian.PutUint64(key[1:9], idx)

    return key
}

func encodeIndex(index *dpaDBIndex) []byte {
    data, _ := rlp.EncodeToBytes(index)
    return data
}

func encodeData(chunk *Chunk) []byte {
    return chunk.SData
}

func decodeIndex(data []byte, index *dpaDBIndex) {
    dec := rlp.NewStream(bytes.NewReader(data), 0)
    dec.Decode(index)
}

func decodeData(data []byte, chunk *Chunk) {
    chunk.SData = data
    chunk.Size = int64(binary.LittleEndian.Uint64(data[0:8]))
}

func gcListPartition(list []*gcItem, left int, right int, pivotIndex int) int {
    pivotValue := list[pivotIndex].value
    dd := list[pivotIndex]
    list[pivotIndex] = list[right]
    list[right] = dd
    storeIndex := left
    for i := left; i < right; i++ {
        if list[i].value < pivotValue {
            dd = list[storeIndex]
            list[storeIndex] = list[i]
            list[i] = dd
            storeIndex++
        }
    }
    dd = list[storeIndex]
    list[storeIndex] = list[right]
    list[right] = dd
    return storeIndex
}

func gcListSelect(list []*gcItem, left int, right int, n int) int {
    if left == right {
        return left
    }
    pivotIndex := (left + right) / 2
    pivotIndex = gcListPartition(list, left, right, pivotIndex)
    if n == pivotIndex {
        return n
    } else {
        if n < pivotIndex {
            return gcListSelect(list, left, pivotIndex-1, n)
        } else {
            return gcListSelect(list, pivotIndex+1, right, n)
        }
    }
}

func (s *DbStore) collectGarbage(ratio float32) {
    it := s.db.NewIterator()
    it.Seek(s.gcPos)
    if it.Valid() {
        s.gcPos = it.Key()
    } else {
        s.gcPos = nil
    }
    gcnt := 0

    for (gcnt < gcArraySize) && (uint64(gcnt) < s.entryCnt) {

        if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) {
            it.Seek(s.gcStartPos)
            if it.Valid() {
                s.gcPos = it.Key()
            } else {
                s.gcPos = nil
            }
        }

        if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) {
            break
        }

        gci := new(gcItem)
        gci.idxKey = s.gcPos
        var index dpaDBIndex
        decodeIndex(it.Value(), &index)
        gci.idx = index.Idx
        // the smaller, the more likely to be gc'd
        gci.value = getIndexGCValue(&index)
        s.gcArray[gcnt] = gci
        gcnt++
        it.Next()
        if it.Valid() {
            s.gcPos = it.Key()
        } else {
            s.gcPos = nil
        }
    }
    it.Release()

    cutidx := gcListSelect(s.gcArray, 0, gcnt-1, int(float32(gcnt)*ratio))
    cutval := s.gcArray[cutidx].value

    // fmt.Print(gcnt, " ", s.entryCnt, " ")

    // actual gc
    for i := 0; i < gcnt; i++ {
        if s.gcArray[i].value <= cutval {
            gcCounter.Inc(1)
            s.delete(s.gcArray[i].idx, s.gcArray[i].idxKey)
        }
    }

    // fmt.Println(s.entryCnt)

    s.db.Put(keyGCPos, s.gcPos)
}

// Export writes all chunks from the store to a tar archive, returning the
// number of chunks written.
func (s *DbStore) Export(out io.Writer) (int64, error) {
    tw := tar.NewWriter(out)
    defer tw.Close()

    it := s.db.NewIterator()
    defer it.Release()
    var count int64
    for ok := it.Seek([]byte{kpIndex}); ok; ok = it.Next() {
        key := it.Key()
        if (key == nil) || (key[0] != kpIndex) {
            break
        }

        var index dpaDBIndex
        decodeIndex(it.Value(), &index)

        data, err := s.db.Get(getDataKey(index.Idx))
        if err != nil {
            log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key[:], err))
            continue
        }

        hdr := &tar.Header{
            Name: hex.EncodeToString(key[1:]),
            Mode: 0644,
            Size: int64(len(data)),
        }
        if err := tw.WriteHeader(hdr); err != nil {
            return count, err
        }
        if _, err := tw.Write(data); err != nil {
            return count, err
        }
        count++
    }

    return count, nil
}

// Import reads chunks into the store from a tar archive, returning the number
// of chunks read.
func (s *DbStore) Import(in io.Reader) (int64, error) {
    tr := tar.NewReader(in)

    var count int64
    for {
        hdr, err := tr.Next()
        if err == io.EOF {
            break
        } else if err != nil {
            return count, err
        }

        if len(hdr.Name) != 64 {
            log.Warn("ignoring non-chunk file", "name", hdr.Name)
            continue
        }

        key, err := hex.DecodeString(hdr.Name)
        if err != nil {
            log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
            continue
        }

        data, err := ioutil.ReadAll(tr)
        if err != nil {
            return count, err
        }

        s.Put(&Chunk{Key: key, SData: data})
        count++
    }

    return count, nil
}

func (s *DbStore) Cleanup() {
    //Iterates over the database and checks that there are no faulty chunks
    it := s.db.NewIterator()
    startPosition := []byte{kpIndex}
    it.Seek(startPosition)
    var key []byte
    var errorsFound, total int
    for it.Valid() {
        key = it.Key()
        if (key == nil) || (key[0] != kpIndex) {
            break
        }
        total++
        var index dpaDBIndex
        decodeIndex(it.Value(), &index)

        data, err := s.db.Get(getDataKey(index.Idx))
        if err != nil {
            log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key[:], err))
            s.delete(index.Idx, getIndexKey(key[1:]))
            errorsFound++
        } else {
            hasher := s.hashfunc()
            hasher.Write(data)
            hash := hasher.Sum(nil)
            if !bytes.Equal(hash, key[1:]) {
                log.Warn(fmt.Sprintf("Found invalid chunk. Hash mismatch. hash=%x, key=%x", hash, key[:]))
                s.delete(index.Idx, getIndexKey(key[1:]))
                errorsFound++
            }
        }
        it.Next()
    }
    it.Release()
    log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total))
}

func (s *DbStore) delete(idx uint64, idxKey []byte) {
    batch := new(leveldb.Batch)
    batch.Delete(idxKey)
    batch.Delete(getDataKey(idx))
    dbStoreDeleteCounter.Inc(1)
    s.entryCnt--
    batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
    s.db.Write(batch)
}

func (s *DbStore) Counter() uint64 {
    s.lock.Lock()
    defer s.lock.Unlock()
    return s.dataIdx
}

func (s *DbStore) Put(chunk *Chunk) {
    s.lock.Lock()
    defer s.lock.Unlock()

    ikey := getIndexKey(chunk.Key)
    var index dpaDBIndex

    if s.tryAccessIdx(ikey, &index) {
        if chunk.dbStored != nil {
            close(chunk.dbStored)
        }
        log.Trace(fmt.Sprintf("Storing to DB: chunk already exists, only update access"))
        return // already exists, only update access
    }

    data := encodeData(chunk)
    //data := ethutil.Encode([]interface{}{entry})

    if s.entryCnt >= s.capacity {
        s.collectGarbage(gcArrayFreeRatio)
    }

    batch := new(leveldb.Batch)

    batch.Put(getDataKey(s.dataIdx), data)

    index.Idx = s.dataIdx
    s.updateIndexAccess(&index)

    idata := encodeIndex(&index)
    batch.Put(ikey, idata)

    batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
    s.entryCnt++
    batch.Put(keyDataIdx, U64ToBytes(s.dataIdx))
    s.dataIdx++
    batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
    s.accessCnt++

    s.db.Write(batch)
    if chunk.dbStored != nil {
        close(chunk.dbStored)
    }
    log.Trace(fmt.Sprintf("DbStore.Put: %v. db storage counter: %v ", chunk.Key.Log(), s.dataIdx))
}

// try to find index; if found, update access cnt and return true
func (s *DbStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool {
    idata, err := s.db.Get(ikey)
    if err != nil {
        return false
    }
    decodeIndex(idata, index)

    batch := new(leveldb.Batch)

    batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
    s.accessCnt++
    s.updateIndexAccess(index)
    idata = encodeIndex(index)
    batch.Put(ikey, idata)

    s.db.Write(batch)

    return true
}

func (s *DbStore) Get(key Key) (chunk *Chunk, err error) {
    s.lock.Lock()
    defer s.lock.Unlock()

    var index dpaDBIndex

    if s.tryAccessIdx(getIndexKey(key), &index) {
        var data []byte
        data, err = s.db.Get(getDataKey(index.Idx))
        if err != nil {
            log.Trace(fmt.Sprintf("DBStore: Chunk %v found but could not be accessed: %v", key.Log(), err))
            s.delete(index.Idx, getIndexKey(key))
            return
        }

        hasher := s.hashfunc()
        hasher.Write(data)
        hash := hasher.Sum(nil)
        if !bytes.Equal(hash, key) {
            s.delete(index.Idx, getIndexKey(key))
            log.Warn("Invalid Chunk in Database. Please repair with command: 'swarm cleandb'")
        }

        chunk = &Chunk{
            Key: key,
        }
        decodeData(data, chunk)
    } else {
        err = notFound
    }

    return

}

func (s *DbStore) updateAccessCnt(key Key) {

    s.lock.Lock()
    defer s.lock.Unlock()

    var index dpaDBIndex
    s.tryAccessIdx(getIndexKey(key), &index) // result_chn == nil, only update access cnt

}

func (s *DbStore) setCapacity(c uint64) {

    s.lock.Lock()
    defer s.lock.Unlock()

    s.capacity = c

    if s.entryCnt > c {
        ratio := float32(1.01) - float32(c)/float32(s.entryCnt)
        if ratio < gcArrayFreeRatio {
            ratio = gcArrayFreeRatio
        }
        if ratio > 1 {
            ratio = 1
        }
        for s.entryCnt > c {
            s.collectGarbage(ratio)
        }
    }
}

func (s *DbStore) Close() {
    s.db.Close()
}

//  describes a section of the DbStore representing the unsynced
// domain relevant to a peer
// Start - Stop designate a continuous area Keys in an address space
// typically the addresses closer to us than to the peer but not closer
// another closer peer in between
// From - To designates a time interval typically from the last disconnect
// till the latest connection (real time traffic is relayed)
type DbSyncState struct {
    Start, Stop Key
    First, Last uint64
}

// implements the syncer iterator interface
// iterates by storage index (~ time of storage = first entry to db)
type dbSyncIterator struct {
    it iterator.Iterator
    DbSyncState
}

// initialises a sync iterator from a syncToken (passed in with the handshake)
func (self *DbStore) NewSyncIterator(state DbSyncState) (si *dbSyncIterator, err error) {
    if state.First > state.Last {
        return nil, fmt.Errorf("no entries found")
    }
    si = &dbSyncIterator{
        it:          self.db.NewIterator(),
        DbSyncState: state,
    }
    si.it.Seek(getIndexKey(state.Start))
    return si, nil
}

// walk the area from Start to Stop and returns items within time interval
// First to Last
func (self *dbSyncIterator) Next() (key Key) {
    for self.it.Valid() {
        dbkey := self.it.Key()
        if dbkey[0] != 0 {
            break
        }
        key = Key(make([]byte, len(dbkey)-1))
        copy(key[:], dbkey[1:])
        if bytes.Compare(key[:], self.Start) <= 0 {
            self.it.Next()
            continue
        }
        if bytes.Compare(key[:], self.Stop) > 0 {
            break
        }
        var index dpaDBIndex
        decodeIndex(self.it.Value(), &index)
        self.it.Next()
        if (index.Idx >= self.First) && (index.Idx < self.Last) {
            return
        }
    }
    self.it.Release()
    return nil
}