aboutsummaryrefslogblamecommitdiffstats
path: root/swarm/pss/handshake.go
blob: e3ead77d04925974d6534fba450435380345d88a (plain) (tree)


































































































































































































































































































































































































                                                                                                                                                                                                                                               
                                                                                   



















































































































































































                                                                                                                                                                                                                                                   
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

// +build !nopsshandshake

package pss

import (
    "context"
    "errors"
    "fmt"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/common/hexutil"
    "github.com/ethereum/go-ethereum/crypto"
    "github.com/ethereum/go-ethereum/p2p"
    "github.com/ethereum/go-ethereum/rlp"
    "github.com/ethereum/go-ethereum/rpc"
    "github.com/ethereum/go-ethereum/swarm/log"
)

const (
    IsActiveHandshake = true
)

var (
    ctrlSingleton *HandshakeController
)

const (
    defaultSymKeyRequestTimeout = 1000 * 8  // max wait ms to receive a response to a handshake symkey request
    defaultSymKeyExpiryTimeout  = 1000 * 10 // ms to wait before allowing garbage collection of an expired symkey
    defaultSymKeySendLimit      = 256       // amount of messages a symkey is valid for
    defaultSymKeyCapacity       = 4         // max number of symkeys to store/send simultaneously
)

// symmetric key exchange message payload
type handshakeMsg struct {
    From    []byte
    Limit   uint16
    Keys    [][]byte
    Request uint8
    Topic   Topic
}

// internal representation of an individual symmetric key
type handshakeKey struct {
    symKeyID  *string
    pubKeyID  *string
    limit     uint16
    count     uint16
    expiredAt time.Time
}

// container for all in- and outgoing keys
// for one particular peer (public key) and topic
type handshake struct {
    outKeys []handshakeKey
    inKeys  []handshakeKey
}

// Initialization parameters for the HandshakeController
//
// SymKeyRequestExpiry: Timeout for waiting for a handshake reply
// (default 8000 ms)
//
// SymKeySendLimit: Amount of messages symmetric keys issues by
// this node is valid for (default 256)
//
// SymKeyCapacity: Ideal (and maximum) amount of symmetric keys
// held per direction per peer (default 4)
type HandshakeParams struct {
    SymKeyRequestTimeout time.Duration
    SymKeyExpiryTimeout  time.Duration
    SymKeySendLimit      uint16
    SymKeyCapacity       uint8
}

// Sane defaults for HandshakeController initialization
func NewHandshakeParams() *HandshakeParams {
    return &HandshakeParams{
        SymKeyRequestTimeout: defaultSymKeyRequestTimeout * time.Millisecond,
        SymKeyExpiryTimeout:  defaultSymKeyExpiryTimeout * time.Millisecond,
        SymKeySendLimit:      defaultSymKeySendLimit,
        SymKeyCapacity:       defaultSymKeyCapacity,
    }
}

// Singleton object enabling semi-automatic Diffie-Hellman
// exchange of ephemeral symmetric keys
type HandshakeController struct {
    pss                  *Pss
    keyC                 map[string]chan []string // adds a channel to report when a handshake succeeds
    lock                 sync.Mutex
    symKeyRequestTimeout time.Duration
    symKeyExpiryTimeout  time.Duration
    symKeySendLimit      uint16
    symKeyCapacity       uint8
    symKeyIndex          map[string]*handshakeKey
    handshakes           map[string]map[Topic]*handshake
    deregisterFuncs      map[Topic]func()
}

// Attach HandshakeController to pss node
//
// Must be called before starting the pss node service
func SetHandshakeController(pss *Pss, params *HandshakeParams) error {
    ctrl := &HandshakeController{
        pss:                  pss,
        keyC:                 make(map[string]chan []string),
        symKeyRequestTimeout: params.SymKeyRequestTimeout,
        symKeyExpiryTimeout:  params.SymKeyExpiryTimeout,
        symKeySendLimit:      params.SymKeySendLimit,
        symKeyCapacity:       params.SymKeyCapacity,
        symKeyIndex:          make(map[string]*handshakeKey),
        handshakes:           make(map[string]map[Topic]*handshake),
        deregisterFuncs:      make(map[Topic]func()),
    }
    api := &HandshakeAPI{
        namespace: "pss",
        ctrl:      ctrl,
    }
    pss.addAPI(rpc.API{
        Namespace: api.namespace,
        Version:   "0.2",
        Service:   api,
        Public:    true,
    })
    ctrlSingleton = ctrl
    return nil
}

// Return all unexpired symmetric keys from store by
// peer (public key), topic and specified direction
func (ctl *HandshakeController) validKeys(pubkeyid string, topic *Topic, in bool) (validkeys []*string) {
    ctl.lock.Lock()
    defer ctl.lock.Unlock()
    now := time.Now()
    if _, ok := ctl.handshakes[pubkeyid]; !ok {
        return []*string{}
    } else if _, ok := ctl.handshakes[pubkeyid][*topic]; !ok {
        return []*string{}
    }
    var keystore *[]handshakeKey
    if in {
        keystore = &(ctl.handshakes[pubkeyid][*topic].inKeys)
    } else {
        keystore = &(ctl.handshakes[pubkeyid][*topic].outKeys)
    }

    for _, key := range *keystore {
        if key.limit <= key.count {
            ctl.releaseKey(*key.symKeyID, topic)
        } else if !key.expiredAt.IsZero() && key.expiredAt.Before(now) {
            ctl.releaseKey(*key.symKeyID, topic)
        } else {
            validkeys = append(validkeys, key.symKeyID)
        }
    }
    return
}

// Add all given symmetric keys with validity limits to store by
// peer (public key), topic and specified direction
func (ctl *HandshakeController) updateKeys(pubkeyid string, topic *Topic, in bool, symkeyids []string, limit uint16) {
    ctl.lock.Lock()
    defer ctl.lock.Unlock()
    if _, ok := ctl.handshakes[pubkeyid]; !ok {
        ctl.handshakes[pubkeyid] = make(map[Topic]*handshake)

    }
    if ctl.handshakes[pubkeyid][*topic] == nil {
        ctl.handshakes[pubkeyid][*topic] = &handshake{}
    }
    var keystore *[]handshakeKey
    expire := time.Now()
    if in {
        keystore = &(ctl.handshakes[pubkeyid][*topic].inKeys)
    } else {
        keystore = &(ctl.handshakes[pubkeyid][*topic].outKeys)
        expire = expire.Add(time.Millisecond * ctl.symKeyExpiryTimeout)
    }
    for _, storekey := range *keystore {
        storekey.expiredAt = expire
    }
    for i := 0; i < len(symkeyids); i++ {
        storekey := handshakeKey{
            symKeyID: &symkeyids[i],
            pubKeyID: &pubkeyid,
            limit:    limit,
        }
        *keystore = append(*keystore, storekey)
        ctl.pss.symKeyPool[*storekey.symKeyID][*topic].protected = true
    }
    for i := 0; i < len(*keystore); i++ {
        ctl.symKeyIndex[*(*keystore)[i].symKeyID] = &((*keystore)[i])
    }
}

// Expire a symmetric key, making it elegible for garbage collection
func (ctl *HandshakeController) releaseKey(symkeyid string, topic *Topic) bool {
    if ctl.symKeyIndex[symkeyid] == nil {
        log.Debug("no symkey", "symkeyid", symkeyid)
        return false
    }
    ctl.symKeyIndex[symkeyid].expiredAt = time.Now()
    log.Debug("handshake release", "symkeyid", symkeyid)
    return true
}

// Checks all symmetric keys in given direction(s) by
// specified peer (public key) and topic for expiry.
// Expired means:
// - expiry timestamp is set, and grace period is exceeded
// - message validity limit is reached
func (ctl *HandshakeController) cleanHandshake(pubkeyid string, topic *Topic, in bool, out bool) int {
    ctl.lock.Lock()
    defer ctl.lock.Unlock()
    var deletecount int
    var deletes []string
    now := time.Now()
    handshake := ctl.handshakes[pubkeyid][*topic]
    log.Debug("handshake clean", "pubkey", pubkeyid, "topic", topic)
    if in {
        for i, key := range handshake.inKeys {
            if key.expiredAt.Before(now) || (key.expiredAt.IsZero() && key.limit <= key.count) {
                log.Trace("handshake in clean remove", "symkeyid", *key.symKeyID)
                deletes = append(deletes, *key.symKeyID)
                handshake.inKeys[deletecount] = handshake.inKeys[i]
                deletecount++
            }
        }
        handshake.inKeys = handshake.inKeys[:len(handshake.inKeys)-deletecount]
    }
    if out {
        deletecount = 0
        for i, key := range handshake.outKeys {
            if key.expiredAt.Before(now) && (key.expiredAt.IsZero() && key.limit <= key.count) {
                log.Trace("handshake out clean remove", "symkeyid", *key.symKeyID)
                deletes = append(deletes, *key.symKeyID)
                handshake.outKeys[deletecount] = handshake.outKeys[i]
                deletecount++
            }
        }
        handshake.outKeys = handshake.outKeys[:len(handshake.outKeys)-deletecount]
    }
    for _, keyid := range deletes {
        delete(ctl.symKeyIndex, keyid)
        ctl.pss.symKeyPool[keyid][*topic].protected = false
    }
    return len(deletes)
}

// Runs cleanHandshake() on all peers and topics
func (ctl *HandshakeController) clean() {
    peerpubkeys := ctl.handshakes
    for pubkeyid, peertopics := range peerpubkeys {
        for topic := range peertopics {
            ctl.cleanHandshake(pubkeyid, &topic, true, true)
        }
    }
}

// Passed as a PssMsg handler for the topic handshake is activated on
// Handles incoming key exchange messages and
// ccunts message usage by symmetric key (expiry limit control)
// Only returns error if key handler fails
func (ctl *HandshakeController) handler(msg []byte, p *p2p.Peer, asymmetric bool, symkeyid string) error {
    if !asymmetric {
        if ctl.symKeyIndex[symkeyid] != nil {
            if ctl.symKeyIndex[symkeyid].count >= ctl.symKeyIndex[symkeyid].limit {
                return fmt.Errorf("discarding message using expired key: %s", symkeyid)
            }
            ctl.symKeyIndex[symkeyid].count++
            log.Trace("increment symkey recv use", "symsymkeyid", symkeyid, "count", ctl.symKeyIndex[symkeyid].count, "limit", ctl.symKeyIndex[symkeyid].limit, "receiver", common.ToHex(crypto.FromECDSAPub(ctl.pss.PublicKey())))
        }
        return nil
    }
    keymsg := &handshakeMsg{}
    err := rlp.DecodeBytes(msg, keymsg)
    if err == nil {
        err := ctl.handleKeys(symkeyid, keymsg)
        if err != nil {
            log.Error("handlekeys fail", "error", err)
        }
        return err
    }
    return nil
}

// Handle incoming key exchange message
// Add keys received from peer to store
// and enerate and send the amount of keys requested by peer
//
// TODO:
// - flood guard
// - keylength check
// - update address hint if:
//   1) leftmost bytes in new address do not match stored
//   2) else, if new address is longer
func (ctl *HandshakeController) handleKeys(pubkeyid string, keymsg *handshakeMsg) error {
    // new keys from peer
    if len(keymsg.Keys) > 0 {
        log.Debug("received handshake keys", "pubkeyid", pubkeyid, "from", keymsg.From, "count", len(keymsg.Keys))
        var sendsymkeyids []string
        for _, key := range keymsg.Keys {
            sendsymkey := make([]byte, len(key))
            copy(sendsymkey, key)
            var address PssAddress
            copy(address[:], keymsg.From)
            sendsymkeyid, err := ctl.pss.setSymmetricKey(sendsymkey, keymsg.Topic, &address, false, false)
            if err != nil {
                return err
            }
            sendsymkeyids = append(sendsymkeyids, sendsymkeyid)
        }
        if len(sendsymkeyids) > 0 {
            ctl.updateKeys(pubkeyid, &keymsg.Topic, false, sendsymkeyids, keymsg.Limit)

            ctl.alertHandshake(pubkeyid, sendsymkeyids)
        }
    }

    // peer request for keys
    if keymsg.Request > 0 {
        _, err := ctl.sendKey(pubkeyid, &keymsg.Topic, keymsg.Request)
        if err != nil {
            return err
        }
    }

    return nil
}

// Send key exchange to peer (public key) valid for `topic`
// Will send number of keys specified by `keycount` with
// validity limits specified in `msglimit`
// If number of valid outgoing keys is less than the ideal/max
// amount, a request is sent for the amount of keys to make up
// the difference
func (ctl *HandshakeController) sendKey(pubkeyid string, topic *Topic, keycount uint8) ([]string, error) {

    var requestcount uint8
    to := &PssAddress{}
    if _, ok := ctl.pss.pubKeyPool[pubkeyid]; !ok {
        return []string{}, errors.New("Invalid public key")
    } else if psp, ok := ctl.pss.pubKeyPool[pubkeyid][*topic]; ok {
        to = psp.address
    }

    recvkeys := make([][]byte, keycount)
    recvkeyids := make([]string, keycount)
    ctl.lock.Lock()
    if _, ok := ctl.handshakes[pubkeyid]; !ok {
        ctl.handshakes[pubkeyid] = make(map[Topic]*handshake)
    }
    ctl.lock.Unlock()

    // check if buffer is not full
    outkeys := ctl.validKeys(pubkeyid, topic, false)
    if len(outkeys) < int(ctl.symKeyCapacity) {
        //requestcount = uint8(self.symKeyCapacity - uint8(len(outkeys)))
        requestcount = ctl.symKeyCapacity
    }
    // return if there's nothing to be accomplished
    if requestcount == 0 && keycount == 0 {
        return []string{}, nil
    }

    // generate new keys to send
    for i := 0; i < len(recvkeyids); i++ {
        var err error
        recvkeyids[i], err = ctl.pss.GenerateSymmetricKey(*topic, to, true)
        if err != nil {
            return []string{}, fmt.Errorf("set receive symkey fail (pubkey %x topic %x): %v", pubkeyid, topic, err)
        }
        recvkeys[i], err = ctl.pss.GetSymmetricKey(recvkeyids[i])
        if err != nil {
            return []string{}, fmt.Errorf("GET Generated outgoing symkey fail (pubkey %x topic %x): %v", pubkeyid, topic, err)
        }
    }
    ctl.updateKeys(pubkeyid, topic, true, recvkeyids, ctl.symKeySendLimit)

    // encode and send the message
    recvkeymsg := &handshakeMsg{
        From:    ctl.pss.BaseAddr(),
        Keys:    recvkeys,
        Request: requestcount,
        Limit:   ctl.symKeySendLimit,
        Topic:   *topic,
    }
    log.Debug("sending our symkeys", "pubkey", pubkeyid, "symkeys", recvkeyids, "limit", ctl.symKeySendLimit, "requestcount", requestcount, "keycount", len(recvkeys))
    recvkeybytes, err := rlp.EncodeToBytes(recvkeymsg)
    if err != nil {
        return []string{}, fmt.Errorf("rlp keymsg encode fail: %v", err)
    }
    // if the send fails it means this public key is not registered for this particular address AND topic
    err = ctl.pss.SendAsym(pubkeyid, *topic, recvkeybytes)
    if err != nil {
        return []string{}, fmt.Errorf("Send symkey failed: %v", err)
    }
    return recvkeyids, nil
}

// Enables callback for keys received from a key exchange request
func (ctl *HandshakeController) alertHandshake(pubkeyid string, symkeys []string) chan []string {
    if len(symkeys) > 0 {
        if _, ok := ctl.keyC[pubkeyid]; ok {
            ctl.keyC[pubkeyid] <- symkeys
            close(ctl.keyC[pubkeyid])
            delete(ctl.keyC, pubkeyid)
        }
        return nil
    }
    if _, ok := ctl.keyC[pubkeyid]; !ok {
        ctl.keyC[pubkeyid] = make(chan []string)
    }
    return ctl.keyC[pubkeyid]
}

type HandshakeAPI struct {
    namespace string
    ctrl      *HandshakeController
}

// Initiate a handshake session for a peer (public key) and topic
// combination.
//
// If `sync` is set, the call will block until keys are received from peer,
// or if the handshake request times out
//
// If `flush` is set, the max amount of keys will be sent to the peer
// regardless of how many valid keys that currently exist in the store.
//
// Returns list of symmetric key ids that can be passed to pss.GetSymmetricKey()
// for retrieval of the symmetric key bytes themselves.
//
// Fails if the incoming symmetric key store is already full (and `flush` is false),
// or if the underlying key dispatcher fails
func (api *HandshakeAPI) Handshake(pubkeyid string, topic Topic, sync bool, flush bool) (keys []string, err error) {
    var hsc chan []string
    var keycount uint8
    if flush {
        keycount = api.ctrl.symKeyCapacity
    } else {
        validkeys := api.ctrl.validKeys(pubkeyid, &topic, false)
        keycount = api.ctrl.symKeyCapacity - uint8(len(validkeys))
    }
    if keycount == 0 {
        return keys, errors.New("Incoming symmetric key store is already full")
    }
    if sync {
        hsc = api.ctrl.alertHandshake(pubkeyid, []string{})
    }
    _, err = api.ctrl.sendKey(pubkeyid, &topic, keycount)
    if err != nil {
        return keys, err
    }
    if sync {
        ctx, cancel := context.WithTimeout(context.Background(), api.ctrl.symKeyRequestTimeout)
        defer cancel()
        select {
        case keys = <-hsc:
            log.Trace("sync handshake response receive", "key", keys)
        case <-ctx.Done():
            return []string{}, errors.New("timeout")
        }
    }
    return keys, nil
}

// Activate handshake functionality on a topic
func (api *HandshakeAPI) AddHandshake(topic Topic) error {
    api.ctrl.deregisterFuncs[topic] = api.ctrl.pss.Register(&topic, api.ctrl.handler)
    return nil
}

// Deactivate handshake functionality on a topic
func (api *HandshakeAPI) RemoveHandshake(topic *Topic) error {
    if _, ok := api.ctrl.deregisterFuncs[*topic]; ok {
        api.ctrl.deregisterFuncs[*topic]()
    }
    return nil
}

// Returns all valid symmetric keys in store per peer (public key)
// and topic.
//
// The `in` and `out` parameters indicate for which direction(s)
// symmetric keys will be returned.
// If both are false, no keys (and no error) will be returned.
func (api *HandshakeAPI) GetHandshakeKeys(pubkeyid string, topic Topic, in bool, out bool) (keys []string, err error) {
    if in {
        for _, inkey := range api.ctrl.validKeys(pubkeyid, &topic, true) {
            keys = append(keys, *inkey)
        }
    }
    if out {
        for _, outkey := range api.ctrl.validKeys(pubkeyid, &topic, false) {
            keys = append(keys, *outkey)
        }
    }
    return keys, nil
}

// Returns the amount of messages the specified symmetric key
// is still valid for under the handshake scheme
func (api *HandshakeAPI) GetHandshakeKeyCapacity(symkeyid string) (uint16, error) {
    storekey := api.ctrl.symKeyIndex[symkeyid]
    if storekey == nil {
        return 0, fmt.Errorf("invalid symkey id %s", symkeyid)
    }
    return storekey.limit - storekey.count, nil
}

// Returns the byte representation of the public key in ascii hex
// associated with the given symmetric key
func (api *HandshakeAPI) GetHandshakePublicKey(symkeyid string) (string, error) {
    storekey := api.ctrl.symKeyIndex[symkeyid]
    if storekey == nil {
        return "", fmt.Errorf("invalid symkey id %s", symkeyid)
    }
    return *storekey.pubKeyID, nil
}

// Manually expire the given symkey
//
// If `flush` is set, garbage collection will be performed before returning.
//
// Returns true on successful removal, false otherwise
func (api *HandshakeAPI) ReleaseHandshakeKey(pubkeyid string, topic Topic, symkeyid string, flush bool) (removed bool, err error) {
    removed = api.ctrl.releaseKey(symkeyid, &topic)
    if removed && flush {
        api.ctrl.cleanHandshake(pubkeyid, &topic, true, true)
    }
    return
}

// Send symmetric message under the handshake scheme
//
// Overloads the pss.SendSym() API call, adding symmetric key usage count
// for message expiry control
func (api *HandshakeAPI) SendSym(symkeyid string, topic Topic, msg hexutil.Bytes) (err error) {
    err = api.ctrl.pss.SendSym(symkeyid, topic, msg[:])
    if api.ctrl.symKeyIndex[symkeyid] != nil {
        if api.ctrl.symKeyIndex[symkeyid].count >= api.ctrl.symKeyIndex[symkeyid].limit {
            return errors.New("attempted send with expired key")
        }
        api.ctrl.symKeyIndex[symkeyid].count++
        log.Trace("increment symkey send use", "symkeyid", symkeyid, "count", api.ctrl.symKeyIndex[symkeyid].count, "limit", api.ctrl.symKeyIndex[symkeyid].limit, "receiver", common.ToHex(crypto.FromECDSAPub(api.ctrl.pss.PublicKey())))
    }
    return
}