aboutsummaryrefslogblamecommitdiffstats
path: root/swarm/pss/pss_test.go
blob: 5f7a914eee8681ec1a993225082d64f1785f31a1 (plain) (tree)



































                                                                                  
















                                                                             


     










                                                                                              












                                                     

                                                                                    
                                                                          
























































































                                                                                                           
                       









































































































                                                                                                                  
                               





                                                                                    
                                                      







                                                                                                    
                                                       







                                                                                                     
                                                       

                                                                                                     
 

 



































































































                                                                                                   
                                                                          















                                                                      
                                                                         

































































                                                                                                                                                        

                                                                         
                       
                                

                                











































































































































                                                                                                                        










                                                                                                    
                       

                              





                                                                    
         
                                                                    















                                                                                                    
                                                                    





















                                                                                         
                                                                    





















                                                                                         
                                                                    


















                                                                         
                                                                           








                                                                        
                                                  




























                                                                              
                       





                                                                                               

                                                                          




                                                                                       
                                                                     





















                                                                                                   

                                                                                     


         
                                                                                    






                                            
                       


















































                                                                                                                           
                                                                          
                                               






                                            
                         









                                                                             
                             



                                                                                                                                 






                                                
                            



                                                                                                                           


                                                  
                                          
                            
                                       








                                                                    
                       






                                                                                                  










                                                                                                                                                              
                       

























































                                                                                              



























                                                                                                     
                                                                   










































                                                                                     
                                                                                            




                                                                                  
                                                                                            




                                     
                                                                                         











                                                                                                        
                                                                                         








































































                                                                                                            
                                                                                             




                                                                                  
                                                                                             











































































































                                                                                                                                       
                                                                                             




                                                                                  
                                                                                             











































                                                                                                            

                         

 
                                                                                                                




                                                                                                             



                                           





                                                                


                                                                                    








                                           


                                                                                    








                                           


                                                                                    





                                           







                                                                                                        



                                                         


                                            
                                                         
 
                                      






































                                                                                       
                                                                                                              


                                                                                       
                                                                                                 










































































































































































                                                                                                                                                  
                                                                                             


























































                                                                                                       
                       




                                                
                                                                 






                                                              
                                                    





























                                                                                                       
                       




                                                
                                                          











































                                                                                                            
                       



                                                        
                                                                     























                                                                                

                                             







                                                  
                                                                                                          










































                                                                                                            
                       


                                                          
                                                                          

























                                                                        

                                     





                                              
                                                                        



















































                                                                                         

                                                         


                                               
                                                
                                            




                                              
                                                                  






























                                                                                                                          





                                                         

















                                                                                      
                                                                  











                                                                                                         
                                                                                            
                                                     
                                                         
                       
                                            
                                        
                                                     






                                                                    
                                  
















                                     


                                                                                                                                                           


                                       
                                                                                              
















                                                                                                                                                               
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package pss

import (
    "bytes"
    "context"
    "crypto/ecdsa"
    "encoding/binary"
    "encoding/hex"
    "encoding/json"
    "flag"
    "fmt"
    "io/ioutil"
    "math/rand"
    "os"
    "strconv"
    "strings"
    "sync"
    "testing"
    "time"

    "github.com/tangerine-network/go-tangerine/common"
    "github.com/tangerine-network/go-tangerine/common/hexutil"
    "github.com/tangerine-network/go-tangerine/crypto"
    "github.com/tangerine-network/go-tangerine/log"
    "github.com/tangerine-network/go-tangerine/metrics"
    "github.com/tangerine-network/go-tangerine/metrics/influxdb"
    "github.com/tangerine-network/go-tangerine/node"
    "github.com/tangerine-network/go-tangerine/p2p"
    "github.com/tangerine-network/go-tangerine/p2p/enode"
    "github.com/tangerine-network/go-tangerine/p2p/protocols"
    "github.com/tangerine-network/go-tangerine/p2p/simulations"
    "github.com/tangerine-network/go-tangerine/p2p/simulations/adapters"
    "github.com/tangerine-network/go-tangerine/rpc"
    "github.com/tangerine-network/go-tangerine/swarm/network"
    "github.com/tangerine-network/go-tangerine/swarm/pot"
    "github.com/tangerine-network/go-tangerine/swarm/state"
    whisper "github.com/tangerine-network/go-tangerine/whisper/whisperv6"
)

var (
    initOnce        = sync.Once{}
    loglevel        = flag.Int("loglevel", 2, "logging verbosity")
    longrunning     = flag.Bool("longrunning", false, "do run long-running tests")
    w               *whisper.Whisper
    wapi            *whisper.PublicWhisperAPI
    psslogmain      log.Logger
    pssprotocols    map[string]*protoCtrl
    useHandshake    bool
    noopHandlerFunc = func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
        return nil
    }
)

func init() {
    flag.Parse()
    rand.Seed(time.Now().Unix())

    adapters.RegisterServices(newServices(false))
    initTest()
}

func initTest() {
    initOnce.Do(
        func() {
            psslogmain = log.New("psslog", "*")
            hs := log.StreamHandler(os.Stderr, log.TerminalFormat(true))
            hf := log.LvlFilterHandler(log.Lvl(*loglevel), hs)
            h := log.CallerFileHandler(hf)
            log.Root().SetHandler(h)

            w = whisper.New(&whisper.DefaultConfig)
            wapi = whisper.NewPublicWhisperAPI(w)

            pssprotocols = make(map[string]*protoCtrl)
        },
    )
}

// test that topic conversion functions give predictable results
func TestTopic(t *testing.T) {

    api := &API{}

    topicstr := strings.Join([]string{PingProtocol.Name, strconv.Itoa(int(PingProtocol.Version))}, ":")

    // bytestotopic is the authoritative topic conversion source
    topicobj := BytesToTopic([]byte(topicstr))

    // string to topic and bytes to topic must match
    topicapiobj, _ := api.StringToTopic(topicstr)
    if topicobj != topicapiobj {
        t.Fatalf("bytes and string topic conversion mismatch; %s != %s", topicobj, topicapiobj)
    }

    // string representation of topichex
    topichex := topicobj.String()

    // protocoltopic wrapper on pingtopic should be same as topicstring
    // check that it matches
    pingtopichex := PingTopic.String()
    if topichex != pingtopichex {
        t.Fatalf("protocol topic conversion mismatch; %s != %s", topichex, pingtopichex)
    }

    // json marshal of topic
    topicjsonout, err := topicobj.MarshalJSON()
    if err != nil {
        t.Fatal(err)
    }
    if string(topicjsonout)[1:len(topicjsonout)-1] != topichex {
        t.Fatalf("topic json marshal mismatch; %s != \"%s\"", topicjsonout, topichex)
    }

    // json unmarshal of topic
    var topicjsonin Topic
    topicjsonin.UnmarshalJSON(topicjsonout)
    if topicjsonin != topicobj {
        t.Fatalf("topic json unmarshal mismatch: %x != %x", topicjsonin, topicobj)
    }
}

// test bit packing of message control flags
func TestMsgParams(t *testing.T) {
    var ctrl byte
    ctrl |= pssControlRaw
    p := newMsgParamsFromBytes([]byte{ctrl})
    m := newPssMsg(p)
    if !m.isRaw() || m.isSym() {
        t.Fatal("expected raw=true and sym=false")
    }
    ctrl |= pssControlSym
    p = newMsgParamsFromBytes([]byte{ctrl})
    m = newPssMsg(p)
    if !m.isRaw() || !m.isSym() {
        t.Fatal("expected raw=true and sym=true")
    }
    ctrl &= 0xff &^ pssControlRaw
    p = newMsgParamsFromBytes([]byte{ctrl})
    m = newPssMsg(p)
    if m.isRaw() || !m.isSym() {
        t.Fatal("expected raw=false and sym=true")
    }
}

// test if we can insert into cache, match items with cache and cache expiry
func TestCache(t *testing.T) {
    var err error
    to, _ := hex.DecodeString("08090a0b0c0d0e0f1011121314150001020304050607161718191a1b1c1d1e1f")
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    keys, err := wapi.NewKeyPair(ctx)
    privkey, err := w.GetPrivateKey(keys)
    if err != nil {
        t.Fatal(err)
    }
    ps := newTestPss(privkey, nil, nil)
    defer ps.Stop()
    pp := NewPssParams().WithPrivateKey(privkey)
    data := []byte("foo")
    datatwo := []byte("bar")
    datathree := []byte("baz")
    wparams := &whisper.MessageParams{
        TTL:      defaultWhisperTTL,
        Src:      privkey,
        Dst:      &privkey.PublicKey,
        Topic:    whisper.TopicType(PingTopic),
        WorkTime: defaultWhisperWorkTime,
        PoW:      defaultWhisperPoW,
        Payload:  data,
    }
    woutmsg, err := whisper.NewSentMessage(wparams)
    env, err := woutmsg.Wrap(wparams)
    msg := &PssMsg{
        Payload: env,
        To:      to,
    }
    wparams.Payload = datatwo
    woutmsg, err = whisper.NewSentMessage(wparams)
    envtwo, err := woutmsg.Wrap(wparams)
    msgtwo := &PssMsg{
        Payload: envtwo,
        To:      to,
    }
    wparams.Payload = datathree
    woutmsg, err = whisper.NewSentMessage(wparams)
    envthree, err := woutmsg.Wrap(wparams)
    msgthree := &PssMsg{
        Payload: envthree,
        To:      to,
    }

    digest := ps.digest(msg)
    if err != nil {
        t.Fatalf("could not store cache msgone: %v", err)
    }
    digesttwo := ps.digest(msgtwo)
    if err != nil {
        t.Fatalf("could not store cache msgtwo: %v", err)
    }
    digestthree := ps.digest(msgthree)
    if err != nil {
        t.Fatalf("could not store cache msgthree: %v", err)
    }

    if digest == digesttwo {
        t.Fatalf("different msgs return same hash: %d", digesttwo)
    }

    // check the cache
    err = ps.addFwdCache(msg)
    if err != nil {
        t.Fatalf("write to pss expire cache failed: %v", err)
    }

    if !ps.checkFwdCache(msg) {
        t.Fatalf("message %v should have EXPIRE record in cache but checkCache returned false", msg)
    }

    if ps.checkFwdCache(msgtwo) {
        t.Fatalf("message %v should NOT have EXPIRE record in cache but checkCache returned true", msgtwo)
    }

    time.Sleep(pp.CacheTTL + 1*time.Second)
    err = ps.addFwdCache(msgthree)
    if err != nil {
        t.Fatalf("write to pss expire cache failed: %v", err)
    }

    if ps.checkFwdCache(msg) {
        t.Fatalf("message %v should have expired from cache but checkCache returned true", msg)
    }

    if _, ok := ps.fwdCache[digestthree]; !ok {
        t.Fatalf("unexpired message should be in the cache: %v", digestthree)
    }

    if _, ok := ps.fwdCache[digesttwo]; ok {
        t.Fatalf("expired message should have been cleared from the cache: %v", digesttwo)
    }
}

// matching of address hints; whether a message could be or is for the node
func TestAddressMatch(t *testing.T) {

    localaddr := network.RandomAddr().Over()
    copy(localaddr[:8], []byte("deadbeef"))
    remoteaddr := []byte("feedbeef")
    kadparams := network.NewKadParams()
    kad := network.NewKademlia(localaddr, kadparams)
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    keys, err := wapi.NewKeyPair(ctx)
    if err != nil {
        t.Fatalf("Could not generate private key: %v", err)
    }
    privkey, err := w.GetPrivateKey(keys)
    pssp := NewPssParams().WithPrivateKey(privkey)
    ps, err := NewPss(kad, pssp)
    if err != nil {
        t.Fatal(err.Error())
    }

    pssmsg := &PssMsg{
        To: remoteaddr,
    }

    // differ from first byte
    if ps.isSelfRecipient(pssmsg) {
        t.Fatalf("isSelfRecipient true but %x != %x", remoteaddr, localaddr)
    }
    if ps.isSelfPossibleRecipient(pssmsg, false) {
        t.Fatalf("isSelfPossibleRecipient true but %x != %x", remoteaddr[:8], localaddr[:8])
    }

    // 8 first bytes same
    copy(remoteaddr[:4], localaddr[:4])
    if ps.isSelfRecipient(pssmsg) {
        t.Fatalf("isSelfRecipient true but %x != %x", remoteaddr, localaddr)
    }
    if !ps.isSelfPossibleRecipient(pssmsg, false) {
        t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8])
    }

    // all bytes same
    pssmsg.To = localaddr
    if !ps.isSelfRecipient(pssmsg) {
        t.Fatalf("isSelfRecipient false but %x == %x", remoteaddr, localaddr)
    }
    if !ps.isSelfPossibleRecipient(pssmsg, false) {
        t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8])
    }

}

// test that message is handled by sender if a prox handler exists and sender is in prox of message
func TestProxShortCircuit(t *testing.T) {

    // sender node address
    localAddr := network.RandomAddr().Over()
    localPotAddr := pot.NewAddressFromBytes(localAddr)

    // set up kademlia
    kadParams := network.NewKadParams()
    kad := network.NewKademlia(localAddr, kadParams)
    peerCount := kad.MinBinSize + 1

    // set up pss
    privKey, err := crypto.GenerateKey()
    pssp := NewPssParams().WithPrivateKey(privKey)
    ps, err := NewPss(kad, pssp)
    if err != nil {
        t.Fatal(err.Error())
    }

    // create kademlia peers, so we have peers both inside and outside minproxlimit
    var peers []*network.Peer
    proxMessageAddress := pot.RandomAddressAt(localPotAddr, peerCount).Bytes()
    distantMessageAddress := pot.RandomAddressAt(localPotAddr, 0).Bytes()

    for i := 0; i < peerCount; i++ {
        rw := &p2p.MsgPipeRW{}
        ptpPeer := p2p.NewPeer(enode.ID{}, "wanna be with me? [ ] yes [ ] no", []p2p.Cap{})
        protoPeer := protocols.NewPeer(ptpPeer, rw, &protocols.Spec{})
        peerAddr := pot.RandomAddressAt(localPotAddr, i)
        bzzPeer := &network.BzzPeer{
            Peer: protoPeer,
            BzzAddr: &network.BzzAddr{
                OAddr: peerAddr.Bytes(),
                UAddr: []byte(fmt.Sprintf("%x", peerAddr[:])),
            },
        }
        peer := network.NewPeer(bzzPeer, kad)
        kad.On(peer)
        peers = append(peers, peer)
    }

    // register it marking prox capability
    delivered := make(chan struct{})
    rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
        log.Trace("in allowraw handler")
        delivered <- struct{}{}
        return nil
    }
    topic := BytesToTopic([]byte{0x2a})
    hndlrProxDereg := ps.Register(&topic, &handler{
        f: rawHandlerFunc,
        caps: &handlerCaps{
            raw:  true,
            prox: true,
        },
    })
    defer hndlrProxDereg()

    // send message too far away for sender to be in prox
    // reception of this message should time out
    errC := make(chan error)
    go func() {
        err := ps.SendRaw(distantMessageAddress, topic, []byte("foo"))
        if err != nil {
            errC <- err
        }
    }()

    ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
    defer cancel()
    select {
    case <-delivered:
        t.Fatal("raw distant message delivered")
    case err := <-errC:
        t.Fatal(err)
    case <-ctx.Done():
    }

    // send message that should be within sender prox
    // this message should be delivered
    go func() {
        err := ps.SendRaw(proxMessageAddress, topic, []byte("bar"))
        if err != nil {
            errC <- err
        }
    }()

    ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
    defer cancel()
    select {
    case <-delivered:
    case err := <-errC:
        t.Fatal(err)
    case <-ctx.Done():
        t.Fatal("raw timeout")
    }

    // try the same prox message with sym and asym send
    proxAddrPss := PssAddress(proxMessageAddress)
    symKeyId, err := ps.GenerateSymmetricKey(topic, proxAddrPss, true)
    go func() {
        err := ps.SendSym(symKeyId, topic, []byte("baz"))
        if err != nil {
            errC <- err
        }
    }()
    ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
    defer cancel()
    select {
    case <-delivered:
    case err := <-errC:
        t.Fatal(err)
    case <-ctx.Done():
        t.Fatal("sym timeout")
    }

    err = ps.SetPeerPublicKey(&privKey.PublicKey, topic, proxAddrPss)
    if err != nil {
        t.Fatal(err)
    }
    pubKeyId := hexutil.Encode(crypto.FromECDSAPub(&privKey.PublicKey))
    go func() {
        err := ps.SendAsym(pubKeyId, topic, []byte("xyzzy"))
        if err != nil {
            errC <- err
        }
    }()
    ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
    defer cancel()
    select {
    case <-delivered:
    case err := <-errC:
        t.Fatal(err)
    case <-ctx.Done():
        t.Fatal("asym timeout")
    }
}

// verify that node can be set as recipient regardless of explicit message address match if minimum one handler of a topic is explicitly set to allow it
// note that in these tests we use the raw capability on handlers for convenience
func TestAddressMatchProx(t *testing.T) {

    // recipient node address
    localAddr := network.RandomAddr().Over()
    localPotAddr := pot.NewAddressFromBytes(localAddr)

    // set up kademlia
    kadparams := network.NewKadParams()
    kad := network.NewKademlia(localAddr, kadparams)
    nnPeerCount := kad.MinBinSize
    peerCount := nnPeerCount + 2

    // set up pss
    privKey, err := crypto.GenerateKey()
    pssp := NewPssParams().WithPrivateKey(privKey)
    ps, err := NewPss(kad, pssp)
    if err != nil {
        t.Fatal(err.Error())
    }

    // create kademlia peers, so we have peers both inside and outside minproxlimit
    var peers []*network.Peer
    for i := 0; i < peerCount; i++ {
        rw := &p2p.MsgPipeRW{}
        ptpPeer := p2p.NewPeer(enode.ID{}, "362436 call me anytime", []p2p.Cap{})
        protoPeer := protocols.NewPeer(ptpPeer, rw, &protocols.Spec{})
        peerAddr := pot.RandomAddressAt(localPotAddr, i)
        bzzPeer := &network.BzzPeer{
            Peer: protoPeer,
            BzzAddr: &network.BzzAddr{
                OAddr: peerAddr.Bytes(),
                UAddr: []byte(fmt.Sprintf("%x", peerAddr[:])),
            },
        }
        peer := network.NewPeer(bzzPeer, kad)
        kad.On(peer)
        peers = append(peers, peer)
    }

    // TODO: create a test in the network package to make a table with n peers where n-m are proxpeers
    // meanwhile test regression for kademlia since we are compiling the test parameters from different packages
    var proxes int
    var conns int
    depth := kad.NeighbourhoodDepth()
    kad.EachConn(nil, peerCount, func(p *network.Peer, po int) bool {
        conns++
        if po >= depth {
            proxes++
        }
        return true
    })
    if proxes != nnPeerCount {
        t.Fatalf("expected %d proxpeers, have %d", nnPeerCount, proxes)
    } else if conns != peerCount {
        t.Fatalf("expected %d peers total, have %d", peerCount, proxes)
    }

    // remote address distances from localAddr to try and the expected outcomes if we use prox handler
    remoteDistances := []int{
        255,
        nnPeerCount + 1,
        nnPeerCount,
        nnPeerCount - 1,
        0,
    }
    expects := []bool{
        true,
        true,
        true,
        false,
        false,
    }

    // first the unit test on the method that calculates possible receipient using prox
    for i, distance := range remoteDistances {
        pssMsg := newPssMsg(&msgParams{})
        pssMsg.To = make([]byte, len(localAddr))
        copy(pssMsg.To, localAddr)
        var byteIdx = distance / 8
        pssMsg.To[byteIdx] ^= 1 << uint(7-(distance%8))
        log.Trace(fmt.Sprintf("addrmatch %v", bytes.Equal(pssMsg.To, localAddr)))
        if ps.isSelfPossibleRecipient(pssMsg, true) != expects[i] {
            t.Fatalf("expected distance %d to be %v", distance, expects[i])
        }
    }

    // we move up to higher level and test the actual message handler
    // for each distance check if we are possible recipient when prox variant is used is set

    // this handler will increment a counter for every message that gets passed to the handler
    var receives int
    rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
        log.Trace("in allowraw handler")
        receives++
        return nil
    }

    // register it marking prox capability
    topic := BytesToTopic([]byte{0x2a})
    hndlrProxDereg := ps.Register(&topic, &handler{
        f: rawHandlerFunc,
        caps: &handlerCaps{
            raw:  true,
            prox: true,
        },
    })

    // test the distances
    var prevReceive int
    for i, distance := range remoteDistances {
        remotePotAddr := pot.RandomAddressAt(localPotAddr, distance)
        remoteAddr := remotePotAddr.Bytes()

        var data [32]byte
        rand.Read(data[:])
        pssMsg := newPssMsg(&msgParams{raw: true})
        pssMsg.To = remoteAddr
        pssMsg.Expire = uint32(time.Now().Unix() + 4200)
        pssMsg.Payload = &whisper.Envelope{
            Topic: whisper.TopicType(topic),
            Data:  data[:],
        }

        log.Trace("withprox addrs", "local", localAddr, "remote", remoteAddr)
        ps.handlePssMsg(context.TODO(), pssMsg)
        if (!expects[i] && prevReceive != receives) || (expects[i] && prevReceive == receives) {
            t.Fatalf("expected distance %d recipient %v when prox is set for handler", distance, expects[i])
        }
        prevReceive = receives
    }

    // now add a non prox-capable handler and test
    ps.Register(&topic, &handler{
        f: rawHandlerFunc,
        caps: &handlerCaps{
            raw: true,
        },
    })
    receives = 0
    prevReceive = 0
    for i, distance := range remoteDistances {
        remotePotAddr := pot.RandomAddressAt(localPotAddr, distance)
        remoteAddr := remotePotAddr.Bytes()

        var data [32]byte
        rand.Read(data[:])
        pssMsg := newPssMsg(&msgParams{raw: true})
        pssMsg.To = remoteAddr
        pssMsg.Expire = uint32(time.Now().Unix() + 4200)
        pssMsg.Payload = &whisper.Envelope{
            Topic: whisper.TopicType(topic),
            Data:  data[:],
        }

        log.Trace("withprox addrs", "local", localAddr, "remote", remoteAddr)
        ps.handlePssMsg(context.TODO(), pssMsg)
        if (!expects[i] && prevReceive != receives) || (expects[i] && prevReceive == receives) {
            t.Fatalf("expected distance %d recipient %v when prox is set for handler", distance, expects[i])
        }
        prevReceive = receives
    }

    // now deregister the prox capable handler, now none of the messages will be handled
    hndlrProxDereg()
    receives = 0

    for _, distance := range remoteDistances {
        remotePotAddr := pot.RandomAddressAt(localPotAddr, distance)
        remoteAddr := remotePotAddr.Bytes()

        pssMsg := newPssMsg(&msgParams{raw: true})
        pssMsg.To = remoteAddr
        pssMsg.Expire = uint32(time.Now().Unix() + 4200)
        pssMsg.Payload = &whisper.Envelope{
            Topic: whisper.TopicType(topic),
            Data:  []byte(remotePotAddr.String()),
        }

        log.Trace("noprox addrs", "local", localAddr, "remote", remoteAddr)
        ps.handlePssMsg(context.TODO(), pssMsg)
        if receives != 0 {
            t.Fatalf("expected distance %d to not be recipient when prox is not set for handler", distance)
        }

    }
}

// verify that message queueing happens when it should, and that expired and corrupt messages are dropped
func TestMessageProcessing(t *testing.T) {

    t.Skip("Disabled due to probable faulty logic for outbox expectations")
    // setup
    privkey, err := crypto.GenerateKey()
    if err != nil {
        t.Fatal(err.Error())
    }

    addr := make([]byte, 32)
    addr[0] = 0x01
    ps := newTestPss(privkey, network.NewKademlia(addr, network.NewKadParams()), NewPssParams())
    defer ps.Stop()

    // message should pass
    msg := newPssMsg(&msgParams{})
    msg.To = addr
    msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix())
    msg.Payload = &whisper.Envelope{
        Topic: [4]byte{},
        Data:  []byte{0x66, 0x6f, 0x6f},
    }
    if err := ps.handlePssMsg(context.TODO(), msg); err != nil {
        t.Fatal(err.Error())
    }
    tmr := time.NewTimer(time.Millisecond * 100)
    var outmsg *PssMsg
    select {
    case outmsg = <-ps.outbox:
    case <-tmr.C:
    default:
    }
    if outmsg != nil {
        t.Fatalf("expected outbox empty after full address on msg, but had message %s", msg)
    }

    // message should pass and queue due to partial length
    msg.To = addr[0:1]
    msg.Payload.Data = []byte{0x78, 0x79, 0x80, 0x80, 0x79}
    if err := ps.handlePssMsg(context.TODO(), msg); err != nil {
        t.Fatal(err.Error())
    }
    tmr.Reset(time.Millisecond * 100)
    outmsg = nil
    select {
    case outmsg = <-ps.outbox:
    case <-tmr.C:
    }
    if outmsg == nil {
        t.Fatal("expected message in outbox on encrypt fail, but empty")
    }
    outmsg = nil
    select {
    case outmsg = <-ps.outbox:
    default:
    }
    if outmsg != nil {
        t.Fatalf("expected only one queued message but also had message %v", msg)
    }

    // full address mismatch should put message in queue
    msg.To[0] = 0xff
    if err := ps.handlePssMsg(context.TODO(), msg); err != nil {
        t.Fatal(err.Error())
    }
    tmr.Reset(time.Millisecond * 10)
    outmsg = nil
    select {
    case outmsg = <-ps.outbox:
    case <-tmr.C:
    }
    if outmsg == nil {
        t.Fatal("expected message in outbox on address mismatch, but empty")
    }
    outmsg = nil
    select {
    case outmsg = <-ps.outbox:
    default:
    }
    if outmsg != nil {
        t.Fatalf("expected only one queued message but also had message %v", msg)
    }

    // expired message should be dropped
    msg.Expire = uint32(time.Now().Add(-time.Second).Unix())
    if err := ps.handlePssMsg(context.TODO(), msg); err != nil {
        t.Fatal(err.Error())
    }
    tmr.Reset(time.Millisecond * 10)
    outmsg = nil
    select {
    case outmsg = <-ps.outbox:
    case <-tmr.C:
    default:
    }
    if outmsg != nil {
        t.Fatalf("expected empty queue but have message %v", msg)
    }

    // invalid message should return error
    fckedupmsg := &struct {
        pssMsg *PssMsg
    }{
        pssMsg: &PssMsg{},
    }
    if err := ps.handlePssMsg(context.TODO(), fckedupmsg); err == nil {
        t.Fatalf("expected error from processMsg but error nil")
    }

    // outbox full should return error
    msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix())
    for i := 0; i < defaultOutboxCapacity; i++ {
        ps.outbox <- msg
    }
    msg.Payload.Data = []byte{0x62, 0x61, 0x72}
    err = ps.handlePssMsg(context.TODO(), msg)
    if err == nil {
        t.Fatal("expected error when mailbox full, but was nil")
    }
}

// set and generate pubkeys and symkeys
func TestKeys(t *testing.T) {
    // make our key and init pss with it
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    ourkeys, err := wapi.NewKeyPair(ctx)
    if err != nil {
        t.Fatalf("create 'our' key fail")
    }
    ctx, cancel2 := context.WithTimeout(context.Background(), time.Second)
    defer cancel2()
    theirkeys, err := wapi.NewKeyPair(ctx)
    if err != nil {
        t.Fatalf("create 'their' key fail")
    }
    ourprivkey, err := w.GetPrivateKey(ourkeys)
    if err != nil {
        t.Fatalf("failed to retrieve 'our' private key")
    }
    theirprivkey, err := w.GetPrivateKey(theirkeys)
    if err != nil {
        t.Fatalf("failed to retrieve 'their' private key")
    }
    ps := newTestPss(ourprivkey, nil, nil)
    defer ps.Stop()

    // set up peer with mock address, mapped to mocked publicaddress and with mocked symkey
    addr := make(PssAddress, 32)
    copy(addr, network.RandomAddr().Over())
    outkey := network.RandomAddr().Over()
    topicobj := BytesToTopic([]byte("foo:42"))
    ps.SetPeerPublicKey(&theirprivkey.PublicKey, topicobj, addr)
    outkeyid, err := ps.SetSymmetricKey(outkey, topicobj, addr, false)
    if err != nil {
        t.Fatalf("failed to set 'our' outgoing symmetric key")
    }

    // make a symmetric key that we will send to peer for encrypting messages to us
    inkeyid, err := ps.GenerateSymmetricKey(topicobj, addr, true)
    if err != nil {
        t.Fatalf("failed to set 'our' incoming symmetric key")
    }

    // get the key back from whisper, check that it's still the same
    outkeyback, err := ps.w.GetSymKey(outkeyid)
    if err != nil {
        t.Fatalf(err.Error())
    }
    inkey, err := ps.w.GetSymKey(inkeyid)
    if err != nil {
        t.Fatalf(err.Error())
    }
    if !bytes.Equal(outkeyback, outkey) {
        t.Fatalf("passed outgoing symkey doesnt equal stored: %x / %x", outkey, outkeyback)
    }

    t.Logf("symout: %v", outkeyback)
    t.Logf("symin: %v", inkey)

    // check that the key is stored in the peerpool
    psp := ps.symKeyPool[inkeyid][topicobj]
    if !bytes.Equal(psp.address, addr) {
        t.Fatalf("inkey address does not match; %p != %p", psp.address, addr)
    }
}

// check that we can retrieve previously added public key entires per topic and peer
func TestGetPublickeyEntries(t *testing.T) {

    privkey, err := crypto.GenerateKey()
    if err != nil {
        t.Fatal(err)
    }
    ps := newTestPss(privkey, nil, nil)
    defer ps.Stop()

    peeraddr := network.RandomAddr().Over()
    topicaddr := make(map[Topic]PssAddress)
    topicaddr[Topic{0x13}] = peeraddr
    topicaddr[Topic{0x2a}] = peeraddr[:16]
    topicaddr[Topic{0x02, 0x9a}] = []byte{}

    remoteprivkey, err := crypto.GenerateKey()
    if err != nil {
        t.Fatal(err)
    }
    remotepubkeybytes := crypto.FromECDSAPub(&remoteprivkey.PublicKey)
    remotepubkeyhex := common.ToHex(remotepubkeybytes)

    pssapi := NewAPI(ps)

    for to, a := range topicaddr {
        err = pssapi.SetPeerPublicKey(remotepubkeybytes, to, a)
        if err != nil {
            t.Fatal(err)
        }
    }

    intopic, err := pssapi.GetPeerTopics(remotepubkeyhex)
    if err != nil {
        t.Fatal(err)
    }

OUTER:
    for _, tnew := range intopic {
        for torig, addr := range topicaddr {
            if bytes.Equal(torig[:], tnew[:]) {
                inaddr, err := pssapi.GetPeerAddress(remotepubkeyhex, torig)
                if err != nil {
                    t.Fatal(err)
                }
                if !bytes.Equal(addr, inaddr) {
                    t.Fatalf("Address mismatch for topic %x; got %x, expected %x", torig, inaddr, addr)
                }
                delete(topicaddr, torig)
                continue OUTER
            }
        }
        t.Fatalf("received topic %x did not match any existing topics", tnew)
    }

    if len(topicaddr) != 0 {
        t.Fatalf("%d topics were not matched", len(topicaddr))
    }
}

// forwarding should skip peers that do not have matching pss capabilities
func TestPeerCapabilityMismatch(t *testing.T) {

    // create privkey for forwarder node
    privkey, err := crypto.GenerateKey()
    if err != nil {
        t.Fatal(err)
    }

    // initialize kad
    baseaddr := network.RandomAddr()
    kad := network.NewKademlia((baseaddr).Over(), network.NewKadParams())
    rw := &p2p.MsgPipeRW{}

    // one peer has a mismatching version of pss
    wrongpssaddr := network.RandomAddr()
    wrongpsscap := p2p.Cap{
        Name:    pssProtocolName,
        Version: 0,
    }
    nid := enode.ID{0x01}
    wrongpsspeer := network.NewPeer(&network.BzzPeer{
        Peer:    protocols.NewPeer(p2p.NewPeer(nid, common.ToHex(wrongpssaddr.Over()), []p2p.Cap{wrongpsscap}), rw, nil),
        BzzAddr: &network.BzzAddr{OAddr: wrongpssaddr.Over(), UAddr: nil},
    }, kad)

    // one peer doesn't even have pss (boo!)
    nopssaddr := network.RandomAddr()
    nopsscap := p2p.Cap{
        Name:    "nopss",
        Version: 1,
    }
    nid = enode.ID{0x02}
    nopsspeer := network.NewPeer(&network.BzzPeer{
        Peer:    protocols.NewPeer(p2p.NewPeer(nid, common.ToHex(nopssaddr.Over()), []p2p.Cap{nopsscap}), rw, nil),
        BzzAddr: &network.BzzAddr{OAddr: nopssaddr.Over(), UAddr: nil},
    }, kad)

    // add peers to kademlia and activate them
    // it's safe so don't check errors
    kad.Register(wrongpsspeer.BzzAddr)
    kad.On(wrongpsspeer)
    kad.Register(nopsspeer.BzzAddr)
    kad.On(nopsspeer)

    // create pss
    pssmsg := &PssMsg{
        To:      []byte{},
        Expire:  uint32(time.Now().Add(time.Second).Unix()),
        Payload: &whisper.Envelope{},
    }
    ps := newTestPss(privkey, kad, nil)
    defer ps.Stop()

    // run the forward
    // it is enough that it completes; trying to send to incapable peers would create segfault
    ps.forward(pssmsg)

}

// verifies that message handlers for raw messages only are invoked when minimum one handler for the topic exists in which raw messages are explicitly allowed
func TestRawAllow(t *testing.T) {

    // set up pss like so many times before
    privKey, err := crypto.GenerateKey()
    if err != nil {
        t.Fatal(err)
    }
    baseAddr := network.RandomAddr()
    kad := network.NewKademlia((baseAddr).Over(), network.NewKadParams())
    ps := newTestPss(privKey, kad, nil)
    defer ps.Stop()
    topic := BytesToTopic([]byte{0x2a})

    // create handler innards that increments every time a message hits it
    var receives int
    rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
        log.Trace("in allowraw handler")
        receives++
        return nil
    }

    // wrap this handler function with a handler without raw capability and register it
    hndlrNoRaw := &handler{
        f: rawHandlerFunc,
    }
    ps.Register(&topic, hndlrNoRaw)

    // test it with a raw message, should be poo-poo
    pssMsg := newPssMsg(&msgParams{
        raw: true,
    })
    pssMsg.To = baseAddr.OAddr
    pssMsg.Expire = uint32(time.Now().Unix() + 4200)
    pssMsg.Payload = &whisper.Envelope{
        Topic: whisper.TopicType(topic),
    }
    ps.handlePssMsg(context.TODO(), pssMsg)
    if receives > 0 {
        t.Fatalf("Expected handler not to be executed with raw cap off")
    }

    // now wrap the same handler function with raw capabilities and register it
    hndlrRaw := &handler{
        f: rawHandlerFunc,
        caps: &handlerCaps{
            raw: true,
        },
    }
    deregRawHandler := ps.Register(&topic, hndlrRaw)

    // should work now
    pssMsg.Payload.Data = []byte("Raw Deal")
    ps.handlePssMsg(context.TODO(), pssMsg)
    if receives == 0 {
        t.Fatalf("Expected handler to be executed with raw cap on")
    }

    // now deregister the raw capable handler
    prevReceives := receives
    deregRawHandler()

    // check that raw messages fail again
    pssMsg.Payload.Data = []byte("Raw Trump")
    ps.handlePssMsg(context.TODO(), pssMsg)
    if receives != prevReceives {
        t.Fatalf("Expected handler not to be executed when raw handler is retracted")
    }
}

// BELOW HERE ARE TESTS USING THE SIMULATION FRAMEWORK

// tests that the API layer can handle edge case values
func TestApi(t *testing.T) {
    clients, err := setupNetwork(2, true)
    if err != nil {
        t.Fatal(err)
    }

    topic := "0xdeadbeef"

    err = clients[0].Call(nil, "pss_sendRaw", "0x", topic, "0x666f6f")
    if err != nil {
        t.Fatal(err)
    }

    err = clients[0].Call(nil, "pss_sendRaw", "0xabcdef", topic, "0x")
    if err == nil {
        t.Fatal("expected error on empty msg")
    }

    overflowAddr := [33]byte{}
    err = clients[0].Call(nil, "pss_sendRaw", hexutil.Encode(overflowAddr[:]), topic, "0x666f6f")
    if err == nil {
        t.Fatal("expected error on send too big address")
    }
}

// verifies that nodes can send and receive raw (verbatim) messages
func TestSendRaw(t *testing.T) {
    t.Run("32", testSendRaw)
    t.Run("8", testSendRaw)
    t.Run("0", testSendRaw)
}

func testSendRaw(t *testing.T) {

    var addrsize int64
    var err error

    paramstring := strings.Split(t.Name(), "/")

    addrsize, _ = strconv.ParseInt(paramstring[1], 10, 0)
    log.Info("raw send test", "addrsize", addrsize)

    clients, err := setupNetwork(2, true)
    if err != nil {
        t.Fatal(err)
    }

    topic := "0xdeadbeef"

    var loaddrhex string
    err = clients[0].Call(&loaddrhex, "pss_baseAddr")
    if err != nil {
        t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
    }
    loaddrhex = loaddrhex[:2+(addrsize*2)]
    var roaddrhex string
    err = clients[1].Call(&roaddrhex, "pss_baseAddr")
    if err != nil {
        t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
    }
    roaddrhex = roaddrhex[:2+(addrsize*2)]

    time.Sleep(time.Millisecond * 500)

    // at this point we've verified that symkeys are saved and match on each peer
    // now try sending symmetrically encrypted message, both directions
    lmsgC := make(chan APIMsg)
    lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10)
    defer lcancel()
    lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, true, false)
    log.Trace("lsub", "id", lsub)
    defer lsub.Unsubscribe()
    rmsgC := make(chan APIMsg)
    rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10)
    defer rcancel()
    rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, true, false)
    log.Trace("rsub", "id", rsub)
    defer rsub.Unsubscribe()

    // send and verify delivery
    lmsg := []byte("plugh")
    err = clients[1].Call(nil, "pss_sendRaw", loaddrhex, topic, hexutil.Encode(lmsg))
    if err != nil {
        t.Fatal(err)
    }
    select {
    case recvmsg := <-lmsgC:
        if !bytes.Equal(recvmsg.Msg, lmsg) {
            t.Fatalf("node 1 received payload mismatch: expected %v, got %v", lmsg, recvmsg)
        }
    case cerr := <-lctx.Done():
        t.Fatalf("test message (left) timed out: %v", cerr)
    }
    rmsg := []byte("xyzzy")
    err = clients[0].Call(nil, "pss_sendRaw", roaddrhex, topic, hexutil.Encode(rmsg))
    if err != nil {
        t.Fatal(err)
    }
    select {
    case recvmsg := <-rmsgC:
        if !bytes.Equal(recvmsg.Msg, rmsg) {
            t.Fatalf("node 2 received payload mismatch: expected %x, got %v", rmsg, recvmsg.Msg)
        }
    case cerr := <-rctx.Done():
        t.Fatalf("test message (right) timed out: %v", cerr)
    }
}

// send symmetrically encrypted message between two directly connected peers
func TestSendSym(t *testing.T) {
    t.Run("32", testSendSym)
    t.Run("8", testSendSym)
    t.Run("0", testSendSym)
}

func testSendSym(t *testing.T) {

    // address hint size
    var addrsize int64
    var err error
    paramstring := strings.Split(t.Name(), "/")
    addrsize, _ = strconv.ParseInt(paramstring[1], 10, 0)
    log.Info("sym send test", "addrsize", addrsize)

    clients, err := setupNetwork(2, false)
    if err != nil {
        t.Fatal(err)
    }

    var topic string
    err = clients[0].Call(&topic, "pss_stringToTopic", "foo:42")
    if err != nil {
        t.Fatal(err)
    }

    var loaddrhex string
    err = clients[0].Call(&loaddrhex, "pss_baseAddr")
    if err != nil {
        t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
    }
    loaddrhex = loaddrhex[:2+(addrsize*2)]
    var roaddrhex string
    err = clients[1].Call(&roaddrhex, "pss_baseAddr")
    if err != nil {
        t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
    }
    roaddrhex = roaddrhex[:2+(addrsize*2)]

    // retrieve public key from pss instance
    // set this public key reciprocally
    var lpubkeyhex string
    err = clients[0].Call(&lpubkeyhex, "pss_getPublicKey")
    if err != nil {
        t.Fatalf("rpc get node 1 pubkey fail: %v", err)
    }
    var rpubkeyhex string
    err = clients[1].Call(&rpubkeyhex, "pss_getPublicKey")
    if err != nil {
        t.Fatalf("rpc get node 2 pubkey fail: %v", err)
    }

    time.Sleep(time.Millisecond * 500)

    // at this point we've verified that symkeys are saved and match on each peer
    // now try sending symmetrically encrypted message, both directions
    lmsgC := make(chan APIMsg)
    lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10)
    defer lcancel()
    lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false, false)
    log.Trace("lsub", "id", lsub)
    defer lsub.Unsubscribe()
    rmsgC := make(chan APIMsg)
    rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10)
    defer rcancel()
    rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false)
    log.Trace("rsub", "id", rsub)
    defer rsub.Unsubscribe()

    lrecvkey := network.RandomAddr().Over()
    rrecvkey := network.RandomAddr().Over()

    var lkeyids [2]string
    var rkeyids [2]string

    // manually set reciprocal symkeys
    err = clients[0].Call(&lkeyids, "psstest_setSymKeys", rpubkeyhex, lrecvkey, rrecvkey, defaultSymKeySendLimit, topic, roaddrhex)
    if err != nil {
        t.Fatal(err)
    }
    err = clients[1].Call(&rkeyids, "psstest_setSymKeys", lpubkeyhex, rrecvkey, lrecvkey, defaultSymKeySendLimit, topic, loaddrhex)
    if err != nil {
        t.Fatal(err)
    }

    // send and verify delivery
    lmsg := []byte("plugh")
    err = clients[1].Call(nil, "pss_sendSym", rkeyids[1], topic, hexutil.Encode(lmsg))
    if err != nil {
        t.Fatal(err)
    }
    select {
    case recvmsg := <-lmsgC:
        if !bytes.Equal(recvmsg.Msg, lmsg) {
            t.Fatalf("node 1 received payload mismatch: expected %v, got %v", lmsg, recvmsg)
        }
    case cerr := <-lctx.Done():
        t.Fatalf("test message timed out: %v", cerr)
    }
    rmsg := []byte("xyzzy")
    err = clients[0].Call(nil, "pss_sendSym", lkeyids[1], topic, hexutil.Encode(rmsg))
    if err != nil {
        t.Fatal(err)
    }
    select {
    case recvmsg := <-rmsgC:
        if !bytes.Equal(recvmsg.Msg, rmsg) {
            t.Fatalf("node 2 received payload mismatch: expected %x, got %v", rmsg, recvmsg.Msg)
        }
    case cerr := <-rctx.Done():
        t.Fatalf("test message timed out: %v", cerr)
    }
}

// send asymmetrically encrypted message between two directly connected peers
func TestSendAsym(t *testing.T) {
    t.Run("32", testSendAsym)
    t.Run("8", testSendAsym)
    t.Run("0", testSendAsym)
}

func testSendAsym(t *testing.T) {

    // address hint size
    var addrsize int64
    var err error
    paramstring := strings.Split(t.Name(), "/")
    addrsize, _ = strconv.ParseInt(paramstring[1], 10, 0)
    log.Info("asym send test", "addrsize", addrsize)

    clients, err := setupNetwork(2, false)
    if err != nil {
        t.Fatal(err)
    }

    var topic string
    err = clients[0].Call(&topic, "pss_stringToTopic", "foo:42")
    if err != nil {
        t.Fatal(err)
    }

    time.Sleep(time.Millisecond * 250)

    var loaddrhex string
    err = clients[0].Call(&loaddrhex, "pss_baseAddr")
    if err != nil {
        t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
    }
    loaddrhex = loaddrhex[:2+(addrsize*2)]
    var roaddrhex string
    err = clients[1].Call(&roaddrhex, "pss_baseAddr")
    if err != nil {
        t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
    }
    roaddrhex = roaddrhex[:2+(addrsize*2)]

    // retrieve public key from pss instance
    // set this public key reciprocally
    var lpubkey string
    err = clients[0].Call(&lpubkey, "pss_getPublicKey")
    if err != nil {
        t.Fatalf("rpc get node 1 pubkey fail: %v", err)
    }
    var rpubkey string
    err = clients[1].Call(&rpubkey, "pss_getPublicKey")
    if err != nil {
        t.Fatalf("rpc get node 2 pubkey fail: %v", err)
    }

    time.Sleep(time.Millisecond * 500) // replace with hive healthy code

    lmsgC := make(chan APIMsg)
    lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10)
    defer lcancel()
    lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false, false)
    log.Trace("lsub", "id", lsub)
    defer lsub.Unsubscribe()
    rmsgC := make(chan APIMsg)
    rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10)
    defer rcancel()
    rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false)
    log.Trace("rsub", "id", rsub)
    defer rsub.Unsubscribe()

    // store reciprocal public keys
    err = clients[0].Call(nil, "pss_setPeerPublicKey", rpubkey, topic, roaddrhex)
    if err != nil {
        t.Fatal(err)
    }
    err = clients[1].Call(nil, "pss_setPeerPublicKey", lpubkey, topic, loaddrhex)
    if err != nil {
        t.Fatal(err)
    }

    // send and verify delivery
    rmsg := []byte("xyzzy")
    err = clients[0].Call(nil, "pss_sendAsym", rpubkey, topic, hexutil.Encode(rmsg))
    if err != nil {
        t.Fatal(err)
    }
    select {
    case recvmsg := <-rmsgC:
        if !bytes.Equal(recvmsg.Msg, rmsg) {
            t.Fatalf("node 2 received payload mismatch: expected %v, got %v", rmsg, recvmsg.Msg)
        }
    case cerr := <-rctx.Done():
        t.Fatalf("test message timed out: %v", cerr)
    }
    lmsg := []byte("plugh")
    err = clients[1].Call(nil, "pss_sendAsym", lpubkey, topic, hexutil.Encode(lmsg))
    if err != nil {
        t.Fatal(err)
    }
    select {
    case recvmsg := <-lmsgC:
        if !bytes.Equal(recvmsg.Msg, lmsg) {
            t.Fatalf("node 1 received payload mismatch: expected %v, got %v", lmsg, recvmsg.Msg)
        }
    case cerr := <-lctx.Done():
        t.Fatalf("test message timed out: %v", cerr)
    }
}

type Job struct {
    Msg      []byte
    SendNode enode.ID
    RecvNode enode.ID
}

func worker(id int, jobs <-chan Job, rpcs map[enode.ID]*rpc.Client, pubkeys map[enode.ID]string, topic string) {
    for j := range jobs {
        rpcs[j.SendNode].Call(nil, "pss_sendAsym", pubkeys[j.RecvNode], topic, hexutil.Encode(j.Msg))
    }
}

func TestNetwork(t *testing.T) {
    t.Run("16/1000/4/sim", testNetwork)
}

// params in run name:
// nodes/msgs/addrbytes/adaptertype
// if adaptertype is exec uses execadapter, simadapter otherwise
func TestNetwork2000(t *testing.T) {
    //enableMetrics()

    if !*longrunning {
        t.Skip("run with --longrunning flag to run extensive network tests")
    }
    t.Run("3/2000/4/sim", testNetwork)
    t.Run("4/2000/4/sim", testNetwork)
    t.Run("8/2000/4/sim", testNetwork)
    t.Run("16/2000/4/sim", testNetwork)
}

func TestNetwork5000(t *testing.T) {
    //enableMetrics()

    if !*longrunning {
        t.Skip("run with --longrunning flag to run extensive network tests")
    }
    t.Run("3/5000/4/sim", testNetwork)
    t.Run("4/5000/4/sim", testNetwork)
    t.Run("8/5000/4/sim", testNetwork)
    t.Run("16/5000/4/sim", testNetwork)
}

func TestNetwork10000(t *testing.T) {
    //enableMetrics()

    if !*longrunning {
        t.Skip("run with --longrunning flag to run extensive network tests")
    }
    t.Run("3/10000/4/sim", testNetwork)
    t.Run("4/10000/4/sim", testNetwork)
    t.Run("8/10000/4/sim", testNetwork)
}

func testNetwork(t *testing.T) {
    paramstring := strings.Split(t.Name(), "/")
    nodecount, _ := strconv.ParseInt(paramstring[1], 10, 0)
    msgcount, _ := strconv.ParseInt(paramstring[2], 10, 0)
    addrsize, _ := strconv.ParseInt(paramstring[3], 10, 0)
    adapter := paramstring[4]

    log.Info("network test", "nodecount", nodecount, "msgcount", msgcount, "addrhintsize", addrsize)

    nodes := make([]enode.ID, nodecount)
    bzzaddrs := make(map[enode.ID]string, nodecount)
    rpcs := make(map[enode.ID]*rpc.Client, nodecount)
    pubkeys := make(map[enode.ID]string, nodecount)

    sentmsgs := make([][]byte, msgcount)
    recvmsgs := make([]bool, msgcount)
    nodemsgcount := make(map[enode.ID]int, nodecount)

    trigger := make(chan enode.ID)

    var a adapters.NodeAdapter
    if adapter == "exec" {
        dirname, err := ioutil.TempDir(".", "")
        if err != nil {
            t.Fatal(err)
        }
        a = adapters.NewExecAdapter(dirname)
    } else if adapter == "tcp" {
        a = adapters.NewTCPAdapter(newServices(false))
    } else if adapter == "sim" {
        a = adapters.NewSimAdapter(newServices(false))
    }
    net := simulations.NewNetwork(a, &simulations.NetworkConfig{
        ID: "0",
    })
    defer net.Shutdown()

    f, err := os.Open(fmt.Sprintf("testdata/snapshot_%d.json", nodecount))
    if err != nil {
        t.Fatal(err)
    }
    jsonbyte, err := ioutil.ReadAll(f)
    if err != nil {
        t.Fatal(err)
    }
    var snap simulations.Snapshot
    err = json.Unmarshal(jsonbyte, &snap)
    if err != nil {
        t.Fatal(err)
    }
    err = net.Load(&snap)
    if err != nil {
        //TODO: Fix p2p simulation framework to not crash when loading 32-nodes
        //t.Fatal(err)
    }

    time.Sleep(1 * time.Second)

    triggerChecks := func(trigger chan enode.ID, id enode.ID, rpcclient *rpc.Client, topic string) error {
        msgC := make(chan APIMsg)
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        sub, err := rpcclient.Subscribe(ctx, "pss", msgC, "receive", topic, false, false)
        if err != nil {
            t.Fatal(err)
        }
        go func() {
            defer sub.Unsubscribe()
            for {
                select {
                case recvmsg := <-msgC:
                    idx, _ := binary.Uvarint(recvmsg.Msg)
                    if !recvmsgs[idx] {
                        log.Debug("msg recv", "idx", idx, "id", id)
                        recvmsgs[idx] = true
                        trigger <- id
                    }
                case <-sub.Err():
                    return
                }
            }
        }()
        return nil
    }

    var topic string
    for i, nod := range net.GetNodes() {
        nodes[i] = nod.ID()
        rpcs[nodes[i]], err = nod.Client()
        if err != nil {
            t.Fatal(err)
        }
        if topic == "" {
            err = rpcs[nodes[i]].Call(&topic, "pss_stringToTopic", "foo:42")
            if err != nil {
                t.Fatal(err)
            }
        }
        var pubkey string
        err = rpcs[nodes[i]].Call(&pubkey, "pss_getPublicKey")
        if err != nil {
            t.Fatal(err)
        }
        pubkeys[nod.ID()] = pubkey
        var addrhex string
        err = rpcs[nodes[i]].Call(&addrhex, "pss_baseAddr")
        if err != nil {
            t.Fatal(err)
        }
        bzzaddrs[nodes[i]] = addrhex
        err = triggerChecks(trigger, nodes[i], rpcs[nodes[i]], topic)
        if err != nil {
            t.Fatal(err)
        }
    }

    time.Sleep(1 * time.Second)

    // setup workers
    jobs := make(chan Job, 10)
    for w := 1; w <= 10; w++ {
        go worker(w, jobs, rpcs, pubkeys, topic)
    }

    time.Sleep(1 * time.Second)

    for i := 0; i < int(msgcount); i++ {
        sendnodeidx := rand.Intn(int(nodecount))
        recvnodeidx := rand.Intn(int(nodecount - 1))
        if recvnodeidx >= sendnodeidx {
            recvnodeidx++
        }
        nodemsgcount[nodes[recvnodeidx]]++
        sentmsgs[i] = make([]byte, 8)
        c := binary.PutUvarint(sentmsgs[i], uint64(i))
        if c == 0 {
            t.Fatal("0 byte message")
        }
        if err != nil {
            t.Fatal(err)
        }
        err = rpcs[nodes[sendnodeidx]].Call(nil, "pss_setPeerPublicKey", pubkeys[nodes[recvnodeidx]], topic, bzzaddrs[nodes[recvnodeidx]])
        if err != nil {
            t.Fatal(err)
        }

        jobs <- Job{
            Msg:      sentmsgs[i],
            SendNode: nodes[sendnodeidx],
            RecvNode: nodes[recvnodeidx],
        }
    }

    finalmsgcount := 0
    ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
    defer cancel()
outer:
    for i := 0; i < int(msgcount); i++ {
        select {
        case id := <-trigger:
            nodemsgcount[id]--
            finalmsgcount++
        case <-ctx.Done():
            log.Warn("timeout")
            break outer
        }
    }

    for i, msg := range recvmsgs {
        if !msg {
            log.Debug("missing message", "idx", i)
        }
    }
    t.Logf("%d of %d messages received", finalmsgcount, msgcount)

    if finalmsgcount != int(msgcount) {
        t.Fatalf("%d messages were not received", int(msgcount)-finalmsgcount)
    }

}

// check that in a network of a -> b -> c -> a
// a doesn't receive a sent message twice
func TestDeduplication(t *testing.T) {
    var err error

    clients, err := setupNetwork(3, false)
    if err != nil {
        t.Fatal(err)
    }

    var addrsize = 32
    var loaddrhex string
    err = clients[0].Call(&loaddrhex, "pss_baseAddr")
    if err != nil {
        t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
    }
    loaddrhex = loaddrhex[:2+(addrsize*2)]
    var roaddrhex string
    err = clients[1].Call(&roaddrhex, "pss_baseAddr")
    if err != nil {
        t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
    }
    roaddrhex = roaddrhex[:2+(addrsize*2)]
    var xoaddrhex string
    err = clients[2].Call(&xoaddrhex, "pss_baseAddr")
    if err != nil {
        t.Fatalf("rpc get node 3 baseaddr fail: %v", err)
    }
    xoaddrhex = xoaddrhex[:2+(addrsize*2)]

    log.Info("peer", "l", loaddrhex, "r", roaddrhex, "x", xoaddrhex)

    var topic string
    err = clients[0].Call(&topic, "pss_stringToTopic", "foo:42")
    if err != nil {
        t.Fatal(err)
    }

    time.Sleep(time.Millisecond * 250)

    // retrieve public key from pss instance
    // set this public key reciprocally
    var rpubkey string
    err = clients[1].Call(&rpubkey, "pss_getPublicKey")
    if err != nil {
        t.Fatalf("rpc get receivenode pubkey fail: %v", err)
    }

    time.Sleep(time.Millisecond * 500) // replace with hive healthy code

    rmsgC := make(chan APIMsg)
    rctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
    defer cancel()
    rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false)
    log.Trace("rsub", "id", rsub)
    defer rsub.Unsubscribe()

    // store public key for recipient
    // zero-length address means forward to all
    // we have just two peers, they will be in proxbin, and will both receive
    err = clients[0].Call(nil, "pss_setPeerPublicKey", rpubkey, topic, "0x")
    if err != nil {
        t.Fatal(err)
    }

    // send and verify delivery
    rmsg := []byte("xyzzy")
    err = clients[0].Call(nil, "pss_sendAsym", rpubkey, topic, hexutil.Encode(rmsg))
    if err != nil {
        t.Fatal(err)
    }

    var receivedok bool
OUTER:
    for {
        select {
        case <-rmsgC:
            if receivedok {
                t.Fatalf("duplicate message received")
            }
            receivedok = true
        case <-rctx.Done():
            break OUTER
        }
    }
    if !receivedok {
        t.Fatalf("message did not arrive")
    }
}

// symmetric send performance with varying message sizes
func BenchmarkSymkeySend(b *testing.B) {
    b.Run(fmt.Sprintf("%d", 256), benchmarkSymKeySend)
    b.Run(fmt.Sprintf("%d", 1024), benchmarkSymKeySend)
    b.Run(fmt.Sprintf("%d", 1024*1024), benchmarkSymKeySend)
    b.Run(fmt.Sprintf("%d", 1024*1024*10), benchmarkSymKeySend)
    b.Run(fmt.Sprintf("%d", 1024*1024*100), benchmarkSymKeySend)
}

func benchmarkSymKeySend(b *testing.B) {
    msgsizestring := strings.Split(b.Name(), "/")
    if len(msgsizestring) != 2 {
        b.Fatalf("benchmark called without msgsize param")
    }
    msgsize, err := strconv.ParseInt(msgsizestring[1], 10, 0)
    if err != nil {
        b.Fatalf("benchmark called with invalid msgsize param '%s': %v", msgsizestring[1], err)
    }
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    keys, err := wapi.NewKeyPair(ctx)
    privkey, err := w.GetPrivateKey(keys)
    ps := newTestPss(privkey, nil, nil)
    defer ps.Stop()
    msg := make([]byte, msgsize)
    rand.Read(msg)
    topic := BytesToTopic([]byte("foo"))
    to := make(PssAddress, 32)
    copy(to[:], network.RandomAddr().Over())
    symkeyid, err := ps.GenerateSymmetricKey(topic, to, true)
    if err != nil {
        b.Fatalf("could not generate symkey: %v", err)
    }
    symkey, err := ps.w.GetSymKey(symkeyid)
    if err != nil {
        b.Fatalf("could not retrieve symkey: %v", err)
    }
    ps.SetSymmetricKey(symkey, topic, to, false)

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        ps.SendSym(symkeyid, topic, msg)
    }
}

// asymmetric send performance with varying message sizes
func BenchmarkAsymkeySend(b *testing.B) {
    b.Run(fmt.Sprintf("%d", 256), benchmarkAsymKeySend)
    b.Run(fmt.Sprintf("%d", 1024), benchmarkAsymKeySend)
    b.Run(fmt.Sprintf("%d", 1024*1024), benchmarkAsymKeySend)
    b.Run(fmt.Sprintf("%d", 1024*1024*10), benchmarkAsymKeySend)
    b.Run(fmt.Sprintf("%d", 1024*1024*100), benchmarkAsymKeySend)
}

func benchmarkAsymKeySend(b *testing.B) {
    msgsizestring := strings.Split(b.Name(), "/")
    if len(msgsizestring) != 2 {
        b.Fatalf("benchmark called without msgsize param")
    }
    msgsize, err := strconv.ParseInt(msgsizestring[1], 10, 0)
    if err != nil {
        b.Fatalf("benchmark called with invalid msgsize param '%s': %v", msgsizestring[1], err)
    }
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    keys, err := wapi.NewKeyPair(ctx)
    privkey, err := w.GetPrivateKey(keys)
    ps := newTestPss(privkey, nil, nil)
    defer ps.Stop()
    msg := make([]byte, msgsize)
    rand.Read(msg)
    topic := BytesToTopic([]byte("foo"))
    to := make(PssAddress, 32)
    copy(to[:], network.RandomAddr().Over())
    ps.SetPeerPublicKey(&privkey.PublicKey, topic, to)
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        ps.SendAsym(common.ToHex(crypto.FromECDSAPub(&privkey.PublicKey)), topic, msg)
    }
}
func BenchmarkSymkeyBruteforceChangeaddr(b *testing.B) {
    for i := 100; i < 100000; i = i * 10 {
        for j := 32; j < 10000; j = j * 8 {
            b.Run(fmt.Sprintf("%d/%d", i, j), benchmarkSymkeyBruteforceChangeaddr)
        }
        //b.Run(fmt.Sprintf("%d", i), benchmarkSymkeyBruteforceChangeaddr)
    }
}

// decrypt performance using symkey cache, worst case
// (decrypt key always last in cache)
func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) {
    keycountstring := strings.Split(b.Name(), "/")
    cachesize := int64(0)
    var ps *Pss
    if len(keycountstring) < 2 {
        b.Fatalf("benchmark called without count param")
    }
    keycount, err := strconv.ParseInt(keycountstring[1], 10, 0)
    if err != nil {
        b.Fatalf("benchmark called with invalid count param '%s': %v", keycountstring[1], err)
    }
    if len(keycountstring) == 3 {
        cachesize, err = strconv.ParseInt(keycountstring[2], 10, 0)
        if err != nil {
            b.Fatalf("benchmark called with invalid cachesize '%s': %v", keycountstring[2], err)
        }
    }
    pssmsgs := make([]*PssMsg, 0, keycount)
    var keyid string
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    keys, err := wapi.NewKeyPair(ctx)
    privkey, err := w.GetPrivateKey(keys)
    if cachesize > 0 {
        ps = newTestPss(privkey, nil, &PssParams{SymKeyCacheCapacity: int(cachesize)})
    } else {
        ps = newTestPss(privkey, nil, nil)
    }
    defer ps.Stop()
    topic := BytesToTopic([]byte("foo"))
    for i := 0; i < int(keycount); i++ {
        to := make(PssAddress, 32)
        copy(to[:], network.RandomAddr().Over())
        keyid, err = ps.GenerateSymmetricKey(topic, to, true)
        if err != nil {
            b.Fatalf("cant generate symkey #%d: %v", i, err)
        }
        symkey, err := ps.w.GetSymKey(keyid)
        if err != nil {
            b.Fatalf("could not retrieve symkey %s: %v", keyid, err)
        }
        wparams := &whisper.MessageParams{
            TTL:      defaultWhisperTTL,
            KeySym:   symkey,
            Topic:    whisper.TopicType(topic),
            WorkTime: defaultWhisperWorkTime,
            PoW:      defaultWhisperPoW,
            Payload:  []byte("xyzzy"),
            Padding:  []byte("1234567890abcdef"),
        }
        woutmsg, err := whisper.NewSentMessage(wparams)
        if err != nil {
            b.Fatalf("could not create whisper message: %v", err)
        }
        env, err := woutmsg.Wrap(wparams)
        if err != nil {
            b.Fatalf("could not generate whisper envelope: %v", err)
        }
        ps.Register(&topic, &handler{
            f: noopHandlerFunc,
        })
        pssmsgs = append(pssmsgs, &PssMsg{
            To:      to,
            Payload: env,
        })
    }
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        if err := ps.process(pssmsgs[len(pssmsgs)-(i%len(pssmsgs))-1], false, false); err != nil {
            b.Fatalf("pss processing failed: %v", err)
        }
    }
}

func BenchmarkSymkeyBruteforceSameaddr(b *testing.B) {
    for i := 100; i < 100000; i = i * 10 {
        for j := 32; j < 10000; j = j * 8 {
            b.Run(fmt.Sprintf("%d/%d", i, j), benchmarkSymkeyBruteforceSameaddr)
        }
    }
}

// decrypt performance using symkey cache, best case
// (decrypt key always first in cache)
func benchmarkSymkeyBruteforceSameaddr(b *testing.B) {
    var keyid string
    var ps *Pss
    cachesize := int64(0)
    keycountstring := strings.Split(b.Name(), "/")
    if len(keycountstring) < 2 {
        b.Fatalf("benchmark called without count param")
    }
    keycount, err := strconv.ParseInt(keycountstring[1], 10, 0)
    if err != nil {
        b.Fatalf("benchmark called with invalid count param '%s': %v", keycountstring[1], err)
    }
    if len(keycountstring) == 3 {
        cachesize, err = strconv.ParseInt(keycountstring[2], 10, 0)
        if err != nil {
            b.Fatalf("benchmark called with invalid cachesize '%s': %v", keycountstring[2], err)
        }
    }
    addr := make([]PssAddress, keycount)
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    keys, err := wapi.NewKeyPair(ctx)
    privkey, err := w.GetPrivateKey(keys)
    if cachesize > 0 {
        ps = newTestPss(privkey, nil, &PssParams{SymKeyCacheCapacity: int(cachesize)})
    } else {
        ps = newTestPss(privkey, nil, nil)
    }
    defer ps.Stop()
    topic := BytesToTopic([]byte("foo"))
    for i := 0; i < int(keycount); i++ {
        copy(addr[i], network.RandomAddr().Over())
        keyid, err = ps.GenerateSymmetricKey(topic, addr[i], true)
        if err != nil {
            b.Fatalf("cant generate symkey #%d: %v", i, err)
        }

    }
    symkey, err := ps.w.GetSymKey(keyid)
    if err != nil {
        b.Fatalf("could not retrieve symkey %s: %v", keyid, err)
    }
    wparams := &whisper.MessageParams{
        TTL:      defaultWhisperTTL,
        KeySym:   symkey,
        Topic:    whisper.TopicType(topic),
        WorkTime: defaultWhisperWorkTime,
        PoW:      defaultWhisperPoW,
        Payload:  []byte("xyzzy"),
        Padding:  []byte("1234567890abcdef"),
    }
    woutmsg, err := whisper.NewSentMessage(wparams)
    if err != nil {
        b.Fatalf("could not create whisper message: %v", err)
    }
    env, err := woutmsg.Wrap(wparams)
    if err != nil {
        b.Fatalf("could not generate whisper envelope: %v", err)
    }
    ps.Register(&topic, &handler{
        f: noopHandlerFunc,
    })
    pssmsg := &PssMsg{
        To:      addr[len(addr)-1][:],
        Payload: env,
    }
    for i := 0; i < b.N; i++ {
        if err := ps.process(pssmsg, false, false); err != nil {
            b.Fatalf("pss processing failed: %v", err)
        }
    }
}

// setup simulated network with bzz/discovery and pss services.
// connects nodes in a circle
// if allowRaw is set, omission of builtin pss encryption is enabled (see PssParams)
func setupNetwork(numnodes int, allowRaw bool) (clients []*rpc.Client, err error) {
    nodes := make([]*simulations.Node, numnodes)
    clients = make([]*rpc.Client, numnodes)
    if numnodes < 2 {
        return nil, fmt.Errorf("Minimum two nodes in network")
    }
    adapter := adapters.NewSimAdapter(newServices(allowRaw))
    net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{
        ID:             "0",
        DefaultService: "bzz",
    })
    for i := 0; i < numnodes; i++ {
        nodeconf := adapters.RandomNodeConfig()
        nodeconf.Services = []string{"bzz", pssProtocolName}
        nodes[i], err = net.NewNodeWithConfig(nodeconf)
        if err != nil {
            return nil, fmt.Errorf("error creating node 1: %v", err)
        }
        err = net.Start(nodes[i].ID())
        if err != nil {
            return nil, fmt.Errorf("error starting node 1: %v", err)
        }
        if i > 0 {
            err = net.Connect(nodes[i].ID(), nodes[i-1].ID())
            if err != nil {
                return nil, fmt.Errorf("error connecting nodes: %v", err)
            }
        }
        clients[i], err = nodes[i].Client()
        if err != nil {
            return nil, fmt.Errorf("create node 1 rpc client fail: %v", err)
        }
    }
    if numnodes > 2 {
        err = net.Connect(nodes[0].ID(), nodes[len(nodes)-1].ID())
        if err != nil {
            return nil, fmt.Errorf("error connecting first and last nodes")
        }
    }
    return clients, nil
}

func newServices(allowRaw bool) adapters.Services {
    stateStore := state.NewInmemoryStore()
    kademlias := make(map[enode.ID]*network.Kademlia)
    kademlia := func(id enode.ID) *network.Kademlia {
        if k, ok := kademlias[id]; ok {
            return k
        }
        params := network.NewKadParams()
        params.NeighbourhoodSize = 2
        params.MaxBinSize = 3
        params.MinBinSize = 1
        params.MaxRetries = 1000
        params.RetryExponent = 2
        params.RetryInterval = 1000000
        kademlias[id] = network.NewKademlia(id[:], params)
        return kademlias[id]
    }
    return adapters.Services{
        pssProtocolName: func(ctx *adapters.ServiceContext) (node.Service, error) {
            // execadapter does not exec init()
            initTest()

            ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second)
            defer cancel()
            keys, err := wapi.NewKeyPair(ctxlocal)
            privkey, err := w.GetPrivateKey(keys)
            pssp := NewPssParams().WithPrivateKey(privkey)
            pssp.AllowRaw = allowRaw
            pskad := kademlia(ctx.Config.ID)
            ps, err := NewPss(pskad, pssp)
            if err != nil {
                return nil, err
            }

            ping := &Ping{
                OutC: make(chan bool),
                Pong: true,
            }
            p2pp := NewPingProtocol(ping)
            pp, err := RegisterProtocol(ps, &PingTopic, PingProtocol, p2pp, &ProtocolParams{Asymmetric: true})
            if err != nil {
                return nil, err
            }
            if useHandshake {
                SetHandshakeController(ps, NewHandshakeParams())
            }
            ps.Register(&PingTopic, &handler{
                f: pp.Handle,
                caps: &handlerCaps{
                    raw: true,
                },
            })
            ps.addAPI(rpc.API{
                Namespace: "psstest",
                Version:   "0.3",
                Service:   NewAPITest(ps),
                Public:    false,
            })
            if err != nil {
                log.Error("Couldnt register pss protocol", "err", err)
                os.Exit(1)
            }
            pssprotocols[ctx.Config.ID.String()] = &protoCtrl{
                C:        ping.OutC,
                protocol: pp,
                run:      p2pp.Run,
            }
            return ps, nil
        },
        "bzz": func(ctx *adapters.ServiceContext) (node.Service, error) {
            addr := network.NewAddr(ctx.Config.Node())
            hp := network.NewHiveParams()
            hp.Discovery = false
            config := &network.BzzConfig{
                OverlayAddr:  addr.Over(),
                UnderlayAddr: addr.Under(),
                HiveParams:   hp,
            }
            return network.NewBzz(config, kademlia(ctx.Config.ID), stateStore, nil, nil), nil
        },
    }
}

func newTestPss(privkey *ecdsa.PrivateKey, kad *network.Kademlia, ppextra *PssParams) *Pss {
    nid := enode.PubkeyToIDV4(&privkey.PublicKey)
    // set up routing if kademlia is not passed to us
    if kad == nil {
        kp := network.NewKadParams()
        kp.NeighbourhoodSize = 3
        kad = network.NewKademlia(nid[:], kp)
    }

    // create pss
    pp := NewPssParams().WithPrivateKey(privkey)
    if ppextra != nil {
        pp.SymKeyCacheCapacity = ppextra.SymKeyCacheCapacity
    }
    ps, err := NewPss(kad, pp)
    if err != nil {
        return nil
    }
    ps.Start(nil)

    return ps
}

// API calls for test/development use
type APITest struct {
    *Pss
}

func NewAPITest(ps *Pss) *APITest {
    return &APITest{Pss: ps}
}

func (apitest *APITest) SetSymKeys(pubkeyid string, recvsymkey []byte, sendsymkey []byte, limit uint16, topic Topic, to hexutil.Bytes) ([2]string, error) {

    recvsymkeyid, err := apitest.SetSymmetricKey(recvsymkey, topic, PssAddress(to), true)
    if err != nil {
        return [2]string{}, err
    }
    sendsymkeyid, err := apitest.SetSymmetricKey(sendsymkey, topic, PssAddress(to), false)
    if err != nil {
        return [2]string{}, err
    }
    return [2]string{recvsymkeyid, sendsymkeyid}, nil
}

func (apitest *APITest) Clean() (int, error) {
    return apitest.Pss.cleanKeys(), nil
}

// enableMetrics is starting InfluxDB reporter so that we collect stats when running tests locally
func enableMetrics() {
    metrics.Enabled = true
    go influxdb.InfluxDBWithTags(metrics.DefaultRegistry, 1*time.Second, "http://localhost:8086", "metrics", "admin", "admin", "swarm.", map[string]string{
        "host": "test",
    })
}