From dbdc5fd4b33ad98ebde72e7d934cebab1408f057 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 14 May 2015 17:41:43 +0200 Subject: p2p: delete Server.Broadcast --- p2p/peer.go | 14 -------------- 1 file changed, 14 deletions(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index c7ec08887..562fbc1b4 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -273,20 +273,6 @@ func (p *Peer) getProto(code uint64) (*protoRW, error) { return nil, newPeerError(errInvalidMsgCode, "%d", code) } -// writeProtoMsg sends the given message on behalf of the given named protocol. -// this exists because of Server.Broadcast. -func (p *Peer) writeProtoMsg(protoName string, msg Msg) error { - proto, ok := p.running[protoName] - if !ok { - return fmt.Errorf("protocol %s not handled by peer", protoName) - } - if msg.Code >= proto.Length { - return newPeerError(errInvalidMsgCode, "code %x is out of range for protocol %q", msg.Code, protoName) - } - msg.Code += proto.offset - return p.rw.WriteMsg(msg) -} - type protoRW struct { Protocol in chan Msg -- cgit v1.2.3 From 64564da20b24f465abfa5bd95fc9deb1c32ec640 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 18 May 2015 01:14:35 +0200 Subject: p2p: decrease maximum message size for devp2p to 1kB The previous limit was 10MB which is unacceptable for all kinds of reasons, the most important one being that we don't want to allow the remote side to make us allocate 10MB at handshake time. --- p2p/peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index 562fbc1b4..87a91d406 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -18,7 +18,7 @@ import ( const ( baseProtocolVersion = 4 baseProtocolLength = uint64(16) - baseProtocolMaxMsgSize = 10 * 1024 * 1024 + baseProtocolMaxMsgSize = 2 * 1024 pingInterval = 15 * time.Second ) -- cgit v1.2.3 From 1440f9a37a8baf67b989ddf0b8cc30c9a1970e14 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Sat, 16 May 2015 00:38:28 +0200 Subject: p2p: new dialer, peer management without locks The most visible change is event-based dialing, which should be an improvement over the timer-based system that we have at the moment. The dialer gets a chance to compute new tasks whenever peers change or dials complete. This is better than checking peers on a timer because dials happen faster. The dialer can now make more precise decisions about whom to dial based on the peer set and we can test those decisions without actually opening any sockets. Peer management is easier to test because the tests can inject connections at checkpoints (after enc handshake, after protocol handshake). Most of the handshake stuff is now part of the RLPx code. It could be exported or move to its own package because it is no longer entangled with Server logic. --- p2p/peer.go | 55 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 30 insertions(+), 25 deletions(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index 87a91d406..cbe5ccc84 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -33,9 +33,17 @@ const ( peersMsg = 0x05 ) +// protoHandshake is the RLP structure of the protocol handshake. +type protoHandshake struct { + Version uint64 + Name string + Caps []Cap + ListenPort uint64 + ID discover.NodeID +} + // Peer represents a connected remote node. type Peer struct { - conn net.Conn rw *conn running map[string]*protoRW @@ -48,37 +56,36 @@ type Peer struct { // NewPeer returns a peer for testing purposes. func NewPeer(id discover.NodeID, name string, caps []Cap) *Peer { pipe, _ := net.Pipe() - msgpipe, _ := MsgPipe() - conn := &conn{msgpipe, &protoHandshake{ID: id, Name: name, Caps: caps}} - peer := newPeer(pipe, conn, nil) + conn := &conn{fd: pipe, transport: nil, id: id, caps: caps, name: name} + peer := newPeer(conn, nil) close(peer.closed) // ensures Disconnect doesn't block return peer } // ID returns the node's public key. func (p *Peer) ID() discover.NodeID { - return p.rw.ID + return p.rw.id } // Name returns the node name that the remote node advertised. func (p *Peer) Name() string { - return p.rw.Name + return p.rw.name } // Caps returns the capabilities (supported subprotocols) of the remote peer. func (p *Peer) Caps() []Cap { // TODO: maybe return copy - return p.rw.Caps + return p.rw.caps } // RemoteAddr returns the remote address of the network connection. func (p *Peer) RemoteAddr() net.Addr { - return p.conn.RemoteAddr() + return p.rw.fd.RemoteAddr() } // LocalAddr returns the local address of the network connection. func (p *Peer) LocalAddr() net.Addr { - return p.conn.LocalAddr() + return p.rw.fd.LocalAddr() } // Disconnect terminates the peer connection with the given reason. @@ -92,13 +99,12 @@ func (p *Peer) Disconnect(reason DiscReason) { // String implements fmt.Stringer. func (p *Peer) String() string { - return fmt.Sprintf("Peer %.8x %v", p.rw.ID[:], p.RemoteAddr()) + return fmt.Sprintf("Peer %x %v", p.rw.id[:8], p.RemoteAddr()) } -func newPeer(fd net.Conn, conn *conn, protocols []Protocol) *Peer { - protomap := matchProtocols(protocols, conn.Caps, conn) +func newPeer(conn *conn, protocols []Protocol) *Peer { + protomap := matchProtocols(protocols, conn.caps, conn) p := &Peer{ - conn: fd, rw: conn, running: protomap, disc: make(chan DiscReason), @@ -117,7 +123,10 @@ func (p *Peer) run() DiscReason { p.startProtocols() // Wait for an error or disconnect. - var reason DiscReason + var ( + reason DiscReason + requested bool + ) select { case err := <-readErr: if r, ok := err.(DiscReason); ok { @@ -131,21 +140,17 @@ func (p *Peer) run() DiscReason { case err := <-p.protoErr: reason = discReasonForError(err) case reason = <-p.disc: - p.politeDisconnect(reason) - reason = DiscRequested + requested = true } - close(p.closed) + p.rw.close(reason) p.wg.Wait() - glog.V(logger.Debug).Infof("%v: Disconnected: %v\n", p, reason) - return reason -} -func (p *Peer) politeDisconnect(reason DiscReason) { - if reason != DiscNetworkError { - SendItems(p.rw, discMsg, uint(reason)) + if requested { + reason = DiscRequested } - p.conn.Close() + glog.V(logger.Debug).Infof("%v: Disconnected: %v\n", p, reason) + return reason } func (p *Peer) pingLoop() { @@ -254,7 +259,7 @@ func (p *Peer) startProtocols() { glog.V(logger.Detail).Infof("%v: Protocol %s/%d returned\n", p, proto.Name, proto.Version) err = errors.New("protocol returned") } else if err != io.EOF { - glog.V(logger.Detail).Infof("%v: Protocol %s/%d error: \n", p, proto.Name, proto.Version, err) + glog.V(logger.Detail).Infof("%v: Protocol %s/%d error: %v\n", p, proto.Name, proto.Version, err) } p.protoErr <- err p.wg.Done() -- cgit v1.2.3