aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cmd/swarm/cleandb.go39
-rw-r--r--cmd/swarm/main.go9
-rw-r--r--cmd/wnode/main.go7
-rw-r--r--swarm/network/depo.go24
-rw-r--r--swarm/network/syncer.go6
-rw-r--r--swarm/storage/dbstore.go61
-rw-r--r--swarm/storage/memstore.go7
-rw-r--r--whisper/mailserver/mailserver.go36
-rw-r--r--whisper/mailserver/server_test.go183
9 files changed, 342 insertions, 30 deletions
diff --git a/cmd/swarm/cleandb.go b/cmd/swarm/cleandb.go
new file mode 100644
index 000000000..81636ada5
--- /dev/null
+++ b/cmd/swarm/cleandb.go
@@ -0,0 +1,39 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package main
+
+import (
+ "log"
+
+ "github.com/ethereum/go-ethereum/swarm/storage"
+ "gopkg.in/urfave/cli.v1"
+)
+
+func cleandb(ctx *cli.Context) {
+ args := ctx.Args()
+ if len(args) != 1 {
+ log.Fatal("need path to chunks database as the first and only argument")
+ }
+
+ chunkDbPath := args[0]
+ hash := storage.MakeHashFunc("SHA3")
+ dbStore, err := storage.NewDbStore(chunkDbPath, hash, 10000000, 0)
+ if err != nil {
+ log.Fatalf("cannot initialise dbstore: %v", err)
+ }
+ dbStore.Cleanup()
+}
diff --git a/cmd/swarm/main.go b/cmd/swarm/main.go
index f9d661bb3..14adc3b10 100644
--- a/cmd/swarm/main.go
+++ b/cmd/swarm/main.go
@@ -192,6 +192,15 @@ Removes a path from the manifest
},
},
},
+ {
+ Action: cleandb,
+ Name: "cleandb",
+ Usage: "Cleans database of corrupted entries",
+ ArgsUsage: " ",
+ Description: `
+Cleans database of corrupted entries.
+`,
+ },
}
app.Flags = []cli.Flag{
diff --git a/cmd/wnode/main.go b/cmd/wnode/main.go
index cbf093aa7..55565eab2 100644
--- a/cmd/wnode/main.go
+++ b/cmd/wnode/main.go
@@ -209,10 +209,15 @@ func initialize() {
nodeid = shh.NewIdentity()
}
+ maxPeers := 80
+ if *bootstrapMode {
+ maxPeers = 800
+ }
+
server = &p2p.Server{
Config: p2p.Config{
PrivateKey: nodeid,
- MaxPeers: 128,
+ MaxPeers: maxPeers,
Name: common.MakeName("whisper-go", "5.0"),
Protocols: shh.Protocols(),
ListenAddr: *argIP,
diff --git a/swarm/network/depo.go b/swarm/network/depo.go
index 79987cc6b..454a57270 100644
--- a/swarm/network/depo.go
+++ b/swarm/network/depo.go
@@ -99,6 +99,7 @@ func (self *Depo) HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer)
// if key found locally, return. otherwise
// remote is untrusted, so hash is verified and chunk passed on to NetStore
func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
+ var islocal bool
req.from = p
chunk, err := self.localStore.Get(req.Key)
switch {
@@ -110,27 +111,32 @@ func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
case chunk.SData == nil:
// found chunk in memory store, needs the data, validate now
- hasher := self.hashfunc()
- hasher.Write(req.SData)
- if !bytes.Equal(hasher.Sum(nil), req.Key) {
- // data does not validate, ignore
- // TODO: peer should be penalised/dropped?
- glog.V(logger.Warn).Infof("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req)
- return
- }
glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v. request entry found", req)
default:
// data is found, store request ignored
// this should update access count?
glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v found locally. ignore.", req)
+ islocal = true
+ //return
+ }
+
+ hasher := self.hashfunc()
+ hasher.Write(req.SData)
+ if !bytes.Equal(hasher.Sum(nil), req.Key) {
+ // data does not validate, ignore
+ // TODO: peer should be penalised/dropped?
+ glog.V(logger.Warn).Infof("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req)
return
}
+ if islocal {
+ return
+ }
// update chunk with size and data
chunk.SData = req.SData // protocol validates that SData is minimum 9 bytes long (int64 size + at least one byte of data)
chunk.Size = int64(binary.LittleEndian.Uint64(req.SData[0:8]))
- glog.V(logger.Detail).Infof("delivery of %p from %v", chunk, p)
+ glog.V(logger.Detail).Infof("delivery of %v from %v", chunk, p)
chunk.Source = p
self.netStore.Put(chunk)
}
diff --git a/swarm/network/syncer.go b/swarm/network/syncer.go
index e871666bd..b6b1ea3b6 100644
--- a/swarm/network/syncer.go
+++ b/swarm/network/syncer.go
@@ -438,7 +438,7 @@ LOOP:
for priority = High; priority >= 0; priority-- {
// the first priority channel that is non-empty will be assigned to keys
if len(self.keys[priority]) > 0 {
- glog.V(logger.Detail).Infof("syncer[%v]: reading request with priority %v", self.key.Log(), priority)
+ glog.V(logger.Detail).Infof("syncer[%v]: reading request with priority %v", self.key.Log(), priority)
keys = self.keys[priority]
break PRIORITIES
}
@@ -551,10 +551,10 @@ LOOP:
}
if sreq, err := self.newSyncRequest(req, priority); err == nil {
// extract key from req
- glog.V(logger.Detail).Infof("syncer(priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced)
+ glog.V(logger.Detail).Infof("syncer[%v]: (priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced)
unsynced = append(unsynced, sreq)
} else {
- glog.V(logger.Warn).Infof("syncer(priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err)
+ glog.V(logger.Warn).Infof("syncer[%v]: (priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err)
}
}
diff --git a/swarm/storage/dbstore.go b/swarm/storage/dbstore.go
index f5d124d29..e320cd327 100644
--- a/swarm/storage/dbstore.go
+++ b/swarm/storage/dbstore.go
@@ -252,12 +252,7 @@ func (s *DbStore) collectGarbage(ratio float32) {
// actual gc
for i := 0; i < gcnt; i++ {
if s.gcArray[i].value <= cutval {
- batch := new(leveldb.Batch)
- batch.Delete(s.gcArray[i].idxKey)
- batch.Delete(getDataKey(s.gcArray[i].idx))
- s.entryCnt--
- batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
- s.db.Write(batch)
+ s.delete(s.gcArray[i].idx, s.gcArray[i].idxKey)
}
}
@@ -266,6 +261,52 @@ func (s *DbStore) collectGarbage(ratio float32) {
s.db.Put(keyGCPos, s.gcPos)
}
+func (s *DbStore) Cleanup() {
+ //Iterates over the database and checks that there are no faulty chunks
+ it := s.db.NewIterator()
+ startPosition := []byte{kpIndex}
+ it.Seek(startPosition)
+ var key []byte
+ var errorsFound, total int
+ for it.Valid() {
+ key = it.Key()
+ if (key == nil) || (key[0] != kpIndex) {
+ break
+ }
+ total++
+ var index dpaDBIndex
+ decodeIndex(it.Value(), &index)
+
+ data, err := s.db.Get(getDataKey(index.Idx))
+ if err != nil {
+ glog.V(logger.Warn).Infof("Chunk %x found but could not be accessed: %v", key[:], err)
+ s.delete(index.Idx, getIndexKey(key[1:]))
+ errorsFound++
+ } else {
+ hasher := s.hashfunc()
+ hasher.Write(data)
+ hash := hasher.Sum(nil)
+ if !bytes.Equal(hash, key[1:]) {
+ glog.V(logger.Warn).Infof("Found invalid chunk. Hash mismatch. hash=%x, key=%x", hash, key[:])
+ s.delete(index.Idx, getIndexKey(key[1:]))
+ errorsFound++
+ }
+ }
+ it.Next()
+ }
+ it.Release()
+ glog.V(logger.Warn).Infof("Found %v errors out of %v entries", errorsFound, total)
+}
+
+func (s *DbStore) delete(idx uint64, idxKey []byte) {
+ batch := new(leveldb.Batch)
+ batch.Delete(idxKey)
+ batch.Delete(getDataKey(idx))
+ s.entryCnt--
+ batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
+ s.db.Write(batch)
+}
+
func (s *DbStore) Counter() uint64 {
s.lock.Lock()
defer s.lock.Unlock()
@@ -283,6 +324,7 @@ func (s *DbStore) Put(chunk *Chunk) {
if chunk.dbStored != nil {
close(chunk.dbStored)
}
+ glog.V(logger.Detail).Infof("Storing to DB: chunk already exists, only update access")
return // already exists, only update access
}
@@ -348,6 +390,8 @@ func (s *DbStore) Get(key Key) (chunk *Chunk, err error) {
var data []byte
data, err = s.db.Get(getDataKey(index.Idx))
if err != nil {
+ glog.V(logger.Detail).Infof("DBStore: Chunk %v found but could not be accessed: %v", key.Log(), err)
+ s.delete(index.Idx, getIndexKey(key))
return
}
@@ -355,9 +399,8 @@ func (s *DbStore) Get(key Key) (chunk *Chunk, err error) {
hasher.Write(data)
hash := hasher.Sum(nil)
if !bytes.Equal(hash, key) {
- s.db.Delete(getDataKey(index.Idx))
- err = fmt.Errorf("invalid chunk. hash=%x, key=%v", hash, key[:])
- return
+ s.delete(index.Idx, getIndexKey(key))
+ panic("Invalid Chunk in Database. Please repair with command: 'swarm cleandb'")
}
chunk = &Chunk{
diff --git a/swarm/storage/memstore.go b/swarm/storage/memstore.go
index e55abb45f..7903d33e7 100644
--- a/swarm/storage/memstore.go
+++ b/swarm/storage/memstore.go
@@ -20,6 +20,9 @@ package storage
import (
"sync"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
)
const (
@@ -284,7 +287,11 @@ func (s *MemStore) removeOldest() {
}
if node.entry.dbStored != nil {
+ glog.V(logger.Detail).Infof("Memstore Clean: Waiting for chunk %v to be saved", node.entry.Key.Log())
<-node.entry.dbStored
+ glog.V(logger.Detail).Infof("Memstore Clean: Chunk %v saved to DBStore. Ready to clear from mem.", node.entry.Key.Log())
+ } else {
+ glog.V(logger.Detail).Infof("Memstore Clean: Chunk %v already in DB. Ready to delete.", node.entry.Key.Log())
}
if node.entry.SData != nil {
diff --git a/whisper/mailserver/mailserver.go b/whisper/mailserver/mailserver.go
index f7d6c3e5c..3e08a3b7e 100644
--- a/whisper/mailserver/mailserver.go
+++ b/whisper/mailserver/mailserver.go
@@ -22,6 +22,7 @@ import (
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/rlp"
@@ -101,11 +102,19 @@ func (s *WMailServer) Archive(env *whisper.Envelope) {
}
func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) {
- ok, lower, upper, topic := s.validate(peer, request)
- if !ok {
+ if peer == nil {
+ glog.V(logger.Error).Info("Whisper peer is nil")
return
}
+ ok, lower, upper, topic := s.validateRequest(peer.ID(), request)
+ if ok {
+ s.processRequest(peer, lower, upper, topic)
+ }
+}
+
+func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, topic whisper.TopicType) []*whisper.Envelope {
+ ret := make([]*whisper.Envelope, 0)
var err error
var zero common.Hash
var empty whisper.TopicType
@@ -122,10 +131,15 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
}
if topic == empty || envelope.Topic == topic {
- err = s.w.SendP2PDirect(peer, &envelope)
- if err != nil {
- glog.V(logger.Error).Infof("Failed to send direct message to peer: %s", err)
- return
+ if peer == nil {
+ // used for test purposes
+ ret = append(ret, &envelope)
+ } else {
+ err = s.w.SendP2PDirect(peer, &envelope)
+ if err != nil {
+ glog.V(logger.Error).Infof("Failed to send direct message to peer: %s", err)
+ return nil
+ }
}
}
}
@@ -134,9 +148,11 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
if err != nil {
glog.V(logger.Error).Infof("Level DB iterator error: %s", err)
}
+
+ return ret
}
-func (s *WMailServer) validate(peer *whisper.Peer, request *whisper.Envelope) (bool, uint32, uint32, whisper.TopicType) {
+func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope) (bool, uint32, uint32, whisper.TopicType) {
var topic whisper.TopicType
if s.pow > 0.0 && request.PoW() < s.pow {
return false, 0, 0, topic
@@ -154,7 +170,11 @@ func (s *WMailServer) validate(peer *whisper.Peer, request *whisper.Envelope) (b
return false, 0, 0, topic
}
- if bytes.Equal(peer.ID(), decrypted.Signature) {
+ src := crypto.FromECDSAPub(decrypted.Src)
+ if len(src)-len(peerID) == 1 {
+ src = src[1:]
+ }
+ if !bytes.Equal(peerID, src) {
glog.V(logger.Warn).Infof("Wrong signature of p2p request")
return false, 0, 0, topic
}
diff --git a/whisper/mailserver/server_test.go b/whisper/mailserver/server_test.go
new file mode 100644
index 000000000..24abf6c1a
--- /dev/null
+++ b/whisper/mailserver/server_test.go
@@ -0,0 +1,183 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of go-ethereum.
+//
+// go-ethereum is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// go-ethereum is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
+
+package mailserver
+
+import (
+ "crypto/ecdsa"
+ "encoding/binary"
+ "io/ioutil"
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/crypto"
+ whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
+)
+
+const powRequirement = 0.00001
+const keyName = "6d604bac5401ce9a6b995f1b45a4ab"
+
+var shh *whisper.Whisper
+var seed = time.Now().Unix()
+
+type ServerTestParams struct {
+ topic whisper.TopicType
+ low uint32
+ upp uint32
+ key *ecdsa.PrivateKey
+}
+
+func assert(statement bool, text string, t *testing.T) {
+ if !statement {
+ t.Fatal(text)
+ }
+}
+
+func TestDBKey(t *testing.T) {
+ var h common.Hash
+ i := uint32(time.Now().Unix())
+ k := NewDbKey(i, h)
+ assert(len(k.raw) == common.HashLength+4, "wrong DB key length", t)
+ assert(byte(i%0x100) == k.raw[3], "raw representation should be big endian", t)
+ assert(byte(i/0x1000000) == k.raw[0], "big endian expected", t)
+}
+
+func generateEnvelope(t *testing.T) *whisper.Envelope {
+ params := &whisper.MessageParams{
+ KeySym: []byte("test key"),
+ Topic: whisper.TopicType{},
+ Payload: []byte("test payload"),
+ PoW: powRequirement,
+ WorkTime: 2,
+ }
+
+ msg := whisper.NewSentMessage(params)
+ env, err := msg.Wrap(params)
+ if err != nil {
+ t.Fatalf("failed to wrap with seed %d: %s.", seed, err)
+ }
+ return env
+}
+
+func TestMailServer(t *testing.T) {
+ const password = "password_for_this_test"
+ const dbPath = "whisper-server-test"
+
+ _, err := ioutil.TempDir("", dbPath)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var server WMailServer
+ shh = whisper.NewWhisper(&server)
+ server.Init(shh, dbPath, password, powRequirement)
+ defer server.Close()
+
+ err = shh.AddSymKey(keyName, []byte(password))
+ if err != nil {
+ t.Fatalf("Failed to create symmetric key for mail request: %s", err)
+ }
+
+ rand.Seed(seed)
+ env := generateEnvelope(t)
+ server.Archive(env)
+ deliverTest(t, &server, env)
+}
+
+func deliverTest(t *testing.T, server *WMailServer, env *whisper.Envelope) {
+ testPeerID := shh.NewIdentity()
+ birth := env.Expiry - env.TTL
+ p := &ServerTestParams{
+ topic: env.Topic,
+ low: birth - 1,
+ upp: birth + 1,
+ key: testPeerID,
+ }
+ singleRequest(t, server, env, p, true)
+
+ p.low, p.upp = birth+1, 0xffffffff
+ singleRequest(t, server, env, p, false)
+
+ p.low, p.upp = 0, birth-1
+ singleRequest(t, server, env, p, false)
+
+ p.low = birth - 1
+ p.upp = birth + 1
+ p.topic[0]++
+ singleRequest(t, server, env, p, false)
+}
+
+func singleRequest(t *testing.T, server *WMailServer, env *whisper.Envelope, p *ServerTestParams, expect bool) {
+ request := createRequest(t, p)
+ src := crypto.FromECDSAPub(&p.key.PublicKey)
+ ok, lower, upper, topic := server.validateRequest(src, request)
+ if !ok {
+ t.Fatalf("request validation failed, seed: %d.", seed)
+ }
+ if lower != p.low {
+ t.Fatalf("request validation failed (lower bound), seed: %d.", seed)
+ }
+ if upper != p.upp {
+ t.Fatalf("request validation failed (upper bound), seed: %d.", seed)
+ }
+ if topic != p.topic {
+ t.Fatalf("request validation failed (topic), seed: %d.", seed)
+ }
+
+ var exist bool
+ mail := server.processRequest(nil, p.low, p.upp, p.topic)
+ for _, msg := range mail {
+ if msg.Hash() == env.Hash() {
+ exist = true
+ break
+ }
+ }
+
+ if exist != expect {
+ t.Fatalf("error: exist = %v, seed: %d.", exist, seed)
+ }
+
+ src[0]++
+ ok, lower, upper, topic = server.validateRequest(src, request)
+ if ok {
+ t.Fatalf("request validation false positive, seed: %d.", seed)
+ }
+}
+
+func createRequest(t *testing.T, p *ServerTestParams) *whisper.Envelope {
+ data := make([]byte, 8+whisper.TopicLength)
+ binary.BigEndian.PutUint32(data, p.low)
+ binary.BigEndian.PutUint32(data[4:], p.upp)
+ copy(data[8:], p.topic[:])
+
+ params := &whisper.MessageParams{
+ KeySym: shh.GetSymKey(keyName),
+ Topic: p.topic,
+ Payload: data,
+ PoW: powRequirement * 2,
+ WorkTime: 2,
+ Src: p.key,
+ }
+
+ msg := whisper.NewSentMessage(params)
+ env, err := msg.Wrap(params)
+ if err != nil {
+ t.Fatalf("failed to wrap with seed %d: %s.", seed, err)
+ }
+ return env
+}