diff options
Diffstat (limited to 'swarm')
-rw-r--r-- | swarm/network/depo.go | 24 | ||||
-rw-r--r-- | swarm/network/syncer.go | 6 | ||||
-rw-r--r-- | swarm/storage/dbstore.go | 61 | ||||
-rw-r--r-- | swarm/storage/memstore.go | 7 |
4 files changed, 77 insertions, 21 deletions
diff --git a/swarm/network/depo.go b/swarm/network/depo.go index 79987cc6b..454a57270 100644 --- a/swarm/network/depo.go +++ b/swarm/network/depo.go @@ -99,6 +99,7 @@ func (self *Depo) HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) // if key found locally, return. otherwise // remote is untrusted, so hash is verified and chunk passed on to NetStore func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) { + var islocal bool req.from = p chunk, err := self.localStore.Get(req.Key) switch { @@ -110,27 +111,32 @@ func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) { case chunk.SData == nil: // found chunk in memory store, needs the data, validate now - hasher := self.hashfunc() - hasher.Write(req.SData) - if !bytes.Equal(hasher.Sum(nil), req.Key) { - // data does not validate, ignore - // TODO: peer should be penalised/dropped? - glog.V(logger.Warn).Infof("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req) - return - } glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v. request entry found", req) default: // data is found, store request ignored // this should update access count? glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v found locally. ignore.", req) + islocal = true + //return + } + + hasher := self.hashfunc() + hasher.Write(req.SData) + if !bytes.Equal(hasher.Sum(nil), req.Key) { + // data does not validate, ignore + // TODO: peer should be penalised/dropped? + glog.V(logger.Warn).Infof("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req) return } + if islocal { + return + } // update chunk with size and data chunk.SData = req.SData // protocol validates that SData is minimum 9 bytes long (int64 size + at least one byte of data) chunk.Size = int64(binary.LittleEndian.Uint64(req.SData[0:8])) - glog.V(logger.Detail).Infof("delivery of %p from %v", chunk, p) + glog.V(logger.Detail).Infof("delivery of %v from %v", chunk, p) chunk.Source = p self.netStore.Put(chunk) } diff --git a/swarm/network/syncer.go b/swarm/network/syncer.go index e871666bd..b6b1ea3b6 100644 --- a/swarm/network/syncer.go +++ b/swarm/network/syncer.go @@ -438,7 +438,7 @@ LOOP: for priority = High; priority >= 0; priority-- { // the first priority channel that is non-empty will be assigned to keys if len(self.keys[priority]) > 0 { - glog.V(logger.Detail).Infof("syncer[%v]: reading request with priority %v", self.key.Log(), priority) + glog.V(logger.Detail).Infof("syncer[%v]: reading request with priority %v", self.key.Log(), priority) keys = self.keys[priority] break PRIORITIES } @@ -551,10 +551,10 @@ LOOP: } if sreq, err := self.newSyncRequest(req, priority); err == nil { // extract key from req - glog.V(logger.Detail).Infof("syncer(priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced) + glog.V(logger.Detail).Infof("syncer[%v]: (priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced) unsynced = append(unsynced, sreq) } else { - glog.V(logger.Warn).Infof("syncer(priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err) + glog.V(logger.Warn).Infof("syncer[%v]: (priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err) } } diff --git a/swarm/storage/dbstore.go b/swarm/storage/dbstore.go index f5d124d29..e320cd327 100644 --- a/swarm/storage/dbstore.go +++ b/swarm/storage/dbstore.go @@ -252,12 +252,7 @@ func (s *DbStore) collectGarbage(ratio float32) { // actual gc for i := 0; i < gcnt; i++ { if s.gcArray[i].value <= cutval { - batch := new(leveldb.Batch) - batch.Delete(s.gcArray[i].idxKey) - batch.Delete(getDataKey(s.gcArray[i].idx)) - s.entryCnt-- - batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt)) - s.db.Write(batch) + s.delete(s.gcArray[i].idx, s.gcArray[i].idxKey) } } @@ -266,6 +261,52 @@ func (s *DbStore) collectGarbage(ratio float32) { s.db.Put(keyGCPos, s.gcPos) } +func (s *DbStore) Cleanup() { + //Iterates over the database and checks that there are no faulty chunks + it := s.db.NewIterator() + startPosition := []byte{kpIndex} + it.Seek(startPosition) + var key []byte + var errorsFound, total int + for it.Valid() { + key = it.Key() + if (key == nil) || (key[0] != kpIndex) { + break + } + total++ + var index dpaDBIndex + decodeIndex(it.Value(), &index) + + data, err := s.db.Get(getDataKey(index.Idx)) + if err != nil { + glog.V(logger.Warn).Infof("Chunk %x found but could not be accessed: %v", key[:], err) + s.delete(index.Idx, getIndexKey(key[1:])) + errorsFound++ + } else { + hasher := s.hashfunc() + hasher.Write(data) + hash := hasher.Sum(nil) + if !bytes.Equal(hash, key[1:]) { + glog.V(logger.Warn).Infof("Found invalid chunk. Hash mismatch. hash=%x, key=%x", hash, key[:]) + s.delete(index.Idx, getIndexKey(key[1:])) + errorsFound++ + } + } + it.Next() + } + it.Release() + glog.V(logger.Warn).Infof("Found %v errors out of %v entries", errorsFound, total) +} + +func (s *DbStore) delete(idx uint64, idxKey []byte) { + batch := new(leveldb.Batch) + batch.Delete(idxKey) + batch.Delete(getDataKey(idx)) + s.entryCnt-- + batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt)) + s.db.Write(batch) +} + func (s *DbStore) Counter() uint64 { s.lock.Lock() defer s.lock.Unlock() @@ -283,6 +324,7 @@ func (s *DbStore) Put(chunk *Chunk) { if chunk.dbStored != nil { close(chunk.dbStored) } + glog.V(logger.Detail).Infof("Storing to DB: chunk already exists, only update access") return // already exists, only update access } @@ -348,6 +390,8 @@ func (s *DbStore) Get(key Key) (chunk *Chunk, err error) { var data []byte data, err = s.db.Get(getDataKey(index.Idx)) if err != nil { + glog.V(logger.Detail).Infof("DBStore: Chunk %v found but could not be accessed: %v", key.Log(), err) + s.delete(index.Idx, getIndexKey(key)) return } @@ -355,9 +399,8 @@ func (s *DbStore) Get(key Key) (chunk *Chunk, err error) { hasher.Write(data) hash := hasher.Sum(nil) if !bytes.Equal(hash, key) { - s.db.Delete(getDataKey(index.Idx)) - err = fmt.Errorf("invalid chunk. hash=%x, key=%v", hash, key[:]) - return + s.delete(index.Idx, getIndexKey(key)) + panic("Invalid Chunk in Database. Please repair with command: 'swarm cleandb'") } chunk = &Chunk{ diff --git a/swarm/storage/memstore.go b/swarm/storage/memstore.go index e55abb45f..7903d33e7 100644 --- a/swarm/storage/memstore.go +++ b/swarm/storage/memstore.go @@ -20,6 +20,9 @@ package storage import ( "sync" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" ) const ( @@ -284,7 +287,11 @@ func (s *MemStore) removeOldest() { } if node.entry.dbStored != nil { + glog.V(logger.Detail).Infof("Memstore Clean: Waiting for chunk %v to be saved", node.entry.Key.Log()) <-node.entry.dbStored + glog.V(logger.Detail).Infof("Memstore Clean: Chunk %v saved to DBStore. Ready to clear from mem.", node.entry.Key.Log()) + } else { + glog.V(logger.Detail).Infof("Memstore Clean: Chunk %v already in DB. Ready to delete.", node.entry.Key.Log()) } if node.entry.SData != nil { |