aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/protocol.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/protocol.go')
-rw-r--r--p2p/protocol.go294
1 files changed, 294 insertions, 0 deletions
diff --git a/p2p/protocol.go b/p2p/protocol.go
new file mode 100644
index 000000000..3f52205f5
--- /dev/null
+++ b/p2p/protocol.go
@@ -0,0 +1,294 @@
+package p2p
+
+import (
+ "bytes"
+ "time"
+
+ "github.com/ethereum/go-ethereum/ethutil"
+)
+
+// Protocol represents a P2P subprotocol implementation.
+type Protocol struct {
+ // Name should contain the official protocol name,
+ // often a three-letter word.
+ Name string
+
+ // Version should contain the version number of the protocol.
+ Version uint
+
+ // Length should contain the number of message codes used
+ // by the protocol.
+ Length uint64
+
+ // Run is called in a new groutine when the protocol has been
+ // negotiated with a peer. It should read and write messages from
+ // rw. The Payload for each message must be fully consumed.
+ //
+ // The peer connection is closed when Start returns. It should return
+ // any protocol-level error (such as an I/O error) that is
+ // encountered.
+ Run func(peer *Peer, rw MsgReadWriter) error
+}
+
+func (p Protocol) cap() Cap {
+ return Cap{p.Name, p.Version}
+}
+
+const (
+ baseProtocolVersion = 2
+ baseProtocolLength = uint64(16)
+ baseProtocolMaxMsgSize = 10 * 1024 * 1024
+)
+
+const (
+ // devp2p message codes
+ handshakeMsg = 0x00
+ discMsg = 0x01
+ pingMsg = 0x02
+ pongMsg = 0x03
+ getPeersMsg = 0x04
+ peersMsg = 0x05
+)
+
+// handshake is the structure of a handshake list.
+type handshake struct {
+ Version uint64
+ ID string
+ Caps []Cap
+ ListenPort uint64
+ NodeID []byte
+}
+
+func (h *handshake) String() string {
+ return h.ID
+}
+func (h *handshake) Pubkey() []byte {
+ return h.NodeID
+}
+
+// Cap is the structure of a peer capability.
+type Cap struct {
+ Name string
+ Version uint
+}
+
+func (cap Cap) RlpData() interface{} {
+ return []interface{}{cap.Name, cap.Version}
+}
+
+type capsByName []Cap
+
+func (cs capsByName) Len() int { return len(cs) }
+func (cs capsByName) Less(i, j int) bool { return cs[i].Name < cs[j].Name }
+func (cs capsByName) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] }
+
+type baseProtocol struct {
+ rw MsgReadWriter
+ peer *Peer
+}
+
+func runBaseProtocol(peer *Peer, rw MsgReadWriter) error {
+ bp := &baseProtocol{rw, peer}
+ if err := bp.doHandshake(rw); err != nil {
+ return err
+ }
+ // run main loop
+ quit := make(chan error, 1)
+ go func() {
+ for {
+ if err := bp.handle(rw); err != nil {
+ quit <- err
+ break
+ }
+ }
+ }()
+ return bp.loop(quit)
+}
+
+var pingTimeout = 2 * time.Second
+
+func (bp *baseProtocol) loop(quit <-chan error) error {
+ ping := time.NewTimer(pingTimeout)
+ activity := bp.peer.activity.Subscribe(time.Time{})
+ lastActive := time.Time{}
+ defer ping.Stop()
+ defer activity.Unsubscribe()
+
+ getPeersTick := time.NewTicker(10 * time.Second)
+ defer getPeersTick.Stop()
+ err := bp.rw.EncodeMsg(getPeersMsg)
+
+ for err == nil {
+ select {
+ case err = <-quit:
+ return err
+ case <-getPeersTick.C:
+ err = bp.rw.EncodeMsg(getPeersMsg)
+ case event := <-activity.Chan():
+ ping.Reset(pingTimeout)
+ lastActive = event.(time.Time)
+ case t := <-ping.C:
+ if lastActive.Add(pingTimeout * 2).Before(t) {
+ err = newPeerError(errPingTimeout, "")
+ } else if lastActive.Add(pingTimeout).Before(t) {
+ err = bp.rw.EncodeMsg(pingMsg)
+ }
+ }
+ }
+ return err
+}
+
+func (bp *baseProtocol) handle(rw MsgReadWriter) error {
+ msg, err := rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if msg.Size > baseProtocolMaxMsgSize {
+ return newPeerError(errMisc, "message too big")
+ }
+ // make sure that the payload has been fully consumed
+ defer msg.Discard()
+
+ switch msg.Code {
+ case handshakeMsg:
+ return newPeerError(errProtocolBreach, "extra handshake received")
+
+ case discMsg:
+ var reason [1]DiscReason
+ if err := msg.Decode(&reason); err != nil {
+ return err
+ }
+ return discRequestedError(reason[0])
+
+ case pingMsg:
+ return bp.rw.EncodeMsg(pongMsg)
+
+ case pongMsg:
+
+ case getPeersMsg:
+ peers := bp.peerList()
+ // this is dangerous. the spec says that we should _delay_
+ // sending the response if no new information is available.
+ // this means that would need to send a response later when
+ // new peers become available.
+ //
+ // TODO: add event mechanism to notify baseProtocol for new peers
+ if len(peers) > 0 {
+ return bp.rw.EncodeMsg(peersMsg, peers)
+ }
+
+ case peersMsg:
+ var peers []*peerAddr
+ if err := msg.Decode(&peers); err != nil {
+ return err
+ }
+ for _, addr := range peers {
+ bp.peer.Debugf("received peer suggestion: %v", addr)
+ bp.peer.newPeerAddr <- addr
+ }
+
+ default:
+ return newPeerError(errInvalidMsgCode, "unknown message code %v", msg.Code)
+ }
+ return nil
+}
+
+func (bp *baseProtocol) doHandshake(rw MsgReadWriter) error {
+ // send our handshake
+ if err := rw.WriteMsg(bp.handshakeMsg()); err != nil {
+ return err
+ }
+
+ // read and handle remote handshake
+ msg, err := rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if msg.Code != handshakeMsg {
+ return newPeerError(errProtocolBreach, "first message must be handshake, got %x", msg.Code)
+ }
+ if msg.Size > baseProtocolMaxMsgSize {
+ return newPeerError(errMisc, "message too big")
+ }
+
+ var hs handshake
+ if err := msg.Decode(&hs); err != nil {
+ return err
+ }
+
+ // validate handshake info
+ if hs.Version != baseProtocolVersion {
+ return newPeerError(errP2PVersionMismatch, "Require protocol %d, received %d\n",
+ baseProtocolVersion, hs.Version)
+ }
+ if len(hs.NodeID) == 0 {
+ return newPeerError(errPubkeyMissing, "")
+ }
+ if len(hs.NodeID) != 64 {
+ return newPeerError(errPubkeyInvalid, "require 512 bit, got %v", len(hs.NodeID)*8)
+ }
+ if da := bp.peer.dialAddr; da != nil {
+ // verify that the peer we wanted to connect to
+ // actually holds the target public key.
+ if da.Pubkey != nil && !bytes.Equal(da.Pubkey, hs.NodeID) {
+ return newPeerError(errPubkeyForbidden, "dial address pubkey mismatch")
+ }
+ }
+ pa := newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID)
+ if err := bp.peer.pubkeyHook(pa); err != nil {
+ return newPeerError(errPubkeyForbidden, "%v", err)
+ }
+
+ // TODO: remove Caps with empty name
+
+ var addr *peerAddr
+ if hs.ListenPort != 0 {
+ addr = newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID)
+ addr.Port = hs.ListenPort
+ }
+ bp.peer.setHandshakeInfo(&hs, addr, hs.Caps)
+ bp.peer.startSubprotocols(hs.Caps)
+ return nil
+}
+
+func (bp *baseProtocol) handshakeMsg() Msg {
+ var (
+ port uint64
+ caps []interface{}
+ )
+ if bp.peer.ourListenAddr != nil {
+ port = bp.peer.ourListenAddr.Port
+ }
+ for _, proto := range bp.peer.protocols {
+ caps = append(caps, proto.cap())
+ }
+ return NewMsg(handshakeMsg,
+ baseProtocolVersion,
+ bp.peer.ourID.String(),
+ caps,
+ port,
+ bp.peer.ourID.Pubkey()[1:],
+ )
+}
+
+func (bp *baseProtocol) peerList() []ethutil.RlpEncodable {
+ peers := bp.peer.otherPeers()
+ ds := make([]ethutil.RlpEncodable, 0, len(peers))
+ for _, p := range peers {
+ p.infolock.Lock()
+ addr := p.listenAddr
+ p.infolock.Unlock()
+ // filter out this peer and peers that are not listening or
+ // have not completed the handshake.
+ // TODO: track previously sent peers and exclude them as well.
+ if p == bp.peer || addr == nil {
+ continue
+ }
+ ds = append(ds, addr)
+ }
+ ourAddr := bp.peer.ourListenAddr
+ if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() {
+ ds = append(ds, ourAddr)
+ }
+ return ds
+}