From 6abf8ef78f1474fdeb7a6a6ce084bf994cc055f2 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 5 Jan 2015 17:10:42 +0100 Subject: Merge --- p2p/protocol.go | 54 +++++++++++++----------------------------------------- 1 file changed, 13 insertions(+), 41 deletions(-) (limited to 'p2p/protocol.go') diff --git a/p2p/protocol.go b/p2p/protocol.go index 3f52205f5..dd8cbc4ec 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -3,8 +3,6 @@ package p2p import ( "bytes" "time" - - "github.com/ethereum/go-ethereum/ethutil" ) // Protocol represents a P2P subprotocol implementation. @@ -89,20 +87,25 @@ type baseProtocol struct { func runBaseProtocol(peer *Peer, rw MsgReadWriter) error { bp := &baseProtocol{rw, peer} - if err := bp.doHandshake(rw); err != nil { + errc := make(chan error, 1) + go func() { errc <- rw.WriteMsg(bp.handshakeMsg()) }() + if err := bp.readHandshake(); err != nil { + return err + } + // handle write error + if err := <-errc; err != nil { return err } // run main loop - quit := make(chan error, 1) go func() { for { if err := bp.handle(rw); err != nil { - quit <- err + errc <- err break } } }() - return bp.loop(quit) + return bp.loop(errc) } var pingTimeout = 2 * time.Second @@ -166,7 +169,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { case pongMsg: case getPeersMsg: - peers := bp.peerList() + peers := bp.peer.PeerList() // this is dangerous. the spec says that we should _delay_ // sending the response if no new information is available. // this means that would need to send a response later when @@ -174,7 +177,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { // // TODO: add event mechanism to notify baseProtocol for new peers if len(peers) > 0 { - return bp.rw.EncodeMsg(peersMsg, peers) + return bp.rw.EncodeMsg(peersMsg, peers...) } case peersMsg: @@ -193,14 +196,9 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { return nil } -func (bp *baseProtocol) doHandshake(rw MsgReadWriter) error { - // send our handshake - if err := rw.WriteMsg(bp.handshakeMsg()); err != nil { - return err - } - +func (bp *baseProtocol) readHandshake() error { // read and handle remote handshake - msg, err := rw.ReadMsg() + msg, err := bp.rw.ReadMsg() if err != nil { return err } @@ -210,12 +208,10 @@ func (bp *baseProtocol) doHandshake(rw MsgReadWriter) error { if msg.Size > baseProtocolMaxMsgSize { return newPeerError(errMisc, "message too big") } - var hs handshake if err := msg.Decode(&hs); err != nil { return err } - // validate handshake info if hs.Version != baseProtocolVersion { return newPeerError(errP2PVersionMismatch, "Require protocol %d, received %d\n", @@ -238,9 +234,7 @@ func (bp *baseProtocol) doHandshake(rw MsgReadWriter) error { if err := bp.peer.pubkeyHook(pa); err != nil { return newPeerError(errPubkeyForbidden, "%v", err) } - // TODO: remove Caps with empty name - var addr *peerAddr if hs.ListenPort != 0 { addr = newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID) @@ -270,25 +264,3 @@ func (bp *baseProtocol) handshakeMsg() Msg { bp.peer.ourID.Pubkey()[1:], ) } - -func (bp *baseProtocol) peerList() []ethutil.RlpEncodable { - peers := bp.peer.otherPeers() - ds := make([]ethutil.RlpEncodable, 0, len(peers)) - for _, p := range peers { - p.infolock.Lock() - addr := p.listenAddr - p.infolock.Unlock() - // filter out this peer and peers that are not listening or - // have not completed the handshake. - // TODO: track previously sent peers and exclude them as well. - if p == bp.peer || addr == nil { - continue - } - ds = append(ds, addr) - } - ourAddr := bp.peer.ourListenAddr - if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { - ds = append(ds, ourAddr) - } - return ds -} -- cgit v1.2.3 From eb0e7b1b8120852a1d56aa0ebd3a98e652965635 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 6 Jan 2015 11:35:09 +0100 Subject: eth, p2p: remove EncodeMsg from p2p.MsgWriter ...and make it a top-level function instead. The original idea behind having EncodeMsg in the interface was that implementations might be able to encode RLP data to their underlying writer directly instead of buffering the encoded data. The encoder will buffer anyway, so that doesn't matter anymore. Given the recent problems with EncodeMsg (copy-pasted implementation bug) I'd rather implement once, correctly. --- p2p/protocol.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'p2p/protocol.go') diff --git a/p2p/protocol.go b/p2p/protocol.go index dd8cbc4ec..969937076 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -119,14 +119,14 @@ func (bp *baseProtocol) loop(quit <-chan error) error { getPeersTick := time.NewTicker(10 * time.Second) defer getPeersTick.Stop() - err := bp.rw.EncodeMsg(getPeersMsg) + err := EncodeMsg(bp.rw, getPeersMsg) for err == nil { select { case err = <-quit: return err case <-getPeersTick.C: - err = bp.rw.EncodeMsg(getPeersMsg) + err = EncodeMsg(bp.rw, getPeersMsg) case event := <-activity.Chan(): ping.Reset(pingTimeout) lastActive = event.(time.Time) @@ -134,7 +134,7 @@ func (bp *baseProtocol) loop(quit <-chan error) error { if lastActive.Add(pingTimeout * 2).Before(t) { err = newPeerError(errPingTimeout, "") } else if lastActive.Add(pingTimeout).Before(t) { - err = bp.rw.EncodeMsg(pingMsg) + err = EncodeMsg(bp.rw, pingMsg) } } } @@ -164,7 +164,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { return discRequestedError(reason[0]) case pingMsg: - return bp.rw.EncodeMsg(pongMsg) + return EncodeMsg(bp.rw, pongMsg) case pongMsg: @@ -177,7 +177,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { // // TODO: add event mechanism to notify baseProtocol for new peers if len(peers) > 0 { - return bp.rw.EncodeMsg(peersMsg, peers...) + return EncodeMsg(bp.rw, peersMsg, peers...) } case peersMsg: -- cgit v1.2.3 From b0ff946b55c23f0fffc50a700bcb255f95855afc Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 6 Jan 2015 12:14:29 +0100 Subject: p2p: move peerList back into baseProtocol It had been moved to Peer, probably for debugging. --- p2p/protocol.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) (limited to 'p2p/protocol.go') diff --git a/p2p/protocol.go b/p2p/protocol.go index 969937076..1d121a885 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -169,7 +169,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { case pongMsg: case getPeersMsg: - peers := bp.peer.PeerList() + peers := bp.peerList() // this is dangerous. the spec says that we should _delay_ // sending the response if no new information is available. // this means that would need to send a response later when @@ -264,3 +264,25 @@ func (bp *baseProtocol) handshakeMsg() Msg { bp.peer.ourID.Pubkey()[1:], ) } + +func (bp *baseProtocol) peerList() []interface{} { + peers := bp.peer.otherPeers() + ds := make([]interface{}, 0, len(peers)) + for _, p := range peers { + p.infolock.Lock() + addr := p.listenAddr + p.infolock.Unlock() + // filter out this peer and peers that are not listening or + // have not completed the handshake. + // TODO: track previously sent peers and exclude them as well. + if p == bp.peer || addr == nil { + continue + } + ds = append(ds, addr) + } + ourAddr := bp.peer.ourListenAddr + if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { + ds = append(ds, ourAddr) + } + return ds +} -- cgit v1.2.3 From d227f6184e9d8ce21d40d032dedc910b2bd25f89 Mon Sep 17 00:00:00 2001 From: zelig Date: Sun, 18 Jan 2015 09:44:49 +0000 Subject: fix protocol to accomodate privkey --- p2p/protocol.go | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'p2p/protocol.go') diff --git a/p2p/protocol.go b/p2p/protocol.go index 1d121a885..62ada929d 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -64,6 +64,10 @@ func (h *handshake) Pubkey() []byte { return h.NodeID } +func (h *handshake) PrivKey() []byte { + return nil +} + // Cap is the structure of a peer capability. type Cap struct { Name string -- cgit v1.2.3 From 5bdc1159433138d92ed6fefb253e3c6ed3a43995 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 5 Feb 2015 03:07:58 +0100 Subject: p2p: integrate p2p/discover Overview of changes: - ClientIdentity has been removed, use discover.NodeID - Server now requires a private key to be set (instead of public key) - Server performs the encryption handshake before launching Peer - Dial logic takes peers from discover table - Encryption handshake code has been cleaned up a bit - baseProtocol is gone because we don't exchange peers anymore - Some parts of baseProtocol have moved into Peer instead --- p2p/protocol.go | 248 -------------------------------------------------------- 1 file changed, 248 deletions(-) (limited to 'p2p/protocol.go') diff --git a/p2p/protocol.go b/p2p/protocol.go index 62ada929d..fe359fc54 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -1,10 +1,5 @@ package p2p -import ( - "bytes" - "time" -) - // Protocol represents a P2P subprotocol implementation. type Protocol struct { // Name should contain the official protocol name, @@ -32,42 +27,6 @@ func (p Protocol) cap() Cap { return Cap{p.Name, p.Version} } -const ( - baseProtocolVersion = 2 - baseProtocolLength = uint64(16) - baseProtocolMaxMsgSize = 10 * 1024 * 1024 -) - -const ( - // devp2p message codes - handshakeMsg = 0x00 - discMsg = 0x01 - pingMsg = 0x02 - pongMsg = 0x03 - getPeersMsg = 0x04 - peersMsg = 0x05 -) - -// handshake is the structure of a handshake list. -type handshake struct { - Version uint64 - ID string - Caps []Cap - ListenPort uint64 - NodeID []byte -} - -func (h *handshake) String() string { - return h.ID -} -func (h *handshake) Pubkey() []byte { - return h.NodeID -} - -func (h *handshake) PrivKey() []byte { - return nil -} - // Cap is the structure of a peer capability. type Cap struct { Name string @@ -83,210 +42,3 @@ type capsByName []Cap func (cs capsByName) Len() int { return len(cs) } func (cs capsByName) Less(i, j int) bool { return cs[i].Name < cs[j].Name } func (cs capsByName) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] } - -type baseProtocol struct { - rw MsgReadWriter - peer *Peer -} - -func runBaseProtocol(peer *Peer, rw MsgReadWriter) error { - bp := &baseProtocol{rw, peer} - errc := make(chan error, 1) - go func() { errc <- rw.WriteMsg(bp.handshakeMsg()) }() - if err := bp.readHandshake(); err != nil { - return err - } - // handle write error - if err := <-errc; err != nil { - return err - } - // run main loop - go func() { - for { - if err := bp.handle(rw); err != nil { - errc <- err - break - } - } - }() - return bp.loop(errc) -} - -var pingTimeout = 2 * time.Second - -func (bp *baseProtocol) loop(quit <-chan error) error { - ping := time.NewTimer(pingTimeout) - activity := bp.peer.activity.Subscribe(time.Time{}) - lastActive := time.Time{} - defer ping.Stop() - defer activity.Unsubscribe() - - getPeersTick := time.NewTicker(10 * time.Second) - defer getPeersTick.Stop() - err := EncodeMsg(bp.rw, getPeersMsg) - - for err == nil { - select { - case err = <-quit: - return err - case <-getPeersTick.C: - err = EncodeMsg(bp.rw, getPeersMsg) - case event := <-activity.Chan(): - ping.Reset(pingTimeout) - lastActive = event.(time.Time) - case t := <-ping.C: - if lastActive.Add(pingTimeout * 2).Before(t) { - err = newPeerError(errPingTimeout, "") - } else if lastActive.Add(pingTimeout).Before(t) { - err = EncodeMsg(bp.rw, pingMsg) - } - } - } - return err -} - -func (bp *baseProtocol) handle(rw MsgReadWriter) error { - msg, err := rw.ReadMsg() - if err != nil { - return err - } - if msg.Size > baseProtocolMaxMsgSize { - return newPeerError(errMisc, "message too big") - } - // make sure that the payload has been fully consumed - defer msg.Discard() - - switch msg.Code { - case handshakeMsg: - return newPeerError(errProtocolBreach, "extra handshake received") - - case discMsg: - var reason [1]DiscReason - if err := msg.Decode(&reason); err != nil { - return err - } - return discRequestedError(reason[0]) - - case pingMsg: - return EncodeMsg(bp.rw, pongMsg) - - case pongMsg: - - case getPeersMsg: - peers := bp.peerList() - // this is dangerous. the spec says that we should _delay_ - // sending the response if no new information is available. - // this means that would need to send a response later when - // new peers become available. - // - // TODO: add event mechanism to notify baseProtocol for new peers - if len(peers) > 0 { - return EncodeMsg(bp.rw, peersMsg, peers...) - } - - case peersMsg: - var peers []*peerAddr - if err := msg.Decode(&peers); err != nil { - return err - } - for _, addr := range peers { - bp.peer.Debugf("received peer suggestion: %v", addr) - bp.peer.newPeerAddr <- addr - } - - default: - return newPeerError(errInvalidMsgCode, "unknown message code %v", msg.Code) - } - return nil -} - -func (bp *baseProtocol) readHandshake() error { - // read and handle remote handshake - msg, err := bp.rw.ReadMsg() - if err != nil { - return err - } - if msg.Code != handshakeMsg { - return newPeerError(errProtocolBreach, "first message must be handshake, got %x", msg.Code) - } - if msg.Size > baseProtocolMaxMsgSize { - return newPeerError(errMisc, "message too big") - } - var hs handshake - if err := msg.Decode(&hs); err != nil { - return err - } - // validate handshake info - if hs.Version != baseProtocolVersion { - return newPeerError(errP2PVersionMismatch, "Require protocol %d, received %d\n", - baseProtocolVersion, hs.Version) - } - if len(hs.NodeID) == 0 { - return newPeerError(errPubkeyMissing, "") - } - if len(hs.NodeID) != 64 { - return newPeerError(errPubkeyInvalid, "require 512 bit, got %v", len(hs.NodeID)*8) - } - if da := bp.peer.dialAddr; da != nil { - // verify that the peer we wanted to connect to - // actually holds the target public key. - if da.Pubkey != nil && !bytes.Equal(da.Pubkey, hs.NodeID) { - return newPeerError(errPubkeyForbidden, "dial address pubkey mismatch") - } - } - pa := newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID) - if err := bp.peer.pubkeyHook(pa); err != nil { - return newPeerError(errPubkeyForbidden, "%v", err) - } - // TODO: remove Caps with empty name - var addr *peerAddr - if hs.ListenPort != 0 { - addr = newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID) - addr.Port = hs.ListenPort - } - bp.peer.setHandshakeInfo(&hs, addr, hs.Caps) - bp.peer.startSubprotocols(hs.Caps) - return nil -} - -func (bp *baseProtocol) handshakeMsg() Msg { - var ( - port uint64 - caps []interface{} - ) - if bp.peer.ourListenAddr != nil { - port = bp.peer.ourListenAddr.Port - } - for _, proto := range bp.peer.protocols { - caps = append(caps, proto.cap()) - } - return NewMsg(handshakeMsg, - baseProtocolVersion, - bp.peer.ourID.String(), - caps, - port, - bp.peer.ourID.Pubkey()[1:], - ) -} - -func (bp *baseProtocol) peerList() []interface{} { - peers := bp.peer.otherPeers() - ds := make([]interface{}, 0, len(peers)) - for _, p := range peers { - p.infolock.Lock() - addr := p.listenAddr - p.infolock.Unlock() - // filter out this peer and peers that are not listening or - // have not completed the handshake. - // TODO: track previously sent peers and exclude them as well. - if p == bp.peer || addr == nil { - continue - } - ds = append(ds, addr) - } - ourAddr := bp.peer.ourListenAddr - if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { - ds = append(ds, ourAddr) - } - return ds -} -- cgit v1.2.3 From 4bef3ce284574c7e0e9a76004e076fc686b13bf6 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 13 Feb 2015 23:54:34 +0100 Subject: p2p: print Cap as name/version --- p2p/protocol.go | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'p2p/protocol.go') diff --git a/p2p/protocol.go b/p2p/protocol.go index fe359fc54..5fa395eda 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -1,5 +1,7 @@ package p2p +import "fmt" + // Protocol represents a P2P subprotocol implementation. type Protocol struct { // Name should contain the official protocol name, @@ -37,6 +39,10 @@ func (cap Cap) RlpData() interface{} { return []interface{}{cap.Name, cap.Version} } +func (cap Cap) String() string { + return fmt.Sprintf("%s/%d", cap.Name, cap.Version) +} + type capsByName []Cap func (cs capsByName) Len() int { return len(cs) } -- cgit v1.2.3