diff options
author | ethersphere <thesw@rm.eth> | 2018-06-20 20:06:27 +0800 |
---|---|---|
committer | ethersphere <thesw@rm.eth> | 2018-06-22 03:10:31 +0800 |
commit | e187711c6545487d4cac3701f0f506bb536234e2 (patch) | |
tree | d2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/pss/client/client.go | |
parent | 574378edb50c907b532946a1d4654dbd6701b20a (diff) | |
download | dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2 dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst dexon-e187711c6545487d4cac3701f0f506bb536234e2.zip |
swarm: network rewrite merge
Diffstat (limited to 'swarm/pss/client/client.go')
-rw-r--r-- | swarm/pss/client/client.go | 354 |
1 files changed, 354 insertions, 0 deletions
diff --git a/swarm/pss/client/client.go b/swarm/pss/client/client.go new file mode 100644 index 000000000..532a22384 --- /dev/null +++ b/swarm/pss/client/client.go @@ -0,0 +1,354 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// +build !noclient,!noprotocol + +package client + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/protocols" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/pss" +) + +const ( + handshakeRetryTimeout = 1000 + handshakeRetryCount = 3 +) + +// The pss client provides devp2p emulation over pss RPC API, +// giving access to pss methods from a different process +type Client struct { + BaseAddrHex string + + // peers + peerPool map[pss.Topic]map[string]*pssRPCRW + protos map[pss.Topic]*p2p.Protocol + + // rpc connections + rpc *rpc.Client + subs []*rpc.ClientSubscription + + // channels + topicsC chan []byte + quitC chan struct{} + + poolMu sync.Mutex +} + +// implements p2p.MsgReadWriter +type pssRPCRW struct { + *Client + topic string + msgC chan []byte + addr pss.PssAddress + pubKeyId string + lastSeen time.Time + closed bool +} + +func (c *Client) newpssRPCRW(pubkeyid string, addr pss.PssAddress, topicobj pss.Topic) (*pssRPCRW, error) { + topic := topicobj.String() + err := c.rpc.Call(nil, "pss_setPeerPublicKey", pubkeyid, topic, hexutil.Encode(addr[:])) + if err != nil { + return nil, fmt.Errorf("setpeer %s %s: %v", topic, pubkeyid, err) + } + return &pssRPCRW{ + Client: c, + topic: topic, + msgC: make(chan []byte), + addr: addr, + pubKeyId: pubkeyid, + }, nil +} + +func (rw *pssRPCRW) ReadMsg() (p2p.Msg, error) { + msg := <-rw.msgC + log.Trace("pssrpcrw read", "msg", msg) + pmsg, err := pss.ToP2pMsg(msg) + if err != nil { + return p2p.Msg{}, err + } + + return pmsg, nil +} + +// If only one message slot left +// then new is requested through handshake +// if buffer is empty, handshake request blocks until return +// after which pointer is changed to first new key in buffer +// will fail if: +// - any api calls fail +// - handshake retries are exhausted without reply, +// - send fails +func (rw *pssRPCRW) WriteMsg(msg p2p.Msg) error { + log.Trace("got writemsg pssclient", "msg", msg) + if rw.closed { + return fmt.Errorf("connection closed") + } + rlpdata := make([]byte, msg.Size) + msg.Payload.Read(rlpdata) + pmsg, err := rlp.EncodeToBytes(pss.ProtocolMsg{ + Code: msg.Code, + Size: msg.Size, + Payload: rlpdata, + }) + if err != nil { + return err + } + + // Get the keys we have + var symkeyids []string + err = rw.Client.rpc.Call(&symkeyids, "pss_getHandshakeKeys", rw.pubKeyId, rw.topic, false, true) + if err != nil { + return err + } + + // Check the capacity of the first key + var symkeycap uint16 + if len(symkeyids) > 0 { + err = rw.Client.rpc.Call(&symkeycap, "pss_getHandshakeKeyCapacity", symkeyids[0]) + if err != nil { + return err + } + } + + err = rw.Client.rpc.Call(nil, "pss_sendSym", symkeyids[0], rw.topic, hexutil.Encode(pmsg)) + if err != nil { + return err + } + + // If this is the last message it is valid for, initiate new handshake + if symkeycap == 1 { + var retries int + var sync bool + // if it's the only remaining key, make sure we don't continue until we have new ones for further writes + if len(symkeyids) == 1 { + sync = true + } + // initiate handshake + _, err := rw.handshake(retries, sync, false) + if err != nil { + log.Warn("failing", "err", err) + return err + } + } + return nil +} + +// retry and synchronicity wrapper for handshake api call +// returns first new symkeyid upon successful execution +func (rw *pssRPCRW) handshake(retries int, sync bool, flush bool) (string, error) { + + var symkeyids []string + var i int + // request new keys + // if the key buffer was depleted, make this as a blocking call and try several times before giving up + for i = 0; i < 1+retries; i++ { + log.Debug("handshake attempt pssrpcrw", "pubkeyid", rw.pubKeyId, "topic", rw.topic, "sync", sync) + err := rw.Client.rpc.Call(&symkeyids, "pss_handshake", rw.pubKeyId, rw.topic, sync, flush) + if err == nil { + var keyid string + if sync { + keyid = symkeyids[0] + } + return keyid, nil + } + if i-1+retries > 1 { + time.Sleep(time.Millisecond * handshakeRetryTimeout) + } + } + + return "", fmt.Errorf("handshake failed after %d attempts", i) +} + +// Custom constructor +// +// Provides direct access to the rpc object +func NewClient(rpcurl string) (*Client, error) { + rpcclient, err := rpc.Dial(rpcurl) + if err != nil { + return nil, err + } + + client, err := NewClientWithRPC(rpcclient) + if err != nil { + return nil, err + } + return client, nil +} + +// Main constructor +// +// The 'rpcclient' parameter allows passing a in-memory rpc client to act as the remote websocket RPC. +func NewClientWithRPC(rpcclient *rpc.Client) (*Client, error) { + client := newClient() + client.rpc = rpcclient + err := client.rpc.Call(&client.BaseAddrHex, "pss_baseAddr") + if err != nil { + return nil, fmt.Errorf("cannot get pss node baseaddress: %v", err) + } + return client, nil +} + +func newClient() (client *Client) { + client = &Client{ + quitC: make(chan struct{}), + peerPool: make(map[pss.Topic]map[string]*pssRPCRW), + protos: make(map[pss.Topic]*p2p.Protocol), + } + return +} + +// Mounts a new devp2p protcool on the pss connection +// +// the protocol is aliased as a "pss topic" +// uses normal devp2p send and incoming message handler routines from the p2p/protocols package +// +// when an incoming message is received from a peer that is not yet known to the client, +// this peer object is instantiated, and the protocol is run on it. +func (c *Client) RunProtocol(ctx context.Context, proto *p2p.Protocol) error { + topicobj := pss.BytesToTopic([]byte(fmt.Sprintf("%s:%d", proto.Name, proto.Version))) + topichex := topicobj.String() + msgC := make(chan pss.APIMsg) + c.peerPool[topicobj] = make(map[string]*pssRPCRW) + sub, err := c.rpc.Subscribe(ctx, "pss", msgC, "receive", topichex) + if err != nil { + return fmt.Errorf("pss event subscription failed: %v", err) + } + c.subs = append(c.subs, sub) + err = c.rpc.Call(nil, "pss_addHandshake", topichex) + if err != nil { + return fmt.Errorf("pss handshake activation failed: %v", err) + } + + // dispatch incoming messages + go func() { + for { + select { + case msg := <-msgC: + // we only allow sym msgs here + if msg.Asymmetric { + continue + } + // we get passed the symkeyid + // need the symkey itself to resolve to peer's pubkey + var pubkeyid string + err = c.rpc.Call(&pubkeyid, "pss_getHandshakePublicKey", msg.Key) + if err != nil || pubkeyid == "" { + log.Trace("proto err or no pubkey", "err", err, "symkeyid", msg.Key) + continue + } + // if we don't have the peer on this protocol already, create it + // this is more or less the same as AddPssPeer, less the handshake initiation + if c.peerPool[topicobj][pubkeyid] == nil { + var addrhex string + err := c.rpc.Call(&addrhex, "pss_getAddress", topichex, false, msg.Key) + if err != nil { + log.Trace(err.Error()) + continue + } + addrbytes, err := hexutil.Decode(addrhex) + if err != nil { + log.Trace(err.Error()) + break + } + addr := pss.PssAddress(addrbytes) + rw, err := c.newpssRPCRW(pubkeyid, addr, topicobj) + if err != nil { + break + } + c.peerPool[topicobj][pubkeyid] = rw + nid, _ := discover.HexID("0x00") + p := p2p.NewPeer(nid, fmt.Sprintf("%v", addr), []p2p.Cap{}) + go proto.Run(p, c.peerPool[topicobj][pubkeyid]) + } + go func() { + c.peerPool[topicobj][pubkeyid].msgC <- msg.Msg + }() + case <-c.quitC: + return + } + } + }() + + c.protos[topicobj] = proto + return nil +} + +// Always call this to ensure that we exit cleanly +func (c *Client) Close() error { + for _, s := range c.subs { + s.Unsubscribe() + } + return nil +} + +// Add a pss peer (public key) and run the protocol on it +// +// client.RunProtocol with matching topic must have been +// run prior to adding the peer, or this method will +// return an error. +// +// The key must exist in the key store of the pss node +// before the peer is added. The method will return an error +// if it is not. +func (c *Client) AddPssPeer(pubkeyid string, addr []byte, spec *protocols.Spec) error { + topic := pss.ProtocolTopic(spec) + if c.peerPool[topic] == nil { + return errors.New("addpeer on unset topic") + } + if c.peerPool[topic][pubkeyid] == nil { + rw, err := c.newpssRPCRW(pubkeyid, addr, topic) + if err != nil { + return err + } + _, err = rw.handshake(handshakeRetryCount, true, true) + if err != nil { + return err + } + c.poolMu.Lock() + c.peerPool[topic][pubkeyid] = rw + c.poolMu.Unlock() + nid, _ := discover.HexID("0x00") + p := p2p.NewPeer(nid, fmt.Sprintf("%v", addr), []p2p.Cap{}) + go c.protos[topic].Run(p, c.peerPool[topic][pubkeyid]) + } + return nil +} + +// Remove a pss peer +// +// TODO: underlying cleanup +func (c *Client) RemovePssPeer(pubkeyid string, spec *protocols.Spec) { + log.Debug("closing pss client peer", "pubkey", pubkeyid, "protoname", spec.Name, "protoversion", spec.Version) + c.poolMu.Lock() + defer c.poolMu.Unlock() + topic := pss.ProtocolTopic(spec) + c.peerPool[topic][pubkeyid].closed = true + delete(c.peerPool[topic], pubkeyid) +} |