aboutsummaryrefslogblamecommitdiffstats
path: root/swarm/pss/pss.go
blob: 1bc28890f9e8eaae784a7a7b38f40e9dbb709cfb (plain) (tree)



















                                                                                  
                 



                      
              




                                                
                                                     

                                                 
                                                   










                                                                   
                                                      


























                                                                                                                        
                     
                        
                                                





                                                                 
                            














                                                                                                                  
                                                   













                                                                                                                                                



                                                                                        





















                                                                                                                                                     



                                                                                                                                    










                                                                                                                               
                                                           
                                                     
                                                                   







                                                                     
                              
















                                                                                        


                                                                    

                                                 
                                                          














































                                                                                                   

                                                                                                                                                             



                            
                                     


















































                                                               
                                                     
                                 
                                    



















                                                                     
                                                             



                                      
                                                  
                                             







                                                                   
         






                                                      
 
                                                        


                                      
                              
                                          










                                                                           

                      
                               


                                                    
                                                          








                                                                           
                                                                        
                                                                    
                                   


                                                                                         
                                                                                                           

                                                                      
                                                                                                                                    


                                    
                                                                                                                                                



                             




                                                                            




                                                                                          














                                                                                                                           


                                        

                                                                                                                                                                  











                                                                                                 
                                                                  




                                                               
                           

                           
                                                                                                          


                                         

                




















                                                              
                                                                                




                  
                                                                                                                                 
                                        
                                                                             









                                                              
                               
                                                                  





                                                 
                                                         

 
                                                                           
                                                                    
                                      














                                                                                    












                                                                     



                                                                                                













                                                                   
                                                                                           



                                                                   
                                                                                                      



















                                                                           



                                                                                                             


                                                                       
                                                                                                                             










                                                                             
                                                                                                                     














                                                                                                   
                                                                                                                                          



















                                                                                                
                                                       









                                                                             
                                             










                                                                                       
                                                                                                            






























                                                                                                                              
                                                                                                             










                                                                                     
                           




























































                                                                                                                                     


                                                        











                                                               






                                                 



                                                                                              

                  














                                                                                                 
         
                                                             













                                                                                                 
         
                                                                              


























































                                                                                                                                                                                 









                                                                                              













                                                                                                  
                                                                                          















                                                                                                                                
                                                                                                  




                                              
                                                   











                                                                                             
                                                                                                   


                                                                                                           
                                                                                                               






































                                                                                             



                                         






































                                                                                               




                                                 

                                    
                         




                                           






                                                     
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package pss

import (
    "bytes"
    "context"
    "crypto/ecdsa"
    "crypto/rand"
    "errors"
    "fmt"
    "hash"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/crypto"
    "github.com/ethereum/go-ethereum/crypto/sha3"
    "github.com/ethereum/go-ethereum/metrics"
    "github.com/ethereum/go-ethereum/p2p"
    "github.com/ethereum/go-ethereum/p2p/enode"
    "github.com/ethereum/go-ethereum/p2p/protocols"
    "github.com/ethereum/go-ethereum/rpc"
    "github.com/ethereum/go-ethereum/swarm/log"
    "github.com/ethereum/go-ethereum/swarm/network"
    "github.com/ethereum/go-ethereum/swarm/pot"
    "github.com/ethereum/go-ethereum/swarm/storage"
    whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
)

const (
    defaultPaddingByteSize     = 16
    DefaultMsgTTL              = time.Second * 120
    defaultDigestCacheTTL      = time.Second * 10
    defaultSymKeyCacheCapacity = 512
    digestLength               = 32 // byte length of digest used for pss cache (currently same as swarm chunk hash)
    defaultWhisperWorkTime     = 3
    defaultWhisperPoW          = 0.0000000001
    defaultMaxMsgSize          = 1024 * 1024
    defaultCleanInterval       = time.Second * 60 * 10
    defaultOutboxCapacity      = 100000
    pssProtocolName            = "pss"
    pssVersion                 = 2
    hasherCount                = 8
)

var (
    addressLength = len(pot.Address{})
)

// cache is used for preventing backwards routing
// will also be instrumental in flood guard mechanism
// and mailbox implementation
type pssCacheEntry struct {
    expiresAt time.Time
}

// abstraction to enable access to p2p.protocols.Peer.Send
type senderPeer interface {
    Info() *p2p.PeerInfo
    ID() enode.ID
    Address() []byte
    Send(context.Context, interface{}) error
}

// per-key peer related information
// member `protected` prevents garbage collection of the instance
type pssPeer struct {
    lastSeen  time.Time
    address   PssAddress
    protected bool
}

// Pss configuration parameters
type PssParams struct {
    MsgTTL              time.Duration
    CacheTTL            time.Duration
    privateKey          *ecdsa.PrivateKey
    SymKeyCacheCapacity int
    AllowRaw            bool // If true, enables sending and receiving messages without builtin pss encryption
}

// Sane defaults for Pss
func NewPssParams() *PssParams {
    return &PssParams{
        MsgTTL:              DefaultMsgTTL,
        CacheTTL:            defaultDigestCacheTTL,
        SymKeyCacheCapacity: defaultSymKeyCacheCapacity,
    }
}

func (params *PssParams) WithPrivateKey(privatekey *ecdsa.PrivateKey) *PssParams {
    params.privateKey = privatekey
    return params
}

// Toplevel pss object, takes care of message sending, receiving, decryption and encryption, message handler dispatchers and message forwarding.
//
// Implements node.Service
type Pss struct {
    *network.Kademlia                   // we can get the Kademlia address from this
    privateKey        *ecdsa.PrivateKey // pss can have it's own independent key
    w                 *whisper.Whisper  // key and encryption backend
    auxAPIs           []rpc.API         // builtins (handshake, test) can add APIs

    // sending and forwarding
    fwdPool         map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer
    fwdPoolMu       sync.RWMutex
    fwdCache        map[pssDigest]pssCacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg
    fwdCacheMu      sync.RWMutex
    cacheTTL        time.Duration // how long to keep messages in fwdCache (not implemented)
    msgTTL          time.Duration
    paddingByteSize int
    capstring       string
    outbox          chan *PssMsg

    // keys and peers
    pubKeyPool                 map[string]map[Topic]*pssPeer // mapping of hex public keys to peer address by topic.
    pubKeyPoolMu               sync.RWMutex
    symKeyPool                 map[string]map[Topic]*pssPeer // mapping of symkeyids to peer address by topic.
    symKeyPoolMu               sync.RWMutex
    symKeyDecryptCache         []*string // fast lookup of symkeys recently used for decryption; last used is on top of stack
    symKeyDecryptCacheCursor   int       // modular cursor pointing to last used, wraps on symKeyDecryptCache array
    symKeyDecryptCacheCapacity int       // max amount of symkeys to keep.

    // message handling
    handlers         map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle()
    handlersMu       sync.RWMutex
    hashPool         sync.Pool
    topicHandlerCaps map[Topic]*handlerCaps // caches capabilities of each topic's handlers (see handlerCap* consts in types.go)

    // process
    quitC chan struct{}
}

func (p *Pss) String() string {
    return fmt.Sprintf("pss: addr %x, pubkey %v", p.BaseAddr(), common.ToHex(crypto.FromECDSAPub(&p.privateKey.PublicKey)))
}

// Creates a new Pss instance.
//
// In addition to params, it takes a swarm network Kademlia
// and a FileStore storage for message cache storage.
func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) {
    if params.privateKey == nil {
        return nil, errors.New("missing private key for pss")
    }
    cap := p2p.Cap{
        Name:    pssProtocolName,
        Version: pssVersion,
    }
    ps := &Pss{
        Kademlia:   k,
        privateKey: params.privateKey,
        w:          whisper.New(&whisper.DefaultConfig),
        quitC:      make(chan struct{}),

        fwdPool:         make(map[string]*protocols.Peer),
        fwdCache:        make(map[pssDigest]pssCacheEntry),
        cacheTTL:        params.CacheTTL,
        msgTTL:          params.MsgTTL,
        paddingByteSize: defaultPaddingByteSize,
        capstring:       cap.String(),
        outbox:          make(chan *PssMsg, defaultOutboxCapacity),

        pubKeyPool:                 make(map[string]map[Topic]*pssPeer),
        symKeyPool:                 make(map[string]map[Topic]*pssPeer),
        symKeyDecryptCache:         make([]*string, params.SymKeyCacheCapacity),
        symKeyDecryptCacheCapacity: params.SymKeyCacheCapacity,

        handlers:         make(map[Topic]map[*handler]bool),
        topicHandlerCaps: make(map[Topic]*handlerCaps),

        hashPool: sync.Pool{
            New: func() interface{} {
                return sha3.NewKeccak256()
            },
        },
    }

    for i := 0; i < hasherCount; i++ {
        hashfunc := storage.MakeHashFunc(storage.DefaultHash)()
        ps.hashPool.Put(hashfunc)
    }

    return ps, nil
}

/////////////////////////////////////////////////////////////////////
// SECTION: node.Service interface
/////////////////////////////////////////////////////////////////////

func (p *Pss) Start(srv *p2p.Server) error {
    go func() {
        ticker := time.NewTicker(defaultCleanInterval)
        cacheTicker := time.NewTicker(p.cacheTTL)
        defer ticker.Stop()
        defer cacheTicker.Stop()
        for {
            select {
            case <-cacheTicker.C:
                p.cleanFwdCache()
            case <-ticker.C:
                p.cleanKeys()
            case <-p.quitC:
                return
            }
        }
    }()
    go func() {
        for {
            select {
            case msg := <-p.outbox:
                err := p.forward(msg)
                if err != nil {
                    log.Error(err.Error())
                    metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
                }
            case <-p.quitC:
                return
            }
        }
    }()
    log.Info("Started Pss")
    log.Info("Loaded EC keys", "pubkey", common.ToHex(crypto.FromECDSAPub(p.PublicKey())), "secp256", common.ToHex(crypto.CompressPubkey(p.PublicKey())))
    return nil
}

func (p *Pss) Stop() error {
    log.Info("Pss shutting down")
    close(p.quitC)
    return nil
}

var pssSpec = &protocols.Spec{
    Name:       pssProtocolName,
    Version:    pssVersion,
    MaxMsgSize: defaultMaxMsgSize,
    Messages: []interface{}{
        PssMsg{},
    },
}

func (p *Pss) Protocols() []p2p.Protocol {
    return []p2p.Protocol{
        {
            Name:    pssSpec.Name,
            Version: pssSpec.Version,
            Length:  pssSpec.Length(),
            Run:     p.Run,
        },
    }
}

func (p *Pss) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
    pp := protocols.NewPeer(peer, rw, pssSpec)
    p.fwdPoolMu.Lock()
    p.fwdPool[peer.Info().ID] = pp
    p.fwdPoolMu.Unlock()
    return pp.Run(p.handlePssMsg)
}

func (p *Pss) APIs() []rpc.API {
    apis := []rpc.API{
        {
            Namespace: "pss",
            Version:   "1.0",
            Service:   NewAPI(p),
            Public:    true,
        },
    }
    apis = append(apis, p.auxAPIs...)
    return apis
}

// add API methods to the pss API
// must be run before node is started
func (p *Pss) addAPI(api rpc.API) {
    p.auxAPIs = append(p.auxAPIs, api)
}

// Returns the swarm Kademlia address of the pss node
func (p *Pss) BaseAddr() []byte {
    return p.Kademlia.BaseAddr()
}

// Returns the pss node's public key
func (p *Pss) PublicKey() *ecdsa.PublicKey {
    return &p.privateKey.PublicKey
}

/////////////////////////////////////////////////////////////////////
// SECTION: Message handling
/////////////////////////////////////////////////////////////////////

// Links a handler function to a Topic
//
// All incoming messages with an envelope Topic matching the
// topic specified will be passed to the given Handler function.
//
// There may be an arbitrary number of handler functions per topic.
//
// Returns a deregister function which needs to be called to
// deregister the handler,
func (p *Pss) Register(topic *Topic, hndlr *handler) func() {
    p.handlersMu.Lock()
    defer p.handlersMu.Unlock()
    handlers := p.handlers[*topic]
    if handlers == nil {
        handlers = make(map[*handler]bool)
        p.handlers[*topic] = handlers
        log.Debug("registered handler", "caps", hndlr.caps)
    }
    if hndlr.caps == nil {
        hndlr.caps = &handlerCaps{}
    }
    handlers[hndlr] = true
    if _, ok := p.topicHandlerCaps[*topic]; !ok {
        p.topicHandlerCaps[*topic] = &handlerCaps{}
    }
    if hndlr.caps.raw {
        p.topicHandlerCaps[*topic].raw = true
    }
    if hndlr.caps.prox {
        p.topicHandlerCaps[*topic].prox = true
    }
    return func() { p.deregister(topic, hndlr) }
}
func (p *Pss) deregister(topic *Topic, hndlr *handler) {
    p.handlersMu.Lock()
    defer p.handlersMu.Unlock()
    handlers := p.handlers[*topic]
    if len(handlers) > 1 {
        delete(p.handlers, *topic)
        // topic caps might have changed now that a handler is gone
        caps := &handlerCaps{}
        for h := range handlers {
            if h.caps.raw {
                caps.raw = true
            }
            if h.caps.prox {
                caps.prox = true
            }
        }
        p.topicHandlerCaps[*topic] = caps
        return
    }
    delete(handlers, hndlr)
}

// get all registered handlers for respective topics
func (p *Pss) getHandlers(topic Topic) map[*handler]bool {
    p.handlersMu.RLock()
    defer p.handlersMu.RUnlock()
    return p.handlers[topic]
}

// Filters incoming messages for processing or forwarding.
// Check if address partially matches
// If yes, it CAN be for us, and we process it
// Only passes error to pss protocol handler if payload is not valid pssmsg
func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error {
    metrics.GetOrRegisterCounter("pss.handlepssmsg", nil).Inc(1)
    pssmsg, ok := msg.(*PssMsg)
    if !ok {
        return fmt.Errorf("invalid message type. Expected *PssMsg, got %T ", msg)
    }
    log.Trace("handler", "self", label(p.Kademlia.BaseAddr()), "topic", label(pssmsg.Payload.Topic[:]))
    if int64(pssmsg.Expire) < time.Now().Unix() {
        metrics.GetOrRegisterCounter("pss.expire", nil).Inc(1)
        log.Warn("pss filtered expired message", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", common.ToHex(pssmsg.To))
        return nil
    }
    if p.checkFwdCache(pssmsg) {
        log.Trace("pss relay block-cache match (process)", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", (common.ToHex(pssmsg.To)))
        return nil
    }
    p.addFwdCache(pssmsg)

    psstopic := Topic(pssmsg.Payload.Topic)

    // raw is simplest handler contingency to check, so check that first
    var isRaw bool
    if pssmsg.isRaw() {
        if _, ok := p.topicHandlerCaps[psstopic]; ok {
            if !p.topicHandlerCaps[psstopic].raw {
                log.Debug("No handler for raw message", "topic", psstopic)
                return nil
            }
        }
        isRaw = true
    }

    // check if we can be recipient:
    // - no prox handler on message and partial address matches
    // - prox handler on message and we are in prox regardless of partial address match
    // store this result so we don't calculate again on every handler
    var isProx bool
    if _, ok := p.topicHandlerCaps[psstopic]; ok {
        isProx = p.topicHandlerCaps[psstopic].prox
    }
    isRecipient := p.isSelfPossibleRecipient(pssmsg, isProx)
    if !isRecipient {
        log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr()), "prox", isProx)
        return p.enqueue(pssmsg)
    }

    log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(p.BaseAddr()), "prox", isProx, "raw", isRaw, "topic", label(pssmsg.Payload.Topic[:]))
    if err := p.process(pssmsg, isRaw, isProx); err != nil {
        qerr := p.enqueue(pssmsg)
        if qerr != nil {
            return fmt.Errorf("process fail: processerr %v, queueerr: %v", err, qerr)
        }
    }
    return nil

}

// Entry point to processing a message for which the current node can be the intended recipient.
// Attempts symmetric and asymmetric decryption with stored keys.
// Dispatches message to all handlers matching the message topic
func (p *Pss) process(pssmsg *PssMsg, raw bool, prox bool) error {
    metrics.GetOrRegisterCounter("pss.process", nil).Inc(1)

    var err error
    var recvmsg *whisper.ReceivedMessage
    var payload []byte
    var from PssAddress
    var asymmetric bool
    var keyid string
    var keyFunc func(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error)

    envelope := pssmsg.Payload
    psstopic := Topic(envelope.Topic)

    if raw {
        payload = pssmsg.Payload.Data
    } else {
        if pssmsg.isSym() {
            keyFunc = p.processSym
        } else {
            asymmetric = true
            keyFunc = p.processAsym
        }

        recvmsg, keyid, from, err = keyFunc(envelope)
        if err != nil {
            return errors.New("Decryption failed")
        }
        payload = recvmsg.Payload
    }

    if len(pssmsg.To) < addressLength {
        if err := p.enqueue(pssmsg); err != nil {
            return err
        }
    }
    p.executeHandlers(psstopic, payload, from, raw, prox, asymmetric, keyid)

    return nil

}

func (p *Pss) executeHandlers(topic Topic, payload []byte, from PssAddress, raw bool, prox bool, asymmetric bool, keyid string) {
    handlers := p.getHandlers(topic)
    peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{})
    for h := range handlers {
        if !h.caps.raw && raw {
            log.Warn("norawhandler")
            continue
        }
        if !h.caps.prox && prox {
            log.Warn("noproxhandler")
            continue
        }
        err := (h.f)(payload, peer, asymmetric, keyid)
        if err != nil {
            log.Warn("Pss handler failed", "err", err)
        }
    }
}

// will return false if using partial address
func (p *Pss) isSelfRecipient(msg *PssMsg) bool {
    return bytes.Equal(msg.To, p.Kademlia.BaseAddr())
}

// test match of leftmost bytes in given message to node's Kademlia address
func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool {
    local := p.Kademlia.BaseAddr()

    // if a partial address matches we are possible recipient regardless of prox
    // if not and prox is not set, we are surely not
    if bytes.Equal(msg.To, local[:len(msg.To)]) {

        return true
    } else if !prox {
        return false
    }

    depth := p.Kademlia.NeighbourhoodDepth()
    po, _ := p.Kademlia.Pof(p.Kademlia.BaseAddr(), msg.To, 0)
    log.Trace("selfpossible", "po", po, "depth", depth)

    return depth <= po
}

/////////////////////////////////////////////////////////////////////
// SECTION: Encryption
/////////////////////////////////////////////////////////////////////

// Links a peer ECDSA public key to a topic
//
// This is required for asymmetric message exchange
// on the given topic
//
// The value in `address` will be used as a routing hint for the
// public key / topic association
func (p *Pss) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, address PssAddress) error {
    if err := validateAddress(address); err != nil {
        return err
    }
    pubkeybytes := crypto.FromECDSAPub(pubkey)
    if len(pubkeybytes) == 0 {
        return fmt.Errorf("invalid public key: %v", pubkey)
    }
    pubkeyid := common.ToHex(pubkeybytes)
    psp := &pssPeer{
        address: address,
    }
    p.pubKeyPoolMu.Lock()
    if _, ok := p.pubKeyPool[pubkeyid]; !ok {
        p.pubKeyPool[pubkeyid] = make(map[Topic]*pssPeer)
    }
    p.pubKeyPool[pubkeyid][topic] = psp
    p.pubKeyPoolMu.Unlock()
    log.Trace("added pubkey", "pubkeyid", pubkeyid, "topic", topic, "address", address)
    return nil
}

// Automatically generate a new symkey for a topic and address hint
func (p *Pss) GenerateSymmetricKey(topic Topic, address PssAddress, addToCache bool) (string, error) {
    keyid, err := p.w.GenerateSymKey()
    if err != nil {
        return "", err
    }
    p.addSymmetricKeyToPool(keyid, topic, address, addToCache, false)
    return keyid, nil
}

// Links a peer symmetric key (arbitrary byte sequence) to a topic
//
// This is required for symmetrically encrypted message exchange
// on the given topic
//
// The key is stored in the whisper backend.
//
// If addtocache is set to true, the key will be added to the cache of keys
// used to attempt symmetric decryption of incoming messages.
//
// Returns a string id that can be used to retrieve the key bytes
// from the whisper backend (see pss.GetSymmetricKey())
func (p *Pss) SetSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool) (string, error) {
    if err := validateAddress(address); err != nil {
        return "", err
    }
    return p.setSymmetricKey(key, topic, address, addtocache, true)
}

func (p *Pss) setSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool, protected bool) (string, error) {
    keyid, err := p.w.AddSymKeyDirect(key)
    if err != nil {
        return "", err
    }
    p.addSymmetricKeyToPool(keyid, topic, address, addtocache, protected)
    return keyid, nil
}

// adds a symmetric key to the pss key pool, and optionally adds the key
// to the collection of keys used to attempt symmetric decryption of
// incoming messages
func (p *Pss) addSymmetricKeyToPool(keyid string, topic Topic, address PssAddress, addtocache bool, protected bool) {
    psp := &pssPeer{
        address:   address,
        protected: protected,
    }
    p.symKeyPoolMu.Lock()
    if _, ok := p.symKeyPool[keyid]; !ok {
        p.symKeyPool[keyid] = make(map[Topic]*pssPeer)
    }
    p.symKeyPool[keyid][topic] = psp
    p.symKeyPoolMu.Unlock()
    if addtocache {
        p.symKeyDecryptCacheCursor++
        p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = &keyid
    }
    key, _ := p.GetSymmetricKey(keyid)
    log.Trace("added symkey", "symkeyid", keyid, "symkey", common.ToHex(key), "topic", topic, "address", address, "cache", addtocache)
}

// Returns a symmetric key byte seqyence stored in the whisper backend
// by its unique id
//
// Passes on the error value from the whisper backend
func (p *Pss) GetSymmetricKey(symkeyid string) ([]byte, error) {
    symkey, err := p.w.GetSymKey(symkeyid)
    if err != nil {
        return nil, err
    }
    return symkey, nil
}

// Returns all recorded topic and address combination for a specific public key
func (p *Pss) GetPublickeyPeers(keyid string) (topic []Topic, address []PssAddress, err error) {
    p.pubKeyPoolMu.RLock()
    defer p.pubKeyPoolMu.RUnlock()
    for t, peer := range p.pubKeyPool[keyid] {
        topic = append(topic, t)
        address = append(address, peer.address)
    }

    return topic, address, nil
}

func (p *Pss) getPeerAddress(keyid string, topic Topic) (PssAddress, error) {
    p.pubKeyPoolMu.RLock()
    defer p.pubKeyPoolMu.RUnlock()
    if peers, ok := p.pubKeyPool[keyid]; ok {
        if t, ok := peers[topic]; ok {
            return t.address, nil
        }
    }
    return nil, fmt.Errorf("peer with pubkey %s, topic %x not found", keyid, topic)
}

// Attempt to decrypt, validate and unpack a
// symmetrically encrypted message
// If successful, returns the unpacked whisper ReceivedMessage struct
// encapsulating the decrypted message, and the whisper backend id
// of the symmetric key used to decrypt the message.
// It fails if decryption of the message fails or if the message is corrupted
func (p *Pss) processSym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) {
    metrics.GetOrRegisterCounter("pss.process.sym", nil).Inc(1)

    for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
        symkeyid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
        symkey, err := p.w.GetSymKey(*symkeyid)
        if err != nil {
            continue
        }
        recvmsg, err := envelope.OpenSymmetric(symkey)
        if err != nil {
            continue
        }
        if !recvmsg.Validate() {
            return nil, "", nil, fmt.Errorf("symmetrically encrypted message has invalid signature or is corrupt")
        }
        p.symKeyPoolMu.Lock()
        from := p.symKeyPool[*symkeyid][Topic(envelope.Topic)].address
        p.symKeyPoolMu.Unlock()
        p.symKeyDecryptCacheCursor++
        p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = symkeyid
        return recvmsg, *symkeyid, from, nil
    }
    return nil, "", nil, fmt.Errorf("could not decrypt message")
}

// Attempt to decrypt, validate and unpack an
// asymmetrically encrypted message
// If successful, returns the unpacked whisper ReceivedMessage struct
// encapsulating the decrypted message, and the byte representation of
// the public key used to decrypt the message.
// It fails if decryption of message fails, or if the message is corrupted
func (p *Pss) processAsym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) {
    metrics.GetOrRegisterCounter("pss.process.asym", nil).Inc(1)

    recvmsg, err := envelope.OpenAsymmetric(p.privateKey)
    if err != nil {
        return nil, "", nil, fmt.Errorf("could not decrypt message: %s", err)
    }
    // check signature (if signed), strip padding
    if !recvmsg.Validate() {
        return nil, "", nil, fmt.Errorf("invalid message")
    }
    pubkeyid := common.ToHex(crypto.FromECDSAPub(recvmsg.Src))
    var from PssAddress
    p.pubKeyPoolMu.Lock()
    if p.pubKeyPool[pubkeyid][Topic(envelope.Topic)] != nil {
        from = p.pubKeyPool[pubkeyid][Topic(envelope.Topic)].address
    }
    p.pubKeyPoolMu.Unlock()
    return recvmsg, pubkeyid, from, nil
}

// Symkey garbage collection
// a key is removed if:
// - it is not marked as protected
// - it is not in the incoming decryption cache
func (p *Pss) cleanKeys() (count int) {
    for keyid, peertopics := range p.symKeyPool {
        var expiredtopics []Topic
        for topic, psp := range peertopics {
            if psp.protected {
                continue
            }

            var match bool
            for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
                cacheid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
                if *cacheid == keyid {
                    match = true
                }
            }
            if !match {
                expiredtopics = append(expiredtopics, topic)
            }
        }
        for _, topic := range expiredtopics {
            p.symKeyPoolMu.Lock()
            delete(p.symKeyPool[keyid], topic)
            log.Trace("symkey cleanup deletion", "symkeyid", keyid, "topic", topic, "val", p.symKeyPool[keyid])
            p.symKeyPoolMu.Unlock()
            count++
        }
    }
    return
}

/////////////////////////////////////////////////////////////////////
// SECTION: Message sending
/////////////////////////////////////////////////////////////////////

func (p *Pss) enqueue(msg *PssMsg) error {
    select {
    case p.outbox <- msg:
        return nil
    default:
    }

    metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
    return errors.New("outbox full")
}

// Send a raw message (any encryption is responsibility of calling client)
//
// Will fail if raw messages are disallowed
func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error {
    if err := validateAddress(address); err != nil {
        return err
    }
    pssMsgParams := &msgParams{
        raw: true,
    }
    payload := &whisper.Envelope{
        Data:  msg,
        Topic: whisper.TopicType(topic),
    }
    pssMsg := newPssMsg(pssMsgParams)
    pssMsg.To = address
    pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
    pssMsg.Payload = payload
    p.addFwdCache(pssMsg)
    err := p.enqueue(pssMsg)
    if err != nil {
        return err
    }

    // if we have a proxhandler on this topic
    // also deliver message to ourselves
    if _, ok := p.topicHandlerCaps[topic]; ok {
        if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox {
            return p.process(pssMsg, true, true)
        }
    }
    return nil
}

// Send a message using symmetric encryption
//
// Fails if the key id does not match any of the stored symmetric keys
func (p *Pss) SendSym(symkeyid string, topic Topic, msg []byte) error {
    symkey, err := p.GetSymmetricKey(symkeyid)
    if err != nil {
        return fmt.Errorf("missing valid send symkey %s: %v", symkeyid, err)
    }
    p.symKeyPoolMu.Lock()
    psp, ok := p.symKeyPool[symkeyid][topic]
    p.symKeyPoolMu.Unlock()
    if !ok {
        return fmt.Errorf("invalid topic '%s' for symkey '%s'", topic.String(), symkeyid)
    }
    return p.send(psp.address, topic, msg, false, symkey)
}

// Send a message using asymmetric encryption
//
// Fails if the key id does not match any in of the stored public keys
func (p *Pss) SendAsym(pubkeyid string, topic Topic, msg []byte) error {
    if _, err := crypto.UnmarshalPubkey(common.FromHex(pubkeyid)); err != nil {
        return fmt.Errorf("Cannot unmarshal pubkey: %x", pubkeyid)
    }
    p.pubKeyPoolMu.Lock()
    psp, ok := p.pubKeyPool[pubkeyid][topic]
    p.pubKeyPoolMu.Unlock()
    if !ok {
        return fmt.Errorf("invalid topic '%s' for pubkey '%s'", topic.String(), pubkeyid)
    }
    return p.send(psp.address, topic, msg, true, common.FromHex(pubkeyid))
}

// Send is payload agnostic, and will accept any byte slice as payload
// It generates an whisper envelope for the specified recipient and topic,
// and wraps the message payload in it.
// TODO: Implement proper message padding
func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []byte) error {
    metrics.GetOrRegisterCounter("pss.send", nil).Inc(1)

    if key == nil || bytes.Equal(key, []byte{}) {
        return fmt.Errorf("Zero length key passed to pss send")
    }
    padding := make([]byte, p.paddingByteSize)
    c, err := rand.Read(padding)
    if err != nil {
        return err
    } else if c < p.paddingByteSize {
        return fmt.Errorf("invalid padding length: %d", c)
    }
    wparams := &whisper.MessageParams{
        TTL:      defaultWhisperTTL,
        Src:      p.privateKey,
        Topic:    whisper.TopicType(topic),
        WorkTime: defaultWhisperWorkTime,
        PoW:      defaultWhisperPoW,
        Payload:  msg,
        Padding:  padding,
    }
    if asymmetric {
        pk, err := crypto.UnmarshalPubkey(key)
        if err != nil {
            return fmt.Errorf("Cannot unmarshal pubkey: %x", key)
        }
        wparams.Dst = pk
    } else {
        wparams.KeySym = key
    }
    // set up outgoing message container, which does encryption and envelope wrapping
    woutmsg, err := whisper.NewSentMessage(wparams)
    if err != nil {
        return fmt.Errorf("failed to generate whisper message encapsulation: %v", err)
    }
    // performs encryption.
    // Does NOT perform / performs negligible PoW due to very low difficulty setting
    // after this the message is ready for sending
    envelope, err := woutmsg.Wrap(wparams)
    if err != nil {
        return fmt.Errorf("failed to perform whisper encryption: %v", err)
    }
    log.Trace("pssmsg whisper done", "env", envelope, "wparams payload", common.ToHex(wparams.Payload), "to", common.ToHex(to), "asym", asymmetric, "key", common.ToHex(key))

    // prepare for devp2p transport
    pssMsgParams := &msgParams{
        sym: !asymmetric,
    }
    pssMsg := newPssMsg(pssMsgParams)
    pssMsg.To = to
    pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
    pssMsg.Payload = envelope
    err = p.enqueue(pssMsg)
    if err != nil {
        return err
    }
    if _, ok := p.topicHandlerCaps[topic]; ok {
        if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox {
            return p.process(pssMsg, true, true)
        }
    }
    return nil
}

// Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct
// The recipient address can be of any length, and the byte slice will be matched to the MSB slice
// of the peer address of the equivalent length.
func (p *Pss) forward(msg *PssMsg) error {
    metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1)

    to := make([]byte, addressLength)
    copy(to[:len(msg.To)], msg.To)

    // send with kademlia
    // find the closest peer to the recipient and attempt to send
    sent := 0
    p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool {
        info := sp.Info()

        // check if the peer is running pss
        var ispss bool
        for _, cap := range info.Caps {
            if cap == p.capstring {
                ispss = true
                break
            }
        }
        if !ispss {
            log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
            return true
        }

        // get the protocol peer from the forwarding peer cache
        sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address())
        p.fwdPoolMu.RLock()
        pp := p.fwdPool[sp.Info().ID]
        p.fwdPoolMu.RUnlock()

        // attempt to send the message
        err := pp.Send(context.TODO(), msg)
        if err != nil {
            metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
            log.Error(err.Error())
            return true
        }
        sent++
        log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg))

        // continue forwarding if:
        // - if the peer is end recipient but the full address has not been disclosed
        // - if the peer address matches the partial address fully
        // - if the peer is in proxbin
        if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) {
            log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match"))
            return true
        } else if isproxbin {
            log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address())))
            return true
        }
        // at this point we stop forwarding, and the state is as follows:
        // - the peer is end recipient and we have full address
        // - we are not in proxbin (directed routing)
        // - partial addresses don't fully match
        return false
    })

    if sent == 0 {
        log.Debug("unable to forward to any peers")
        if err := p.enqueue(msg); err != nil {
            metrics.GetOrRegisterCounter("pss.forward.enqueue.error", nil).Inc(1)
            log.Error(err.Error())
            return err
        }
    }

    // cache the message
    p.addFwdCache(msg)
    return nil
}

/////////////////////////////////////////////////////////////////////
// SECTION: Caching
/////////////////////////////////////////////////////////////////////

// cleanFwdCache is used to periodically remove expired entries from the forward cache
func (p *Pss) cleanFwdCache() {
    metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1)
    p.fwdCacheMu.Lock()
    defer p.fwdCacheMu.Unlock()
    for k, v := range p.fwdCache {
        if v.expiresAt.Before(time.Now()) {
            delete(p.fwdCache, k)
        }
    }
}

func label(b []byte) string {
    return fmt.Sprintf("%04x", b[:2])
}

// add a message to the cache
func (p *Pss) addFwdCache(msg *PssMsg) error {
    metrics.GetOrRegisterCounter("pss.addfwdcache", nil).Inc(1)

    var entry pssCacheEntry
    var ok bool

    p.fwdCacheMu.Lock()
    defer p.fwdCacheMu.Unlock()

    digest := p.digest(msg)
    if entry, ok = p.fwdCache[digest]; !ok {
        entry = pssCacheEntry{}
    }
    entry.expiresAt = time.Now().Add(p.cacheTTL)
    p.fwdCache[digest] = entry
    return nil
}

// check if message is in the cache
func (p *Pss) checkFwdCache(msg *PssMsg) bool {
    p.fwdCacheMu.Lock()
    defer p.fwdCacheMu.Unlock()

    digest := p.digest(msg)
    entry, ok := p.fwdCache[digest]
    if ok {
        if entry.expiresAt.After(time.Now()) {
            log.Trace("unexpired cache", "digest", fmt.Sprintf("%x", digest))
            metrics.GetOrRegisterCounter("pss.checkfwdcache.unexpired", nil).Inc(1)
            return true
        }
        metrics.GetOrRegisterCounter("pss.checkfwdcache.expired", nil).Inc(1)
    }
    return false
}

// Digest of message
func (p *Pss) digest(msg *PssMsg) pssDigest {
    return p.digestBytes(msg.serialize())
}

func (p *Pss) digestBytes(msg []byte) pssDigest {
    hasher := p.hashPool.Get().(hash.Hash)
    defer p.hashPool.Put(hasher)
    hasher.Reset()
    hasher.Write(msg)
    digest := pssDigest{}
    key := hasher.Sum(nil)
    copy(digest[:], key[:digestLength])
    return digest
}

func validateAddress(addr PssAddress) error {
    if len(addr) > addressLength {
        return errors.New("address too long")
    }
    return nil
}