diff options
author | ethersphere <thesw@rm.eth> | 2018-06-20 20:06:27 +0800 |
---|---|---|
committer | ethersphere <thesw@rm.eth> | 2018-06-22 03:10:31 +0800 |
commit | e187711c6545487d4cac3701f0f506bb536234e2 (patch) | |
tree | d2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/storage/memstore.go | |
parent | 574378edb50c907b532946a1d4654dbd6701b20a (diff) | |
download | dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2 dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst dexon-e187711c6545487d4cac3701f0f506bb536234e2.zip |
swarm: network rewrite merge
Diffstat (limited to 'swarm/storage/memstore.go')
-rw-r--r-- | swarm/storage/memstore.go | 361 |
1 files changed, 87 insertions, 274 deletions
diff --git a/swarm/storage/memstore.go b/swarm/storage/memstore.go index d6be54220..7af31ffbd 100644 --- a/swarm/storage/memstore.go +++ b/swarm/storage/memstore.go @@ -1,4 +1,4 @@ -// Copyright 2016 The go-ethereum Authors +// Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify @@ -19,316 +19,129 @@ package storage import ( - "fmt" "sync" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" -) - -//metrics variables -var ( - memstorePutCounter = metrics.NewRegisteredCounter("storage.db.memstore.put.count", nil) - memstoreRemoveCounter = metrics.NewRegisteredCounter("storage.db.memstore.rm.count", nil) -) - -const ( - memTreeLW = 2 // log2(subtree count) of the subtrees - memTreeFLW = 14 // log2(subtree count) of the root layer - dbForceUpdateAccessCnt = 1000 - defaultCacheCapacity = 5000 + lru "github.com/hashicorp/golang-lru" ) type MemStore struct { - memtree *memTree - entryCnt, capacity uint // stored entries - accessCnt uint64 // access counter; oldest is thrown away when full - dbAccessCnt uint64 - dbStore *DbStore - lock sync.Mutex -} - -/* -a hash prefix subtree containing subtrees or one storage entry (but never both) - -- access[0] stores the smallest (oldest) access count value in this subtree -- if it contains more subtrees and its subtree count is at least 4, access[1:2] - stores the smallest access count in the first and second halves of subtrees - (so that access[0] = min(access[1], access[2]) -- likewise, if subtree count is at least 8, - access[1] = min(access[3], access[4]) - access[2] = min(access[5], access[6]) - (access[] is a binary tree inside the multi-bit leveled hash tree) -*/ - -func NewMemStore(d *DbStore, capacity uint) (m *MemStore) { - m = &MemStore{} - m.memtree = newMemTree(memTreeFLW, nil, 0) - m.dbStore = d - m.setCapacity(capacity) - return + cache *lru.Cache + requests *lru.Cache + mu sync.RWMutex + disabled bool } -type memTree struct { - subtree []*memTree - parent *memTree - parentIdx uint - - bits uint // log2(subtree count) - width uint // subtree count - - entry *Chunk // if subtrees are present, entry should be nil - lastDBaccess uint64 - access []uint64 -} - -func newMemTree(b uint, parent *memTree, pidx uint) (node *memTree) { - node = new(memTree) - node.bits = b - node.width = 1 << b - node.subtree = make([]*memTree, node.width) - node.access = make([]uint64, node.width-1) - node.parent = parent - node.parentIdx = pidx - if parent != nil { - parent.subtree[pidx] = node - } - - return node -} - -func (node *memTree) updateAccess(a uint64) { - aidx := uint(0) - var aa uint64 - oa := node.access[0] - for node.access[aidx] == oa { - node.access[aidx] = a - if aidx > 0 { - aa = node.access[((aidx-1)^1)+1] - aidx = (aidx - 1) >> 1 - } else { - pidx := node.parentIdx - node = node.parent - if node == nil { - return - } - nn := node.subtree[pidx^1] - if nn != nil { - aa = nn.access[0] - } else { - aa = 0 - } - aidx = (node.width + pidx - 2) >> 1 - } - - if (aa != 0) && (aa < a) { - a = aa +//NewMemStore is instantiating a MemStore cache. We are keeping a record of all outgoing requests for chunks, that +//should later be delivered by peer nodes, in the `requests` LRU cache. We are also keeping all frequently requested +//chunks in the `cache` LRU cache. +// +//`requests` LRU cache capacity should ideally never be reached, this is why for the time being it should be initialised +//with the same value as the LDBStore capacity. +func NewMemStore(params *StoreParams, _ *LDBStore) (m *MemStore) { + if params.CacheCapacity == 0 { + return &MemStore{ + disabled: true, } } -} - -func (s *MemStore) setCapacity(c uint) { - s.lock.Lock() - defer s.lock.Unlock() - for c < s.entryCnt { - s.removeOldest() + onEvicted := func(key interface{}, value interface{}) { + v := value.(*Chunk) + <-v.dbStoredC } - s.capacity = c -} - -func (s *MemStore) Counter() uint { - return s.entryCnt -} - -// entry (not its copy) is going to be in MemStore -func (s *MemStore) Put(entry *Chunk) { - if s.capacity == 0 { - return + c, err := lru.NewWithEvict(int(params.CacheCapacity), onEvicted) + if err != nil { + panic(err) } - s.lock.Lock() - defer s.lock.Unlock() - - if s.entryCnt >= s.capacity { - s.removeOldest() + requestEvicted := func(key interface{}, value interface{}) { + // temporary remove of the error log, until we figure out the problem, as it is too spamy + //log.Error("evict called on outgoing request") } - - s.accessCnt++ - - memstorePutCounter.Inc(1) - - node := s.memtree - bitpos := uint(0) - for node.entry == nil { - l := entry.Key.bits(bitpos, node.bits) - st := node.subtree[l] - if st == nil { - st = newMemTree(memTreeLW, node, l) - bitpos += node.bits - node = st - break - } - bitpos += node.bits - node = st + r, err := lru.NewWithEvict(int(params.ChunkRequestsCacheCapacity), requestEvicted) + if err != nil { + panic(err) } - if node.entry != nil { - - if node.entry.Key.isEqual(entry.Key) { - node.updateAccess(s.accessCnt) - if entry.SData == nil { - entry.Size = node.entry.Size - entry.SData = node.entry.SData - } - if entry.Req == nil { - entry.Req = node.entry.Req - } - entry.C = node.entry.C - node.entry = entry - return - } - - for node.entry != nil { + return &MemStore{ + cache: c, + requests: r, + } +} - l := node.entry.Key.bits(bitpos, node.bits) - st := node.subtree[l] - if st == nil { - st = newMemTree(memTreeLW, node, l) - } - st.entry = node.entry - node.entry = nil - st.updateAccess(node.access[0]) +func (m *MemStore) Get(addr Address) (*Chunk, error) { + if m.disabled { + return nil, ErrChunkNotFound + } - l = entry.Key.bits(bitpos, node.bits) - st = node.subtree[l] - if st == nil { - st = newMemTree(memTreeLW, node, l) - } - bitpos += node.bits - node = st + m.mu.RLock() + defer m.mu.RUnlock() - } + r, ok := m.requests.Get(string(addr)) + // it is a request + if ok { + return r.(*Chunk), nil } - node.entry = entry - node.lastDBaccess = s.dbAccessCnt - node.updateAccess(s.accessCnt) - s.entryCnt++ + // it is not a request + c, ok := m.cache.Get(string(addr)) + if !ok { + return nil, ErrChunkNotFound + } + return c.(*Chunk), nil } -func (s *MemStore) Get(hash Key) (chunk *Chunk, err error) { - s.lock.Lock() - defer s.lock.Unlock() - - node := s.memtree - bitpos := uint(0) - for node.entry == nil { - l := hash.bits(bitpos, node.bits) - st := node.subtree[l] - if st == nil { - return nil, notFound - } - bitpos += node.bits - node = st +func (m *MemStore) Put(c *Chunk) { + if m.disabled { + return } - if node.entry.Key.isEqual(hash) { - s.accessCnt++ - node.updateAccess(s.accessCnt) - chunk = node.entry - if s.dbAccessCnt-node.lastDBaccess > dbForceUpdateAccessCnt { - s.dbAccessCnt++ - node.lastDBaccess = s.dbAccessCnt - if s.dbStore != nil { - s.dbStore.updateAccessCnt(hash) + m.mu.Lock() + defer m.mu.Unlock() + + // it is a request + if c.ReqC != nil { + select { + case <-c.ReqC: + if c.GetErrored() != nil { + m.requests.Remove(string(c.Addr)) + return } + m.cache.Add(string(c.Addr), c) + m.requests.Remove(string(c.Addr)) + default: + m.requests.Add(string(c.Addr), c) } - } else { - err = notFound + return } - return + // it is not a request + m.cache.Add(string(c.Addr), c) + m.requests.Remove(string(c.Addr)) } -func (s *MemStore) removeOldest() { - node := s.memtree - - for node.entry == nil { - - aidx := uint(0) - av := node.access[aidx] - - for aidx < node.width/2-1 { - if av == node.access[aidx*2+1] { - node.access[aidx] = node.access[aidx*2+2] - aidx = aidx*2 + 1 - } else if av == node.access[aidx*2+2] { - node.access[aidx] = node.access[aidx*2+1] - aidx = aidx*2 + 2 - } else { - panic(nil) - } +func (m *MemStore) setCapacity(n int) { + if n <= 0 { + m.disabled = true + } else { + onEvicted := func(key interface{}, value interface{}) { + v := value.(*Chunk) + <-v.dbStoredC } - pidx := aidx*2 + 2 - node.width - if (node.subtree[pidx] != nil) && (av == node.subtree[pidx].access[0]) { - if node.subtree[pidx+1] != nil { - node.access[aidx] = node.subtree[pidx+1].access[0] - } else { - node.access[aidx] = 0 - } - } else if (node.subtree[pidx+1] != nil) && (av == node.subtree[pidx+1].access[0]) { - if node.subtree[pidx] != nil { - node.access[aidx] = node.subtree[pidx].access[0] - } else { - node.access[aidx] = 0 - } - pidx++ - } else { - panic(nil) + c, err := lru.NewWithEvict(n, onEvicted) + if err != nil { + panic(err) } - //fmt.Println(pidx) - node = node.subtree[pidx] - - } - - if node.entry.dbStored != nil { - log.Trace(fmt.Sprintf("Memstore Clean: Waiting for chunk %v to be saved", node.entry.Key.Log())) - <-node.entry.dbStored - log.Trace(fmt.Sprintf("Memstore Clean: Chunk %v saved to DBStore. Ready to clear from mem.", node.entry.Key.Log())) - } else { - log.Trace(fmt.Sprintf("Memstore Clean: Chunk %v already in DB. Ready to delete.", node.entry.Key.Log())) - } - - if node.entry.SData != nil { - memstoreRemoveCounter.Inc(1) - node.entry = nil - s.entryCnt-- - } - - node.access[0] = 0 - - //--- - - aidx := uint(0) - for { - aa := node.access[aidx] - if aidx > 0 { - aidx = (aidx - 1) >> 1 - } else { - pidx := node.parentIdx - node = node.parent - if node == nil { - return - } - aidx = (node.width + pidx - 2) >> 1 + r, err := lru.New(defaultChunkRequestsCacheCapacity) + if err != nil { + panic(err) } - if (aa != 0) && ((aa < node.access[aidx]) || (node.access[aidx] == 0)) { - node.access[aidx] = aa + + m = &MemStore{ + cache: c, + requests: r, } } } -// Close memstore func (s *MemStore) Close() {} |