package p2p import ( "bytes" "net" "sort" "time" "github.com/ethereum/go-ethereum/ethutil" ) // Protocol is implemented by P2P subprotocols. type Protocol interface { // Start is called when the protocol becomes active. // It should read and write messages from rw. // Messages must be fully consumed. // // The connection is closed when Start returns. It should return // any protocol-level error (such as an I/O error) that is // encountered. Start(peer *Peer, rw MsgReadWriter) error // Offset should return the number of message codes // used by the protocol. Offset() MsgCode } type MsgReader interface { ReadMsg() (Msg, error) } type MsgWriter interface { WriteMsg(Msg) error } // MsgReadWriter is passed to protocols. Protocol implementations can // use it to write messages back to a connected peer. type MsgReadWriter interface { MsgReader MsgWriter } type MsgHandler func(code MsgCode, data *ethutil.Value) error // MsgLoop reads messages off the given reader and // calls the handler function for each decoded message until // it returns an error or the peer connection is closed. // // If a message is larger than the given maximum size, RunProtocol // returns an appropriate error.n func MsgLoop(r MsgReader, maxsize uint32, handler MsgHandler) error { for { msg, err := r.ReadMsg() if err != nil { return err } if msg.Size > maxsize { return NewPeerError(InvalidMsg, "size %d exceeds maximum size of %d", msg.Size, maxsize) } value, err := msg.Data() if err != nil { return err } if err := handler(msg.Code, value); err != nil { return err } } } // the ÐΞVp2p base protocol type baseProtocol struct { rw MsgReadWriter peer *Peer } type bpMsg struct { code MsgCode data *ethutil.Value } const ( p2pVersion = 0 pingTimeout = 2 * time.Second pingGracePeriod = 2 * time.Second ) const ( // message codes handshakeMsg = iota discMsg pingMsg pongMsg getPeersMsg peersMsg ) const ( baseProtocolOffset MsgCode = 16 baseProtocolMaxMsgSize = 500 * 1024 ) type DiscReason byte const ( // Values are given explicitly instead of by iota because these values are // defined by the wire protocol spec; it is easier for humans to ensure // correctness when values are explicit. DiscRequested = 0x00 DiscNetworkError = 0x01 DiscProtocolError = 0x02 DiscUselessPeer = 0x03 DiscTooManyPeers = 0x04 DiscAlreadyConnected = 0x05 DiscIncompatibleVersion = 0x06 DiscInvalidIdentity = 0x07 DiscQuitting = 0x08 DiscUnexpectedIdentity = 0x09 DiscSelf = 0x0a DiscReadTimeout = 0x0b DiscSubprotocolError = 0x10 ) var discReasonToString = [DiscSubprotocolError + 1]string{ DiscRequested: "Disconnect requested", DiscNetworkError: "Network error", DiscProtocolError: "Breach of protocol", DiscUselessPeer: "Useless peer", DiscTooManyPeers: "Too many peers", DiscAlreadyConnected: "Already connected", DiscIncompatibleVersion: "Incompatible P2P protocol version", DiscInvalidIdentity: "Invalid node identity", DiscQuitting: "Client quitting", DiscUnexpectedIdentity: "Unexpected identity", DiscSelf: "Connected to self", DiscReadTimeout: "Read timeout", DiscSubprotocolError: "Subprotocol error", } func (d DiscReason) String() string { if len(discReasonToString) < int(d) { return "Unknown" } return discReasonToString[d] } func (bp *baseProtocol) Ping() { } func (bp *baseProtocol) Offset() MsgCode { return baseProtocolOffset } func (bp *baseProtocol) Start(peer *Peer, rw MsgReadWriter) error { bp.peer, bp.rw = peer, rw // Do the handshake. // TODO: disconnect is valid before handshake, too. rw.WriteMsg(bp.peer.server.handshakeMsg()) msg, err := rw.ReadMsg() if err != nil { return err } if msg.Code != handshakeMsg { return NewPeerError(ProtocolBreach, " first message must be handshake") } data, err := msg.Data() if err != nil { return NewPeerError(InvalidMsg, "%v", err) } if err := bp.handleHandshake(data); err != nil { return err } msgin := make(chan bpMsg) done := make(chan error, 1) go func() { done <- MsgLoop(rw, baseProtocolMaxMsgSize, func(code MsgCode, data *ethutil.Value) error { msgin <- bpMsg{code, data} return nil }) }() return bp.loop(msgin, done) } func (bp *baseProtocol) loop(msgin <-chan bpMsg, quit <-chan error) error { logger.Debugf("pingpong keepalive started at %v\n", time.Now()) messenger := bp.rw.(*proto).messenger pingTimer := time.NewTimer(pingTimeout) pinged := true for { select { case msg := <-msgin: if err := bp.handle(msg.code, msg.data); err != nil { return err } case err := <-quit: return err case <-messenger.pulse: pingTimer.Reset(pingTimeout) pinged = false case <-pingTimer.C: if pinged { return NewPeerError(PingTimeout, "") } logger.Debugf("pinging at %v\n", time.Now()) if err := bp.rw.WriteMsg(NewMsg(pingMsg)); err != nil { return NewPeerError(WriteError, "%v", err) } pinged = true pingTimer.Reset(pingTimeout) } } } func (bp *baseProtocol) handle(code MsgCode, data *ethutil.Value) error { switch code { case handshakeMsg: return NewPeerError(ProtocolBreach, " extra handshake received") case discMsg: logger.Infof("Disconnect requested from peer %v, reason", DiscReason(data.Get(0).Uint())) bp.peer.server.PeerDisconnect() <- DisconnectRequest{ addr: bp.peer.Address, reason: DiscRequested, } case pingMsg: return bp.rw.WriteMsg(NewMsg(pongMsg)) case pongMsg: // reply for ping case getPeersMsg: // Peer asked for list of connected peers. peersRLP := bp.peer.server.encodedPeerList() if peersRLP != nil { msg := Msg{ Code: peersMsg, Size: uint32(len(peersRLP)), Payload: bytes.NewReader(peersRLP), } return bp.rw.WriteMsg(msg) } case peersMsg: bp.handlePeers(data) default: return NewPeerError(InvalidMsgCode, "unknown message code %v", code) } return nil } func (bp *baseProtocol) handlePeers(data *ethutil.Value) { it := data.NewIterator() for it.Next() { ip := net.IP(it.Value().Get(0).Bytes()) port := it.Value().Get(1).Uint() address := &net.TCPAddr{IP: ip, Port: int(port)} go bp.peer.server.PeerConnect(address) } } func (bp *baseProtocol) handleHandshake(c *ethutil.Value) error { var ( remoteVersion = c.Get(0).Uint() id = c.Get(1).Str() caps = c.Get(2) port = c.Get(3).Uint() pubkey = c.Get(4).Bytes() ) // Check correctness of p2p protocol version if remoteVersion != p2pVersion { return NewPeerError(P2PVersionMismatch, "Require protocol %d, received %d\n", p2pVersion, remoteVersion) } // Handle the pub key (validation, uniqueness) if len(pubkey) == 0 { return NewPeerError(PubkeyMissing, "not supplied in handshake.") } if len(pubkey) != 64 { return NewPeerError(PubkeyInvalid, "require 512 bit, got %v", len(pubkey)*8) } // self connect detection if bytes.Compare(bp.peer.server.ClientIdentity().Pubkey()[1:], pubkey) == 0 { return NewPeerError(PubkeyForbidden, "not allowed to connect to bp") } // register pubkey on server. this also sets the pubkey on the peer (need lock) if err := bp.peer.server.RegisterPubkey(bp.peer, pubkey); err != nil { return NewPeerError(PubkeyForbidden, err.Error()) } // check port if bp.peer.Inbound { uint16port := uint16(port) if bp.peer.Port > 0 && bp.peer.Port != uint16port { return NewPeerError(PortMismatch, "port mismatch: %v != %v", bp.peer.Port, port) } else { bp.peer.Port = uint16port } } capsIt := caps.NewIterator() for capsIt.Next() { cap := capsIt.Value().Str() bp.peer.Caps = append(bp.peer.Caps, cap) } sort.Strings(bp.peer.Caps) bp.rw.(*proto).messenger.setRemoteProtocols(bp.peer.Caps) bp.peer.Id = id return nil }