From e23e86921b55cb1ee2fca6b6fb9ed91f5532f9fd Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Viktor=20Tr=C3=B3n?= <viktor.tron@gmail.com>
Date: Mon, 13 Feb 2017 18:50:50 +0630
Subject: swarm/network: fix chunk integrity checks (#3665)

* swarm/network: integrity on incoming known chunks
* swarm/network: fix integrity check for incoming chunks
* swarm/storage: imrpoved integrity checking on chunks
* dbstore panics on corrupt chunk entry an prompts user to run cleandb
* memstore adds logging for garbage collection
* dbstore refactor item delete. correct partial deletes in Get
* cmd/swarm: added cleandb subcommand
---
 swarm/storage/dbstore.go  | 61 ++++++++++++++++++++++++++++++++++++++++-------
 swarm/storage/memstore.go |  7 ++++++
 2 files changed, 59 insertions(+), 9 deletions(-)

(limited to 'swarm/storage')

diff --git a/swarm/storage/dbstore.go b/swarm/storage/dbstore.go
index f5d124d29..e320cd327 100644
--- a/swarm/storage/dbstore.go
+++ b/swarm/storage/dbstore.go
@@ -252,12 +252,7 @@ func (s *DbStore) collectGarbage(ratio float32) {
 	// actual gc
 	for i := 0; i < gcnt; i++ {
 		if s.gcArray[i].value <= cutval {
-			batch := new(leveldb.Batch)
-			batch.Delete(s.gcArray[i].idxKey)
-			batch.Delete(getDataKey(s.gcArray[i].idx))
-			s.entryCnt--
-			batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
-			s.db.Write(batch)
+			s.delete(s.gcArray[i].idx, s.gcArray[i].idxKey)
 		}
 	}
 
@@ -266,6 +261,52 @@ func (s *DbStore) collectGarbage(ratio float32) {
 	s.db.Put(keyGCPos, s.gcPos)
 }
 
+func (s *DbStore) Cleanup() {
+	//Iterates over the database and checks that there are no faulty chunks
+	it := s.db.NewIterator()
+	startPosition := []byte{kpIndex}
+	it.Seek(startPosition)
+	var key []byte
+	var errorsFound, total int
+	for it.Valid() {
+		key = it.Key()
+		if (key == nil) || (key[0] != kpIndex) {
+			break
+		}
+		total++
+		var index dpaDBIndex
+		decodeIndex(it.Value(), &index)
+
+		data, err := s.db.Get(getDataKey(index.Idx))
+		if err != nil {
+			glog.V(logger.Warn).Infof("Chunk %x found but could not be accessed: %v", key[:], err)
+			s.delete(index.Idx, getIndexKey(key[1:]))
+			errorsFound++
+		} else {
+			hasher := s.hashfunc()
+			hasher.Write(data)
+			hash := hasher.Sum(nil)
+			if !bytes.Equal(hash, key[1:]) {
+				glog.V(logger.Warn).Infof("Found invalid chunk. Hash mismatch. hash=%x, key=%x", hash, key[:])
+				s.delete(index.Idx, getIndexKey(key[1:]))
+				errorsFound++
+			}
+		}
+		it.Next()
+	}
+	it.Release()
+	glog.V(logger.Warn).Infof("Found %v errors out of %v entries", errorsFound, total)
+}
+
+func (s *DbStore) delete(idx uint64, idxKey []byte) {
+	batch := new(leveldb.Batch)
+	batch.Delete(idxKey)
+	batch.Delete(getDataKey(idx))
+	s.entryCnt--
+	batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
+	s.db.Write(batch)
+}
+
 func (s *DbStore) Counter() uint64 {
 	s.lock.Lock()
 	defer s.lock.Unlock()
@@ -283,6 +324,7 @@ func (s *DbStore) Put(chunk *Chunk) {
 		if chunk.dbStored != nil {
 			close(chunk.dbStored)
 		}
+		glog.V(logger.Detail).Infof("Storing to DB: chunk already exists, only update access")
 		return // already exists, only update access
 	}
 
@@ -348,6 +390,8 @@ func (s *DbStore) Get(key Key) (chunk *Chunk, err error) {
 		var data []byte
 		data, err = s.db.Get(getDataKey(index.Idx))
 		if err != nil {
+			glog.V(logger.Detail).Infof("DBStore: Chunk %v found but could not be accessed: %v", key.Log(), err)
+			s.delete(index.Idx, getIndexKey(key))
 			return
 		}
 
@@ -355,9 +399,8 @@ func (s *DbStore) Get(key Key) (chunk *Chunk, err error) {
 		hasher.Write(data)
 		hash := hasher.Sum(nil)
 		if !bytes.Equal(hash, key) {
-			s.db.Delete(getDataKey(index.Idx))
-			err = fmt.Errorf("invalid chunk. hash=%x, key=%v", hash, key[:])
-			return
+			s.delete(index.Idx, getIndexKey(key))
+			panic("Invalid Chunk in Database. Please repair with command: 'swarm cleandb'")
 		}
 
 		chunk = &Chunk{
diff --git a/swarm/storage/memstore.go b/swarm/storage/memstore.go
index e55abb45f..7903d33e7 100644
--- a/swarm/storage/memstore.go
+++ b/swarm/storage/memstore.go
@@ -20,6 +20,9 @@ package storage
 
 import (
 	"sync"
+
+	"github.com/ethereum/go-ethereum/logger"
+	"github.com/ethereum/go-ethereum/logger/glog"
 )
 
 const (
@@ -284,7 +287,11 @@ func (s *MemStore) removeOldest() {
 	}
 
 	if node.entry.dbStored != nil {
+		glog.V(logger.Detail).Infof("Memstore Clean: Waiting for chunk %v to be saved", node.entry.Key.Log())
 		<-node.entry.dbStored
+		glog.V(logger.Detail).Infof("Memstore Clean: Chunk %v saved to DBStore. Ready to clear from mem.", node.entry.Key.Log())
+	} else {
+		glog.V(logger.Detail).Infof("Memstore Clean: Chunk %v already in DB. Ready to delete.", node.entry.Key.Log())
 	}
 
 	if node.entry.SData != nil {
-- 
cgit v1.2.3