aboutsummaryrefslogblamecommitdiffstats
path: root/p2p/protocol.go
blob: d22ba70cb291a6f0aae79193a0fae35a27378066 (plain) (tree)
1
2
3
4
5
6
7
8
9



               

              
              

                                                 

 
                                               
                         










                                                                        
                        




















































                                                                                                                


       


                                         


       






                           

 
       

                                                   






















                                                                                  
                                                          


















                                                                     


                                    

                                          

 


















                                                                                       
         
 









                                                                       

 




























                                                                               

 



                                                                                
 





                                                                                                         
 

                                                      
 

                                 
 







                                                                   
                         
                                                  
                 
 

                                    
 

                                                                                    
         
                  

 

                                                          



                                                                
                                                      


         
                                                                 
             




                                                
         
                                                    

                                                                                                                        



                                                      
                                                                                


                              
                                                                                            

         

                                                                                     
                                                                                      


                                                                                       

                                                                              


                     
                            
                                          

                                                                                                        
                        
                                                 





                                           
                                                        
         



                                                                 
 
package p2p

import (
    "bytes"
    "net"
    "sort"
    "time"

    "github.com/ethereum/go-ethereum/ethutil"
)

// Protocol is implemented by P2P subprotocols.
type Protocol interface {
    // Start is called when the protocol becomes active.
    // It should read and write messages from rw.
    // Messages must be fully consumed.
    //
    // The connection is closed when Start returns. It should return
    // any protocol-level error (such as an I/O error) that is
    // encountered.
    Start(peer *Peer, rw MsgReadWriter) error

    // Offset should return the number of message codes
    // used by the protocol.
    Offset() MsgCode
}

type MsgReader interface {
    ReadMsg() (Msg, error)
}

type MsgWriter interface {
    WriteMsg(Msg) error
}

// MsgReadWriter is passed to protocols. Protocol implementations can
// use it to write messages back to a connected peer.
type MsgReadWriter interface {
    MsgReader
    MsgWriter
}

type MsgHandler func(code MsgCode, data *ethutil.Value) error

// MsgLoop reads messages off the given reader and
// calls the handler function for each decoded message until
// it returns an error or the peer connection is closed.
//
// If a message is larger than the given maximum size, RunProtocol
// returns an appropriate error.n
func MsgLoop(r MsgReader, maxsize uint32, handler MsgHandler) error {
    for {
        msg, err := r.ReadMsg()
        if err != nil {
            return err
        }
        if msg.Size > maxsize {
            return NewPeerError(InvalidMsg, "size %d exceeds maximum size of %d", msg.Size, maxsize)
        }
        value, err := msg.Data()
        if err != nil {
            return err
        }
        if err := handler(msg.Code, value); err != nil {
            return err
        }
    }
}

// the ÐΞVp2p base protocol
type baseProtocol struct {
    rw   MsgReadWriter
    peer *Peer
}

type bpMsg struct {
    code MsgCode
    data *ethutil.Value
}

const (
    p2pVersion      = 0
    pingTimeout     = 2 * time.Second
    pingGracePeriod = 2 * time.Second
)

const (
    // message codes
    handshakeMsg = iota
    discMsg
    pingMsg
    pongMsg
    getPeersMsg
    peersMsg
)

const (
    baseProtocolOffset     MsgCode = 16
    baseProtocolMaxMsgSize         = 500 * 1024
)

type DiscReason byte

const (
    // Values are given explicitly instead of by iota because these values are
    // defined by the wire protocol spec; it is easier for humans to ensure
    // correctness when values are explicit.
    DiscRequested           = 0x00
    DiscNetworkError        = 0x01
    DiscProtocolError       = 0x02
    DiscUselessPeer         = 0x03
    DiscTooManyPeers        = 0x04
    DiscAlreadyConnected    = 0x05
    DiscIncompatibleVersion = 0x06
    DiscInvalidIdentity     = 0x07
    DiscQuitting            = 0x08
    DiscUnexpectedIdentity  = 0x09
    DiscSelf                = 0x0a
    DiscReadTimeout         = 0x0b
    DiscSubprotocolError    = 0x10
)

var discReasonToString = [DiscSubprotocolError + 1]string{
    DiscRequested:           "Disconnect requested",
    DiscNetworkError:        "Network error",
    DiscProtocolError:       "Breach of protocol",
    DiscUselessPeer:         "Useless peer",
    DiscTooManyPeers:        "Too many peers",
    DiscAlreadyConnected:    "Already connected",
    DiscIncompatibleVersion: "Incompatible P2P protocol version",
    DiscInvalidIdentity:     "Invalid node identity",
    DiscQuitting:            "Client quitting",
    DiscUnexpectedIdentity:  "Unexpected identity",
    DiscSelf:                "Connected to self",
    DiscReadTimeout:         "Read timeout",
    DiscSubprotocolError:    "Subprotocol error",
}

func (d DiscReason) String() string {
    if len(discReasonToString) < int(d) {
        return "Unknown"
    }
    return discReasonToString[d]
}

func (bp *baseProtocol) Offset() MsgCode {
    return baseProtocolOffset
}

func (bp *baseProtocol) Start(peer *Peer, rw MsgReadWriter) error {
    bp.peer, bp.rw = peer, rw

    // Do the handshake.
    // TODO: disconnect is valid before handshake, too.
    rw.WriteMsg(bp.peer.server.handshakeMsg())
    msg, err := rw.ReadMsg()
    if err != nil {
        return err
    }
    if msg.Code != handshakeMsg {
        return NewPeerError(ProtocolBreach, " first message must be handshake")
    }
    data, err := msg.Data()
    if err != nil {
        return NewPeerError(InvalidMsg, "%v", err)
    }
    if err := bp.handleHandshake(data); err != nil {
        return err
    }

    msgin := make(chan bpMsg)
    done := make(chan error, 1)
    go func() {
        done <- MsgLoop(rw, baseProtocolMaxMsgSize,
            func(code MsgCode, data *ethutil.Value) error {
                msgin <- bpMsg{code, data}
                return nil
            })
    }()
    return bp.loop(msgin, done)
}

func (bp *baseProtocol) loop(msgin <-chan bpMsg, quit <-chan error) error {
    logger.Debugf("pingpong keepalive started at %v\n", time.Now())
    messenger := bp.rw.(*proto).messenger
    pingTimer := time.NewTimer(pingTimeout)
    pinged := true

    for {
        select {
        case msg := <-msgin:
            if err := bp.handle(msg.code, msg.data); err != nil {
                return err
            }
        case err := <-quit:
            return err
        case <-messenger.pulse:
            pingTimer.Reset(pingTimeout)
            pinged = false
        case <-pingTimer.C:
            if pinged {
                return NewPeerError(PingTimeout, "")
            }
            logger.Debugf("pinging at %v\n", time.Now())
            if err := bp.rw.WriteMsg(NewMsg(pingMsg)); err != nil {
                return NewPeerError(WriteError, "%v", err)
            }
            pinged = true
            pingTimer.Reset(pingTimeout)
        }
    }
}

func (bp *baseProtocol) handle(code MsgCode, data *ethutil.Value) error {
    switch code {
    case handshakeMsg:
        return NewPeerError(ProtocolBreach, " extra handshake received")

    case discMsg:
        logger.Infof("Disconnect requested from peer %v, reason", DiscReason(data.Get(0).Uint()))
        bp.peer.server.PeerDisconnect() <- DisconnectRequest{
            addr:   bp.peer.Address,
            reason: DiscRequested,
        }

    case pingMsg:
        return bp.rw.WriteMsg(NewMsg(pongMsg))

    case pongMsg:
        // reply for ping

    case getPeersMsg:
        // Peer asked for list of connected peers.
        peersRLP := bp.peer.server.encodedPeerList()
        if peersRLP != nil {
            msg := Msg{
                Code:    peersMsg,
                Size:    uint32(len(peersRLP)),
                Payload: bytes.NewReader(peersRLP),
            }
            return bp.rw.WriteMsg(msg)
        }

    case peersMsg:
        bp.handlePeers(data)

    default:
        return NewPeerError(InvalidMsgCode, "unknown message code %v", code)
    }
    return nil
}

func (bp *baseProtocol) handlePeers(data *ethutil.Value) {
    it := data.NewIterator()
    for it.Next() {
        ip := net.IP(it.Value().Get(0).Bytes())
        port := it.Value().Get(1).Uint()
        address := &net.TCPAddr{IP: ip, Port: int(port)}
        go bp.peer.server.PeerConnect(address)
    }
}

func (bp *baseProtocol) handleHandshake(c *ethutil.Value) error {
    var (
        remoteVersion = c.Get(0).Uint()
        id            = c.Get(1).Str()
        caps          = c.Get(2)
        port          = c.Get(3).Uint()
        pubkey        = c.Get(4).Bytes()
    )
    // Check correctness of p2p protocol version
    if remoteVersion != p2pVersion {
        return NewPeerError(P2PVersionMismatch, "Require protocol %d, received %d\n", p2pVersion, remoteVersion)
    }

    // Handle the pub key (validation, uniqueness)
    if len(pubkey) == 0 {
        return NewPeerError(PubkeyMissing, "not supplied in handshake.")
    }

    if len(pubkey) != 64 {
        return NewPeerError(PubkeyInvalid, "require 512 bit, got %v", len(pubkey)*8)
    }

    // self connect detection
    if bytes.Compare(bp.peer.server.ClientIdentity().Pubkey()[1:], pubkey) == 0 {
        return NewPeerError(PubkeyForbidden, "not allowed to connect to self")
    }

    // register pubkey on server. this also sets the pubkey on the peer (need lock)
    if err := bp.peer.server.RegisterPubkey(bp.peer, pubkey); err != nil {
        return NewPeerError(PubkeyForbidden, err.Error())
    }

    // check port
    if bp.peer.Inbound {
        uint16port := uint16(port)
        if bp.peer.Port > 0 && bp.peer.Port != uint16port {
            return NewPeerError(PortMismatch, "port mismatch: %v != %v", bp.peer.Port, port)
        } else {
            bp.peer.Port = uint16port
        }
    }

    capsIt := caps.NewIterator()
    for capsIt.Next() {
        cap := capsIt.Value().Str()
        bp.peer.Caps = append(bp.peer.Caps, cap)
    }
    sort.Strings(bp.peer.Caps)
    bp.rw.(*proto).messenger.setRemoteProtocols(bp.peer.Caps)
    bp.peer.Id = id
    return nil
}