From 771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb Mon Sep 17 00:00:00 2001 From: zelig Date: Thu, 23 Oct 2014 16:57:54 +0100 Subject: initial commit of p2p package --- p2p/protocol.go | 278 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 278 insertions(+) create mode 100644 p2p/protocol.go (limited to 'p2p/protocol.go') diff --git a/p2p/protocol.go b/p2p/protocol.go new file mode 100644 index 000000000..5d05ced7d --- /dev/null +++ b/p2p/protocol.go @@ -0,0 +1,278 @@ +package p2p + +import ( + "bytes" + "fmt" + "net" + "sort" + "sync" + "time" +) + +type Protocol interface { + Start() + Stop() + HandleIn(*Msg, chan *Msg) + HandleOut(*Msg) bool + Offset() MsgCode + Name() string +} + +const ( + P2PVersion = 0 + pingTimeout = 2 + pingGracePeriod = 2 +) + +const ( + HandshakeMsg = iota + DiscMsg + PingMsg + PongMsg + GetPeersMsg + PeersMsg + offset = 16 +) + +type ProtocolState uint8 + +const ( + nullState = iota + handshakeReceived +) + +type DiscReason byte + +const ( + // Values are given explicitly instead of by iota because these values are + // defined by the wire protocol spec; it is easier for humans to ensure + // correctness when values are explicit. + DiscRequested = 0x00 + DiscNetworkError = 0x01 + DiscProtocolError = 0x02 + DiscUselessPeer = 0x03 + DiscTooManyPeers = 0x04 + DiscAlreadyConnected = 0x05 + DiscIncompatibleVersion = 0x06 + DiscInvalidIdentity = 0x07 + DiscQuitting = 0x08 + DiscUnexpectedIdentity = 0x09 + DiscSelf = 0x0a + DiscReadTimeout = 0x0b + DiscSubprotocolError = 0x10 +) + +var discReasonToString = map[DiscReason]string{ + DiscRequested: "Disconnect requested", + DiscNetworkError: "Network error", + DiscProtocolError: "Breach of protocol", + DiscUselessPeer: "Useless peer", + DiscTooManyPeers: "Too many peers", + DiscAlreadyConnected: "Already connected", + DiscIncompatibleVersion: "Incompatible P2P protocol version", + DiscInvalidIdentity: "Invalid node identity", + DiscQuitting: "Client quitting", + DiscUnexpectedIdentity: "Unexpected identity", + DiscSelf: "Connected to self", + DiscReadTimeout: "Read timeout", + DiscSubprotocolError: "Subprotocol error", +} + +func (d DiscReason) String() string { + if len(discReasonToString) < int(d) { + return "Unknown" + } + + return discReasonToString[d] +} + +type BaseProtocol struct { + peer *Peer + state ProtocolState + stateLock sync.RWMutex +} + +func NewBaseProtocol(peer *Peer) *BaseProtocol { + self := &BaseProtocol{ + peer: peer, + } + + return self +} + +func (self *BaseProtocol) Start() { + if self.peer != nil { + self.peer.Write("", self.peer.Server().Handshake()) + go self.peer.Messenger().PingPong( + pingTimeout*time.Second, + pingGracePeriod*time.Second, + self.Ping, + self.Timeout, + ) + } +} + +func (self *BaseProtocol) Stop() { +} + +func (self *BaseProtocol) Ping() { + msg, _ := NewMsg(PingMsg) + self.peer.Write("", msg) +} + +func (self *BaseProtocol) Timeout() { + self.peerError(PingTimeout, "") +} + +func (self *BaseProtocol) Name() string { + return "" +} + +func (self *BaseProtocol) Offset() MsgCode { + return offset +} + +func (self *BaseProtocol) CheckState(state ProtocolState) bool { + self.stateLock.RLock() + self.stateLock.RUnlock() + if self.state != state { + return false + } else { + return true + } +} + +func (self *BaseProtocol) HandleIn(msg *Msg, response chan *Msg) { + if msg.Code() == HandshakeMsg { + self.handleHandshake(msg) + } else { + if !self.CheckState(handshakeReceived) { + self.peerError(ProtocolBreach, "message code %v not allowed", msg.Code()) + close(response) + return + } + switch msg.Code() { + case DiscMsg: + logger.Infof("Disconnect requested from peer %v, reason", DiscReason(msg.Data().Get(0).Uint())) + self.peer.Server().PeerDisconnect() <- DisconnectRequest{ + addr: self.peer.Address, + reason: DiscRequested, + } + case PingMsg: + out, _ := NewMsg(PongMsg) + response <- out + case PongMsg: + case GetPeersMsg: + // Peer asked for list of connected peers + if out, err := self.peer.Server().PeersMessage(); err != nil { + response <- out + } + case PeersMsg: + self.handlePeers(msg) + default: + self.peerError(InvalidMsgCode, "unknown message code %v", msg.Code()) + } + } + close(response) +} + +func (self *BaseProtocol) HandleOut(msg *Msg) (allowed bool) { + // somewhat overly paranoid + allowed = msg.Code() == HandshakeMsg || msg.Code() == DiscMsg || msg.Code() < self.Offset() && self.CheckState(handshakeReceived) + return +} + +func (self *BaseProtocol) peerError(errorCode ErrorCode, format string, v ...interface{}) { + err := NewPeerError(errorCode, format, v...) + logger.Warnln(err) + fmt.Println(self.peer, err) + if self.peer != nil { + self.peer.PeerErrorChan() <- err + } +} + +func (self *BaseProtocol) handlePeers(msg *Msg) { + it := msg.Data().NewIterator() + for it.Next() { + ip := net.IP(it.Value().Get(0).Bytes()) + port := it.Value().Get(1).Uint() + address := &net.TCPAddr{IP: ip, Port: int(port)} + go self.peer.Server().PeerConnect(address) + } +} + +func (self *BaseProtocol) handleHandshake(msg *Msg) { + self.stateLock.Lock() + defer self.stateLock.Unlock() + if self.state != nullState { + self.peerError(ProtocolBreach, "extra handshake") + return + } + + c := msg.Data() + + var ( + p2pVersion = c.Get(0).Uint() + id = c.Get(1).Str() + caps = c.Get(2) + port = c.Get(3).Uint() + pubkey = c.Get(4).Bytes() + ) + fmt.Printf("handshake received %v, %v, %v, %v, %v ", p2pVersion, id, caps, port, pubkey) + + // Check correctness of p2p protocol version + if p2pVersion != P2PVersion { + self.peerError(P2PVersionMismatch, "Require protocol %d, received %d\n", P2PVersion, p2pVersion) + return + } + + // Handle the pub key (validation, uniqueness) + if len(pubkey) == 0 { + self.peerError(PubkeyMissing, "not supplied in handshake.") + return + } + + if len(pubkey) != 64 { + self.peerError(PubkeyInvalid, "require 512 bit, got %v", len(pubkey)*8) + return + } + + // Self connect detection + if bytes.Compare(self.peer.Server().ClientIdentity().Pubkey()[1:], pubkey) == 0 { + self.peerError(PubkeyForbidden, "not allowed to connect to self") + return + } + + // register pubkey on server. this also sets the pubkey on the peer (need lock) + if err := self.peer.Server().RegisterPubkey(self.peer, pubkey); err != nil { + self.peerError(PubkeyForbidden, err.Error()) + return + } + + // check port + if self.peer.Inbound { + uint16port := uint16(port) + if self.peer.Port > 0 && self.peer.Port != uint16port { + self.peerError(PortMismatch, "port mismatch: %v != %v", self.peer.Port, port) + return + } else { + self.peer.Port = uint16port + } + } + + capsIt := caps.NewIterator() + for capsIt.Next() { + cap := capsIt.Value().Str() + self.peer.Caps = append(self.peer.Caps, cap) + } + sort.Strings(self.peer.Caps) + self.peer.Messenger().AddProtocols(self.peer.Caps) + + self.peer.Id = id + + self.state = handshakeReceived + + //p.ethereum.PushPeer(p) + // p.ethereum.reactor.Post("peerList", p.ethereum.Peers()) + return +} -- cgit v1.2.3 From f38052c499c1fee61423efeddb1f52677f1442e9 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 4 Nov 2014 13:21:44 +0100 Subject: p2p: rework protocol API --- p2p/protocol.go | 353 +++++++++++++++++++++++++++++++------------------------- 1 file changed, 196 insertions(+), 157 deletions(-) (limited to 'p2p/protocol.go') diff --git a/p2p/protocol.go b/p2p/protocol.go index 5d05ced7d..ccc275287 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -2,43 +2,101 @@ package p2p import ( "bytes" - "fmt" "net" "sort" - "sync" "time" + + "github.com/ethereum/go-ethereum/ethutil" ) +// Protocol is implemented by P2P subprotocols. type Protocol interface { - Start() - Stop() - HandleIn(*Msg, chan *Msg) - HandleOut(*Msg) bool + // Start is called when the protocol becomes active. + // It should read and write messages from rw. + // Messages must be fully consumed. + // + // The connection is closed when Start returns. It should return + // any protocol-level error (such as an I/O error) that is + // encountered. + Start(peer *Peer, rw MsgReadWriter) error + + // Offset should return the number of message codes + // used by the protocol. Offset() MsgCode - Name() string +} + +type MsgReader interface { + ReadMsg() (Msg, error) +} + +type MsgWriter interface { + WriteMsg(Msg) error +} + +// MsgReadWriter is passed to protocols. Protocol implementations can +// use it to write messages back to a connected peer. +type MsgReadWriter interface { + MsgReader + MsgWriter +} + +type MsgHandler func(code MsgCode, data *ethutil.Value) error + +// MsgLoop reads messages off the given reader and +// calls the handler function for each decoded message until +// it returns an error or the peer connection is closed. +// +// If a message is larger than the given maximum size, RunProtocol +// returns an appropriate error.n +func MsgLoop(r MsgReader, maxsize uint32, handler MsgHandler) error { + for { + msg, err := r.ReadMsg() + if err != nil { + return err + } + if msg.Size > maxsize { + return NewPeerError(InvalidMsg, "size %d exceeds maximum size of %d", msg.Size, maxsize) + } + value, err := msg.Data() + if err != nil { + return err + } + if err := handler(msg.Code, value); err != nil { + return err + } + } +} + +// the ÐΞVp2p base protocol +type baseProtocol struct { + rw MsgReadWriter + peer *Peer +} + +type bpMsg struct { + code MsgCode + data *ethutil.Value } const ( - P2PVersion = 0 - pingTimeout = 2 - pingGracePeriod = 2 + p2pVersion = 0 + pingTimeout = 2 * time.Second + pingGracePeriod = 2 * time.Second ) const ( - HandshakeMsg = iota - DiscMsg - PingMsg - PongMsg - GetPeersMsg - PeersMsg - offset = 16 + // message codes + handshakeMsg = iota + discMsg + pingMsg + pongMsg + getPeersMsg + peersMsg ) -type ProtocolState uint8 - const ( - nullState = iota - handshakeReceived + baseProtocolOffset MsgCode = 16 + baseProtocolMaxMsgSize = 500 * 1024 ) type DiscReason byte @@ -62,7 +120,7 @@ const ( DiscSubprotocolError = 0x10 ) -var discReasonToString = map[DiscReason]string{ +var discReasonToString = [DiscSubprotocolError + 1]string{ DiscRequested: "Disconnect requested", DiscNetworkError: "Network error", DiscProtocolError: "Breach of protocol", @@ -82,197 +140,178 @@ func (d DiscReason) String() string { if len(discReasonToString) < int(d) { return "Unknown" } - return discReasonToString[d] } -type BaseProtocol struct { - peer *Peer - state ProtocolState - stateLock sync.RWMutex +func (bp *baseProtocol) Ping() { } -func NewBaseProtocol(peer *Peer) *BaseProtocol { - self := &BaseProtocol{ - peer: peer, - } - - return self +func (bp *baseProtocol) Offset() MsgCode { + return baseProtocolOffset } -func (self *BaseProtocol) Start() { - if self.peer != nil { - self.peer.Write("", self.peer.Server().Handshake()) - go self.peer.Messenger().PingPong( - pingTimeout*time.Second, - pingGracePeriod*time.Second, - self.Ping, - self.Timeout, - ) +func (bp *baseProtocol) Start(peer *Peer, rw MsgReadWriter) error { + bp.peer, bp.rw = peer, rw + + // Do the handshake. + // TODO: disconnect is valid before handshake, too. + rw.WriteMsg(bp.peer.server.handshakeMsg()) + msg, err := rw.ReadMsg() + if err != nil { + return err + } + if msg.Code != handshakeMsg { + return NewPeerError(ProtocolBreach, " first message must be handshake") + } + data, err := msg.Data() + if err != nil { + return NewPeerError(InvalidMsg, "%v", err) + } + if err := bp.handleHandshake(data); err != nil { + return err } -} -func (self *BaseProtocol) Stop() { + msgin := make(chan bpMsg) + done := make(chan error, 1) + go func() { + done <- MsgLoop(rw, baseProtocolMaxMsgSize, + func(code MsgCode, data *ethutil.Value) error { + msgin <- bpMsg{code, data} + return nil + }) + }() + return bp.loop(msgin, done) } -func (self *BaseProtocol) Ping() { - msg, _ := NewMsg(PingMsg) - self.peer.Write("", msg) +func (bp *baseProtocol) loop(msgin <-chan bpMsg, quit <-chan error) error { + logger.Debugf("pingpong keepalive started at %v\n", time.Now()) + messenger := bp.rw.(*proto).messenger + pingTimer := time.NewTimer(pingTimeout) + pinged := true + + for { + select { + case msg := <-msgin: + if err := bp.handle(msg.code, msg.data); err != nil { + return err + } + case err := <-quit: + return err + case <-messenger.pulse: + pingTimer.Reset(pingTimeout) + pinged = false + case <-pingTimer.C: + if pinged { + return NewPeerError(PingTimeout, "") + } + logger.Debugf("pinging at %v\n", time.Now()) + if err := bp.rw.WriteMsg(NewMsg(pingMsg)); err != nil { + return NewPeerError(WriteError, "%v", err) + } + pinged = true + pingTimer.Reset(pingTimeout) + } + } } -func (self *BaseProtocol) Timeout() { - self.peerError(PingTimeout, "") -} +func (bp *baseProtocol) handle(code MsgCode, data *ethutil.Value) error { + switch code { + case handshakeMsg: + return NewPeerError(ProtocolBreach, " extra handshake received") -func (self *BaseProtocol) Name() string { - return "" -} + case discMsg: + logger.Infof("Disconnect requested from peer %v, reason", DiscReason(data.Get(0).Uint())) + bp.peer.server.PeerDisconnect() <- DisconnectRequest{ + addr: bp.peer.Address, + reason: DiscRequested, + } -func (self *BaseProtocol) Offset() MsgCode { - return offset -} + case pingMsg: + return bp.rw.WriteMsg(NewMsg(pongMsg)) -func (self *BaseProtocol) CheckState(state ProtocolState) bool { - self.stateLock.RLock() - self.stateLock.RUnlock() - if self.state != state { - return false - } else { - return true - } -} + case pongMsg: + // reply for ping -func (self *BaseProtocol) HandleIn(msg *Msg, response chan *Msg) { - if msg.Code() == HandshakeMsg { - self.handleHandshake(msg) - } else { - if !self.CheckState(handshakeReceived) { - self.peerError(ProtocolBreach, "message code %v not allowed", msg.Code()) - close(response) - return - } - switch msg.Code() { - case DiscMsg: - logger.Infof("Disconnect requested from peer %v, reason", DiscReason(msg.Data().Get(0).Uint())) - self.peer.Server().PeerDisconnect() <- DisconnectRequest{ - addr: self.peer.Address, - reason: DiscRequested, - } - case PingMsg: - out, _ := NewMsg(PongMsg) - response <- out - case PongMsg: - case GetPeersMsg: - // Peer asked for list of connected peers - if out, err := self.peer.Server().PeersMessage(); err != nil { - response <- out + case getPeersMsg: + // Peer asked for list of connected peers. + peersRLP := bp.peer.server.encodedPeerList() + if peersRLP != nil { + msg := Msg{ + Code: peersMsg, + Size: uint32(len(peersRLP)), + Payload: bytes.NewReader(peersRLP), } - case PeersMsg: - self.handlePeers(msg) - default: - self.peerError(InvalidMsgCode, "unknown message code %v", msg.Code()) + return bp.rw.WriteMsg(msg) } - } - close(response) -} -func (self *BaseProtocol) HandleOut(msg *Msg) (allowed bool) { - // somewhat overly paranoid - allowed = msg.Code() == HandshakeMsg || msg.Code() == DiscMsg || msg.Code() < self.Offset() && self.CheckState(handshakeReceived) - return -} + case peersMsg: + bp.handlePeers(data) -func (self *BaseProtocol) peerError(errorCode ErrorCode, format string, v ...interface{}) { - err := NewPeerError(errorCode, format, v...) - logger.Warnln(err) - fmt.Println(self.peer, err) - if self.peer != nil { - self.peer.PeerErrorChan() <- err + default: + return NewPeerError(InvalidMsgCode, "unknown message code %v", code) } + return nil } -func (self *BaseProtocol) handlePeers(msg *Msg) { - it := msg.Data().NewIterator() +func (bp *baseProtocol) handlePeers(data *ethutil.Value) { + it := data.NewIterator() for it.Next() { ip := net.IP(it.Value().Get(0).Bytes()) port := it.Value().Get(1).Uint() address := &net.TCPAddr{IP: ip, Port: int(port)} - go self.peer.Server().PeerConnect(address) + go bp.peer.server.PeerConnect(address) } } -func (self *BaseProtocol) handleHandshake(msg *Msg) { - self.stateLock.Lock() - defer self.stateLock.Unlock() - if self.state != nullState { - self.peerError(ProtocolBreach, "extra handshake") - return - } - - c := msg.Data() - +func (bp *baseProtocol) handleHandshake(c *ethutil.Value) error { var ( - p2pVersion = c.Get(0).Uint() - id = c.Get(1).Str() - caps = c.Get(2) - port = c.Get(3).Uint() - pubkey = c.Get(4).Bytes() + remoteVersion = c.Get(0).Uint() + id = c.Get(1).Str() + caps = c.Get(2) + port = c.Get(3).Uint() + pubkey = c.Get(4).Bytes() ) - fmt.Printf("handshake received %v, %v, %v, %v, %v ", p2pVersion, id, caps, port, pubkey) - // Check correctness of p2p protocol version - if p2pVersion != P2PVersion { - self.peerError(P2PVersionMismatch, "Require protocol %d, received %d\n", P2PVersion, p2pVersion) - return + if remoteVersion != p2pVersion { + return NewPeerError(P2PVersionMismatch, "Require protocol %d, received %d\n", p2pVersion, remoteVersion) } // Handle the pub key (validation, uniqueness) if len(pubkey) == 0 { - self.peerError(PubkeyMissing, "not supplied in handshake.") - return + return NewPeerError(PubkeyMissing, "not supplied in handshake.") } if len(pubkey) != 64 { - self.peerError(PubkeyInvalid, "require 512 bit, got %v", len(pubkey)*8) - return + return NewPeerError(PubkeyInvalid, "require 512 bit, got %v", len(pubkey)*8) } - // Self connect detection - if bytes.Compare(self.peer.Server().ClientIdentity().Pubkey()[1:], pubkey) == 0 { - self.peerError(PubkeyForbidden, "not allowed to connect to self") - return + // self connect detection + if bytes.Compare(bp.peer.server.ClientIdentity().Pubkey()[1:], pubkey) == 0 { + return NewPeerError(PubkeyForbidden, "not allowed to connect to bp") } // register pubkey on server. this also sets the pubkey on the peer (need lock) - if err := self.peer.Server().RegisterPubkey(self.peer, pubkey); err != nil { - self.peerError(PubkeyForbidden, err.Error()) - return + if err := bp.peer.server.RegisterPubkey(bp.peer, pubkey); err != nil { + return NewPeerError(PubkeyForbidden, err.Error()) } // check port - if self.peer.Inbound { + if bp.peer.Inbound { uint16port := uint16(port) - if self.peer.Port > 0 && self.peer.Port != uint16port { - self.peerError(PortMismatch, "port mismatch: %v != %v", self.peer.Port, port) - return + if bp.peer.Port > 0 && bp.peer.Port != uint16port { + return NewPeerError(PortMismatch, "port mismatch: %v != %v", bp.peer.Port, port) } else { - self.peer.Port = uint16port + bp.peer.Port = uint16port } } capsIt := caps.NewIterator() for capsIt.Next() { cap := capsIt.Value().Str() - self.peer.Caps = append(self.peer.Caps, cap) + bp.peer.Caps = append(bp.peer.Caps, cap) } - sort.Strings(self.peer.Caps) - self.peer.Messenger().AddProtocols(self.peer.Caps) - - self.peer.Id = id - - self.state = handshakeReceived - - //p.ethereum.PushPeer(p) - // p.ethereum.reactor.Post("peerList", p.ethereum.Peers()) - return + sort.Strings(bp.peer.Caps) + bp.rw.(*proto).messenger.setRemoteProtocols(bp.peer.Caps) + bp.peer.Id = id + return nil } -- cgit v1.2.3 From 7149191dd999f4d192398e4b0821b656e62f3345 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 5 Nov 2014 01:28:46 +0100 Subject: p2p: fix issues found during review --- p2p/protocol.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) (limited to 'p2p/protocol.go') diff --git a/p2p/protocol.go b/p2p/protocol.go index ccc275287..d22ba70cb 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -143,9 +143,6 @@ func (d DiscReason) String() string { return discReasonToString[d] } -func (bp *baseProtocol) Ping() { -} - func (bp *baseProtocol) Offset() MsgCode { return baseProtocolOffset } @@ -287,7 +284,7 @@ func (bp *baseProtocol) handleHandshake(c *ethutil.Value) error { // self connect detection if bytes.Compare(bp.peer.server.ClientIdentity().Pubkey()[1:], pubkey) == 0 { - return NewPeerError(PubkeyForbidden, "not allowed to connect to bp") + return NewPeerError(PubkeyForbidden, "not allowed to connect to self") } // register pubkey on server. this also sets the pubkey on the peer (need lock) -- cgit v1.2.3 From 59b63caf5e4de64ceb7dcdf01551a080f53b1672 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 21 Nov 2014 21:48:49 +0100 Subject: p2p: API cleanup and PoC 7 compatibility Whoa, one more big commit. I didn't manage to untangle the changes while working towards compatibility. --- p2p/protocol.go | 412 ++++++++++++++++++++++++++------------------------------ 1 file changed, 194 insertions(+), 218 deletions(-) (limited to 'p2p/protocol.go') diff --git a/p2p/protocol.go b/p2p/protocol.go index d22ba70cb..169dcdb6e 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -3,249 +3,185 @@ package p2p import ( "bytes" "net" - "sort" "time" "github.com/ethereum/go-ethereum/ethutil" ) -// Protocol is implemented by P2P subprotocols. -type Protocol interface { - // Start is called when the protocol becomes active. - // It should read and write messages from rw. - // Messages must be fully consumed. - // - // The connection is closed when Start returns. It should return - // any protocol-level error (such as an I/O error) that is - // encountered. - Start(peer *Peer, rw MsgReadWriter) error +// Protocol represents a P2P subprotocol implementation. +type Protocol struct { + // Name should contain the official protocol name, + // often a three-letter word. + Name string - // Offset should return the number of message codes - // used by the protocol. - Offset() MsgCode -} + // Version should contain the version number of the protocol. + Version uint -type MsgReader interface { - ReadMsg() (Msg, error) -} - -type MsgWriter interface { - WriteMsg(Msg) error -} - -// MsgReadWriter is passed to protocols. Protocol implementations can -// use it to write messages back to a connected peer. -type MsgReadWriter interface { - MsgReader - MsgWriter -} + // Length should contain the number of message codes used + // by the protocol. + Length uint64 -type MsgHandler func(code MsgCode, data *ethutil.Value) error - -// MsgLoop reads messages off the given reader and -// calls the handler function for each decoded message until -// it returns an error or the peer connection is closed. -// -// If a message is larger than the given maximum size, RunProtocol -// returns an appropriate error.n -func MsgLoop(r MsgReader, maxsize uint32, handler MsgHandler) error { - for { - msg, err := r.ReadMsg() - if err != nil { - return err - } - if msg.Size > maxsize { - return NewPeerError(InvalidMsg, "size %d exceeds maximum size of %d", msg.Size, maxsize) - } - value, err := msg.Data() - if err != nil { - return err - } - if err := handler(msg.Code, value); err != nil { - return err - } - } -} - -// the ÐΞVp2p base protocol -type baseProtocol struct { - rw MsgReadWriter - peer *Peer + // Run is called in a new groutine when the protocol has been + // negotiated with a peer. It should read and write messages from + // rw. The Payload for each message must be fully consumed. + // + // The peer connection is closed when Start returns. It should return + // any protocol-level error (such as an I/O error) that is + // encountered. + Run func(peer *Peer, rw MsgReadWriter) error } -type bpMsg struct { - code MsgCode - data *ethutil.Value +func (p Protocol) cap() Cap { + return Cap{p.Name, p.Version} } const ( - p2pVersion = 0 - pingTimeout = 2 * time.Second - pingGracePeriod = 2 * time.Second + baseProtocolVersion = 2 + baseProtocolLength = uint64(16) + baseProtocolMaxMsgSize = 10 * 1024 * 1024 ) const ( - // message codes - handshakeMsg = iota - discMsg - pingMsg - pongMsg - getPeersMsg - peersMsg + // devp2p message codes + handshakeMsg = 0x00 + discMsg = 0x01 + pingMsg = 0x02 + pongMsg = 0x03 + getPeersMsg = 0x04 + peersMsg = 0x05 ) -const ( - baseProtocolOffset MsgCode = 16 - baseProtocolMaxMsgSize = 500 * 1024 -) - -type DiscReason byte +// handshake is the structure of a handshake list. +type handshake struct { + Version uint64 + ID string + Caps []Cap + ListenPort uint64 + NodeID []byte +} -const ( - // Values are given explicitly instead of by iota because these values are - // defined by the wire protocol spec; it is easier for humans to ensure - // correctness when values are explicit. - DiscRequested = 0x00 - DiscNetworkError = 0x01 - DiscProtocolError = 0x02 - DiscUselessPeer = 0x03 - DiscTooManyPeers = 0x04 - DiscAlreadyConnected = 0x05 - DiscIncompatibleVersion = 0x06 - DiscInvalidIdentity = 0x07 - DiscQuitting = 0x08 - DiscUnexpectedIdentity = 0x09 - DiscSelf = 0x0a - DiscReadTimeout = 0x0b - DiscSubprotocolError = 0x10 -) +func (h *handshake) String() string { + return h.ID +} +func (h *handshake) Pubkey() []byte { + return h.NodeID +} -var discReasonToString = [DiscSubprotocolError + 1]string{ - DiscRequested: "Disconnect requested", - DiscNetworkError: "Network error", - DiscProtocolError: "Breach of protocol", - DiscUselessPeer: "Useless peer", - DiscTooManyPeers: "Too many peers", - DiscAlreadyConnected: "Already connected", - DiscIncompatibleVersion: "Incompatible P2P protocol version", - DiscInvalidIdentity: "Invalid node identity", - DiscQuitting: "Client quitting", - DiscUnexpectedIdentity: "Unexpected identity", - DiscSelf: "Connected to self", - DiscReadTimeout: "Read timeout", - DiscSubprotocolError: "Subprotocol error", +// Cap is the structure of a peer capability. +type Cap struct { + Name string + Version uint } -func (d DiscReason) String() string { - if len(discReasonToString) < int(d) { - return "Unknown" - } - return discReasonToString[d] +func (cap Cap) RlpData() interface{} { + return []interface{}{cap.Name, cap.Version} } -func (bp *baseProtocol) Offset() MsgCode { - return baseProtocolOffset +type capsByName []Cap + +func (cs capsByName) Len() int { return len(cs) } +func (cs capsByName) Less(i, j int) bool { return cs[i].Name < cs[j].Name } +func (cs capsByName) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] } + +type baseProtocol struct { + rw MsgReadWriter + peer *Peer } -func (bp *baseProtocol) Start(peer *Peer, rw MsgReadWriter) error { - bp.peer, bp.rw = peer, rw +func runBaseProtocol(peer *Peer, rw MsgReadWriter) error { + bp := &baseProtocol{rw, peer} - // Do the handshake. - // TODO: disconnect is valid before handshake, too. - rw.WriteMsg(bp.peer.server.handshakeMsg()) + // do handshake + if err := rw.WriteMsg(bp.handshakeMsg()); err != nil { + return err + } msg, err := rw.ReadMsg() if err != nil { return err } if msg.Code != handshakeMsg { - return NewPeerError(ProtocolBreach, " first message must be handshake") + return newPeerError(errProtocolBreach, "first message must be handshake, got %x", msg.Code) } data, err := msg.Data() if err != nil { - return NewPeerError(InvalidMsg, "%v", err) + return newPeerError(errInvalidMsg, "%v", err) } if err := bp.handleHandshake(data); err != nil { return err } - msgin := make(chan bpMsg) - done := make(chan error, 1) + // run main loop + quit := make(chan error, 1) go func() { - done <- MsgLoop(rw, baseProtocolMaxMsgSize, - func(code MsgCode, data *ethutil.Value) error { - msgin <- bpMsg{code, data} - return nil - }) + quit <- MsgLoop(rw, baseProtocolMaxMsgSize, bp.handle) }() - return bp.loop(msgin, done) + return bp.loop(quit) } -func (bp *baseProtocol) loop(msgin <-chan bpMsg, quit <-chan error) error { - logger.Debugf("pingpong keepalive started at %v\n", time.Now()) - messenger := bp.rw.(*proto).messenger - pingTimer := time.NewTimer(pingTimeout) - pinged := true +var pingTimeout = 2 * time.Second + +func (bp *baseProtocol) loop(quit <-chan error) error { + ping := time.NewTimer(pingTimeout) + activity := bp.peer.activity.Subscribe(time.Time{}) + lastActive := time.Time{} + defer ping.Stop() + defer activity.Unsubscribe() - for { + getPeersTick := time.NewTicker(10 * time.Second) + defer getPeersTick.Stop() + err := bp.rw.EncodeMsg(getPeersMsg) + + for err == nil { select { - case msg := <-msgin: - if err := bp.handle(msg.code, msg.data); err != nil { - return err - } - case err := <-quit: + case err = <-quit: return err - case <-messenger.pulse: - pingTimer.Reset(pingTimeout) - pinged = false - case <-pingTimer.C: - if pinged { - return NewPeerError(PingTimeout, "") + case <-getPeersTick.C: + err = bp.rw.EncodeMsg(getPeersMsg) + case event := <-activity.Chan(): + ping.Reset(pingTimeout) + lastActive = event.(time.Time) + case t := <-ping.C: + if lastActive.Add(pingTimeout * 2).Before(t) { + err = newPeerError(errPingTimeout, "") + } else if lastActive.Add(pingTimeout).Before(t) { + err = bp.rw.EncodeMsg(pingMsg) } - logger.Debugf("pinging at %v\n", time.Now()) - if err := bp.rw.WriteMsg(NewMsg(pingMsg)); err != nil { - return NewPeerError(WriteError, "%v", err) - } - pinged = true - pingTimer.Reset(pingTimeout) } } + return err } -func (bp *baseProtocol) handle(code MsgCode, data *ethutil.Value) error { +func (bp *baseProtocol) handle(code uint64, data *ethutil.Value) error { switch code { case handshakeMsg: - return NewPeerError(ProtocolBreach, " extra handshake received") + return newPeerError(errProtocolBreach, "extra handshake received") case discMsg: - logger.Infof("Disconnect requested from peer %v, reason", DiscReason(data.Get(0).Uint())) - bp.peer.server.PeerDisconnect() <- DisconnectRequest{ - addr: bp.peer.Address, - reason: DiscRequested, - } + bp.peer.Disconnect(DiscReason(data.Get(0).Uint())) + return nil case pingMsg: - return bp.rw.WriteMsg(NewMsg(pongMsg)) + return bp.rw.EncodeMsg(pongMsg) case pongMsg: - // reply for ping case getPeersMsg: - // Peer asked for list of connected peers. - peersRLP := bp.peer.server.encodedPeerList() - if peersRLP != nil { - msg := Msg{ - Code: peersMsg, - Size: uint32(len(peersRLP)), - Payload: bytes.NewReader(peersRLP), - } - return bp.rw.WriteMsg(msg) + peers := bp.peerList() + // this is dangerous. the spec says that we should _delay_ + // sending the response if no new information is available. + // this means that would need to send a response later when + // new peers become available. + // + // TODO: add event mechanism to notify baseProtocol for new peers + if len(peers) > 0 { + return bp.rw.EncodeMsg(peersMsg, peers) } case peersMsg: bp.handlePeers(data) default: - return NewPeerError(InvalidMsgCode, "unknown message code %v", code) + return newPeerError(errInvalidMsgCode, "unknown message code %v", code) } return nil } @@ -253,62 +189,102 @@ func (bp *baseProtocol) handle(code MsgCode, data *ethutil.Value) error { func (bp *baseProtocol) handlePeers(data *ethutil.Value) { it := data.NewIterator() for it.Next() { - ip := net.IP(it.Value().Get(0).Bytes()) - port := it.Value().Get(1).Uint() - address := &net.TCPAddr{IP: ip, Port: int(port)} - go bp.peer.server.PeerConnect(address) + addr := &peerAddr{ + IP: net.IP(it.Value().Get(0).Bytes()), + Port: it.Value().Get(1).Uint(), + Pubkey: it.Value().Get(2).Bytes(), + } + bp.peer.Debugf("received peer suggestion: %v", addr) + bp.peer.newPeerAddr <- addr } } func (bp *baseProtocol) handleHandshake(c *ethutil.Value) error { - var ( - remoteVersion = c.Get(0).Uint() - id = c.Get(1).Str() - caps = c.Get(2) - port = c.Get(3).Uint() - pubkey = c.Get(4).Bytes() - ) - // Check correctness of p2p protocol version - if remoteVersion != p2pVersion { - return NewPeerError(P2PVersionMismatch, "Require protocol %d, received %d\n", p2pVersion, remoteVersion) + hs := handshake{ + Version: c.Get(0).Uint(), + ID: c.Get(1).Str(), + Caps: nil, // decoded below + ListenPort: c.Get(3).Uint(), + NodeID: c.Get(4).Bytes(), } - - // Handle the pub key (validation, uniqueness) - if len(pubkey) == 0 { - return NewPeerError(PubkeyMissing, "not supplied in handshake.") + if hs.Version != baseProtocolVersion { + return newPeerError(errP2PVersionMismatch, "Require protocol %d, received %d\n", + baseProtocolVersion, hs.Version) } - - if len(pubkey) != 64 { - return NewPeerError(PubkeyInvalid, "require 512 bit, got %v", len(pubkey)*8) + if len(hs.NodeID) == 0 { + return newPeerError(errPubkeyMissing, "") + } + if len(hs.NodeID) != 64 { + return newPeerError(errPubkeyInvalid, "require 512 bit, got %v", len(hs.NodeID)*8) + } + if da := bp.peer.dialAddr; da != nil { + // verify that the peer we wanted to connect to + // actually holds the target public key. + if da.Pubkey != nil && !bytes.Equal(da.Pubkey, hs.NodeID) { + return newPeerError(errPubkeyForbidden, "dial address pubkey mismatch") + } + } + pa := newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID) + if err := bp.peer.pubkeyHook(pa); err != nil { + return newPeerError(errPubkeyForbidden, "%v", err) + } + capsIt := c.Get(2).NewIterator() + for capsIt.Next() { + cap := capsIt.Value() + name := cap.Get(0).Str() + if name != "" { + hs.Caps = append(hs.Caps, Cap{Name: name, Version: uint(cap.Get(1).Uint())}) + } } - // self connect detection - if bytes.Compare(bp.peer.server.ClientIdentity().Pubkey()[1:], pubkey) == 0 { - return NewPeerError(PubkeyForbidden, "not allowed to connect to self") + var addr *peerAddr + if hs.ListenPort != 0 { + addr = newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID) + addr.Port = hs.ListenPort } + bp.peer.setHandshakeInfo(&hs, addr, hs.Caps) + bp.peer.startSubprotocols(hs.Caps) + return nil +} - // register pubkey on server. this also sets the pubkey on the peer (need lock) - if err := bp.peer.server.RegisterPubkey(bp.peer, pubkey); err != nil { - return NewPeerError(PubkeyForbidden, err.Error()) +func (bp *baseProtocol) handshakeMsg() Msg { + var ( + port uint64 + caps []interface{} + ) + if bp.peer.ourListenAddr != nil { + port = bp.peer.ourListenAddr.Port } + for _, proto := range bp.peer.protocols { + caps = append(caps, proto.cap()) + } + return NewMsg(handshakeMsg, + baseProtocolVersion, + bp.peer.ourID.String(), + caps, + port, + bp.peer.ourID.Pubkey()[1:], + ) +} - // check port - if bp.peer.Inbound { - uint16port := uint16(port) - if bp.peer.Port > 0 && bp.peer.Port != uint16port { - return NewPeerError(PortMismatch, "port mismatch: %v != %v", bp.peer.Port, port) - } else { - bp.peer.Port = uint16port +func (bp *baseProtocol) peerList() []ethutil.RlpEncodable { + peers := bp.peer.otherPeers() + ds := make([]ethutil.RlpEncodable, 0, len(peers)) + for _, p := range peers { + p.infolock.Lock() + addr := p.listenAddr + p.infolock.Unlock() + // filter out this peer and peers that are not listening or + // have not completed the handshake. + // TODO: track previously sent peers and exclude them as well. + if p == bp.peer || addr == nil { + continue } + ds = append(ds, addr) } - - capsIt := caps.NewIterator() - for capsIt.Next() { - cap := capsIt.Value().Str() - bp.peer.Caps = append(bp.peer.Caps, cap) + ourAddr := bp.peer.ourListenAddr + if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { + ds = append(ds, ourAddr) } - sort.Strings(bp.peer.Caps) - bp.rw.(*proto).messenger.setRemoteProtocols(bp.peer.Caps) - bp.peer.Id = id - return nil + return ds } -- cgit v1.2.3 From 6049fcd52ab10362721a352cfd7a93a01c3ffa97 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 25 Nov 2014 12:25:31 +0100 Subject: p2p: use package rlp for baseProtocol --- p2p/protocol.go | 107 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 56 insertions(+), 51 deletions(-) (limited to 'p2p/protocol.go') diff --git a/p2p/protocol.go b/p2p/protocol.go index 169dcdb6e..28eab87cd 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -2,7 +2,6 @@ package p2p import ( "bytes" - "net" "time" "github.com/ethereum/go-ethereum/ethutil" @@ -90,30 +89,18 @@ type baseProtocol struct { func runBaseProtocol(peer *Peer, rw MsgReadWriter) error { bp := &baseProtocol{rw, peer} - - // do handshake - if err := rw.WriteMsg(bp.handshakeMsg()); err != nil { - return err - } - msg, err := rw.ReadMsg() - if err != nil { + if err := bp.doHandshake(rw); err != nil { return err } - if msg.Code != handshakeMsg { - return newPeerError(errProtocolBreach, "first message must be handshake, got %x", msg.Code) - } - data, err := msg.Data() - if err != nil { - return newPeerError(errInvalidMsg, "%v", err) - } - if err := bp.handleHandshake(data); err != nil { - return err - } - // run main loop quit := make(chan error, 1) go func() { - quit <- MsgLoop(rw, baseProtocolMaxMsgSize, bp.handle) + for { + if err := bp.handle(rw); err != nil { + quit <- err + break + } + } }() return bp.loop(quit) } @@ -151,13 +138,27 @@ func (bp *baseProtocol) loop(quit <-chan error) error { return err } -func (bp *baseProtocol) handle(code uint64, data *ethutil.Value) error { - switch code { +func (bp *baseProtocol) handle(rw MsgReadWriter) error { + msg, err := rw.ReadMsg() + if err != nil { + return err + } + if msg.Size > baseProtocolMaxMsgSize { + return newPeerError(errMisc, "message too big") + } + // make sure that the payload has been fully consumed + defer msg.Discard() + + switch msg.Code { case handshakeMsg: return newPeerError(errProtocolBreach, "extra handshake received") case discMsg: - bp.peer.Disconnect(DiscReason(data.Get(0).Uint())) + var reason DiscReason + if err := msg.Decode(&reason); err != nil { + return err + } + bp.peer.Disconnect(reason) return nil case pingMsg: @@ -178,35 +179,45 @@ func (bp *baseProtocol) handle(code uint64, data *ethutil.Value) error { } case peersMsg: - bp.handlePeers(data) + var peers []*peerAddr + if err := msg.Decode(&peers); err != nil { + return err + } + for _, addr := range peers { + bp.peer.Debugf("received peer suggestion: %v", addr) + bp.peer.newPeerAddr <- addr + } default: - return newPeerError(errInvalidMsgCode, "unknown message code %v", code) + return newPeerError(errInvalidMsgCode, "unknown message code %v", msg.Code) } return nil } -func (bp *baseProtocol) handlePeers(data *ethutil.Value) { - it := data.NewIterator() - for it.Next() { - addr := &peerAddr{ - IP: net.IP(it.Value().Get(0).Bytes()), - Port: it.Value().Get(1).Uint(), - Pubkey: it.Value().Get(2).Bytes(), - } - bp.peer.Debugf("received peer suggestion: %v", addr) - bp.peer.newPeerAddr <- addr +func (bp *baseProtocol) doHandshake(rw MsgReadWriter) error { + // send our handshake + if err := rw.WriteMsg(bp.handshakeMsg()); err != nil { + return err + } + + // read and handle remote handshake + msg, err := rw.ReadMsg() + if err != nil { + return err + } + if msg.Code != handshakeMsg { + return newPeerError(errProtocolBreach, "first message must be handshake, got %x", msg.Code) + } + if msg.Size > baseProtocolMaxMsgSize { + return newPeerError(errMisc, "message too big") } -} -func (bp *baseProtocol) handleHandshake(c *ethutil.Value) error { - hs := handshake{ - Version: c.Get(0).Uint(), - ID: c.Get(1).Str(), - Caps: nil, // decoded below - ListenPort: c.Get(3).Uint(), - NodeID: c.Get(4).Bytes(), + var hs handshake + if err := msg.Decode(&hs); err != nil { + return err } + + // validate handshake info if hs.Version != baseProtocolVersion { return newPeerError(errP2PVersionMismatch, "Require protocol %d, received %d\n", baseProtocolVersion, hs.Version) @@ -228,14 +239,8 @@ func (bp *baseProtocol) handleHandshake(c *ethutil.Value) error { if err := bp.peer.pubkeyHook(pa); err != nil { return newPeerError(errPubkeyForbidden, "%v", err) } - capsIt := c.Get(2).NewIterator() - for capsIt.Next() { - cap := capsIt.Value() - name := cap.Get(0).Str() - if name != "" { - hs.Caps = append(hs.Caps, Cap{Name: name, Version: uint(cap.Get(1).Uint())}) - } - } + + // TODO: remove Caps with empty name var addr *peerAddr if hs.ListenPort != 0 { -- cgit v1.2.3 From 9423401d73562b2a55398559a9d21af75210d955 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 9 Dec 2014 13:39:56 +0100 Subject: p2p: fix decoding of disconnect reason (fixes #200) --- p2p/protocol.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'p2p/protocol.go') diff --git a/p2p/protocol.go b/p2p/protocol.go index 28eab87cd..5af586f13 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -154,11 +154,11 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { return newPeerError(errProtocolBreach, "extra handshake received") case discMsg: - var reason DiscReason + var reason [1]DiscReason if err := msg.Decode(&reason); err != nil { return err } - bp.peer.Disconnect(reason) + bp.peer.Disconnect(reason[0]) return nil case pingMsg: -- cgit v1.2.3 From f0f672777866a524c36e767a6c313f93f574dd8c Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 12 Dec 2014 11:58:39 +0100 Subject: p2p: use an error type for disconnect requests Test-tastic. --- p2p/protocol.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'p2p/protocol.go') diff --git a/p2p/protocol.go b/p2p/protocol.go index 5af586f13..3f52205f5 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -158,8 +158,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { if err := msg.Decode(&reason); err != nil { return err } - bp.peer.Disconnect(reason[0]) - return nil + return discRequestedError(reason[0]) case pingMsg: return bp.rw.EncodeMsg(pongMsg) -- cgit v1.2.3