aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/protocol.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2014-11-22 04:48:49 +0800
committerFelix Lange <fjl@twurst.com>2014-11-22 04:52:45 +0800
commit59b63caf5e4de64ceb7dcdf01551a080f53b1672 (patch)
treea4e79590284c5afe4d6927b422a5092b074e7938 /p2p/protocol.go
parente4a601c6444afdc11ce0cb80d7fd83116de2c8b9 (diff)
downloadgo-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar
go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.gz
go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.bz2
go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.lz
go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.xz
go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.zst
go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.zip
p2p: API cleanup and PoC 7 compatibility
Whoa, one more big commit. I didn't manage to untangle the changes while working towards compatibility.
Diffstat (limited to 'p2p/protocol.go')
-rw-r--r--p2p/protocol.go412
1 files changed, 194 insertions, 218 deletions
diff --git a/p2p/protocol.go b/p2p/protocol.go
index d22ba70cb..169dcdb6e 100644
--- a/p2p/protocol.go
+++ b/p2p/protocol.go
@@ -3,249 +3,185 @@ package p2p
import (
"bytes"
"net"
- "sort"
"time"
"github.com/ethereum/go-ethereum/ethutil"
)
-// Protocol is implemented by P2P subprotocols.
-type Protocol interface {
- // Start is called when the protocol becomes active.
- // It should read and write messages from rw.
- // Messages must be fully consumed.
- //
- // The connection is closed when Start returns. It should return
- // any protocol-level error (such as an I/O error) that is
- // encountered.
- Start(peer *Peer, rw MsgReadWriter) error
+// Protocol represents a P2P subprotocol implementation.
+type Protocol struct {
+ // Name should contain the official protocol name,
+ // often a three-letter word.
+ Name string
- // Offset should return the number of message codes
- // used by the protocol.
- Offset() MsgCode
-}
+ // Version should contain the version number of the protocol.
+ Version uint
-type MsgReader interface {
- ReadMsg() (Msg, error)
-}
-
-type MsgWriter interface {
- WriteMsg(Msg) error
-}
-
-// MsgReadWriter is passed to protocols. Protocol implementations can
-// use it to write messages back to a connected peer.
-type MsgReadWriter interface {
- MsgReader
- MsgWriter
-}
+ // Length should contain the number of message codes used
+ // by the protocol.
+ Length uint64
-type MsgHandler func(code MsgCode, data *ethutil.Value) error
-
-// MsgLoop reads messages off the given reader and
-// calls the handler function for each decoded message until
-// it returns an error or the peer connection is closed.
-//
-// If a message is larger than the given maximum size, RunProtocol
-// returns an appropriate error.n
-func MsgLoop(r MsgReader, maxsize uint32, handler MsgHandler) error {
- for {
- msg, err := r.ReadMsg()
- if err != nil {
- return err
- }
- if msg.Size > maxsize {
- return NewPeerError(InvalidMsg, "size %d exceeds maximum size of %d", msg.Size, maxsize)
- }
- value, err := msg.Data()
- if err != nil {
- return err
- }
- if err := handler(msg.Code, value); err != nil {
- return err
- }
- }
-}
-
-// the ÐΞVp2p base protocol
-type baseProtocol struct {
- rw MsgReadWriter
- peer *Peer
+ // Run is called in a new groutine when the protocol has been
+ // negotiated with a peer. It should read and write messages from
+ // rw. The Payload for each message must be fully consumed.
+ //
+ // The peer connection is closed when Start returns. It should return
+ // any protocol-level error (such as an I/O error) that is
+ // encountered.
+ Run func(peer *Peer, rw MsgReadWriter) error
}
-type bpMsg struct {
- code MsgCode
- data *ethutil.Value
+func (p Protocol) cap() Cap {
+ return Cap{p.Name, p.Version}
}
const (
- p2pVersion = 0
- pingTimeout = 2 * time.Second
- pingGracePeriod = 2 * time.Second
+ baseProtocolVersion = 2
+ baseProtocolLength = uint64(16)
+ baseProtocolMaxMsgSize = 10 * 1024 * 1024
)
const (
- // message codes
- handshakeMsg = iota
- discMsg
- pingMsg
- pongMsg
- getPeersMsg
- peersMsg
+ // devp2p message codes
+ handshakeMsg = 0x00
+ discMsg = 0x01
+ pingMsg = 0x02
+ pongMsg = 0x03
+ getPeersMsg = 0x04
+ peersMsg = 0x05
)
-const (
- baseProtocolOffset MsgCode = 16
- baseProtocolMaxMsgSize = 500 * 1024
-)
-
-type DiscReason byte
+// handshake is the structure of a handshake list.
+type handshake struct {
+ Version uint64
+ ID string
+ Caps []Cap
+ ListenPort uint64
+ NodeID []byte
+}
-const (
- // Values are given explicitly instead of by iota because these values are
- // defined by the wire protocol spec; it is easier for humans to ensure
- // correctness when values are explicit.
- DiscRequested = 0x00
- DiscNetworkError = 0x01
- DiscProtocolError = 0x02
- DiscUselessPeer = 0x03
- DiscTooManyPeers = 0x04
- DiscAlreadyConnected = 0x05
- DiscIncompatibleVersion = 0x06
- DiscInvalidIdentity = 0x07
- DiscQuitting = 0x08
- DiscUnexpectedIdentity = 0x09
- DiscSelf = 0x0a
- DiscReadTimeout = 0x0b
- DiscSubprotocolError = 0x10
-)
+func (h *handshake) String() string {
+ return h.ID
+}
+func (h *handshake) Pubkey() []byte {
+ return h.NodeID
+}
-var discReasonToString = [DiscSubprotocolError + 1]string{
- DiscRequested: "Disconnect requested",
- DiscNetworkError: "Network error",
- DiscProtocolError: "Breach of protocol",
- DiscUselessPeer: "Useless peer",
- DiscTooManyPeers: "Too many peers",
- DiscAlreadyConnected: "Already connected",
- DiscIncompatibleVersion: "Incompatible P2P protocol version",
- DiscInvalidIdentity: "Invalid node identity",
- DiscQuitting: "Client quitting",
- DiscUnexpectedIdentity: "Unexpected identity",
- DiscSelf: "Connected to self",
- DiscReadTimeout: "Read timeout",
- DiscSubprotocolError: "Subprotocol error",
+// Cap is the structure of a peer capability.
+type Cap struct {
+ Name string
+ Version uint
}
-func (d DiscReason) String() string {
- if len(discReasonToString) < int(d) {
- return "Unknown"
- }
- return discReasonToString[d]
+func (cap Cap) RlpData() interface{} {
+ return []interface{}{cap.Name, cap.Version}
}
-func (bp *baseProtocol) Offset() MsgCode {
- return baseProtocolOffset
+type capsByName []Cap
+
+func (cs capsByName) Len() int { return len(cs) }
+func (cs capsByName) Less(i, j int) bool { return cs[i].Name < cs[j].Name }
+func (cs capsByName) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] }
+
+type baseProtocol struct {
+ rw MsgReadWriter
+ peer *Peer
}
-func (bp *baseProtocol) Start(peer *Peer, rw MsgReadWriter) error {
- bp.peer, bp.rw = peer, rw
+func runBaseProtocol(peer *Peer, rw MsgReadWriter) error {
+ bp := &baseProtocol{rw, peer}
- // Do the handshake.
- // TODO: disconnect is valid before handshake, too.
- rw.WriteMsg(bp.peer.server.handshakeMsg())
+ // do handshake
+ if err := rw.WriteMsg(bp.handshakeMsg()); err != nil {
+ return err
+ }
msg, err := rw.ReadMsg()
if err != nil {
return err
}
if msg.Code != handshakeMsg {
- return NewPeerError(ProtocolBreach, " first message must be handshake")
+ return newPeerError(errProtocolBreach, "first message must be handshake, got %x", msg.Code)
}
data, err := msg.Data()
if err != nil {
- return NewPeerError(InvalidMsg, "%v", err)
+ return newPeerError(errInvalidMsg, "%v", err)
}
if err := bp.handleHandshake(data); err != nil {
return err
}
- msgin := make(chan bpMsg)
- done := make(chan error, 1)
+ // run main loop
+ quit := make(chan error, 1)
go func() {
- done <- MsgLoop(rw, baseProtocolMaxMsgSize,
- func(code MsgCode, data *ethutil.Value) error {
- msgin <- bpMsg{code, data}
- return nil
- })
+ quit <- MsgLoop(rw, baseProtocolMaxMsgSize, bp.handle)
}()
- return bp.loop(msgin, done)
+ return bp.loop(quit)
}
-func (bp *baseProtocol) loop(msgin <-chan bpMsg, quit <-chan error) error {
- logger.Debugf("pingpong keepalive started at %v\n", time.Now())
- messenger := bp.rw.(*proto).messenger
- pingTimer := time.NewTimer(pingTimeout)
- pinged := true
+var pingTimeout = 2 * time.Second
+
+func (bp *baseProtocol) loop(quit <-chan error) error {
+ ping := time.NewTimer(pingTimeout)
+ activity := bp.peer.activity.Subscribe(time.Time{})
+ lastActive := time.Time{}
+ defer ping.Stop()
+ defer activity.Unsubscribe()
- for {
+ getPeersTick := time.NewTicker(10 * time.Second)
+ defer getPeersTick.Stop()
+ err := bp.rw.EncodeMsg(getPeersMsg)
+
+ for err == nil {
select {
- case msg := <-msgin:
- if err := bp.handle(msg.code, msg.data); err != nil {
- return err
- }
- case err := <-quit:
+ case err = <-quit:
return err
- case <-messenger.pulse:
- pingTimer.Reset(pingTimeout)
- pinged = false
- case <-pingTimer.C:
- if pinged {
- return NewPeerError(PingTimeout, "")
+ case <-getPeersTick.C:
+ err = bp.rw.EncodeMsg(getPeersMsg)
+ case event := <-activity.Chan():
+ ping.Reset(pingTimeout)
+ lastActive = event.(time.Time)
+ case t := <-ping.C:
+ if lastActive.Add(pingTimeout * 2).Before(t) {
+ err = newPeerError(errPingTimeout, "")
+ } else if lastActive.Add(pingTimeout).Before(t) {
+ err = bp.rw.EncodeMsg(pingMsg)
}
- logger.Debugf("pinging at %v\n", time.Now())
- if err := bp.rw.WriteMsg(NewMsg(pingMsg)); err != nil {
- return NewPeerError(WriteError, "%v", err)
- }
- pinged = true
- pingTimer.Reset(pingTimeout)
}
}
+ return err
}
-func (bp *baseProtocol) handle(code MsgCode, data *ethutil.Value) error {
+func (bp *baseProtocol) handle(code uint64, data *ethutil.Value) error {
switch code {
case handshakeMsg:
- return NewPeerError(ProtocolBreach, " extra handshake received")
+ return newPeerError(errProtocolBreach, "extra handshake received")
case discMsg:
- logger.Infof("Disconnect requested from peer %v, reason", DiscReason(data.Get(0).Uint()))
- bp.peer.server.PeerDisconnect() <- DisconnectRequest{
- addr: bp.peer.Address,
- reason: DiscRequested,
- }
+ bp.peer.Disconnect(DiscReason(data.Get(0).Uint()))
+ return nil
case pingMsg:
- return bp.rw.WriteMsg(NewMsg(pongMsg))
+ return bp.rw.EncodeMsg(pongMsg)
case pongMsg:
- // reply for ping
case getPeersMsg:
- // Peer asked for list of connected peers.
- peersRLP := bp.peer.server.encodedPeerList()
- if peersRLP != nil {
- msg := Msg{
- Code: peersMsg,
- Size: uint32(len(peersRLP)),
- Payload: bytes.NewReader(peersRLP),
- }
- return bp.rw.WriteMsg(msg)
+ peers := bp.peerList()
+ // this is dangerous. the spec says that we should _delay_
+ // sending the response if no new information is available.
+ // this means that would need to send a response later when
+ // new peers become available.
+ //
+ // TODO: add event mechanism to notify baseProtocol for new peers
+ if len(peers) > 0 {
+ return bp.rw.EncodeMsg(peersMsg, peers)
}
case peersMsg:
bp.handlePeers(data)
default:
- return NewPeerError(InvalidMsgCode, "unknown message code %v", code)
+ return newPeerError(errInvalidMsgCode, "unknown message code %v", code)
}
return nil
}
@@ -253,62 +189,102 @@ func (bp *baseProtocol) handle(code MsgCode, data *ethutil.Value) error {
func (bp *baseProtocol) handlePeers(data *ethutil.Value) {
it := data.NewIterator()
for it.Next() {
- ip := net.IP(it.Value().Get(0).Bytes())
- port := it.Value().Get(1).Uint()
- address := &net.TCPAddr{IP: ip, Port: int(port)}
- go bp.peer.server.PeerConnect(address)
+ addr := &peerAddr{
+ IP: net.IP(it.Value().Get(0).Bytes()),
+ Port: it.Value().Get(1).Uint(),
+ Pubkey: it.Value().Get(2).Bytes(),
+ }
+ bp.peer.Debugf("received peer suggestion: %v", addr)
+ bp.peer.newPeerAddr <- addr
}
}
func (bp *baseProtocol) handleHandshake(c *ethutil.Value) error {
- var (
- remoteVersion = c.Get(0).Uint()
- id = c.Get(1).Str()
- caps = c.Get(2)
- port = c.Get(3).Uint()
- pubkey = c.Get(4).Bytes()
- )
- // Check correctness of p2p protocol version
- if remoteVersion != p2pVersion {
- return NewPeerError(P2PVersionMismatch, "Require protocol %d, received %d\n", p2pVersion, remoteVersion)
+ hs := handshake{
+ Version: c.Get(0).Uint(),
+ ID: c.Get(1).Str(),
+ Caps: nil, // decoded below
+ ListenPort: c.Get(3).Uint(),
+ NodeID: c.Get(4).Bytes(),
}
-
- // Handle the pub key (validation, uniqueness)
- if len(pubkey) == 0 {
- return NewPeerError(PubkeyMissing, "not supplied in handshake.")
+ if hs.Version != baseProtocolVersion {
+ return newPeerError(errP2PVersionMismatch, "Require protocol %d, received %d\n",
+ baseProtocolVersion, hs.Version)
}
-
- if len(pubkey) != 64 {
- return NewPeerError(PubkeyInvalid, "require 512 bit, got %v", len(pubkey)*8)
+ if len(hs.NodeID) == 0 {
+ return newPeerError(errPubkeyMissing, "")
+ }
+ if len(hs.NodeID) != 64 {
+ return newPeerError(errPubkeyInvalid, "require 512 bit, got %v", len(hs.NodeID)*8)
+ }
+ if da := bp.peer.dialAddr; da != nil {
+ // verify that the peer we wanted to connect to
+ // actually holds the target public key.
+ if da.Pubkey != nil && !bytes.Equal(da.Pubkey, hs.NodeID) {
+ return newPeerError(errPubkeyForbidden, "dial address pubkey mismatch")
+ }
+ }
+ pa := newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID)
+ if err := bp.peer.pubkeyHook(pa); err != nil {
+ return newPeerError(errPubkeyForbidden, "%v", err)
+ }
+ capsIt := c.Get(2).NewIterator()
+ for capsIt.Next() {
+ cap := capsIt.Value()
+ name := cap.Get(0).Str()
+ if name != "" {
+ hs.Caps = append(hs.Caps, Cap{Name: name, Version: uint(cap.Get(1).Uint())})
+ }
}
- // self connect detection
- if bytes.Compare(bp.peer.server.ClientIdentity().Pubkey()[1:], pubkey) == 0 {
- return NewPeerError(PubkeyForbidden, "not allowed to connect to self")
+ var addr *peerAddr
+ if hs.ListenPort != 0 {
+ addr = newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID)
+ addr.Port = hs.ListenPort
}
+ bp.peer.setHandshakeInfo(&hs, addr, hs.Caps)
+ bp.peer.startSubprotocols(hs.Caps)
+ return nil
+}
- // register pubkey on server. this also sets the pubkey on the peer (need lock)
- if err := bp.peer.server.RegisterPubkey(bp.peer, pubkey); err != nil {
- return NewPeerError(PubkeyForbidden, err.Error())
+func (bp *baseProtocol) handshakeMsg() Msg {
+ var (
+ port uint64
+ caps []interface{}
+ )
+ if bp.peer.ourListenAddr != nil {
+ port = bp.peer.ourListenAddr.Port
}
+ for _, proto := range bp.peer.protocols {
+ caps = append(caps, proto.cap())
+ }
+ return NewMsg(handshakeMsg,
+ baseProtocolVersion,
+ bp.peer.ourID.String(),
+ caps,
+ port,
+ bp.peer.ourID.Pubkey()[1:],
+ )
+}
- // check port
- if bp.peer.Inbound {
- uint16port := uint16(port)
- if bp.peer.Port > 0 && bp.peer.Port != uint16port {
- return NewPeerError(PortMismatch, "port mismatch: %v != %v", bp.peer.Port, port)
- } else {
- bp.peer.Port = uint16port
+func (bp *baseProtocol) peerList() []ethutil.RlpEncodable {
+ peers := bp.peer.otherPeers()
+ ds := make([]ethutil.RlpEncodable, 0, len(peers))
+ for _, p := range peers {
+ p.infolock.Lock()
+ addr := p.listenAddr
+ p.infolock.Unlock()
+ // filter out this peer and peers that are not listening or
+ // have not completed the handshake.
+ // TODO: track previously sent peers and exclude them as well.
+ if p == bp.peer || addr == nil {
+ continue
}
+ ds = append(ds, addr)
}
-
- capsIt := caps.NewIterator()
- for capsIt.Next() {
- cap := capsIt.Value().Str()
- bp.peer.Caps = append(bp.peer.Caps, cap)
+ ourAddr := bp.peer.ourListenAddr
+ if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() {
+ ds = append(ds, ourAddr)
}
- sort.Strings(bp.peer.Caps)
- bp.rw.(*proto).messenger.setRemoteProtocols(bp.peer.Caps)
- bp.peer.Id = id
- return nil
+ return ds
}