aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/memstore.go
diff options
context:
space:
mode:
authorethersphere <thesw@rm.eth>2018-06-20 20:06:27 +0800
committerethersphere <thesw@rm.eth>2018-06-22 03:10:31 +0800
commite187711c6545487d4cac3701f0f506bb536234e2 (patch)
treed2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/storage/memstore.go
parent574378edb50c907b532946a1d4654dbd6701b20a (diff)
downloaddexon-e187711c6545487d4cac3701f0f506bb536234e2.tar
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst
dexon-e187711c6545487d4cac3701f0f506bb536234e2.zip
swarm: network rewrite merge
Diffstat (limited to 'swarm/storage/memstore.go')
-rw-r--r--swarm/storage/memstore.go361
1 files changed, 87 insertions, 274 deletions
diff --git a/swarm/storage/memstore.go b/swarm/storage/memstore.go
index d6be54220..7af31ffbd 100644
--- a/swarm/storage/memstore.go
+++ b/swarm/storage/memstore.go
@@ -1,4 +1,4 @@
-// Copyright 2016 The go-ethereum Authors
+// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
@@ -19,316 +19,129 @@
package storage
import (
- "fmt"
"sync"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/metrics"
-)
-
-//metrics variables
-var (
- memstorePutCounter = metrics.NewRegisteredCounter("storage.db.memstore.put.count", nil)
- memstoreRemoveCounter = metrics.NewRegisteredCounter("storage.db.memstore.rm.count", nil)
-)
-
-const (
- memTreeLW = 2 // log2(subtree count) of the subtrees
- memTreeFLW = 14 // log2(subtree count) of the root layer
- dbForceUpdateAccessCnt = 1000
- defaultCacheCapacity = 5000
+ lru "github.com/hashicorp/golang-lru"
)
type MemStore struct {
- memtree *memTree
- entryCnt, capacity uint // stored entries
- accessCnt uint64 // access counter; oldest is thrown away when full
- dbAccessCnt uint64
- dbStore *DbStore
- lock sync.Mutex
-}
-
-/*
-a hash prefix subtree containing subtrees or one storage entry (but never both)
-
-- access[0] stores the smallest (oldest) access count value in this subtree
-- if it contains more subtrees and its subtree count is at least 4, access[1:2]
- stores the smallest access count in the first and second halves of subtrees
- (so that access[0] = min(access[1], access[2])
-- likewise, if subtree count is at least 8,
- access[1] = min(access[3], access[4])
- access[2] = min(access[5], access[6])
- (access[] is a binary tree inside the multi-bit leveled hash tree)
-*/
-
-func NewMemStore(d *DbStore, capacity uint) (m *MemStore) {
- m = &MemStore{}
- m.memtree = newMemTree(memTreeFLW, nil, 0)
- m.dbStore = d
- m.setCapacity(capacity)
- return
+ cache *lru.Cache
+ requests *lru.Cache
+ mu sync.RWMutex
+ disabled bool
}
-type memTree struct {
- subtree []*memTree
- parent *memTree
- parentIdx uint
-
- bits uint // log2(subtree count)
- width uint // subtree count
-
- entry *Chunk // if subtrees are present, entry should be nil
- lastDBaccess uint64
- access []uint64
-}
-
-func newMemTree(b uint, parent *memTree, pidx uint) (node *memTree) {
- node = new(memTree)
- node.bits = b
- node.width = 1 << b
- node.subtree = make([]*memTree, node.width)
- node.access = make([]uint64, node.width-1)
- node.parent = parent
- node.parentIdx = pidx
- if parent != nil {
- parent.subtree[pidx] = node
- }
-
- return node
-}
-
-func (node *memTree) updateAccess(a uint64) {
- aidx := uint(0)
- var aa uint64
- oa := node.access[0]
- for node.access[aidx] == oa {
- node.access[aidx] = a
- if aidx > 0 {
- aa = node.access[((aidx-1)^1)+1]
- aidx = (aidx - 1) >> 1
- } else {
- pidx := node.parentIdx
- node = node.parent
- if node == nil {
- return
- }
- nn := node.subtree[pidx^1]
- if nn != nil {
- aa = nn.access[0]
- } else {
- aa = 0
- }
- aidx = (node.width + pidx - 2) >> 1
- }
-
- if (aa != 0) && (aa < a) {
- a = aa
+//NewMemStore is instantiating a MemStore cache. We are keeping a record of all outgoing requests for chunks, that
+//should later be delivered by peer nodes, in the `requests` LRU cache. We are also keeping all frequently requested
+//chunks in the `cache` LRU cache.
+//
+//`requests` LRU cache capacity should ideally never be reached, this is why for the time being it should be initialised
+//with the same value as the LDBStore capacity.
+func NewMemStore(params *StoreParams, _ *LDBStore) (m *MemStore) {
+ if params.CacheCapacity == 0 {
+ return &MemStore{
+ disabled: true,
}
}
-}
-
-func (s *MemStore) setCapacity(c uint) {
- s.lock.Lock()
- defer s.lock.Unlock()
- for c < s.entryCnt {
- s.removeOldest()
+ onEvicted := func(key interface{}, value interface{}) {
+ v := value.(*Chunk)
+ <-v.dbStoredC
}
- s.capacity = c
-}
-
-func (s *MemStore) Counter() uint {
- return s.entryCnt
-}
-
-// entry (not its copy) is going to be in MemStore
-func (s *MemStore) Put(entry *Chunk) {
- if s.capacity == 0 {
- return
+ c, err := lru.NewWithEvict(int(params.CacheCapacity), onEvicted)
+ if err != nil {
+ panic(err)
}
- s.lock.Lock()
- defer s.lock.Unlock()
-
- if s.entryCnt >= s.capacity {
- s.removeOldest()
+ requestEvicted := func(key interface{}, value interface{}) {
+ // temporary remove of the error log, until we figure out the problem, as it is too spamy
+ //log.Error("evict called on outgoing request")
}
-
- s.accessCnt++
-
- memstorePutCounter.Inc(1)
-
- node := s.memtree
- bitpos := uint(0)
- for node.entry == nil {
- l := entry.Key.bits(bitpos, node.bits)
- st := node.subtree[l]
- if st == nil {
- st = newMemTree(memTreeLW, node, l)
- bitpos += node.bits
- node = st
- break
- }
- bitpos += node.bits
- node = st
+ r, err := lru.NewWithEvict(int(params.ChunkRequestsCacheCapacity), requestEvicted)
+ if err != nil {
+ panic(err)
}
- if node.entry != nil {
-
- if node.entry.Key.isEqual(entry.Key) {
- node.updateAccess(s.accessCnt)
- if entry.SData == nil {
- entry.Size = node.entry.Size
- entry.SData = node.entry.SData
- }
- if entry.Req == nil {
- entry.Req = node.entry.Req
- }
- entry.C = node.entry.C
- node.entry = entry
- return
- }
-
- for node.entry != nil {
+ return &MemStore{
+ cache: c,
+ requests: r,
+ }
+}
- l := node.entry.Key.bits(bitpos, node.bits)
- st := node.subtree[l]
- if st == nil {
- st = newMemTree(memTreeLW, node, l)
- }
- st.entry = node.entry
- node.entry = nil
- st.updateAccess(node.access[0])
+func (m *MemStore) Get(addr Address) (*Chunk, error) {
+ if m.disabled {
+ return nil, ErrChunkNotFound
+ }
- l = entry.Key.bits(bitpos, node.bits)
- st = node.subtree[l]
- if st == nil {
- st = newMemTree(memTreeLW, node, l)
- }
- bitpos += node.bits
- node = st
+ m.mu.RLock()
+ defer m.mu.RUnlock()
- }
+ r, ok := m.requests.Get(string(addr))
+ // it is a request
+ if ok {
+ return r.(*Chunk), nil
}
- node.entry = entry
- node.lastDBaccess = s.dbAccessCnt
- node.updateAccess(s.accessCnt)
- s.entryCnt++
+ // it is not a request
+ c, ok := m.cache.Get(string(addr))
+ if !ok {
+ return nil, ErrChunkNotFound
+ }
+ return c.(*Chunk), nil
}
-func (s *MemStore) Get(hash Key) (chunk *Chunk, err error) {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- node := s.memtree
- bitpos := uint(0)
- for node.entry == nil {
- l := hash.bits(bitpos, node.bits)
- st := node.subtree[l]
- if st == nil {
- return nil, notFound
- }
- bitpos += node.bits
- node = st
+func (m *MemStore) Put(c *Chunk) {
+ if m.disabled {
+ return
}
- if node.entry.Key.isEqual(hash) {
- s.accessCnt++
- node.updateAccess(s.accessCnt)
- chunk = node.entry
- if s.dbAccessCnt-node.lastDBaccess > dbForceUpdateAccessCnt {
- s.dbAccessCnt++
- node.lastDBaccess = s.dbAccessCnt
- if s.dbStore != nil {
- s.dbStore.updateAccessCnt(hash)
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ // it is a request
+ if c.ReqC != nil {
+ select {
+ case <-c.ReqC:
+ if c.GetErrored() != nil {
+ m.requests.Remove(string(c.Addr))
+ return
}
+ m.cache.Add(string(c.Addr), c)
+ m.requests.Remove(string(c.Addr))
+ default:
+ m.requests.Add(string(c.Addr), c)
}
- } else {
- err = notFound
+ return
}
- return
+ // it is not a request
+ m.cache.Add(string(c.Addr), c)
+ m.requests.Remove(string(c.Addr))
}
-func (s *MemStore) removeOldest() {
- node := s.memtree
-
- for node.entry == nil {
-
- aidx := uint(0)
- av := node.access[aidx]
-
- for aidx < node.width/2-1 {
- if av == node.access[aidx*2+1] {
- node.access[aidx] = node.access[aidx*2+2]
- aidx = aidx*2 + 1
- } else if av == node.access[aidx*2+2] {
- node.access[aidx] = node.access[aidx*2+1]
- aidx = aidx*2 + 2
- } else {
- panic(nil)
- }
+func (m *MemStore) setCapacity(n int) {
+ if n <= 0 {
+ m.disabled = true
+ } else {
+ onEvicted := func(key interface{}, value interface{}) {
+ v := value.(*Chunk)
+ <-v.dbStoredC
}
- pidx := aidx*2 + 2 - node.width
- if (node.subtree[pidx] != nil) && (av == node.subtree[pidx].access[0]) {
- if node.subtree[pidx+1] != nil {
- node.access[aidx] = node.subtree[pidx+1].access[0]
- } else {
- node.access[aidx] = 0
- }
- } else if (node.subtree[pidx+1] != nil) && (av == node.subtree[pidx+1].access[0]) {
- if node.subtree[pidx] != nil {
- node.access[aidx] = node.subtree[pidx].access[0]
- } else {
- node.access[aidx] = 0
- }
- pidx++
- } else {
- panic(nil)
+ c, err := lru.NewWithEvict(n, onEvicted)
+ if err != nil {
+ panic(err)
}
- //fmt.Println(pidx)
- node = node.subtree[pidx]
-
- }
-
- if node.entry.dbStored != nil {
- log.Trace(fmt.Sprintf("Memstore Clean: Waiting for chunk %v to be saved", node.entry.Key.Log()))
- <-node.entry.dbStored
- log.Trace(fmt.Sprintf("Memstore Clean: Chunk %v saved to DBStore. Ready to clear from mem.", node.entry.Key.Log()))
- } else {
- log.Trace(fmt.Sprintf("Memstore Clean: Chunk %v already in DB. Ready to delete.", node.entry.Key.Log()))
- }
-
- if node.entry.SData != nil {
- memstoreRemoveCounter.Inc(1)
- node.entry = nil
- s.entryCnt--
- }
-
- node.access[0] = 0
-
- //---
-
- aidx := uint(0)
- for {
- aa := node.access[aidx]
- if aidx > 0 {
- aidx = (aidx - 1) >> 1
- } else {
- pidx := node.parentIdx
- node = node.parent
- if node == nil {
- return
- }
- aidx = (node.width + pidx - 2) >> 1
+ r, err := lru.New(defaultChunkRequestsCacheCapacity)
+ if err != nil {
+ panic(err)
}
- if (aa != 0) && ((aa < node.access[aidx]) || (node.access[aidx] == 0)) {
- node.access[aidx] = aa
+
+ m = &MemStore{
+ cache: c,
+ requests: r,
}
}
}
-// Close memstore
func (s *MemStore) Close() {}