diff options
-rw-r--r-- | cmd/swarm/cleandb.go | 39 | ||||
-rw-r--r-- | cmd/swarm/main.go | 9 | ||||
-rw-r--r-- | cmd/wnode/main.go | 7 | ||||
-rw-r--r-- | swarm/network/depo.go | 24 | ||||
-rw-r--r-- | swarm/network/syncer.go | 6 | ||||
-rw-r--r-- | swarm/storage/dbstore.go | 61 | ||||
-rw-r--r-- | swarm/storage/memstore.go | 7 | ||||
-rw-r--r-- | whisper/mailserver/mailserver.go | 36 | ||||
-rw-r--r-- | whisper/mailserver/server_test.go | 183 |
9 files changed, 342 insertions, 30 deletions
diff --git a/cmd/swarm/cleandb.go b/cmd/swarm/cleandb.go new file mode 100644 index 000000000..81636ada5 --- /dev/null +++ b/cmd/swarm/cleandb.go @@ -0,0 +1,39 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package main + +import ( + "log" + + "github.com/ethereum/go-ethereum/swarm/storage" + "gopkg.in/urfave/cli.v1" +) + +func cleandb(ctx *cli.Context) { + args := ctx.Args() + if len(args) != 1 { + log.Fatal("need path to chunks database as the first and only argument") + } + + chunkDbPath := args[0] + hash := storage.MakeHashFunc("SHA3") + dbStore, err := storage.NewDbStore(chunkDbPath, hash, 10000000, 0) + if err != nil { + log.Fatalf("cannot initialise dbstore: %v", err) + } + dbStore.Cleanup() +} diff --git a/cmd/swarm/main.go b/cmd/swarm/main.go index f9d661bb3..14adc3b10 100644 --- a/cmd/swarm/main.go +++ b/cmd/swarm/main.go @@ -192,6 +192,15 @@ Removes a path from the manifest }, }, }, + { + Action: cleandb, + Name: "cleandb", + Usage: "Cleans database of corrupted entries", + ArgsUsage: " ", + Description: ` +Cleans database of corrupted entries. +`, + }, } app.Flags = []cli.Flag{ diff --git a/cmd/wnode/main.go b/cmd/wnode/main.go index cbf093aa7..55565eab2 100644 --- a/cmd/wnode/main.go +++ b/cmd/wnode/main.go @@ -209,10 +209,15 @@ func initialize() { nodeid = shh.NewIdentity() } + maxPeers := 80 + if *bootstrapMode { + maxPeers = 800 + } + server = &p2p.Server{ Config: p2p.Config{ PrivateKey: nodeid, - MaxPeers: 128, + MaxPeers: maxPeers, Name: common.MakeName("whisper-go", "5.0"), Protocols: shh.Protocols(), ListenAddr: *argIP, diff --git a/swarm/network/depo.go b/swarm/network/depo.go index 79987cc6b..454a57270 100644 --- a/swarm/network/depo.go +++ b/swarm/network/depo.go @@ -99,6 +99,7 @@ func (self *Depo) HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) // if key found locally, return. otherwise // remote is untrusted, so hash is verified and chunk passed on to NetStore func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) { + var islocal bool req.from = p chunk, err := self.localStore.Get(req.Key) switch { @@ -110,27 +111,32 @@ func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) { case chunk.SData == nil: // found chunk in memory store, needs the data, validate now - hasher := self.hashfunc() - hasher.Write(req.SData) - if !bytes.Equal(hasher.Sum(nil), req.Key) { - // data does not validate, ignore - // TODO: peer should be penalised/dropped? - glog.V(logger.Warn).Infof("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req) - return - } glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v. request entry found", req) default: // data is found, store request ignored // this should update access count? glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v found locally. ignore.", req) + islocal = true + //return + } + + hasher := self.hashfunc() + hasher.Write(req.SData) + if !bytes.Equal(hasher.Sum(nil), req.Key) { + // data does not validate, ignore + // TODO: peer should be penalised/dropped? + glog.V(logger.Warn).Infof("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req) return } + if islocal { + return + } // update chunk with size and data chunk.SData = req.SData // protocol validates that SData is minimum 9 bytes long (int64 size + at least one byte of data) chunk.Size = int64(binary.LittleEndian.Uint64(req.SData[0:8])) - glog.V(logger.Detail).Infof("delivery of %p from %v", chunk, p) + glog.V(logger.Detail).Infof("delivery of %v from %v", chunk, p) chunk.Source = p self.netStore.Put(chunk) } diff --git a/swarm/network/syncer.go b/swarm/network/syncer.go index e871666bd..b6b1ea3b6 100644 --- a/swarm/network/syncer.go +++ b/swarm/network/syncer.go @@ -438,7 +438,7 @@ LOOP: for priority = High; priority >= 0; priority-- { // the first priority channel that is non-empty will be assigned to keys if len(self.keys[priority]) > 0 { - glog.V(logger.Detail).Infof("syncer[%v]: reading request with priority %v", self.key.Log(), priority) + glog.V(logger.Detail).Infof("syncer[%v]: reading request with priority %v", self.key.Log(), priority) keys = self.keys[priority] break PRIORITIES } @@ -551,10 +551,10 @@ LOOP: } if sreq, err := self.newSyncRequest(req, priority); err == nil { // extract key from req - glog.V(logger.Detail).Infof("syncer(priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced) + glog.V(logger.Detail).Infof("syncer[%v]: (priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced) unsynced = append(unsynced, sreq) } else { - glog.V(logger.Warn).Infof("syncer(priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err) + glog.V(logger.Warn).Infof("syncer[%v]: (priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err) } } diff --git a/swarm/storage/dbstore.go b/swarm/storage/dbstore.go index f5d124d29..e320cd327 100644 --- a/swarm/storage/dbstore.go +++ b/swarm/storage/dbstore.go @@ -252,12 +252,7 @@ func (s *DbStore) collectGarbage(ratio float32) { // actual gc for i := 0; i < gcnt; i++ { if s.gcArray[i].value <= cutval { - batch := new(leveldb.Batch) - batch.Delete(s.gcArray[i].idxKey) - batch.Delete(getDataKey(s.gcArray[i].idx)) - s.entryCnt-- - batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt)) - s.db.Write(batch) + s.delete(s.gcArray[i].idx, s.gcArray[i].idxKey) } } @@ -266,6 +261,52 @@ func (s *DbStore) collectGarbage(ratio float32) { s.db.Put(keyGCPos, s.gcPos) } +func (s *DbStore) Cleanup() { + //Iterates over the database and checks that there are no faulty chunks + it := s.db.NewIterator() + startPosition := []byte{kpIndex} + it.Seek(startPosition) + var key []byte + var errorsFound, total int + for it.Valid() { + key = it.Key() + if (key == nil) || (key[0] != kpIndex) { + break + } + total++ + var index dpaDBIndex + decodeIndex(it.Value(), &index) + + data, err := s.db.Get(getDataKey(index.Idx)) + if err != nil { + glog.V(logger.Warn).Infof("Chunk %x found but could not be accessed: %v", key[:], err) + s.delete(index.Idx, getIndexKey(key[1:])) + errorsFound++ + } else { + hasher := s.hashfunc() + hasher.Write(data) + hash := hasher.Sum(nil) + if !bytes.Equal(hash, key[1:]) { + glog.V(logger.Warn).Infof("Found invalid chunk. Hash mismatch. hash=%x, key=%x", hash, key[:]) + s.delete(index.Idx, getIndexKey(key[1:])) + errorsFound++ + } + } + it.Next() + } + it.Release() + glog.V(logger.Warn).Infof("Found %v errors out of %v entries", errorsFound, total) +} + +func (s *DbStore) delete(idx uint64, idxKey []byte) { + batch := new(leveldb.Batch) + batch.Delete(idxKey) + batch.Delete(getDataKey(idx)) + s.entryCnt-- + batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt)) + s.db.Write(batch) +} + func (s *DbStore) Counter() uint64 { s.lock.Lock() defer s.lock.Unlock() @@ -283,6 +324,7 @@ func (s *DbStore) Put(chunk *Chunk) { if chunk.dbStored != nil { close(chunk.dbStored) } + glog.V(logger.Detail).Infof("Storing to DB: chunk already exists, only update access") return // already exists, only update access } @@ -348,6 +390,8 @@ func (s *DbStore) Get(key Key) (chunk *Chunk, err error) { var data []byte data, err = s.db.Get(getDataKey(index.Idx)) if err != nil { + glog.V(logger.Detail).Infof("DBStore: Chunk %v found but could not be accessed: %v", key.Log(), err) + s.delete(index.Idx, getIndexKey(key)) return } @@ -355,9 +399,8 @@ func (s *DbStore) Get(key Key) (chunk *Chunk, err error) { hasher.Write(data) hash := hasher.Sum(nil) if !bytes.Equal(hash, key) { - s.db.Delete(getDataKey(index.Idx)) - err = fmt.Errorf("invalid chunk. hash=%x, key=%v", hash, key[:]) - return + s.delete(index.Idx, getIndexKey(key)) + panic("Invalid Chunk in Database. Please repair with command: 'swarm cleandb'") } chunk = &Chunk{ diff --git a/swarm/storage/memstore.go b/swarm/storage/memstore.go index e55abb45f..7903d33e7 100644 --- a/swarm/storage/memstore.go +++ b/swarm/storage/memstore.go @@ -20,6 +20,9 @@ package storage import ( "sync" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" ) const ( @@ -284,7 +287,11 @@ func (s *MemStore) removeOldest() { } if node.entry.dbStored != nil { + glog.V(logger.Detail).Infof("Memstore Clean: Waiting for chunk %v to be saved", node.entry.Key.Log()) <-node.entry.dbStored + glog.V(logger.Detail).Infof("Memstore Clean: Chunk %v saved to DBStore. Ready to clear from mem.", node.entry.Key.Log()) + } else { + glog.V(logger.Detail).Infof("Memstore Clean: Chunk %v already in DB. Ready to delete.", node.entry.Key.Log()) } if node.entry.SData != nil { diff --git a/whisper/mailserver/mailserver.go b/whisper/mailserver/mailserver.go index f7d6c3e5c..3e08a3b7e 100644 --- a/whisper/mailserver/mailserver.go +++ b/whisper/mailserver/mailserver.go @@ -22,6 +22,7 @@ import ( "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/rlp" @@ -101,11 +102,19 @@ func (s *WMailServer) Archive(env *whisper.Envelope) { } func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) { - ok, lower, upper, topic := s.validate(peer, request) - if !ok { + if peer == nil { + glog.V(logger.Error).Info("Whisper peer is nil") return } + ok, lower, upper, topic := s.validateRequest(peer.ID(), request) + if ok { + s.processRequest(peer, lower, upper, topic) + } +} + +func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, topic whisper.TopicType) []*whisper.Envelope { + ret := make([]*whisper.Envelope, 0) var err error var zero common.Hash var empty whisper.TopicType @@ -122,10 +131,15 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) } if topic == empty || envelope.Topic == topic { - err = s.w.SendP2PDirect(peer, &envelope) - if err != nil { - glog.V(logger.Error).Infof("Failed to send direct message to peer: %s", err) - return + if peer == nil { + // used for test purposes + ret = append(ret, &envelope) + } else { + err = s.w.SendP2PDirect(peer, &envelope) + if err != nil { + glog.V(logger.Error).Infof("Failed to send direct message to peer: %s", err) + return nil + } } } } @@ -134,9 +148,11 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) if err != nil { glog.V(logger.Error).Infof("Level DB iterator error: %s", err) } + + return ret } -func (s *WMailServer) validate(peer *whisper.Peer, request *whisper.Envelope) (bool, uint32, uint32, whisper.TopicType) { +func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope) (bool, uint32, uint32, whisper.TopicType) { var topic whisper.TopicType if s.pow > 0.0 && request.PoW() < s.pow { return false, 0, 0, topic @@ -154,7 +170,11 @@ func (s *WMailServer) validate(peer *whisper.Peer, request *whisper.Envelope) (b return false, 0, 0, topic } - if bytes.Equal(peer.ID(), decrypted.Signature) { + src := crypto.FromECDSAPub(decrypted.Src) + if len(src)-len(peerID) == 1 { + src = src[1:] + } + if !bytes.Equal(peerID, src) { glog.V(logger.Warn).Infof("Wrong signature of p2p request") return false, 0, 0, topic } diff --git a/whisper/mailserver/server_test.go b/whisper/mailserver/server_test.go new file mode 100644 index 000000000..24abf6c1a --- /dev/null +++ b/whisper/mailserver/server_test.go @@ -0,0 +1,183 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. + +package mailserver + +import ( + "crypto/ecdsa" + "encoding/binary" + "io/ioutil" + "math/rand" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" +) + +const powRequirement = 0.00001 +const keyName = "6d604bac5401ce9a6b995f1b45a4ab" + +var shh *whisper.Whisper +var seed = time.Now().Unix() + +type ServerTestParams struct { + topic whisper.TopicType + low uint32 + upp uint32 + key *ecdsa.PrivateKey +} + +func assert(statement bool, text string, t *testing.T) { + if !statement { + t.Fatal(text) + } +} + +func TestDBKey(t *testing.T) { + var h common.Hash + i := uint32(time.Now().Unix()) + k := NewDbKey(i, h) + assert(len(k.raw) == common.HashLength+4, "wrong DB key length", t) + assert(byte(i%0x100) == k.raw[3], "raw representation should be big endian", t) + assert(byte(i/0x1000000) == k.raw[0], "big endian expected", t) +} + +func generateEnvelope(t *testing.T) *whisper.Envelope { + params := &whisper.MessageParams{ + KeySym: []byte("test key"), + Topic: whisper.TopicType{}, + Payload: []byte("test payload"), + PoW: powRequirement, + WorkTime: 2, + } + + msg := whisper.NewSentMessage(params) + env, err := msg.Wrap(params) + if err != nil { + t.Fatalf("failed to wrap with seed %d: %s.", seed, err) + } + return env +} + +func TestMailServer(t *testing.T) { + const password = "password_for_this_test" + const dbPath = "whisper-server-test" + + _, err := ioutil.TempDir("", dbPath) + if err != nil { + t.Fatal(err) + } + + var server WMailServer + shh = whisper.NewWhisper(&server) + server.Init(shh, dbPath, password, powRequirement) + defer server.Close() + + err = shh.AddSymKey(keyName, []byte(password)) + if err != nil { + t.Fatalf("Failed to create symmetric key for mail request: %s", err) + } + + rand.Seed(seed) + env := generateEnvelope(t) + server.Archive(env) + deliverTest(t, &server, env) +} + +func deliverTest(t *testing.T, server *WMailServer, env *whisper.Envelope) { + testPeerID := shh.NewIdentity() + birth := env.Expiry - env.TTL + p := &ServerTestParams{ + topic: env.Topic, + low: birth - 1, + upp: birth + 1, + key: testPeerID, + } + singleRequest(t, server, env, p, true) + + p.low, p.upp = birth+1, 0xffffffff + singleRequest(t, server, env, p, false) + + p.low, p.upp = 0, birth-1 + singleRequest(t, server, env, p, false) + + p.low = birth - 1 + p.upp = birth + 1 + p.topic[0]++ + singleRequest(t, server, env, p, false) +} + +func singleRequest(t *testing.T, server *WMailServer, env *whisper.Envelope, p *ServerTestParams, expect bool) { + request := createRequest(t, p) + src := crypto.FromECDSAPub(&p.key.PublicKey) + ok, lower, upper, topic := server.validateRequest(src, request) + if !ok { + t.Fatalf("request validation failed, seed: %d.", seed) + } + if lower != p.low { + t.Fatalf("request validation failed (lower bound), seed: %d.", seed) + } + if upper != p.upp { + t.Fatalf("request validation failed (upper bound), seed: %d.", seed) + } + if topic != p.topic { + t.Fatalf("request validation failed (topic), seed: %d.", seed) + } + + var exist bool + mail := server.processRequest(nil, p.low, p.upp, p.topic) + for _, msg := range mail { + if msg.Hash() == env.Hash() { + exist = true + break + } + } + + if exist != expect { + t.Fatalf("error: exist = %v, seed: %d.", exist, seed) + } + + src[0]++ + ok, lower, upper, topic = server.validateRequest(src, request) + if ok { + t.Fatalf("request validation false positive, seed: %d.", seed) + } +} + +func createRequest(t *testing.T, p *ServerTestParams) *whisper.Envelope { + data := make([]byte, 8+whisper.TopicLength) + binary.BigEndian.PutUint32(data, p.low) + binary.BigEndian.PutUint32(data[4:], p.upp) + copy(data[8:], p.topic[:]) + + params := &whisper.MessageParams{ + KeySym: shh.GetSymKey(keyName), + Topic: p.topic, + Payload: data, + PoW: powRequirement * 2, + WorkTime: 2, + Src: p.key, + } + + msg := whisper.NewSentMessage(params) + env, err := msg.Wrap(params) + if err != nil { + t.Fatalf("failed to wrap with seed %d: %s.", seed, err) + } + return env +} |