From 6abf8ef78f1474fdeb7a6a6ce084bf994cc055f2 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 5 Jan 2015 17:10:42 +0100 Subject: Merge --- p2p/peer.go | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index 86c4d7ab5..0d7eec9f4 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -45,8 +45,8 @@ func (d peerAddr) String() string { return fmt.Sprintf("%v:%d", d.IP, d.Port) } -func (d peerAddr) RlpData() interface{} { - return []interface{}{d.IP, d.Port, d.Pubkey} +func (d *peerAddr) RlpData() interface{} { + return []interface{}{string(d.IP), d.Port, d.Pubkey} } // Peer represents a remote peer. @@ -426,7 +426,7 @@ func (rw *proto) WriteMsg(msg Msg) error { } func (rw *proto) EncodeMsg(code uint64, data ...interface{}) error { - return rw.WriteMsg(NewMsg(code, data)) + return rw.WriteMsg(NewMsg(code, data...)) } func (rw *proto) ReadMsg() (Msg, error) { @@ -460,3 +460,25 @@ func (r *eofSignal) Read(buf []byte) (int, error) { } return n, err } + +func (peer *Peer) PeerList() []interface{} { + peers := peer.otherPeers() + ds := make([]interface{}, 0, len(peers)) + for _, p := range peers { + p.infolock.Lock() + addr := p.listenAddr + p.infolock.Unlock() + // filter out this peer and peers that are not listening or + // have not completed the handshake. + // TODO: track previously sent peers and exclude them as well. + if p == peer || addr == nil { + continue + } + ds = append(ds, addr) + } + ourAddr := peer.ourListenAddr + if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { + ds = append(ds, ourAddr) + } + return ds +} -- cgit v1.2.3 From b0ff946b55c23f0fffc50a700bcb255f95855afc Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 6 Jan 2015 12:14:29 +0100 Subject: p2p: move peerList back into baseProtocol It had been moved to Peer, probably for debugging. --- p2p/peer.go | 22 ---------------------- 1 file changed, 22 deletions(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index 0d7eec9f4..2380a3285 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -460,25 +460,3 @@ func (r *eofSignal) Read(buf []byte) (int, error) { } return n, err } - -func (peer *Peer) PeerList() []interface{} { - peers := peer.otherPeers() - ds := make([]interface{}, 0, len(peers)) - for _, p := range peers { - p.infolock.Lock() - addr := p.listenAddr - p.infolock.Unlock() - // filter out this peer and peers that are not listening or - // have not completed the handshake. - // TODO: track previously sent peers and exclude them as well. - if p == peer || addr == nil { - continue - } - ds = append(ds, addr) - } - ourAddr := peer.ourListenAddr - if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { - ds = append(ds, ourAddr) - } - return ds -} -- cgit v1.2.3 From c8a8aa0d43336491f6aa264467968e06b489d34c Mon Sep 17 00:00:00 2001 From: zelig Date: Sun, 18 Jan 2015 07:59:54 +0000 Subject: initial hook for crypto handshake (void, off by default) --- p2p/peer.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index 2380a3285..886b95a80 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -70,6 +70,7 @@ type Peer struct { // These fields maintain the running protocols. protocols []Protocol runBaseProtocol bool // for testing + cryptoHandshake bool // for testing runlock sync.RWMutex // protects running running map[string]*proto @@ -141,6 +142,20 @@ func (p *Peer) Identity() ClientIdentity { return p.identity } +func (self *Peer) Pubkey() (pubkey []byte) { + self.infolock.Lock() + defer self.infolock.Unlock() + switch { + case self.identity != nil: + pubkey = self.identity.Pubkey() + case self.dialAddr != nil: + pubkey = self.dialAddr.Pubkey + case self.listenAddr != nil: + pubkey = self.listenAddr.Pubkey + } + return +} + // Caps returns the capabilities (supported subprotocols) of the remote peer. func (p *Peer) Caps() []Cap { p.infolock.Lock() @@ -207,6 +222,12 @@ func (p *Peer) loop() (reason DiscReason, err error) { defer close(p.closed) defer p.conn.Close() + if p.cryptoHandshake { + if err := p.handleCryptoHandshake(); err != nil { + return DiscProtocolError, err // no graceful disconnect + } + } + // read loop readMsg := make(chan Msg) readErr := make(chan error) @@ -307,6 +328,11 @@ func (p *Peer) dispatch(msg Msg, protoDone chan struct{}) (wait bool, err error) return wait, nil } +func (p *Peer) handleCryptoHandshake() (err error) { + + return nil +} + func (p *Peer) startBaseProtocol() { p.runlock.Lock() defer p.runlock.Unlock() -- cgit v1.2.3 From 1803c65e4097b9d6cb83f72a8a09aeddcc01f685 Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 19 Jan 2015 11:21:13 +0000 Subject: integrate cryptoId into peer and connection lifecycle --- p2p/peer.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index 886b95a80..e98c3d560 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -222,10 +222,14 @@ func (p *Peer) loop() (reason DiscReason, err error) { defer close(p.closed) defer p.conn.Close() + var readLoop func(chan Msg, chan error, chan bool) if p.cryptoHandshake { - if err := p.handleCryptoHandshake(); err != nil { + if readLoop, err := p.handleCryptoHandshake(); err != nil { + // from here on everything can be encrypted, authenticated return DiscProtocolError, err // no graceful disconnect } + } else { + readLoop = p.readLoop } // read loop @@ -233,7 +237,7 @@ func (p *Peer) loop() (reason DiscReason, err error) { readErr := make(chan error) readNext := make(chan bool, 1) protoDone := make(chan struct{}, 1) - go p.readLoop(readMsg, readErr, readNext) + go readLoop(readMsg, readErr, readNext) readNext <- true if p.runBaseProtocol { @@ -329,8 +333,19 @@ func (p *Peer) dispatch(msg Msg, protoDone chan struct{}) (wait bool, err error) } func (p *Peer) handleCryptoHandshake() (err error) { + // cryptoId is just created for the lifecycle of the handshake + // it is survived by an encrypted readwriter + if p.dialAddr != 0 { // this should have its own method Outgoing() bool + initiator = true + } + // create crypto layer + cryptoId := newCryptoId(p.identity, initiator, sessionToken) + // run on peer + if rw, err := cryptoId.Run(p.Pubkey()); err != nil { + return err + } + p.conn = rw.Run(p.conn) - return nil } func (p *Peer) startBaseProtocol() { -- cgit v1.2.3 From e252c634cb40c8ef7f9bcd542f5418a937929620 Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 19 Jan 2015 23:42:13 +0000 Subject: first stab at integrating crypto in our p2p - abstract the entire handshake logic in cryptoId.Run() taking session-relevant parameters - changes in peer to accomodate how the encryption layer would be switched on - modify arguments of handshake components - fixed test getting the wrong pubkey but it till crashes on DH in newSession() --- p2p/peer.go | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index e98c3d560..e3e04ee65 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -222,9 +222,9 @@ func (p *Peer) loop() (reason DiscReason, err error) { defer close(p.closed) defer p.conn.Close() - var readLoop func(chan Msg, chan error, chan bool) + var readLoop func(chan<- Msg, chan<- error, <-chan bool) if p.cryptoHandshake { - if readLoop, err := p.handleCryptoHandshake(); err != nil { + if readLoop, err = p.handleCryptoHandshake(); err != nil { // from here on everything can be encrypted, authenticated return DiscProtocolError, err // no graceful disconnect } @@ -332,20 +332,33 @@ func (p *Peer) dispatch(msg Msg, protoDone chan struct{}) (wait bool, err error) return wait, nil } -func (p *Peer) handleCryptoHandshake() (err error) { +type readLoop func(chan<- Msg, chan<- error, <-chan bool) + +func (p *Peer) handleCryptoHandshake() (loop readLoop, err error) { // cryptoId is just created for the lifecycle of the handshake // it is survived by an encrypted readwriter - if p.dialAddr != 0 { // this should have its own method Outgoing() bool + var initiator bool + var sessionToken []byte + if p.dialAddr != nil { // this should have its own method Outgoing() bool initiator = true } // create crypto layer - cryptoId := newCryptoId(p.identity, initiator, sessionToken) + // this could in principle run only once but maybe we want to allow + // identity switching + var crypto *cryptoId + if crypto, err = newCryptoId(p.ourID); err != nil { + return + } // run on peer - if rw, err := cryptoId.Run(p.Pubkey()); err != nil { - return err + // this bit handles the handshake and creates a secure communications channel with + // var rw *secretRW + if sessionToken, _, err = crypto.Run(p.conn, p.Pubkey(), sessionToken, initiator); err != nil { + return } - p.conn = rw.Run(p.conn) - + loop = func(msg chan<- Msg, err chan<- error, next <-chan bool) { + // this is the readloop :) + } + return } func (p *Peer) startBaseProtocol() { -- cgit v1.2.3 From 58fc2c679b3d3b987aeefc98aa22db5f02717638 Mon Sep 17 00:00:00 2001 From: zelig Date: Tue, 20 Jan 2015 15:20:18 +0000 Subject: important fix for peer pubkey. when taken from identity, chop first format byte! --- p2p/peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index e3e04ee65..e44eaab34 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -147,7 +147,7 @@ func (self *Peer) Pubkey() (pubkey []byte) { defer self.infolock.Unlock() switch { case self.identity != nil: - pubkey = self.identity.Pubkey() + pubkey = self.identity.Pubkey()[1:] case self.dialAddr != nil: pubkey = self.dialAddr.Pubkey case self.listenAddr != nil: -- cgit v1.2.3 From faa069a126da29a246193713568634e5be6edd2d Mon Sep 17 00:00:00 2001 From: zelig Date: Wed, 21 Jan 2015 16:22:49 +0000 Subject: peer-level integration test for crypto handshake - add const length params for handshake messages - add length check to fail early - add debug logs to help interop testing (!ABSOLUTELY SHOULD BE DELETED LATER) - wrap connection read/writes in error check - add cryptoReady channel in peer to signal when secure session setup is finished - wait for cryptoReady or timeout in TestPeersHandshake --- p2p/peer.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index e44eaab34..818f80580 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -71,6 +71,7 @@ type Peer struct { protocols []Protocol runBaseProtocol bool // for testing cryptoHandshake bool // for testing + cryptoReady chan struct{} runlock sync.RWMutex // protects running running map[string]*proto @@ -120,15 +121,16 @@ func newServerPeer(server *Server, conn net.Conn, dialAddr *peerAddr) *Peer { func newPeer(conn net.Conn, protocols []Protocol, dialAddr *peerAddr) *Peer { p := &Peer{ - Logger: logger.NewLogger("P2P " + conn.RemoteAddr().String()), - conn: conn, - dialAddr: dialAddr, - bufconn: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)), - protocols: protocols, - running: make(map[string]*proto), - disc: make(chan DiscReason), - protoErr: make(chan error), - closed: make(chan struct{}), + Logger: logger.NewLogger("P2P " + conn.RemoteAddr().String()), + conn: conn, + dialAddr: dialAddr, + bufconn: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)), + protocols: protocols, + running: make(map[string]*proto), + disc: make(chan DiscReason), + protoErr: make(chan error), + closed: make(chan struct{}), + cryptoReady: make(chan struct{}), } return p } @@ -240,6 +242,7 @@ func (p *Peer) loop() (reason DiscReason, err error) { go readLoop(readMsg, readErr, readNext) readNext <- true + close(p.cryptoReady) if p.runBaseProtocol { p.startBaseProtocol() } @@ -353,6 +356,7 @@ func (p *Peer) handleCryptoHandshake() (loop readLoop, err error) { // this bit handles the handshake and creates a secure communications channel with // var rw *secretRW if sessionToken, _, err = crypto.Run(p.conn, p.Pubkey(), sessionToken, initiator); err != nil { + p.Debugf("unable to setup secure session: %v", err) return } loop = func(msg chan<- Msg, err chan<- error, next <-chan bool) { -- cgit v1.2.3 From 54252ede3177cb169fbb9e4824a31ce58cb0316c Mon Sep 17 00:00:00 2001 From: zelig Date: Wed, 21 Jan 2015 16:53:13 +0000 Subject: add temporary forced session token generation --- p2p/peer.go | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index 818f80580..99f1a61d3 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -3,6 +3,7 @@ package p2p import ( "bufio" "bytes" + "crypto/rand" "fmt" "io" "io/ioutil" @@ -342,6 +343,10 @@ func (p *Peer) handleCryptoHandshake() (loop readLoop, err error) { // it is survived by an encrypted readwriter var initiator bool var sessionToken []byte + sessionToken = make([]byte, keyLen) + if _, err = rand.Read(sessionToken); err != nil { + return + } if p.dialAddr != nil { // this should have its own method Outgoing() bool initiator = true } -- cgit v1.2.3 From 4499743522d32990614c7d900d746e998a1b81ed Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 26 Jan 2015 14:50:12 +0000 Subject: apply handshake related improvements from p2p.crypto branch --- p2p/peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index 99f1a61d3..62df58f8d 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -343,7 +343,7 @@ func (p *Peer) handleCryptoHandshake() (loop readLoop, err error) { // it is survived by an encrypted readwriter var initiator bool var sessionToken []byte - sessionToken = make([]byte, keyLen) + sessionToken = make([]byte, shaLen) if _, err = rand.Read(sessionToken); err != nil { return } -- cgit v1.2.3 From 68205dec9ff8ab7d16c61f5e32b104d7aa20b352 Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 26 Jan 2015 16:16:23 +0000 Subject: make crypto handshake calls package level, store privateKey on peer + tests ok --- p2p/peer.go | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index 62df58f8d..e82bca222 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -3,6 +3,7 @@ package p2p import ( "bufio" "bytes" + "crypto/ecdsa" "crypto/rand" "fmt" "io" @@ -12,6 +13,8 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" ) @@ -73,6 +76,7 @@ type Peer struct { runBaseProtocol bool // for testing cryptoHandshake bool // for testing cryptoReady chan struct{} + privateKey []byte runlock sync.RWMutex // protects running running map[string]*proto @@ -338,6 +342,13 @@ func (p *Peer) dispatch(msg Msg, protoDone chan struct{}) (wait bool, err error) type readLoop func(chan<- Msg, chan<- error, <-chan bool) +func (p *Peer) PrivateKey() (prv *ecdsa.PrivateKey, err error) { + if prv = crypto.ToECDSA(p.privateKey); prv == nil { + err = fmt.Errorf("invalid private key") + } + return +} + func (p *Peer) handleCryptoHandshake() (loop readLoop, err error) { // cryptoId is just created for the lifecycle of the handshake // it is survived by an encrypted readwriter @@ -350,17 +361,17 @@ func (p *Peer) handleCryptoHandshake() (loop readLoop, err error) { if p.dialAddr != nil { // this should have its own method Outgoing() bool initiator = true } - // create crypto layer - // this could in principle run only once but maybe we want to allow - // identity switching - var crypto *cryptoId - if crypto, err = newCryptoId(p.ourID); err != nil { - return - } + // run on peer // this bit handles the handshake and creates a secure communications channel with // var rw *secretRW - if sessionToken, _, err = crypto.Run(p.conn, p.Pubkey(), sessionToken, initiator); err != nil { + var prvKey *ecdsa.PrivateKey + if prvKey, err = p.PrivateKey(); err != nil { + err = fmt.Errorf("unable to access private key for client: %v", err) + return + } + // initialise a new secure session + if sessionToken, _, err = NewSecureSession(p.conn, prvKey, p.Pubkey(), sessionToken, initiator); err != nil { p.Debugf("unable to setup secure session: %v", err) return } -- cgit v1.2.3 From 5bdc1159433138d92ed6fefb253e3c6ed3a43995 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 5 Feb 2015 03:07:58 +0100 Subject: p2p: integrate p2p/discover Overview of changes: - ClientIdentity has been removed, use discover.NodeID - Server now requires a private key to be set (instead of public key) - Server performs the encryption handshake before launching Peer - Dial logic takes peers from discover table - Encryption handshake code has been cleaned up a bit - baseProtocol is gone because we don't exchange peers anymore - Some parts of baseProtocol have moved into Peer instead --- p2p/peer.go | 518 ++++++++++++++++++++++-------------------------------------- 1 file changed, 192 insertions(+), 326 deletions(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index e82bca222..1fa8264a3 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -1,10 +1,6 @@ package p2p import ( - "bufio" - "bytes" - "crypto/ecdsa" - "crypto/rand" "fmt" "io" "io/ioutil" @@ -13,179 +9,118 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/crypto" - - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/rlp" ) -// peerAddr is the structure of a peer list element. -// It is also a valid net.Addr. -type peerAddr struct { - IP net.IP - Port uint64 - Pubkey []byte // optional -} +const ( + // maximum amount of time allowed for reading a message + msgReadTimeout = 5 * time.Second + // maximum amount of time allowed for writing a message + msgWriteTimeout = 5 * time.Second + // messages smaller than this many bytes will be read at + // once before passing them to a protocol. + wholePayloadSize = 64 * 1024 -func newPeerAddr(addr net.Addr, pubkey []byte) *peerAddr { - n := addr.Network() - if n != "tcp" && n != "tcp4" && n != "tcp6" { - // for testing with non-TCP - return &peerAddr{net.ParseIP("127.0.0.1"), 30303, pubkey} - } - ta := addr.(*net.TCPAddr) - return &peerAddr{ta.IP, uint64(ta.Port), pubkey} -} + disconnectGracePeriod = 2 * time.Second +) -func (d peerAddr) Network() string { - if d.IP.To4() != nil { - return "tcp4" - } else { - return "tcp6" - } -} +const ( + baseProtocolVersion = 2 + baseProtocolLength = uint64(16) + baseProtocolMaxMsgSize = 10 * 1024 * 1024 +) -func (d peerAddr) String() string { - return fmt.Sprintf("%v:%d", d.IP, d.Port) -} +const ( + // devp2p message codes + handshakeMsg = 0x00 + discMsg = 0x01 + pingMsg = 0x02 + pongMsg = 0x03 + getPeersMsg = 0x04 + peersMsg = 0x05 +) -func (d *peerAddr) RlpData() interface{} { - return []interface{}{string(d.IP), d.Port, d.Pubkey} +// handshake is the RLP structure of the protocol handshake. +type handshake struct { + Version uint64 + Name string + Caps []Cap + ListenPort uint64 + NodeID discover.NodeID } -// Peer represents a remote peer. +// Peer represents a connected remote node. type Peer struct { // Peers have all the log methods. // Use them to display messages related to the peer. *logger.Logger - infolock sync.Mutex - identity ClientIdentity - caps []Cap - listenAddr *peerAddr // what remote peer is listening on - dialAddr *peerAddr // non-nil if dialing + infoMu sync.Mutex + name string + caps []Cap - // The mutex protects the connection - // so only one protocol can write at a time. - writeMu sync.Mutex - conn net.Conn - bufconn *bufio.ReadWriter + ourID, remoteID *discover.NodeID + ourName string + + rw *frameRW // These fields maintain the running protocols. - protocols []Protocol - runBaseProtocol bool // for testing - cryptoHandshake bool // for testing - cryptoReady chan struct{} - privateKey []byte + protocols []Protocol + runlock sync.RWMutex // protects running + running map[string]*proto - runlock sync.RWMutex // protects running - running map[string]*proto + protocolHandshakeEnabled bool protoWG sync.WaitGroup protoErr chan error closed chan struct{} disc chan DiscReason - - activity event.TypeMux // for activity events - - slot int // index into Server peer list - - // These fields are kept so base protocol can access them. - // TODO: this should be one or more interfaces - ourID ClientIdentity // client id of the Server - ourListenAddr *peerAddr // listen addr of Server, nil if not listening - newPeerAddr chan<- *peerAddr // tell server about received peers - otherPeers func() []*Peer // should return the list of all peers - pubkeyHook func(*peerAddr) error // called at end of handshake to validate pubkey } // NewPeer returns a peer for testing purposes. -func NewPeer(id ClientIdentity, caps []Cap) *Peer { +func NewPeer(id discover.NodeID, name string, caps []Cap) *Peer { conn, _ := net.Pipe() - peer := newPeer(conn, nil, nil) - peer.setHandshakeInfo(id, nil, caps) - close(peer.closed) + peer := newPeer(conn, nil, "", nil, &id) + peer.setHandshakeInfo(name, caps) + close(peer.closed) // ensures Disconnect doesn't block return peer } -func newServerPeer(server *Server, conn net.Conn, dialAddr *peerAddr) *Peer { - p := newPeer(conn, server.Protocols, dialAddr) - p.ourID = server.Identity - p.newPeerAddr = server.peerConnect - p.otherPeers = server.Peers - p.pubkeyHook = server.verifyPeer - p.runBaseProtocol = true - - // laddr can be updated concurrently by NAT traversal. - // newServerPeer must be called with the server lock held. - if server.laddr != nil { - p.ourListenAddr = newPeerAddr(server.laddr, server.Identity.Pubkey()) - } - return p -} - -func newPeer(conn net.Conn, protocols []Protocol, dialAddr *peerAddr) *Peer { - p := &Peer{ - Logger: logger.NewLogger("P2P " + conn.RemoteAddr().String()), - conn: conn, - dialAddr: dialAddr, - bufconn: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)), - protocols: protocols, - running: make(map[string]*proto), - disc: make(chan DiscReason), - protoErr: make(chan error), - closed: make(chan struct{}), - cryptoReady: make(chan struct{}), - } - return p +// ID returns the node's public key. +func (p *Peer) ID() discover.NodeID { + return *p.remoteID } -// Identity returns the client identity of the remote peer. The -// identity can be nil if the peer has not yet completed the -// handshake. -func (p *Peer) Identity() ClientIdentity { - p.infolock.Lock() - defer p.infolock.Unlock() - return p.identity -} - -func (self *Peer) Pubkey() (pubkey []byte) { - self.infolock.Lock() - defer self.infolock.Unlock() - switch { - case self.identity != nil: - pubkey = self.identity.Pubkey()[1:] - case self.dialAddr != nil: - pubkey = self.dialAddr.Pubkey - case self.listenAddr != nil: - pubkey = self.listenAddr.Pubkey - } - return +// Name returns the node name that the remote node advertised. +func (p *Peer) Name() string { + // this needs a lock because the information is part of the + // protocol handshake. + p.infoMu.Lock() + name := p.name + p.infoMu.Unlock() + return name } // Caps returns the capabilities (supported subprotocols) of the remote peer. func (p *Peer) Caps() []Cap { - p.infolock.Lock() - defer p.infolock.Unlock() - return p.caps -} - -func (p *Peer) setHandshakeInfo(id ClientIdentity, laddr *peerAddr, caps []Cap) { - p.infolock.Lock() - p.identity = id - p.listenAddr = laddr - p.caps = caps - p.infolock.Unlock() + // this needs a lock because the information is part of the + // protocol handshake. + p.infoMu.Lock() + caps := p.caps + p.infoMu.Unlock() + return caps } // RemoteAddr returns the remote address of the network connection. func (p *Peer) RemoteAddr() net.Addr { - return p.conn.RemoteAddr() + return p.rw.RemoteAddr() } // LocalAddr returns the local address of the network connection. func (p *Peer) LocalAddr() net.Addr { - return p.conn.LocalAddr() + return p.rw.LocalAddr() } // Disconnect terminates the peer connection with the given reason. @@ -199,201 +134,167 @@ func (p *Peer) Disconnect(reason DiscReason) { // String implements fmt.Stringer. func (p *Peer) String() string { - kind := "inbound" - p.infolock.Lock() - if p.dialAddr != nil { - kind = "outbound" + return fmt.Sprintf("Peer %.8x %v", p.remoteID, p.RemoteAddr()) +} + +func newPeer(conn net.Conn, protocols []Protocol, ourName string, ourID, remoteID *discover.NodeID) *Peer { + logtag := fmt.Sprintf("Peer %.8x %v", remoteID, conn.RemoteAddr()) + return &Peer{ + Logger: logger.NewLogger(logtag), + rw: newFrameRW(conn, msgWriteTimeout), + ourID: ourID, + ourName: ourName, + remoteID: remoteID, + protocols: protocols, + running: make(map[string]*proto), + disc: make(chan DiscReason), + protoErr: make(chan error), + closed: make(chan struct{}), } - p.infolock.Unlock() - return fmt.Sprintf("Peer(%p %v %s)", p, p.conn.RemoteAddr(), kind) } -const ( - // maximum amount of time allowed for reading a message - msgReadTimeout = 5 * time.Second - // maximum amount of time allowed for writing a message - msgWriteTimeout = 5 * time.Second - // messages smaller than this many bytes will be read at - // once before passing them to a protocol. - wholePayloadSize = 64 * 1024 -) - -var ( - inactivityTimeout = 2 * time.Second - disconnectGracePeriod = 2 * time.Second -) +func (p *Peer) setHandshakeInfo(name string, caps []Cap) { + p.infoMu.Lock() + p.name = name + p.caps = caps + p.infoMu.Unlock() +} -func (p *Peer) loop() (reason DiscReason, err error) { - defer p.activity.Stop() +func (p *Peer) run() DiscReason { + var readErr = make(chan error, 1) defer p.closeProtocols() defer close(p.closed) - defer p.conn.Close() + defer p.rw.Close() + + // start the read loop + go func() { readErr <- p.readLoop() }() - var readLoop func(chan<- Msg, chan<- error, <-chan bool) - if p.cryptoHandshake { - if readLoop, err = p.handleCryptoHandshake(); err != nil { - // from here on everything can be encrypted, authenticated - return DiscProtocolError, err // no graceful disconnect + if p.protocolHandshakeEnabled { + if err := writeProtocolHandshake(p.rw, p.ourName, *p.ourID, p.protocols); err != nil { + p.DebugDetailf("Protocol handshake error: %v\n", err) + return DiscProtocolError } - } else { - readLoop = p.readLoop } - // read loop - readMsg := make(chan Msg) - readErr := make(chan error) - readNext := make(chan bool, 1) - protoDone := make(chan struct{}, 1) - go readLoop(readMsg, readErr, readNext) - readNext <- true - - close(p.cryptoReady) - if p.runBaseProtocol { - p.startBaseProtocol() + // wait for an error or disconnect + var reason DiscReason + select { + case err := <-readErr: + // We rely on protocols to abort if there is a write error. It + // might be more robust to handle them here as well. + p.DebugDetailf("Read error: %v\n", err) + reason = DiscNetworkError + case err := <-p.protoErr: + reason = discReasonForError(err) + case reason = <-p.disc: } - -loop: - for { - select { - case msg := <-readMsg: - // a new message has arrived. - var wait bool - if wait, err = p.dispatch(msg, protoDone); err != nil { - p.Errorf("msg dispatch error: %v\n", err) - reason = discReasonForError(err) - break loop - } - if !wait { - // Msg has already been read completely, continue with next message. - readNext <- true - } - p.activity.Post(time.Now()) - case <-protoDone: - // protocol has consumed the message payload, - // we can continue reading from the socket. - readNext <- true - - case err := <-readErr: - // read failed. there is no need to run the - // polite disconnect sequence because the connection - // is probably dead anyway. - // TODO: handle write errors as well - return DiscNetworkError, err - case err = <-p.protoErr: - reason = discReasonForError(err) - break loop - case reason = <-p.disc: - break loop - } + if reason != DiscNetworkError { + p.politeDisconnect(reason) } + p.Debugf("Disconnected: %v\n", reason) + return reason +} - // wait for read loop to return. - close(readNext) - <-readErr - // tell the remote end to disconnect +func (p *Peer) politeDisconnect(reason DiscReason) { done := make(chan struct{}) go func() { - p.conn.SetDeadline(time.Now().Add(disconnectGracePeriod)) - p.writeMsg(NewMsg(discMsg, reason), disconnectGracePeriod) - io.Copy(ioutil.Discard, p.conn) + // send reason + EncodeMsg(p.rw, discMsg, uint(reason)) + // discard any data that might arrive + io.Copy(ioutil.Discard, p.rw) close(done) }() select { case <-done: case <-time.After(disconnectGracePeriod): } - return reason, err } -func (p *Peer) readLoop(msgc chan<- Msg, errc chan<- error, unblock <-chan bool) { - for _ = range unblock { - p.conn.SetReadDeadline(time.Now().Add(msgReadTimeout)) - if msg, err := readMsg(p.bufconn); err != nil { - errc <- err - } else { - msgc <- msg +func (p *Peer) readLoop() error { + if p.protocolHandshakeEnabled { + if err := readProtocolHandshake(p, p.rw); err != nil { + return err } } - close(errc) + for { + msg, err := p.rw.ReadMsg() + if err != nil { + return err + } + if err = p.handle(msg); err != nil { + return err + } + } + return nil } -func (p *Peer) dispatch(msg Msg, protoDone chan struct{}) (wait bool, err error) { - proto, err := p.getProto(msg.Code) - if err != nil { - return false, err - } - if msg.Size <= wholePayloadSize { - // optimization: msg is small enough, read all - // of it and move on to the next message - buf, err := ioutil.ReadAll(msg.Payload) +func (p *Peer) handle(msg Msg) error { + switch { + case msg.Code == pingMsg: + msg.Discard() + go EncodeMsg(p.rw, pongMsg) + case msg.Code == discMsg: + var reason DiscReason + // no need to discard or for error checking, we'll close the + // connection after this. + rlp.Decode(msg.Payload, &reason) + p.Disconnect(DiscRequested) + return discRequestedError(reason) + case msg.Code < baseProtocolLength: + // ignore other base protocol messages + return msg.Discard() + default: + // it's a subprotocol message + proto, err := p.getProto(msg.Code) if err != nil { - return false, err + return fmt.Errorf("msg code out of range: %v", msg.Code) } - msg.Payload = bytes.NewReader(buf) - proto.in <- msg - } else { - wait = true - pr := &eofSignal{msg.Payload, int64(msg.Size), protoDone} - msg.Payload = pr proto.in <- msg } - return wait, nil + return nil } -type readLoop func(chan<- Msg, chan<- error, <-chan bool) - -func (p *Peer) PrivateKey() (prv *ecdsa.PrivateKey, err error) { - if prv = crypto.ToECDSA(p.privateKey); prv == nil { - err = fmt.Errorf("invalid private key") +func readProtocolHandshake(p *Peer, rw MsgReadWriter) error { + // read and handle remote handshake + msg, err := rw.ReadMsg() + if err != nil { + return err } - return -} - -func (p *Peer) handleCryptoHandshake() (loop readLoop, err error) { - // cryptoId is just created for the lifecycle of the handshake - // it is survived by an encrypted readwriter - var initiator bool - var sessionToken []byte - sessionToken = make([]byte, shaLen) - if _, err = rand.Read(sessionToken); err != nil { - return + if msg.Code != handshakeMsg { + return newPeerError(errProtocolBreach, "expected handshake, got %x", msg.Code) } - if p.dialAddr != nil { // this should have its own method Outgoing() bool - initiator = true + if msg.Size > baseProtocolMaxMsgSize { + return newPeerError(errMisc, "message too big") } - - // run on peer - // this bit handles the handshake and creates a secure communications channel with - // var rw *secretRW - var prvKey *ecdsa.PrivateKey - if prvKey, err = p.PrivateKey(); err != nil { - err = fmt.Errorf("unable to access private key for client: %v", err) - return + var hs handshake + if err := msg.Decode(&hs); err != nil { + return err } - // initialise a new secure session - if sessionToken, _, err = NewSecureSession(p.conn, prvKey, p.Pubkey(), sessionToken, initiator); err != nil { - p.Debugf("unable to setup secure session: %v", err) - return + // validate handshake info + if hs.Version != baseProtocolVersion { + return newPeerError(errP2PVersionMismatch, "required version %d, received %d\n", + baseProtocolVersion, hs.Version) } - loop = func(msg chan<- Msg, err chan<- error, next <-chan bool) { - // this is the readloop :) + if hs.NodeID == *p.remoteID { + return newPeerError(errPubkeyForbidden, "node ID mismatch") } - return + // TODO: remove Caps with empty name + p.setHandshakeInfo(hs.Name, hs.Caps) + p.startSubprotocols(hs.Caps) + return nil } -func (p *Peer) startBaseProtocol() { - p.runlock.Lock() - defer p.runlock.Unlock() - p.running[""] = p.startProto(0, Protocol{ - Length: baseProtocolLength, - Run: runBaseProtocol, - }) +func writeProtocolHandshake(w MsgWriter, name string, id discover.NodeID, ps []Protocol) error { + var caps []interface{} + for _, proto := range ps { + caps = append(caps, proto.cap()) + } + return EncodeMsg(w, handshakeMsg, baseProtocolVersion, name, caps, 0, id) } // startProtocols starts matching named subprotocols. func (p *Peer) startSubprotocols(caps []Cap) { sort.Sort(capsByName(caps)) - p.runlock.Lock() defer p.runlock.Unlock() offset := baseProtocolLength @@ -412,20 +313,22 @@ outer: } func (p *Peer) startProto(offset uint64, impl Protocol) *proto { + p.DebugDetailf("Starting protocol %s/%d\n", impl.Name, impl.Version) rw := &proto{ + name: impl.Name, in: make(chan Msg), offset: offset, maxcode: impl.Length, - peer: p, + w: p.rw, } p.protoWG.Add(1) go func() { err := impl.Run(p, rw) if err == nil { - p.Infof("protocol %q returned", impl.Name) + p.DebugDetailf("Protocol %s/%d returned\n", impl.Name, impl.Version) err = newPeerError(errMisc, "protocol returned") } else { - p.Errorf("protocol %q error: %v\n", impl.Name, err) + p.DebugDetailf("Protocol %s/%d error: %v\n", impl.Name, impl.Version, err) } select { case p.protoErr <- err: @@ -459,6 +362,7 @@ func (p *Peer) closeProtocols() { } // writeProtoMsg sends the given message on behalf of the given named protocol. +// this exists because of Server.Broadcast. func (p *Peer) writeProtoMsg(protoName string, msg Msg) error { p.runlock.RLock() proto, ok := p.running[protoName] @@ -470,25 +374,14 @@ func (p *Peer) writeProtoMsg(protoName string, msg Msg) error { return newPeerError(errInvalidMsgCode, "code %x is out of range for protocol %q", msg.Code, protoName) } msg.Code += proto.offset - return p.writeMsg(msg, msgWriteTimeout) -} - -// writeMsg writes a message to the connection. -func (p *Peer) writeMsg(msg Msg, timeout time.Duration) error { - p.writeMu.Lock() - defer p.writeMu.Unlock() - p.conn.SetWriteDeadline(time.Now().Add(timeout)) - if err := writeMsg(p.bufconn, msg); err != nil { - return newPeerError(errWrite, "%v", err) - } - return p.bufconn.Flush() + return p.rw.WriteMsg(msg) } type proto struct { name string in chan Msg maxcode, offset uint64 - peer *Peer + w MsgWriter } func (rw *proto) WriteMsg(msg Msg) error { @@ -496,11 +389,7 @@ func (rw *proto) WriteMsg(msg Msg) error { return newPeerError(errInvalidMsgCode, "not handled") } msg.Code += rw.offset - return rw.peer.writeMsg(msg, msgWriteTimeout) -} - -func (rw *proto) EncodeMsg(code uint64, data ...interface{}) error { - return rw.WriteMsg(NewMsg(code, data...)) + return rw.w.WriteMsg(msg) } func (rw *proto) ReadMsg() (Msg, error) { @@ -511,26 +400,3 @@ func (rw *proto) ReadMsg() (Msg, error) { msg.Code -= rw.offset return msg, nil } - -// eofSignal wraps a reader with eof signaling. the eof channel is -// closed when the wrapped reader returns an error or when count bytes -// have been read. -// -type eofSignal struct { - wrapped io.Reader - count int64 - eof chan<- struct{} -} - -// note: when using eofSignal to detect whether a message payload -// has been read, Read might not be called for zero sized messages. - -func (r *eofSignal) Read(buf []byte) (int, error) { - n, err := r.wrapped.Read(buf) - r.count -= int64(n) - if (err != nil || r.count <= 0) && r.eof != nil { - r.eof <- struct{}{} // tell Peer that msg has been consumed - r.eof = nil - } - return n, err -} -- cgit v1.2.3 From e34d1341022a51d8a86c4836c91e4e0ded888d27 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Sat, 7 Feb 2015 00:13:22 +0100 Subject: p2p: fixes for actual connections The unit test hooks were turned on 'in production'. --- p2p/peer.go | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index 1fa8264a3..b61cf96da 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -1,6 +1,7 @@ package p2p import ( + "errors" "fmt" "io" "io/ioutil" @@ -71,7 +72,8 @@ type Peer struct { runlock sync.RWMutex // protects running running map[string]*proto - protocolHandshakeEnabled bool + // disables protocol handshake, for testing + noHandshake bool protoWG sync.WaitGroup protoErr chan error @@ -134,11 +136,11 @@ func (p *Peer) Disconnect(reason DiscReason) { // String implements fmt.Stringer. func (p *Peer) String() string { - return fmt.Sprintf("Peer %.8x %v", p.remoteID, p.RemoteAddr()) + return fmt.Sprintf("Peer %.8x %v", p.remoteID[:], p.RemoteAddr()) } func newPeer(conn net.Conn, protocols []Protocol, ourName string, ourID, remoteID *discover.NodeID) *Peer { - logtag := fmt.Sprintf("Peer %.8x %v", remoteID, conn.RemoteAddr()) + logtag := fmt.Sprintf("Peer %.8x %v", remoteID[:], conn.RemoteAddr()) return &Peer{ Logger: logger.NewLogger(logtag), rw: newFrameRW(conn, msgWriteTimeout), @@ -164,33 +166,35 @@ func (p *Peer) run() DiscReason { var readErr = make(chan error, 1) defer p.closeProtocols() defer close(p.closed) - defer p.rw.Close() - // start the read loop go func() { readErr <- p.readLoop() }() - if p.protocolHandshakeEnabled { + if !p.noHandshake { if err := writeProtocolHandshake(p.rw, p.ourName, *p.ourID, p.protocols); err != nil { p.DebugDetailf("Protocol handshake error: %v\n", err) + p.rw.Close() return DiscProtocolError } } - // wait for an error or disconnect + // Wait for an error or disconnect. var reason DiscReason select { case err := <-readErr: // We rely on protocols to abort if there is a write error. It // might be more robust to handle them here as well. p.DebugDetailf("Read error: %v\n", err) - reason = DiscNetworkError + p.rw.Close() + return DiscNetworkError + case err := <-p.protoErr: reason = discReasonForError(err) case reason = <-p.disc: } - if reason != DiscNetworkError { - p.politeDisconnect(reason) - } + p.politeDisconnect(reason) + + // Wait for readLoop. It will end because conn is now closed. + <-readErr p.Debugf("Disconnected: %v\n", reason) return reason } @@ -198,9 +202,9 @@ func (p *Peer) run() DiscReason { func (p *Peer) politeDisconnect(reason DiscReason) { done := make(chan struct{}) go func() { - // send reason EncodeMsg(p.rw, discMsg, uint(reason)) - // discard any data that might arrive + // Wait for the other side to close the connection. + // Discard any data that they send until then. io.Copy(ioutil.Discard, p.rw) close(done) }() @@ -208,10 +212,11 @@ func (p *Peer) politeDisconnect(reason DiscReason) { case <-done: case <-time.After(disconnectGracePeriod): } + p.rw.Close() } func (p *Peer) readLoop() error { - if p.protocolHandshakeEnabled { + if !p.noHandshake { if err := readProtocolHandshake(p, p.rw); err != nil { return err } @@ -264,7 +269,7 @@ func readProtocolHandshake(p *Peer, rw MsgReadWriter) error { return newPeerError(errProtocolBreach, "expected handshake, got %x", msg.Code) } if msg.Size > baseProtocolMaxMsgSize { - return newPeerError(errMisc, "message too big") + return newPeerError(errInvalidMsg, "message too big") } var hs handshake if err := msg.Decode(&hs); err != nil { @@ -326,7 +331,7 @@ func (p *Peer) startProto(offset uint64, impl Protocol) *proto { err := impl.Run(p, rw) if err == nil { p.DebugDetailf("Protocol %s/%d returned\n", impl.Name, impl.Version) - err = newPeerError(errMisc, "protocol returned") + err = errors.New("protocol returned") } else { p.DebugDetailf("Protocol %s/%d error: %v\n", impl.Name, impl.Version, err) } -- cgit v1.2.3 From 5110f80bba13e3758ae1836a88afee123df81e3e Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 13 Feb 2015 14:44:00 +0100 Subject: p2p: improve read deadlines There are now two deadlines, frameReadTimeout and payloadReadTimeout. The frame timeout is longer and allows for connections that are idle. The message timeout is still short and ensures that we don't get stuck in the middle of a message. --- p2p/peer.go | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index b61cf96da..f779c1c02 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -15,22 +15,12 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) -const ( - // maximum amount of time allowed for reading a message - msgReadTimeout = 5 * time.Second - // maximum amount of time allowed for writing a message - msgWriteTimeout = 5 * time.Second - // messages smaller than this many bytes will be read at - // once before passing them to a protocol. - wholePayloadSize = 64 * 1024 - - disconnectGracePeriod = 2 * time.Second -) - const ( baseProtocolVersion = 2 baseProtocolLength = uint64(16) baseProtocolMaxMsgSize = 10 * 1024 * 1024 + + disconnectGracePeriod = 2 * time.Second ) const ( -- cgit v1.2.3 From fd3e1061e01690c7046a0d80635284c1592b7699 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 13 Feb 2015 15:06:06 +0100 Subject: p2p: handle disconnect before protocol handshake --- p2p/peer.go | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index f779c1c02..6aa78045b 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -255,6 +255,13 @@ func readProtocolHandshake(p *Peer, rw MsgReadWriter) error { if err != nil { return err } + if msg.Code == discMsg { + // disconnect before protocol handshake is valid according to the + // spec and we send it ourself if Server.addPeer fails. + var reason DiscReason + rlp.Decode(msg.Payload, &reason) + return discRequestedError(reason) + } if msg.Code != handshakeMsg { return newPeerError(errProtocolBreach, "expected handshake, got %x", msg.Code) } -- cgit v1.2.3 From 32a9c0ca809508c1648b8f44f3e09725af7a80d3 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 13 Feb 2015 15:08:40 +0100 Subject: p2p: bump devp2p protcol version to 3 For compatibility with cpp-ethereum --- p2p/peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index 6aa78045b..fd5bec7d5 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -16,7 +16,7 @@ import ( ) const ( - baseProtocolVersion = 2 + baseProtocolVersion = 3 baseProtocolLength = uint64(16) baseProtocolMaxMsgSize = 10 * 1024 * 1024 -- cgit v1.2.3 From 73f94f37559ca0c8739c7dddeaf46d36827fdf30 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 19 Feb 2015 01:52:03 +0100 Subject: p2p: disable encryption handshake The diff is a bit bigger than expected because the protocol handshake logic has moved out of Peer. This is necessary because the protocol handshake will have custom framing in the final protocol. --- p2p/peer.go | 229 ++++++++++++++++-------------------------------------------- 1 file changed, 59 insertions(+), 170 deletions(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index fd5bec7d5..b9bf0fd73 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -33,37 +33,14 @@ const ( peersMsg = 0x05 ) -// handshake is the RLP structure of the protocol handshake. -type handshake struct { - Version uint64 - Name string - Caps []Cap - ListenPort uint64 - NodeID discover.NodeID -} - // Peer represents a connected remote node. type Peer struct { // Peers have all the log methods. // Use them to display messages related to the peer. *logger.Logger - infoMu sync.Mutex - name string - caps []Cap - - ourID, remoteID *discover.NodeID - ourName string - - rw *frameRW - - // These fields maintain the running protocols. - protocols []Protocol - runlock sync.RWMutex // protects running - running map[string]*proto - - // disables protocol handshake, for testing - noHandshake bool + rw *conn + running map[string]*protoRW protoWG sync.WaitGroup protoErr chan error @@ -73,36 +50,27 @@ type Peer struct { // NewPeer returns a peer for testing purposes. func NewPeer(id discover.NodeID, name string, caps []Cap) *Peer { - conn, _ := net.Pipe() - peer := newPeer(conn, nil, "", nil, &id) - peer.setHandshakeInfo(name, caps) + pipe, _ := net.Pipe() + conn := newConn(pipe, &protoHandshake{ID: id, Name: name, Caps: caps}) + peer := newPeer(conn, nil) close(peer.closed) // ensures Disconnect doesn't block return peer } // ID returns the node's public key. func (p *Peer) ID() discover.NodeID { - return *p.remoteID + return p.rw.ID } // Name returns the node name that the remote node advertised. func (p *Peer) Name() string { - // this needs a lock because the information is part of the - // protocol handshake. - p.infoMu.Lock() - name := p.name - p.infoMu.Unlock() - return name + return p.rw.Name } // Caps returns the capabilities (supported subprotocols) of the remote peer. func (p *Peer) Caps() []Cap { - // this needs a lock because the information is part of the - // protocol handshake. - p.infoMu.Lock() - caps := p.caps - p.infoMu.Unlock() - return caps + // TODO: maybe return copy + return p.rw.Caps } // RemoteAddr returns the remote address of the network connection. @@ -126,30 +94,20 @@ func (p *Peer) Disconnect(reason DiscReason) { // String implements fmt.Stringer. func (p *Peer) String() string { - return fmt.Sprintf("Peer %.8x %v", p.remoteID[:], p.RemoteAddr()) + return fmt.Sprintf("Peer %.8x %v", p.rw.ID[:], p.RemoteAddr()) } -func newPeer(conn net.Conn, protocols []Protocol, ourName string, ourID, remoteID *discover.NodeID) *Peer { - logtag := fmt.Sprintf("Peer %.8x %v", remoteID[:], conn.RemoteAddr()) - return &Peer{ - Logger: logger.NewLogger(logtag), - rw: newFrameRW(conn, msgWriteTimeout), - ourID: ourID, - ourName: ourName, - remoteID: remoteID, - protocols: protocols, - running: make(map[string]*proto), - disc: make(chan DiscReason), - protoErr: make(chan error), - closed: make(chan struct{}), +func newPeer(conn *conn, protocols []Protocol) *Peer { + logtag := fmt.Sprintf("Peer %.8x %v", conn.ID[:], conn.RemoteAddr()) + p := &Peer{ + Logger: logger.NewLogger(logtag), + rw: conn, + running: matchProtocols(protocols, conn.Caps, conn), + disc: make(chan DiscReason), + protoErr: make(chan error), + closed: make(chan struct{}), } -} - -func (p *Peer) setHandshakeInfo(name string, caps []Cap) { - p.infoMu.Lock() - p.name = name - p.caps = caps - p.infoMu.Unlock() + return p } func (p *Peer) run() DiscReason { @@ -157,16 +115,9 @@ func (p *Peer) run() DiscReason { defer p.closeProtocols() defer close(p.closed) + p.startProtocols() go func() { readErr <- p.readLoop() }() - if !p.noHandshake { - if err := writeProtocolHandshake(p.rw, p.ourName, *p.ourID, p.protocols); err != nil { - p.DebugDetailf("Protocol handshake error: %v\n", err) - p.rw.Close() - return DiscProtocolError - } - } - // Wait for an error or disconnect. var reason DiscReason select { @@ -206,11 +157,6 @@ func (p *Peer) politeDisconnect(reason DiscReason) { } func (p *Peer) readLoop() error { - if !p.noHandshake { - if err := readProtocolHandshake(p, p.rw); err != nil { - return err - } - } for { msg, err := p.rw.ReadMsg() if err != nil { @@ -249,105 +195,51 @@ func (p *Peer) handle(msg Msg) error { return nil } -func readProtocolHandshake(p *Peer, rw MsgReadWriter) error { - // read and handle remote handshake - msg, err := rw.ReadMsg() - if err != nil { - return err - } - if msg.Code == discMsg { - // disconnect before protocol handshake is valid according to the - // spec and we send it ourself if Server.addPeer fails. - var reason DiscReason - rlp.Decode(msg.Payload, &reason) - return discRequestedError(reason) - } - if msg.Code != handshakeMsg { - return newPeerError(errProtocolBreach, "expected handshake, got %x", msg.Code) - } - if msg.Size > baseProtocolMaxMsgSize { - return newPeerError(errInvalidMsg, "message too big") - } - var hs handshake - if err := msg.Decode(&hs); err != nil { - return err - } - // validate handshake info - if hs.Version != baseProtocolVersion { - return newPeerError(errP2PVersionMismatch, "required version %d, received %d\n", - baseProtocolVersion, hs.Version) - } - if hs.NodeID == *p.remoteID { - return newPeerError(errPubkeyForbidden, "node ID mismatch") - } - // TODO: remove Caps with empty name - p.setHandshakeInfo(hs.Name, hs.Caps) - p.startSubprotocols(hs.Caps) - return nil -} - -func writeProtocolHandshake(w MsgWriter, name string, id discover.NodeID, ps []Protocol) error { - var caps []interface{} - for _, proto := range ps { - caps = append(caps, proto.cap()) - } - return EncodeMsg(w, handshakeMsg, baseProtocolVersion, name, caps, 0, id) -} - -// startProtocols starts matching named subprotocols. -func (p *Peer) startSubprotocols(caps []Cap) { +// matchProtocols creates structures for matching named subprotocols. +func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW { sort.Sort(capsByName(caps)) - p.runlock.Lock() - defer p.runlock.Unlock() offset := baseProtocolLength + result := make(map[string]*protoRW) outer: for _, cap := range caps { - for _, proto := range p.protocols { - if proto.Name == cap.Name && - proto.Version == cap.Version && - p.running[cap.Name] == nil { - p.running[cap.Name] = p.startProto(offset, proto) + for _, proto := range protocols { + if proto.Name == cap.Name && proto.Version == cap.Version && result[cap.Name] == nil { + result[cap.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw} offset += proto.Length continue outer } } } + return result } -func (p *Peer) startProto(offset uint64, impl Protocol) *proto { - p.DebugDetailf("Starting protocol %s/%d\n", impl.Name, impl.Version) - rw := &proto{ - name: impl.Name, - in: make(chan Msg), - offset: offset, - maxcode: impl.Length, - w: p.rw, +func (p *Peer) startProtocols() { + for _, proto := range p.running { + proto := proto + p.DebugDetailf("Starting protocol %s/%d\n", proto.Name, proto.Version) + p.protoWG.Add(1) + go func() { + err := proto.Run(p, proto) + if err == nil { + p.DebugDetailf("Protocol %s/%d returned\n", proto.Name, proto.Version) + err = errors.New("protocol returned") + } else { + p.DebugDetailf("Protocol %s/%d error: %v\n", proto.Name, proto.Version, err) + } + select { + case p.protoErr <- err: + case <-p.closed: + } + p.protoWG.Done() + }() } - p.protoWG.Add(1) - go func() { - err := impl.Run(p, rw) - if err == nil { - p.DebugDetailf("Protocol %s/%d returned\n", impl.Name, impl.Version) - err = errors.New("protocol returned") - } else { - p.DebugDetailf("Protocol %s/%d error: %v\n", impl.Name, impl.Version, err) - } - select { - case p.protoErr <- err: - case <-p.closed: - } - p.protoWG.Done() - }() - return rw } // getProto finds the protocol responsible for handling // the given message code. -func (p *Peer) getProto(code uint64) (*proto, error) { - p.runlock.RLock() - defer p.runlock.RUnlock() +func (p *Peer) getProto(code uint64) (*protoRW, error) { for _, proto := range p.running { - if code >= proto.offset && code < proto.offset+proto.maxcode { + if code >= proto.offset && code < proto.offset+proto.Length { return proto, nil } } @@ -355,46 +247,43 @@ func (p *Peer) getProto(code uint64) (*proto, error) { } func (p *Peer) closeProtocols() { - p.runlock.RLock() for _, p := range p.running { close(p.in) } - p.runlock.RUnlock() p.protoWG.Wait() } // writeProtoMsg sends the given message on behalf of the given named protocol. // this exists because of Server.Broadcast. func (p *Peer) writeProtoMsg(protoName string, msg Msg) error { - p.runlock.RLock() proto, ok := p.running[protoName] - p.runlock.RUnlock() if !ok { return fmt.Errorf("protocol %s not handled by peer", protoName) } - if msg.Code >= proto.maxcode { + if msg.Code >= proto.Length { return newPeerError(errInvalidMsgCode, "code %x is out of range for protocol %q", msg.Code, protoName) } msg.Code += proto.offset return p.rw.WriteMsg(msg) } -type proto struct { - name string - in chan Msg - maxcode, offset uint64 - w MsgWriter +type protoRW struct { + Protocol + + in chan Msg + offset uint64 + w MsgWriter } -func (rw *proto) WriteMsg(msg Msg) error { - if msg.Code >= rw.maxcode { +func (rw *protoRW) WriteMsg(msg Msg) error { + if msg.Code >= rw.Length { return newPeerError(errInvalidMsgCode, "not handled") } msg.Code += rw.offset return rw.w.WriteMsg(msg) } -func (rw *proto) ReadMsg() (Msg, error) { +func (rw *protoRW) ReadMsg() (Msg, error) { msg, ok := <-rw.in if !ok { return msg, io.EOF -- cgit v1.2.3 From 3dbd32093cd1060c339b3351fbb676b8c7cc31f0 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 19 Feb 2015 16:53:52 +0100 Subject: p2p: enable devp2p ping This should prevent connection drops. --- p2p/peer.go | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) (limited to 'p2p/peer.go') diff --git a/p2p/peer.go b/p2p/peer.go index b9bf0fd73..fb027c834 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -21,6 +21,7 @@ const ( baseProtocolMaxMsgSize = 10 * 1024 * 1024 disconnectGracePeriod = 2 * time.Second + pingInterval = 15 * time.Second ) const ( @@ -118,19 +119,33 @@ func (p *Peer) run() DiscReason { p.startProtocols() go func() { readErr <- p.readLoop() }() + ping := time.NewTicker(pingInterval) + defer ping.Stop() + // Wait for an error or disconnect. var reason DiscReason - select { - case err := <-readErr: - // We rely on protocols to abort if there is a write error. It - // might be more robust to handle them here as well. - p.DebugDetailf("Read error: %v\n", err) - p.rw.Close() - return DiscNetworkError - - case err := <-p.protoErr: - reason = discReasonForError(err) - case reason = <-p.disc: +loop: + for { + select { + case <-ping.C: + go func() { + if err := EncodeMsg(p.rw, pingMsg, nil); err != nil { + p.protoErr <- err + return + } + }() + case err := <-readErr: + // We rely on protocols to abort if there is a write error. It + // might be more robust to handle them here as well. + p.DebugDetailf("Read error: %v\n", err) + p.rw.Close() + return DiscNetworkError + case err := <-p.protoErr: + reason = discReasonForError(err) + break loop + case reason = <-p.disc: + break loop + } } p.politeDisconnect(reason) -- cgit v1.2.3