aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/pss/pss_test.go
diff options
context:
space:
mode:
authorethersphere <thesw@rm.eth>2018-06-20 20:06:27 +0800
committerethersphere <thesw@rm.eth>2018-06-22 03:10:31 +0800
commite187711c6545487d4cac3701f0f506bb536234e2 (patch)
treed2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/pss/pss_test.go
parent574378edb50c907b532946a1d4654dbd6701b20a (diff)
downloadgo-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.zip
swarm: network rewrite merge
Diffstat (limited to 'swarm/pss/pss_test.go')
-rw-r--r--swarm/pss/pss_test.go1683
1 files changed, 1683 insertions, 0 deletions
diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go
new file mode 100644
index 000000000..e28af275b
--- /dev/null
+++ b/swarm/pss/pss_test.go
@@ -0,0 +1,1683 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package pss
+
+import (
+ "bytes"
+ "context"
+ "crypto/ecdsa"
+ "encoding/binary"
+ "encoding/hex"
+ "encoding/json"
+ "flag"
+ "fmt"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "strconv"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/metrics/influxdb"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/protocols"
+ "github.com/ethereum/go-ethereum/p2p/simulations"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/state"
+ whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
+)
+
+var (
+ initOnce = sync.Once{}
+ debugdebugflag = flag.Bool("vv", false, "veryverbose")
+ debugflag = flag.Bool("v", false, "verbose")
+ w *whisper.Whisper
+ wapi *whisper.PublicWhisperAPI
+ psslogmain log.Logger
+ pssprotocols map[string]*protoCtrl
+ useHandshake bool
+)
+
+func init() {
+ flag.Parse()
+ rand.Seed(time.Now().Unix())
+
+ adapters.RegisterServices(newServices(false))
+ initTest()
+}
+
+func initTest() {
+ initOnce.Do(
+ func() {
+ loglevel := log.LvlInfo
+ if *debugflag {
+ loglevel = log.LvlDebug
+ } else if *debugdebugflag {
+ loglevel = log.LvlTrace
+ }
+
+ psslogmain = log.New("psslog", "*")
+ hs := log.StreamHandler(os.Stderr, log.TerminalFormat(true))
+ hf := log.LvlFilterHandler(loglevel, hs)
+ h := log.CallerFileHandler(hf)
+ log.Root().SetHandler(h)
+
+ w = whisper.New(&whisper.DefaultConfig)
+ wapi = whisper.NewPublicWhisperAPI(w)
+
+ pssprotocols = make(map[string]*protoCtrl)
+ },
+ )
+}
+
+// test that topic conversion functions give predictable results
+func TestTopic(t *testing.T) {
+
+ api := &API{}
+
+ topicstr := strings.Join([]string{PingProtocol.Name, strconv.Itoa(int(PingProtocol.Version))}, ":")
+
+ // bytestotopic is the authoritative topic conversion source
+ topicobj := BytesToTopic([]byte(topicstr))
+
+ // string to topic and bytes to topic must match
+ topicapiobj, _ := api.StringToTopic(topicstr)
+ if topicobj != topicapiobj {
+ t.Fatalf("bytes and string topic conversion mismatch; %s != %s", topicobj, topicapiobj)
+ }
+
+ // string representation of topichex
+ topichex := topicobj.String()
+
+ // protocoltopic wrapper on pingtopic should be same as topicstring
+ // check that it matches
+ pingtopichex := PingTopic.String()
+ if topichex != pingtopichex {
+ t.Fatalf("protocol topic conversion mismatch; %s != %s", topichex, pingtopichex)
+ }
+
+ // json marshal of topic
+ topicjsonout, err := topicobj.MarshalJSON()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if string(topicjsonout)[1:len(topicjsonout)-1] != topichex {
+ t.Fatalf("topic json marshal mismatch; %s != \"%s\"", topicjsonout, topichex)
+ }
+
+ // json unmarshal of topic
+ var topicjsonin Topic
+ topicjsonin.UnmarshalJSON(topicjsonout)
+ if topicjsonin != topicobj {
+ t.Fatalf("topic json unmarshal mismatch: %x != %x", topicjsonin, topicobj)
+ }
+}
+
+// test bit packing of message control flags
+func TestMsgParams(t *testing.T) {
+ var ctrl byte
+ ctrl |= pssControlRaw
+ p := newMsgParamsFromBytes([]byte{ctrl})
+ m := newPssMsg(p)
+ if !m.isRaw() || m.isSym() {
+ t.Fatal("expected raw=true and sym=false")
+ }
+ ctrl |= pssControlSym
+ p = newMsgParamsFromBytes([]byte{ctrl})
+ m = newPssMsg(p)
+ if !m.isRaw() || !m.isSym() {
+ t.Fatal("expected raw=true and sym=true")
+ }
+ ctrl &= 0xff &^ pssControlRaw
+ p = newMsgParamsFromBytes([]byte{ctrl})
+ m = newPssMsg(p)
+ if m.isRaw() || !m.isSym() {
+ t.Fatal("expected raw=false and sym=true")
+ }
+}
+
+// test if we can insert into cache, match items with cache and cache expiry
+func TestCache(t *testing.T) {
+ var err error
+ to, _ := hex.DecodeString("08090a0b0c0d0e0f1011121314150001020304050607161718191a1b1c1d1e1f")
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+ keys, err := wapi.NewKeyPair(ctx)
+ privkey, err := w.GetPrivateKey(keys)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ps := newTestPss(privkey, nil, nil)
+ pp := NewPssParams().WithPrivateKey(privkey)
+ data := []byte("foo")
+ datatwo := []byte("bar")
+ datathree := []byte("baz")
+ wparams := &whisper.MessageParams{
+ TTL: defaultWhisperTTL,
+ Src: privkey,
+ Dst: &privkey.PublicKey,
+ Topic: whisper.TopicType(PingTopic),
+ WorkTime: defaultWhisperWorkTime,
+ PoW: defaultWhisperPoW,
+ Payload: data,
+ }
+ woutmsg, err := whisper.NewSentMessage(wparams)
+ env, err := woutmsg.Wrap(wparams)
+ msg := &PssMsg{
+ Payload: env,
+ To: to,
+ }
+ wparams.Payload = datatwo
+ woutmsg, err = whisper.NewSentMessage(wparams)
+ envtwo, err := woutmsg.Wrap(wparams)
+ msgtwo := &PssMsg{
+ Payload: envtwo,
+ To: to,
+ }
+ wparams.Payload = datathree
+ woutmsg, err = whisper.NewSentMessage(wparams)
+ envthree, err := woutmsg.Wrap(wparams)
+ msgthree := &PssMsg{
+ Payload: envthree,
+ To: to,
+ }
+
+ digest := ps.digest(msg)
+ if err != nil {
+ t.Fatalf("could not store cache msgone: %v", err)
+ }
+ digesttwo := ps.digest(msgtwo)
+ if err != nil {
+ t.Fatalf("could not store cache msgtwo: %v", err)
+ }
+ digestthree := ps.digest(msgthree)
+ if err != nil {
+ t.Fatalf("could not store cache msgthree: %v", err)
+ }
+
+ if digest == digesttwo {
+ t.Fatalf("different msgs return same hash: %d", digesttwo)
+ }
+
+ // check the cache
+ err = ps.addFwdCache(msg)
+ if err != nil {
+ t.Fatalf("write to pss expire cache failed: %v", err)
+ }
+
+ if !ps.checkFwdCache(msg) {
+ t.Fatalf("message %v should have EXPIRE record in cache but checkCache returned false", msg)
+ }
+
+ if ps.checkFwdCache(msgtwo) {
+ t.Fatalf("message %v should NOT have EXPIRE record in cache but checkCache returned true", msgtwo)
+ }
+
+ time.Sleep(pp.CacheTTL + 1*time.Second)
+ err = ps.addFwdCache(msgthree)
+ if err != nil {
+ t.Fatalf("write to pss expire cache failed: %v", err)
+ }
+
+ if ps.checkFwdCache(msg) {
+ t.Fatalf("message %v should have expired from cache but checkCache returned true", msg)
+ }
+
+ if _, ok := ps.fwdCache[digestthree]; !ok {
+ t.Fatalf("unexpired message should be in the cache: %v", digestthree)
+ }
+
+ if _, ok := ps.fwdCache[digesttwo]; ok {
+ t.Fatalf("expired message should have been cleared from the cache: %v", digesttwo)
+ }
+}
+
+// matching of address hints; whether a message could be or is for the node
+func TestAddressMatch(t *testing.T) {
+
+ localaddr := network.RandomAddr().Over()
+ copy(localaddr[:8], []byte("deadbeef"))
+ remoteaddr := []byte("feedbeef")
+ kadparams := network.NewKadParams()
+ kad := network.NewKademlia(localaddr, kadparams)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+ keys, err := wapi.NewKeyPair(ctx)
+ if err != nil {
+ t.Fatalf("Could not generate private key: %v", err)
+ }
+ privkey, err := w.GetPrivateKey(keys)
+ pssp := NewPssParams().WithPrivateKey(privkey)
+ ps, err := NewPss(kad, pssp)
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+
+ pssmsg := &PssMsg{
+ To: remoteaddr,
+ Payload: &whisper.Envelope{},
+ }
+
+ // differ from first byte
+ if ps.isSelfRecipient(pssmsg) {
+ t.Fatalf("isSelfRecipient true but %x != %x", remoteaddr, localaddr)
+ }
+ if ps.isSelfPossibleRecipient(pssmsg) {
+ t.Fatalf("isSelfPossibleRecipient true but %x != %x", remoteaddr[:8], localaddr[:8])
+ }
+
+ // 8 first bytes same
+ copy(remoteaddr[:4], localaddr[:4])
+ if ps.isSelfRecipient(pssmsg) {
+ t.Fatalf("isSelfRecipient true but %x != %x", remoteaddr, localaddr)
+ }
+ if !ps.isSelfPossibleRecipient(pssmsg) {
+ t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8])
+ }
+
+ // all bytes same
+ pssmsg.To = localaddr
+ if !ps.isSelfRecipient(pssmsg) {
+ t.Fatalf("isSelfRecipient false but %x == %x", remoteaddr, localaddr)
+ }
+ if !ps.isSelfPossibleRecipient(pssmsg) {
+ t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8])
+ }
+}
+
+//
+func TestHandlerConditions(t *testing.T) {
+
+ t.Skip("Disabled due to probable faulty logic for outbox expectations")
+ // setup
+ privkey, err := crypto.GenerateKey()
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+
+ addr := make([]byte, 32)
+ addr[0] = 0x01
+ ps := newTestPss(privkey, network.NewKademlia(addr, network.NewKadParams()), NewPssParams())
+
+ // message should pass
+ msg := &PssMsg{
+ To: addr,
+ Expire: uint32(time.Now().Add(time.Second * 60).Unix()),
+ Payload: &whisper.Envelope{
+ Topic: [4]byte{},
+ Data: []byte{0x66, 0x6f, 0x6f},
+ },
+ }
+ if err := ps.handlePssMsg(msg); err != nil {
+ t.Fatal(err.Error())
+ }
+ tmr := time.NewTimer(time.Millisecond * 100)
+ var outmsg *PssMsg
+ select {
+ case outmsg = <-ps.outbox:
+ case <-tmr.C:
+ default:
+ }
+ if outmsg != nil {
+ t.Fatalf("expected outbox empty after full address on msg, but had message %s", msg)
+ }
+
+ // message should pass and queue due to partial length
+ msg.To = addr[0:1]
+ msg.Payload.Data = []byte{0x78, 0x79, 0x80, 0x80, 0x79}
+ if err := ps.handlePssMsg(msg); err != nil {
+ t.Fatal(err.Error())
+ }
+ tmr.Reset(time.Millisecond * 100)
+ outmsg = nil
+ select {
+ case outmsg = <-ps.outbox:
+ case <-tmr.C:
+ }
+ if outmsg == nil {
+ t.Fatal("expected message in outbox on encrypt fail, but empty")
+ }
+ outmsg = nil
+ select {
+ case outmsg = <-ps.outbox:
+ default:
+ }
+ if outmsg != nil {
+ t.Fatalf("expected only one queued message but also had message %v", msg)
+ }
+
+ // full address mismatch should put message in queue
+ msg.To[0] = 0xff
+ if err := ps.handlePssMsg(msg); err != nil {
+ t.Fatal(err.Error())
+ }
+ tmr.Reset(time.Millisecond * 10)
+ outmsg = nil
+ select {
+ case outmsg = <-ps.outbox:
+ case <-tmr.C:
+ }
+ if outmsg == nil {
+ t.Fatal("expected message in outbox on address mismatch, but empty")
+ }
+ outmsg = nil
+ select {
+ case outmsg = <-ps.outbox:
+ default:
+ }
+ if outmsg != nil {
+ t.Fatalf("expected only one queued message but also had message %v", msg)
+ }
+
+ // expired message should be dropped
+ msg.Expire = uint32(time.Now().Add(-time.Second).Unix())
+ if err := ps.handlePssMsg(msg); err != nil {
+ t.Fatal(err.Error())
+ }
+ tmr.Reset(time.Millisecond * 10)
+ outmsg = nil
+ select {
+ case outmsg = <-ps.outbox:
+ case <-tmr.C:
+ default:
+ }
+ if outmsg != nil {
+ t.Fatalf("expected empty queue but have message %v", msg)
+ }
+
+ // invalid message should return error
+ fckedupmsg := &struct {
+ pssMsg *PssMsg
+ }{
+ pssMsg: &PssMsg{},
+ }
+ if err := ps.handlePssMsg(fckedupmsg); err == nil {
+ t.Fatalf("expected error from processMsg but error nil")
+ }
+
+ // outbox full should return error
+ msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix())
+ for i := 0; i < defaultOutboxCapacity; i++ {
+ ps.outbox <- msg
+ }
+ msg.Payload.Data = []byte{0x62, 0x61, 0x72}
+ err = ps.handlePssMsg(msg)
+ if err == nil {
+ t.Fatal("expected error when mailbox full, but was nil")
+ }
+}
+
+// set and generate pubkeys and symkeys
+func TestKeys(t *testing.T) {
+ // make our key and init pss with it
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+ ourkeys, err := wapi.NewKeyPair(ctx)
+ if err != nil {
+ t.Fatalf("create 'our' key fail")
+ }
+ ctx, cancel2 := context.WithTimeout(context.Background(), time.Second)
+ defer cancel2()
+ theirkeys, err := wapi.NewKeyPair(ctx)
+ if err != nil {
+ t.Fatalf("create 'their' key fail")
+ }
+ ourprivkey, err := w.GetPrivateKey(ourkeys)
+ if err != nil {
+ t.Fatalf("failed to retrieve 'our' private key")
+ }
+ theirprivkey, err := w.GetPrivateKey(theirkeys)
+ if err != nil {
+ t.Fatalf("failed to retrieve 'their' private key")
+ }
+ ps := newTestPss(ourprivkey, nil, nil)
+
+ // set up peer with mock address, mapped to mocked publicaddress and with mocked symkey
+ addr := make(PssAddress, 32)
+ copy(addr, network.RandomAddr().Over())
+ outkey := network.RandomAddr().Over()
+ topicobj := BytesToTopic([]byte("foo:42"))
+ ps.SetPeerPublicKey(&theirprivkey.PublicKey, topicobj, &addr)
+ outkeyid, err := ps.SetSymmetricKey(outkey, topicobj, &addr, false)
+ if err != nil {
+ t.Fatalf("failed to set 'our' outgoing symmetric key")
+ }
+
+ // make a symmetric key that we will send to peer for encrypting messages to us
+ inkeyid, err := ps.generateSymmetricKey(topicobj, &addr, true)
+ if err != nil {
+ t.Fatalf("failed to set 'our' incoming symmetric key")
+ }
+
+ // get the key back from whisper, check that it's still the same
+ outkeyback, err := ps.w.GetSymKey(outkeyid)
+ if err != nil {
+ t.Fatalf(err.Error())
+ }
+ inkey, err := ps.w.GetSymKey(inkeyid)
+ if err != nil {
+ t.Fatalf(err.Error())
+ }
+ if !bytes.Equal(outkeyback, outkey) {
+ t.Fatalf("passed outgoing symkey doesnt equal stored: %x / %x", outkey, outkeyback)
+ }
+
+ t.Logf("symout: %v", outkeyback)
+ t.Logf("symin: %v", inkey)
+
+ // check that the key is stored in the peerpool
+ psp := ps.symKeyPool[inkeyid][topicobj]
+ if psp.address != &addr {
+ t.Fatalf("inkey address does not match; %p != %p", psp.address, &addr)
+ }
+}
+
+func TestGetPublickeyEntries(t *testing.T) {
+
+ privkey, err := crypto.GenerateKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+ ps := newTestPss(privkey, nil, nil)
+
+ peeraddr := network.RandomAddr().Over()
+ topicaddr := make(map[Topic]PssAddress)
+ topicaddr[Topic{0x13}] = peeraddr
+ topicaddr[Topic{0x2a}] = peeraddr[:16]
+ topicaddr[Topic{0x02, 0x9a}] = []byte{}
+
+ remoteprivkey, err := crypto.GenerateKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+ remotepubkeybytes := crypto.FromECDSAPub(&remoteprivkey.PublicKey)
+ remotepubkeyhex := common.ToHex(remotepubkeybytes)
+
+ pssapi := NewAPI(ps)
+
+ for to, a := range topicaddr {
+ err = pssapi.SetPeerPublicKey(remotepubkeybytes, to, a)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ intopic, err := pssapi.GetPeerTopics(remotepubkeyhex)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+OUTER:
+ for _, tnew := range intopic {
+ for torig, addr := range topicaddr {
+ if bytes.Equal(torig[:], tnew[:]) {
+ inaddr, err := pssapi.GetPeerAddress(remotepubkeyhex, torig)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !bytes.Equal(addr, inaddr) {
+ t.Fatalf("Address mismatch for topic %x; got %x, expected %x", torig, inaddr, addr)
+ }
+ delete(topicaddr, torig)
+ continue OUTER
+ }
+ }
+ t.Fatalf("received topic %x did not match any existing topics", tnew)
+ }
+
+ if len(topicaddr) != 0 {
+ t.Fatalf("%d topics were not matched", len(topicaddr))
+ }
+}
+
+type pssTestPeer struct {
+ *protocols.Peer
+ addr []byte
+}
+
+func (t *pssTestPeer) Address() []byte {
+ return t.addr
+}
+
+func (t *pssTestPeer) Update(addr network.OverlayAddr) network.OverlayAddr {
+ return addr
+}
+
+func (t *pssTestPeer) Off() network.OverlayAddr {
+ return &pssTestPeer{}
+}
+
+// forwarding should skip peers that do not have matching pss capabilities
+func TestMismatch(t *testing.T) {
+
+ // create privkey for forwarder node
+ privkey, err := crypto.GenerateKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // initialize overlay
+ baseaddr := network.RandomAddr()
+ kad := network.NewKademlia((baseaddr).Over(), network.NewKadParams())
+ rw := &p2p.MsgPipeRW{}
+
+ // one peer has a mismatching version of pss
+ wrongpssaddr := network.RandomAddr()
+ wrongpsscap := p2p.Cap{
+ Name: pssProtocolName,
+ Version: 0,
+ }
+ nid, _ := discover.HexID("0x01")
+ wrongpsspeer := &pssTestPeer{
+ Peer: protocols.NewPeer(p2p.NewPeer(nid, common.ToHex(wrongpssaddr.Over()), []p2p.Cap{wrongpsscap}), rw, nil),
+ addr: wrongpssaddr.Over(),
+ }
+
+ // one peer doesn't even have pss (boo!)
+ nopssaddr := network.RandomAddr()
+ nopsscap := p2p.Cap{
+ Name: "nopss",
+ Version: 1,
+ }
+ nid, _ = discover.HexID("0x02")
+ nopsspeer := &pssTestPeer{
+ Peer: protocols.NewPeer(p2p.NewPeer(nid, common.ToHex(nopssaddr.Over()), []p2p.Cap{nopsscap}), rw, nil),
+ addr: nopssaddr.Over(),
+ }
+
+ // add peers to kademlia and activate them
+ // it's safe so don't check errors
+ kad.Register([]network.OverlayAddr{wrongpsspeer})
+ kad.On(wrongpsspeer)
+ kad.Register([]network.OverlayAddr{nopsspeer})
+ kad.On(nopsspeer)
+
+ // create pss
+ pssmsg := &PssMsg{
+ To: []byte{},
+ Expire: uint32(time.Now().Add(time.Second).Unix()),
+ Payload: &whisper.Envelope{},
+ }
+ ps := newTestPss(privkey, kad, nil)
+
+ // run the forward
+ // it is enough that it completes; trying to send to incapable peers would create segfault
+ ps.forward(pssmsg)
+
+}
+
+func TestSendRaw(t *testing.T) {
+ t.Run("32", testSendRaw)
+ t.Run("8", testSendRaw)
+ t.Run("0", testSendRaw)
+}
+
+func testSendRaw(t *testing.T) {
+
+ var addrsize int64
+ var err error
+
+ paramstring := strings.Split(t.Name(), "/")
+
+ addrsize, _ = strconv.ParseInt(paramstring[1], 10, 0)
+ log.Info("raw send test", "addrsize", addrsize)
+
+ clients, err := setupNetwork(2, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ topic := "0xdeadbeef"
+
+ var loaddrhex string
+ err = clients[0].Call(&loaddrhex, "pss_baseAddr")
+ if err != nil {
+ t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
+ }
+ loaddrhex = loaddrhex[:2+(addrsize*2)]
+ var roaddrhex string
+ err = clients[1].Call(&roaddrhex, "pss_baseAddr")
+ if err != nil {
+ t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
+ }
+ roaddrhex = roaddrhex[:2+(addrsize*2)]
+
+ time.Sleep(time.Millisecond * 500)
+
+ // at this point we've verified that symkeys are saved and match on each peer
+ // now try sending symmetrically encrypted message, both directions
+ lmsgC := make(chan APIMsg)
+ lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer lcancel()
+ lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic)
+ log.Trace("lsub", "id", lsub)
+ defer lsub.Unsubscribe()
+ rmsgC := make(chan APIMsg)
+ rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer rcancel()
+ rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic)
+ log.Trace("rsub", "id", rsub)
+ defer rsub.Unsubscribe()
+
+ // send and verify delivery
+ lmsg := []byte("plugh")
+ err = clients[1].Call(nil, "pss_sendRaw", loaddrhex, topic, lmsg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ select {
+ case recvmsg := <-lmsgC:
+ if !bytes.Equal(recvmsg.Msg, lmsg) {
+ t.Fatalf("node 1 received payload mismatch: expected %v, got %v", lmsg, recvmsg)
+ }
+ case cerr := <-lctx.Done():
+ t.Fatalf("test message (left) timed out: %v", cerr)
+ }
+ rmsg := []byte("xyzzy")
+ err = clients[0].Call(nil, "pss_sendRaw", roaddrhex, topic, rmsg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ select {
+ case recvmsg := <-rmsgC:
+ if !bytes.Equal(recvmsg.Msg, rmsg) {
+ t.Fatalf("node 2 received payload mismatch: expected %x, got %v", rmsg, recvmsg.Msg)
+ }
+ case cerr := <-rctx.Done():
+ t.Fatalf("test message (right) timed out: %v", cerr)
+ }
+}
+
+// send symmetrically encrypted message between two directly connected peers
+func TestSendSym(t *testing.T) {
+ t.Run("32", testSendSym)
+ t.Run("8", testSendSym)
+ t.Run("0", testSendSym)
+}
+
+func testSendSym(t *testing.T) {
+
+ // address hint size
+ var addrsize int64
+ var err error
+ paramstring := strings.Split(t.Name(), "/")
+ addrsize, _ = strconv.ParseInt(paramstring[1], 10, 0)
+ log.Info("sym send test", "addrsize", addrsize)
+
+ clients, err := setupNetwork(2, false)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var topic string
+ err = clients[0].Call(&topic, "pss_stringToTopic", "foo:42")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var loaddrhex string
+ err = clients[0].Call(&loaddrhex, "pss_baseAddr")
+ if err != nil {
+ t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
+ }
+ loaddrhex = loaddrhex[:2+(addrsize*2)]
+ var roaddrhex string
+ err = clients[1].Call(&roaddrhex, "pss_baseAddr")
+ if err != nil {
+ t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
+ }
+ roaddrhex = roaddrhex[:2+(addrsize*2)]
+
+ // retrieve public key from pss instance
+ // set this public key reciprocally
+ var lpubkeyhex string
+ err = clients[0].Call(&lpubkeyhex, "pss_getPublicKey")
+ if err != nil {
+ t.Fatalf("rpc get node 1 pubkey fail: %v", err)
+ }
+ var rpubkeyhex string
+ err = clients[1].Call(&rpubkeyhex, "pss_getPublicKey")
+ if err != nil {
+ t.Fatalf("rpc get node 2 pubkey fail: %v", err)
+ }
+
+ time.Sleep(time.Millisecond * 500)
+
+ // at this point we've verified that symkeys are saved and match on each peer
+ // now try sending symmetrically encrypted message, both directions
+ lmsgC := make(chan APIMsg)
+ lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer lcancel()
+ lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic)
+ log.Trace("lsub", "id", lsub)
+ defer lsub.Unsubscribe()
+ rmsgC := make(chan APIMsg)
+ rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer rcancel()
+ rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic)
+ log.Trace("rsub", "id", rsub)
+ defer rsub.Unsubscribe()
+
+ lrecvkey := network.RandomAddr().Over()
+ rrecvkey := network.RandomAddr().Over()
+
+ var lkeyids [2]string
+ var rkeyids [2]string
+
+ // manually set reciprocal symkeys
+ err = clients[0].Call(&lkeyids, "psstest_setSymKeys", rpubkeyhex, lrecvkey, rrecvkey, defaultSymKeySendLimit, topic, roaddrhex)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = clients[1].Call(&rkeyids, "psstest_setSymKeys", lpubkeyhex, rrecvkey, lrecvkey, defaultSymKeySendLimit, topic, loaddrhex)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // send and verify delivery
+ lmsg := []byte("plugh")
+ err = clients[1].Call(nil, "pss_sendSym", rkeyids[1], topic, hexutil.Encode(lmsg))
+ if err != nil {
+ t.Fatal(err)
+ }
+ select {
+ case recvmsg := <-lmsgC:
+ if !bytes.Equal(recvmsg.Msg, lmsg) {
+ t.Fatalf("node 1 received payload mismatch: expected %v, got %v", lmsg, recvmsg)
+ }
+ case cerr := <-lctx.Done():
+ t.Fatalf("test message timed out: %v", cerr)
+ }
+ rmsg := []byte("xyzzy")
+ err = clients[0].Call(nil, "pss_sendSym", lkeyids[1], topic, hexutil.Encode(rmsg))
+ if err != nil {
+ t.Fatal(err)
+ }
+ select {
+ case recvmsg := <-rmsgC:
+ if !bytes.Equal(recvmsg.Msg, rmsg) {
+ t.Fatalf("node 2 received payload mismatch: expected %x, got %v", rmsg, recvmsg.Msg)
+ }
+ case cerr := <-rctx.Done():
+ t.Fatalf("test message timed out: %v", cerr)
+ }
+}
+
+// send asymmetrically encrypted message between two directly connected peers
+func TestSendAsym(t *testing.T) {
+ t.Run("32", testSendAsym)
+ t.Run("8", testSendAsym)
+ t.Run("0", testSendAsym)
+}
+
+func testSendAsym(t *testing.T) {
+
+ // address hint size
+ var addrsize int64
+ var err error
+ paramstring := strings.Split(t.Name(), "/")
+ addrsize, _ = strconv.ParseInt(paramstring[1], 10, 0)
+ log.Info("asym send test", "addrsize", addrsize)
+
+ clients, err := setupNetwork(2, false)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var topic string
+ err = clients[0].Call(&topic, "pss_stringToTopic", "foo:42")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ time.Sleep(time.Millisecond * 250)
+
+ var loaddrhex string
+ err = clients[0].Call(&loaddrhex, "pss_baseAddr")
+ if err != nil {
+ t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
+ }
+ loaddrhex = loaddrhex[:2+(addrsize*2)]
+ var roaddrhex string
+ err = clients[1].Call(&roaddrhex, "pss_baseAddr")
+ if err != nil {
+ t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
+ }
+ roaddrhex = roaddrhex[:2+(addrsize*2)]
+
+ // retrieve public key from pss instance
+ // set this public key reciprocally
+ var lpubkey string
+ err = clients[0].Call(&lpubkey, "pss_getPublicKey")
+ if err != nil {
+ t.Fatalf("rpc get node 1 pubkey fail: %v", err)
+ }
+ var rpubkey string
+ err = clients[1].Call(&rpubkey, "pss_getPublicKey")
+ if err != nil {
+ t.Fatalf("rpc get node 2 pubkey fail: %v", err)
+ }
+
+ time.Sleep(time.Millisecond * 500) // replace with hive healthy code
+
+ lmsgC := make(chan APIMsg)
+ lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer lcancel()
+ lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic)
+ log.Trace("lsub", "id", lsub)
+ defer lsub.Unsubscribe()
+ rmsgC := make(chan APIMsg)
+ rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer rcancel()
+ rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic)
+ log.Trace("rsub", "id", rsub)
+ defer rsub.Unsubscribe()
+
+ // store reciprocal public keys
+ err = clients[0].Call(nil, "pss_setPeerPublicKey", rpubkey, topic, roaddrhex)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = clients[1].Call(nil, "pss_setPeerPublicKey", lpubkey, topic, loaddrhex)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // send and verify delivery
+ rmsg := []byte("xyzzy")
+ err = clients[0].Call(nil, "pss_sendAsym", rpubkey, topic, hexutil.Encode(rmsg))
+ if err != nil {
+ t.Fatal(err)
+ }
+ select {
+ case recvmsg := <-rmsgC:
+ if !bytes.Equal(recvmsg.Msg, rmsg) {
+ t.Fatalf("node 2 received payload mismatch: expected %v, got %v", rmsg, recvmsg.Msg)
+ }
+ case cerr := <-rctx.Done():
+ t.Fatalf("test message timed out: %v", cerr)
+ }
+ lmsg := []byte("plugh")
+ err = clients[1].Call(nil, "pss_sendAsym", lpubkey, topic, hexutil.Encode(lmsg))
+ if err != nil {
+ t.Fatal(err)
+ }
+ select {
+ case recvmsg := <-lmsgC:
+ if !bytes.Equal(recvmsg.Msg, lmsg) {
+ t.Fatalf("node 1 received payload mismatch: expected %v, got %v", lmsg, recvmsg.Msg)
+ }
+ case cerr := <-lctx.Done():
+ t.Fatalf("test message timed out: %v", cerr)
+ }
+}
+
+type Job struct {
+ Msg []byte
+ SendNode discover.NodeID
+ RecvNode discover.NodeID
+}
+
+func worker(id int, jobs <-chan Job, rpcs map[discover.NodeID]*rpc.Client, pubkeys map[discover.NodeID]string, topic string) {
+ for j := range jobs {
+ rpcs[j.SendNode].Call(nil, "pss_sendAsym", pubkeys[j.RecvNode], topic, hexutil.Encode(j.Msg))
+ }
+}
+
+// params in run name:
+// nodes/msgs/addrbytes/adaptertype
+// if adaptertype is exec uses execadapter, simadapter otherwise
+func TestNetwork2000(t *testing.T) {
+ //enableMetrics()
+
+ t.Run("3/2000/4/sim", testNetwork)
+ t.Run("4/2000/4/sim", testNetwork)
+ t.Run("8/2000/4/sim", testNetwork)
+ t.Run("16/2000/4/sim", testNetwork)
+}
+
+func TestNetwork5000(t *testing.T) {
+ //enableMetrics()
+
+ t.Run("3/5000/4/sim", testNetwork)
+ t.Run("4/5000/4/sim", testNetwork)
+ t.Run("8/5000/4/sim", testNetwork)
+ t.Run("16/5000/4/sim", testNetwork)
+}
+
+func TestNetwork10000(t *testing.T) {
+ //enableMetrics()
+
+ t.Run("3/10000/4/sim", testNetwork)
+ t.Run("4/10000/4/sim", testNetwork)
+ t.Run("8/10000/4/sim", testNetwork)
+}
+
+func testNetwork(t *testing.T) {
+ type msgnotifyC struct {
+ id discover.NodeID
+ msgIdx int
+ }
+
+ paramstring := strings.Split(t.Name(), "/")
+ nodecount, _ := strconv.ParseInt(paramstring[1], 10, 0)
+ msgcount, _ := strconv.ParseInt(paramstring[2], 10, 0)
+ addrsize, _ := strconv.ParseInt(paramstring[3], 10, 0)
+ adapter := paramstring[4]
+
+ log.Info("network test", "nodecount", nodecount, "msgcount", msgcount, "addrhintsize", addrsize)
+
+ nodes := make([]discover.NodeID, nodecount)
+ bzzaddrs := make(map[discover.NodeID]string, nodecount)
+ rpcs := make(map[discover.NodeID]*rpc.Client, nodecount)
+ pubkeys := make(map[discover.NodeID]string, nodecount)
+
+ sentmsgs := make([][]byte, msgcount)
+ recvmsgs := make([]bool, msgcount)
+ nodemsgcount := make(map[discover.NodeID]int, nodecount)
+
+ trigger := make(chan discover.NodeID)
+
+ var a adapters.NodeAdapter
+ if adapter == "exec" {
+ dirname, err := ioutil.TempDir(".", "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ a = adapters.NewExecAdapter(dirname)
+ } else if adapter == "tcp" {
+ a = adapters.NewTCPAdapter(newServices(false))
+ } else if adapter == "sim" {
+ a = adapters.NewSimAdapter(newServices(false))
+ }
+ net := simulations.NewNetwork(a, &simulations.NetworkConfig{
+ ID: "0",
+ })
+ defer net.Shutdown()
+
+ f, err := os.Open(fmt.Sprintf("testdata/snapshot_%d.json", nodecount))
+ if err != nil {
+ t.Fatal(err)
+ }
+ jsonbyte, err := ioutil.ReadAll(f)
+ if err != nil {
+ t.Fatal(err)
+ }
+ var snap simulations.Snapshot
+ err = json.Unmarshal(jsonbyte, &snap)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = net.Load(&snap)
+ if err != nil {
+ //TODO: Fix p2p simulation framework to not crash when loading 32-nodes
+ //t.Fatal(err)
+ }
+
+ time.Sleep(1 * time.Second)
+
+ triggerChecks := func(trigger chan discover.NodeID, id discover.NodeID, rpcclient *rpc.Client, topic string) error {
+ msgC := make(chan APIMsg)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ sub, err := rpcclient.Subscribe(ctx, "pss", msgC, "receive", topic)
+ if err != nil {
+ t.Fatal(err)
+ }
+ go func() {
+ defer sub.Unsubscribe()
+ for {
+ select {
+ case recvmsg := <-msgC:
+ idx, _ := binary.Uvarint(recvmsg.Msg)
+ if !recvmsgs[idx] {
+ log.Debug("msg recv", "idx", idx, "id", id)
+ recvmsgs[idx] = true
+ trigger <- id
+ }
+ case <-sub.Err():
+ return
+ }
+ }
+ }()
+ return nil
+ }
+
+ var topic string
+ for i, nod := range net.GetNodes() {
+ nodes[i] = nod.ID()
+ rpcs[nodes[i]], err = nod.Client()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if topic == "" {
+ err = rpcs[nodes[i]].Call(&topic, "pss_stringToTopic", "foo:42")
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+ var pubkey string
+ err = rpcs[nodes[i]].Call(&pubkey, "pss_getPublicKey")
+ if err != nil {
+ t.Fatal(err)
+ }
+ pubkeys[nod.ID()] = pubkey
+ var addrhex string
+ err = rpcs[nodes[i]].Call(&addrhex, "pss_baseAddr")
+ if err != nil {
+ t.Fatal(err)
+ }
+ bzzaddrs[nodes[i]] = addrhex
+ err = triggerChecks(trigger, nodes[i], rpcs[nodes[i]], topic)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ time.Sleep(1 * time.Second)
+
+ // setup workers
+ jobs := make(chan Job, 10)
+ for w := 1; w <= 10; w++ {
+ go worker(w, jobs, rpcs, pubkeys, topic)
+ }
+
+ time.Sleep(1 * time.Second)
+
+ for i := 0; i < int(msgcount); i++ {
+ sendnodeidx := rand.Intn(int(nodecount))
+ recvnodeidx := rand.Intn(int(nodecount - 1))
+ if recvnodeidx >= sendnodeidx {
+ recvnodeidx++
+ }
+ nodemsgcount[nodes[recvnodeidx]]++
+ sentmsgs[i] = make([]byte, 8)
+ c := binary.PutUvarint(sentmsgs[i], uint64(i))
+ if c == 0 {
+ t.Fatal("0 byte message")
+ }
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = rpcs[nodes[sendnodeidx]].Call(nil, "pss_setPeerPublicKey", pubkeys[nodes[recvnodeidx]], topic, bzzaddrs[nodes[recvnodeidx]])
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ jobs <- Job{
+ Msg: sentmsgs[i],
+ SendNode: nodes[sendnodeidx],
+ RecvNode: nodes[recvnodeidx],
+ }
+ }
+
+ finalmsgcount := 0
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
+ defer cancel()
+outer:
+ for i := 0; i < int(msgcount); i++ {
+ select {
+ case id := <-trigger:
+ nodemsgcount[id]--
+ finalmsgcount++
+ case <-ctx.Done():
+ log.Warn("timeout")
+ break outer
+ }
+ }
+
+ for i, msg := range recvmsgs {
+ if !msg {
+ log.Debug("missing message", "idx", i)
+ }
+ }
+ t.Logf("%d of %d messages received", finalmsgcount, msgcount)
+
+ if finalmsgcount != int(msgcount) {
+ t.Fatalf("%d messages were not received", int(msgcount)-finalmsgcount)
+ }
+
+}
+
+// check that in a network of a -> b -> c -> a
+// a doesn't receive a sent message twice
+func TestDeduplication(t *testing.T) {
+ var err error
+
+ clients, err := setupNetwork(3, false)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var addrsize = 32
+ var loaddrhex string
+ err = clients[0].Call(&loaddrhex, "pss_baseAddr")
+ if err != nil {
+ t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
+ }
+ loaddrhex = loaddrhex[:2+(addrsize*2)]
+ var roaddrhex string
+ err = clients[1].Call(&roaddrhex, "pss_baseAddr")
+ if err != nil {
+ t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
+ }
+ roaddrhex = roaddrhex[:2+(addrsize*2)]
+ var xoaddrhex string
+ err = clients[2].Call(&xoaddrhex, "pss_baseAddr")
+ if err != nil {
+ t.Fatalf("rpc get node 3 baseaddr fail: %v", err)
+ }
+ xoaddrhex = xoaddrhex[:2+(addrsize*2)]
+
+ log.Info("peer", "l", loaddrhex, "r", roaddrhex, "x", xoaddrhex)
+
+ var topic string
+ err = clients[0].Call(&topic, "pss_stringToTopic", "foo:42")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ time.Sleep(time.Millisecond * 250)
+
+ // retrieve public key from pss instance
+ // set this public key reciprocally
+ var rpubkey string
+ err = clients[1].Call(&rpubkey, "pss_getPublicKey")
+ if err != nil {
+ t.Fatalf("rpc get receivenode pubkey fail: %v", err)
+ }
+
+ time.Sleep(time.Millisecond * 500) // replace with hive healthy code
+
+ rmsgC := make(chan APIMsg)
+ rctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
+ defer cancel()
+ rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic)
+ log.Trace("rsub", "id", rsub)
+ defer rsub.Unsubscribe()
+
+ // store public key for recipient
+ // zero-length address means forward to all
+ // we have just two peers, they will be in proxbin, and will both receive
+ err = clients[0].Call(nil, "pss_setPeerPublicKey", rpubkey, topic, "0x")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // send and verify delivery
+ rmsg := []byte("xyzzy")
+ err = clients[0].Call(nil, "pss_sendAsym", rpubkey, topic, hexutil.Encode(rmsg))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var receivedok bool
+OUTER:
+ for {
+ select {
+ case <-rmsgC:
+ if receivedok {
+ t.Fatalf("duplicate message received")
+ }
+ receivedok = true
+ case <-rctx.Done():
+ break OUTER
+ }
+ }
+ if !receivedok {
+ t.Fatalf("message did not arrive")
+ }
+}
+
+// symmetric send performance with varying message sizes
+func BenchmarkSymkeySend(b *testing.B) {
+ b.Run(fmt.Sprintf("%d", 256), benchmarkSymKeySend)
+ b.Run(fmt.Sprintf("%d", 1024), benchmarkSymKeySend)
+ b.Run(fmt.Sprintf("%d", 1024*1024), benchmarkSymKeySend)
+ b.Run(fmt.Sprintf("%d", 1024*1024*10), benchmarkSymKeySend)
+ b.Run(fmt.Sprintf("%d", 1024*1024*100), benchmarkSymKeySend)
+}
+
+func benchmarkSymKeySend(b *testing.B) {
+ msgsizestring := strings.Split(b.Name(), "/")
+ if len(msgsizestring) != 2 {
+ b.Fatalf("benchmark called without msgsize param")
+ }
+ msgsize, err := strconv.ParseInt(msgsizestring[1], 10, 0)
+ if err != nil {
+ b.Fatalf("benchmark called with invalid msgsize param '%s': %v", msgsizestring[1], err)
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+ keys, err := wapi.NewKeyPair(ctx)
+ privkey, err := w.GetPrivateKey(keys)
+ ps := newTestPss(privkey, nil, nil)
+ msg := make([]byte, msgsize)
+ rand.Read(msg)
+ topic := BytesToTopic([]byte("foo"))
+ to := make(PssAddress, 32)
+ copy(to[:], network.RandomAddr().Over())
+ symkeyid, err := ps.generateSymmetricKey(topic, &to, true)
+ if err != nil {
+ b.Fatalf("could not generate symkey: %v", err)
+ }
+ symkey, err := ps.w.GetSymKey(symkeyid)
+ if err != nil {
+ b.Fatalf("could not retrieve symkey: %v", err)
+ }
+ ps.SetSymmetricKey(symkey, topic, &to, false)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ ps.SendSym(symkeyid, topic, msg)
+ }
+}
+
+// asymmetric send performance with varying message sizes
+func BenchmarkAsymkeySend(b *testing.B) {
+ b.Run(fmt.Sprintf("%d", 256), benchmarkAsymKeySend)
+ b.Run(fmt.Sprintf("%d", 1024), benchmarkAsymKeySend)
+ b.Run(fmt.Sprintf("%d", 1024*1024), benchmarkAsymKeySend)
+ b.Run(fmt.Sprintf("%d", 1024*1024*10), benchmarkAsymKeySend)
+ b.Run(fmt.Sprintf("%d", 1024*1024*100), benchmarkAsymKeySend)
+}
+
+func benchmarkAsymKeySend(b *testing.B) {
+ msgsizestring := strings.Split(b.Name(), "/")
+ if len(msgsizestring) != 2 {
+ b.Fatalf("benchmark called without msgsize param")
+ }
+ msgsize, err := strconv.ParseInt(msgsizestring[1], 10, 0)
+ if err != nil {
+ b.Fatalf("benchmark called with invalid msgsize param '%s': %v", msgsizestring[1], err)
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+ keys, err := wapi.NewKeyPair(ctx)
+ privkey, err := w.GetPrivateKey(keys)
+ ps := newTestPss(privkey, nil, nil)
+ msg := make([]byte, msgsize)
+ rand.Read(msg)
+ topic := BytesToTopic([]byte("foo"))
+ to := make(PssAddress, 32)
+ copy(to[:], network.RandomAddr().Over())
+ ps.SetPeerPublicKey(&privkey.PublicKey, topic, &to)
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ ps.SendAsym(common.ToHex(crypto.FromECDSAPub(&privkey.PublicKey)), topic, msg)
+ }
+}
+func BenchmarkSymkeyBruteforceChangeaddr(b *testing.B) {
+ for i := 100; i < 100000; i = i * 10 {
+ for j := 32; j < 10000; j = j * 8 {
+ b.Run(fmt.Sprintf("%d/%d", i, j), benchmarkSymkeyBruteforceChangeaddr)
+ }
+ //b.Run(fmt.Sprintf("%d", i), benchmarkSymkeyBruteforceChangeaddr)
+ }
+}
+
+// decrypt performance using symkey cache, worst case
+// (decrypt key always last in cache)
+func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) {
+ keycountstring := strings.Split(b.Name(), "/")
+ cachesize := int64(0)
+ var ps *Pss
+ if len(keycountstring) < 2 {
+ b.Fatalf("benchmark called without count param")
+ }
+ keycount, err := strconv.ParseInt(keycountstring[1], 10, 0)
+ if err != nil {
+ b.Fatalf("benchmark called with invalid count param '%s': %v", keycountstring[1], err)
+ }
+ if len(keycountstring) == 3 {
+ cachesize, err = strconv.ParseInt(keycountstring[2], 10, 0)
+ if err != nil {
+ b.Fatalf("benchmark called with invalid cachesize '%s': %v", keycountstring[2], err)
+ }
+ }
+ pssmsgs := make([]*PssMsg, 0, keycount)
+ var keyid string
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+ keys, err := wapi.NewKeyPair(ctx)
+ privkey, err := w.GetPrivateKey(keys)
+ if cachesize > 0 {
+ ps = newTestPss(privkey, nil, &PssParams{SymKeyCacheCapacity: int(cachesize)})
+ } else {
+ ps = newTestPss(privkey, nil, nil)
+ }
+ topic := BytesToTopic([]byte("foo"))
+ for i := 0; i < int(keycount); i++ {
+ to := make(PssAddress, 32)
+ copy(to[:], network.RandomAddr().Over())
+ keyid, err = ps.generateSymmetricKey(topic, &to, true)
+ if err != nil {
+ b.Fatalf("cant generate symkey #%d: %v", i, err)
+ }
+ symkey, err := ps.w.GetSymKey(keyid)
+ if err != nil {
+ b.Fatalf("could not retrieve symkey %s: %v", keyid, err)
+ }
+ wparams := &whisper.MessageParams{
+ TTL: defaultWhisperTTL,
+ KeySym: symkey,
+ Topic: whisper.TopicType(topic),
+ WorkTime: defaultWhisperWorkTime,
+ PoW: defaultWhisperPoW,
+ Payload: []byte("xyzzy"),
+ Padding: []byte("1234567890abcdef"),
+ }
+ woutmsg, err := whisper.NewSentMessage(wparams)
+ if err != nil {
+ b.Fatalf("could not create whisper message: %v", err)
+ }
+ env, err := woutmsg.Wrap(wparams)
+ if err != nil {
+ b.Fatalf("could not generate whisper envelope: %v", err)
+ }
+ ps.Register(&topic, func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
+ return nil
+ })
+ pssmsgs = append(pssmsgs, &PssMsg{
+ To: to,
+ Payload: env,
+ })
+ }
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ if err := ps.process(pssmsgs[len(pssmsgs)-(i%len(pssmsgs))-1]); err != nil {
+ b.Fatalf("pss processing failed: %v", err)
+ }
+ }
+}
+
+func BenchmarkSymkeyBruteforceSameaddr(b *testing.B) {
+ for i := 100; i < 100000; i = i * 10 {
+ for j := 32; j < 10000; j = j * 8 {
+ b.Run(fmt.Sprintf("%d/%d", i, j), benchmarkSymkeyBruteforceSameaddr)
+ }
+ }
+}
+
+// decrypt performance using symkey cache, best case
+// (decrypt key always first in cache)
+func benchmarkSymkeyBruteforceSameaddr(b *testing.B) {
+ var keyid string
+ var ps *Pss
+ cachesize := int64(0)
+ keycountstring := strings.Split(b.Name(), "/")
+ if len(keycountstring) < 2 {
+ b.Fatalf("benchmark called without count param")
+ }
+ keycount, err := strconv.ParseInt(keycountstring[1], 10, 0)
+ if err != nil {
+ b.Fatalf("benchmark called with invalid count param '%s': %v", keycountstring[1], err)
+ }
+ if len(keycountstring) == 3 {
+ cachesize, err = strconv.ParseInt(keycountstring[2], 10, 0)
+ if err != nil {
+ b.Fatalf("benchmark called with invalid cachesize '%s': %v", keycountstring[2], err)
+ }
+ }
+ addr := make([]PssAddress, keycount)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+ keys, err := wapi.NewKeyPair(ctx)
+ privkey, err := w.GetPrivateKey(keys)
+ if cachesize > 0 {
+ ps = newTestPss(privkey, nil, &PssParams{SymKeyCacheCapacity: int(cachesize)})
+ } else {
+ ps = newTestPss(privkey, nil, nil)
+ }
+ topic := BytesToTopic([]byte("foo"))
+ for i := 0; i < int(keycount); i++ {
+ copy(addr[i], network.RandomAddr().Over())
+ keyid, err = ps.generateSymmetricKey(topic, &addr[i], true)
+ if err != nil {
+ b.Fatalf("cant generate symkey #%d: %v", i, err)
+ }
+
+ }
+ symkey, err := ps.w.GetSymKey(keyid)
+ if err != nil {
+ b.Fatalf("could not retrieve symkey %s: %v", keyid, err)
+ }
+ wparams := &whisper.MessageParams{
+ TTL: defaultWhisperTTL,
+ KeySym: symkey,
+ Topic: whisper.TopicType(topic),
+ WorkTime: defaultWhisperWorkTime,
+ PoW: defaultWhisperPoW,
+ Payload: []byte("xyzzy"),
+ Padding: []byte("1234567890abcdef"),
+ }
+ woutmsg, err := whisper.NewSentMessage(wparams)
+ if err != nil {
+ b.Fatalf("could not create whisper message: %v", err)
+ }
+ env, err := woutmsg.Wrap(wparams)
+ if err != nil {
+ b.Fatalf("could not generate whisper envelope: %v", err)
+ }
+ ps.Register(&topic, func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
+ return nil
+ })
+ pssmsg := &PssMsg{
+ To: addr[len(addr)-1][:],
+ Payload: env,
+ }
+ for i := 0; i < b.N; i++ {
+ if err := ps.process(pssmsg); err != nil {
+ b.Fatalf("pss processing failed: %v", err)
+ }
+ }
+}
+
+// setup simulated network with bzz/discovery and pss services.
+// connects nodes in a circle
+// if allowRaw is set, omission of builtin pss encryption is enabled (see PssParams)
+func setupNetwork(numnodes int, allowRaw bool) (clients []*rpc.Client, err error) {
+ nodes := make([]*simulations.Node, numnodes)
+ clients = make([]*rpc.Client, numnodes)
+ if numnodes < 2 {
+ return nil, fmt.Errorf("Minimum two nodes in network")
+ }
+ adapter := adapters.NewSimAdapter(newServices(allowRaw))
+ net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{
+ ID: "0",
+ DefaultService: "bzz",
+ })
+ for i := 0; i < numnodes; i++ {
+ nodeconf := adapters.RandomNodeConfig()
+ nodeconf.Services = []string{"bzz", pssProtocolName}
+ nodes[i], err = net.NewNodeWithConfig(nodeconf)
+ if err != nil {
+ return nil, fmt.Errorf("error creating node 1: %v", err)
+ }
+ err = net.Start(nodes[i].ID())
+ if err != nil {
+ return nil, fmt.Errorf("error starting node 1: %v", err)
+ }
+ if i > 0 {
+ err = net.Connect(nodes[i].ID(), nodes[i-1].ID())
+ if err != nil {
+ return nil, fmt.Errorf("error connecting nodes: %v", err)
+ }
+ }
+ clients[i], err = nodes[i].Client()
+ if err != nil {
+ return nil, fmt.Errorf("create node 1 rpc client fail: %v", err)
+ }
+ }
+ if numnodes > 2 {
+ err = net.Connect(nodes[0].ID(), nodes[len(nodes)-1].ID())
+ if err != nil {
+ return nil, fmt.Errorf("error connecting first and last nodes")
+ }
+ }
+ return clients, nil
+}
+
+func newServices(allowRaw bool) adapters.Services {
+ stateStore := state.NewInmemoryStore()
+ kademlias := make(map[discover.NodeID]*network.Kademlia)
+ kademlia := func(id discover.NodeID) *network.Kademlia {
+ if k, ok := kademlias[id]; ok {
+ return k
+ }
+ addr := network.NewAddrFromNodeID(id)
+ params := network.NewKadParams()
+ params.MinProxBinSize = 2
+ params.MaxBinSize = 3
+ params.MinBinSize = 1
+ params.MaxRetries = 1000
+ params.RetryExponent = 2
+ params.RetryInterval = 1000000
+ kademlias[id] = network.NewKademlia(addr.Over(), params)
+ return kademlias[id]
+ }
+ return adapters.Services{
+ pssProtocolName: func(ctx *adapters.ServiceContext) (node.Service, error) {
+ // execadapter does not exec init()
+ initTest()
+
+ ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+ keys, err := wapi.NewKeyPair(ctxlocal)
+ privkey, err := w.GetPrivateKey(keys)
+ pssp := NewPssParams().WithPrivateKey(privkey)
+ pssp.AllowRaw = allowRaw
+ pskad := kademlia(ctx.Config.ID)
+ ps, err := NewPss(pskad, pssp)
+ if err != nil {
+ return nil, err
+ }
+
+ ping := &Ping{
+ OutC: make(chan bool),
+ Pong: true,
+ }
+ p2pp := NewPingProtocol(ping)
+ pp, err := RegisterProtocol(ps, &PingTopic, PingProtocol, p2pp, &ProtocolParams{Asymmetric: true})
+ if err != nil {
+ return nil, err
+ }
+ if useHandshake {
+ SetHandshakeController(ps, NewHandshakeParams())
+ }
+ ps.Register(&PingTopic, pp.Handle)
+ ps.addAPI(rpc.API{
+ Namespace: "psstest",
+ Version: "0.3",
+ Service: NewAPITest(ps),
+ Public: false,
+ })
+ if err != nil {
+ log.Error("Couldnt register pss protocol", "err", err)
+ os.Exit(1)
+ }
+ pssprotocols[ctx.Config.ID.String()] = &protoCtrl{
+ C: ping.OutC,
+ protocol: pp,
+ run: p2pp.Run,
+ }
+ return ps, nil
+ },
+ "bzz": func(ctx *adapters.ServiceContext) (node.Service, error) {
+ addr := network.NewAddrFromNodeID(ctx.Config.ID)
+ hp := network.NewHiveParams()
+ hp.Discovery = false
+ config := &network.BzzConfig{
+ OverlayAddr: addr.Over(),
+ UnderlayAddr: addr.Under(),
+ HiveParams: hp,
+ }
+ return network.NewBzz(config, kademlia(ctx.Config.ID), stateStore, nil, nil), nil
+ },
+ }
+}
+
+func newTestPss(privkey *ecdsa.PrivateKey, overlay network.Overlay, ppextra *PssParams) *Pss {
+
+ var nid discover.NodeID
+ copy(nid[:], crypto.FromECDSAPub(&privkey.PublicKey))
+ addr := network.NewAddrFromNodeID(nid)
+
+ // set up routing if kademlia is not passed to us
+ if overlay == nil {
+ kp := network.NewKadParams()
+ kp.MinProxBinSize = 3
+ overlay = network.NewKademlia(addr.Over(), kp)
+ }
+
+ // create pss
+ pp := NewPssParams().WithPrivateKey(privkey)
+ if ppextra != nil {
+ pp.SymKeyCacheCapacity = ppextra.SymKeyCacheCapacity
+ }
+ ps, err := NewPss(overlay, pp)
+ if err != nil {
+ return nil
+ }
+ ps.Start(nil)
+
+ return ps
+}
+
+// API calls for test/development use
+type APITest struct {
+ *Pss
+}
+
+func NewAPITest(ps *Pss) *APITest {
+ return &APITest{Pss: ps}
+}
+
+func (apitest *APITest) SetSymKeys(pubkeyid string, recvsymkey []byte, sendsymkey []byte, limit uint16, topic Topic, to PssAddress) ([2]string, error) {
+ recvsymkeyid, err := apitest.SetSymmetricKey(recvsymkey, topic, &to, true)
+ if err != nil {
+ return [2]string{}, err
+ }
+ sendsymkeyid, err := apitest.SetSymmetricKey(sendsymkey, topic, &to, false)
+ if err != nil {
+ return [2]string{}, err
+ }
+ return [2]string{recvsymkeyid, sendsymkeyid}, nil
+}
+
+func (apitest *APITest) Clean() (int, error) {
+ return apitest.Pss.cleanKeys(), nil
+}
+
+// enableMetrics is starting InfluxDB reporter so that we collect stats when running tests locally
+func enableMetrics() {
+ metrics.Enabled = true
+ go influxdb.InfluxDBWithTags(metrics.DefaultRegistry, 1*time.Second, "http://localhost:8086", "metrics", "admin", "admin", "swarm.", map[string]string{
+ "host": "test",
+ })
+}