From 4811f460e7aad37c6c6867df0461a5fa162b5f2c Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 19 Mar 2015 15:08:04 +0100 Subject: p2p: export ExpectMsg (for eth protocol testing) --- p2p/message.go | 32 ++++++++++++++++++++++++++++++++ p2p/peer_test.go | 32 -------------------------------- 2 files changed, 32 insertions(+), 32 deletions(-) (limited to 'p2p') diff --git a/p2p/message.go b/p2p/message.go index 14e4404c9..e7e6af287 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -208,3 +208,35 @@ func (p *MsgPipeRW) Close() error { close(p.closing) return nil } + +// ExpectMsg reads a message from r and verifies that its +// code and encoded RLP content match the provided values. +// If content is nil, the payload is discarded and not verified. +func ExpectMsg(r MsgReader, code uint64, content interface{}) error { + msg, err := r.ReadMsg() + if err != nil { + return err + } + if msg.Code != code { + return fmt.Errorf("message code mismatch: got %d, expected %d", msg.Code, code) + } + if content == nil { + return msg.Discard() + } else { + contentEnc, err := rlp.EncodeToBytes(content) + if err != nil { + panic("content encode error: " + err.Error()) + } + if int(msg.Size) != len(contentEnc) { + return fmt.Errorf("message size mismatch: got %d, want %d", msg.Size, len(contentEnc)) + } + actualContent, err := ioutil.ReadAll(msg.Payload) + if err != nil { + return err + } + if !bytes.Equal(actualContent, contentEnc) { + return fmt.Errorf("message payload mismatch:\ngot: %x\nwant: %x", actualContent, contentEnc) + } + } + return nil +} diff --git a/p2p/peer_test.go b/p2p/peer_test.go index cc9f1f0cd..626dfd00f 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -192,35 +192,3 @@ func TestNewPeer(t *testing.T) { p.Disconnect(DiscAlreadyConnected) // Should not hang } - -// expectMsg reads a message from r and verifies that its -// code and encoded RLP content match the provided values. -// If content is nil, the payload is discarded and not verified. -func expectMsg(r MsgReader, code uint64, content interface{}) error { - msg, err := r.ReadMsg() - if err != nil { - return err - } - if msg.Code != code { - return fmt.Errorf("message code mismatch: got %d, expected %d", msg.Code, code) - } - if content == nil { - return msg.Discard() - } else { - contentEnc, err := rlp.EncodeToBytes(content) - if err != nil { - panic("content encode error: " + err.Error()) - } - if int(msg.Size) != len(contentEnc) { - return fmt.Errorf("message size mismatch: got %d, want %d", msg.Size, len(contentEnc)) - } - actualContent, err := ioutil.ReadAll(msg.Payload) - if err != nil { - return err - } - if !bytes.Equal(actualContent, contentEnc) { - return fmt.Errorf("message payload mismatch:\ngot: %x\nwant: %x", actualContent, contentEnc) - } - } - return nil -} -- cgit v1.2.3 From 5ba51594c7eb1f04b3636e6413de6d4eb70228d2 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 19 Mar 2015 15:11:02 +0100 Subject: p2p: use package rlp to encode messages Message encoding functions have been renamed to catch any uses. The switch to the new encoder can cause subtle incompatibilities. If there are any users outside of our tree, they will at least be alerted that there was a change. NewMsg no longer exists. The replacements for EncodeMsg are called Send and SendItems. --- p2p/handshake.go | 8 ++------ p2p/message.go | 36 +++++++++++++++++++++++------------- p2p/message_test.go | 24 ++++-------------------- p2p/peer.go | 6 +++--- p2p/peer_test.go | 41 +++++++++++++++++++++-------------------- p2p/rlpx_test.go | 4 ++-- p2p/server.go | 11 ++++++++--- p2p/server_test.go | 2 +- 8 files changed, 64 insertions(+), 68 deletions(-) (limited to 'p2p') diff --git a/p2p/handshake.go b/p2p/handshake.go index 7fc497517..031064407 100644 --- a/p2p/handshake.go +++ b/p2p/handshake.go @@ -92,7 +92,7 @@ func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake) ( return nil, errors.New("node ID in protocol handshake does not match encryption handshake") } // TODO: validate that handshake node ID matches - if err := writeProtocolHandshake(rw, our); err != nil { + if err := Send(rw, handshakeMsg, our); err != nil { return nil, fmt.Errorf("protocol write error: %v", err) } return &conn{rw, rhs}, nil @@ -106,7 +106,7 @@ func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, // Run the protocol handshake using authenticated messages. rw := newRlpxFrameRW(fd, secrets) - if err := writeProtocolHandshake(rw, our); err != nil { + if err := Send(rw, handshakeMsg, our); err != nil { return nil, fmt.Errorf("protocol write error: %v", err) } rhs, err := readProtocolHandshake(rw, our) @@ -398,10 +398,6 @@ func xor(one, other []byte) (xor []byte) { return xor } -func writeProtocolHandshake(w MsgWriter, our *protoHandshake) error { - return EncodeMsg(w, handshakeMsg, our.Version, our.Name, our.Caps, our.ListenPort, our.ID[:]) -} - func readProtocolHandshake(r MsgReader, our *protoHandshake) (*protoHandshake, error) { // read and handle remote handshake msg, err := r.ReadMsg() diff --git a/p2p/message.go b/p2p/message.go index e7e6af287..5dc7d5460 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -11,7 +11,6 @@ import ( "sync/atomic" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" ) @@ -28,13 +27,7 @@ type Msg struct { Payload io.Reader } -// NewMsg creates an RLP-encoded message with the given code. -func NewMsg(code uint64, params ...interface{}) Msg { - p := bytes.NewReader(common.Encode(params)) - return Msg{Code: code, Size: uint32(p.Len()), Payload: p} -} - -// Decode parse the RLP content of a message into +// Decode parses the RLP content of a message into // the given value, which must be a pointer. // // For the decoding rules, please see package rlp. @@ -76,13 +69,30 @@ type MsgReadWriter interface { MsgWriter } -// EncodeMsg writes an RLP-encoded message with the given code and -// data elements. -func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error { - return w.WriteMsg(NewMsg(code, data...)) +// Send writes an RLP-encoded message with the given code. +// data should encode as an RLP list. +func Send(w MsgWriter, msgcode uint64, data interface{}) error { + size, r, err := rlp.EncodeToReader(data) + if err != nil { + return err + } + return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r}) +} + +// SendItems writes an RLP with the given code and data elements. +// For a call such as: +// +// SendItems(w, code, e1, e2, e3) +// +// the message payload will be an RLP list containing the items: +// +// [e1, e2, e3] +// +func SendItems(w MsgWriter, msgcode uint64, elems ...interface{}) error { + return Send(w, msgcode, elems) } -// netWrapper wrapsa MsgReadWriter with locks around +// netWrapper wraps a MsgReadWriter with locks around // ReadMsg/WriteMsg and applies read/write deadlines. type netWrapper struct { rmu, wmu sync.Mutex diff --git a/p2p/message_test.go b/p2p/message_test.go index 31ed61d87..9a93dd347 100644 --- a/p2p/message_test.go +++ b/p2p/message_test.go @@ -5,33 +5,17 @@ import ( "encoding/hex" "fmt" "io" - "io/ioutil" "runtime" "strings" "testing" "time" ) -func TestNewMsg(t *testing.T) { - msg := NewMsg(3, 1, "000") - if msg.Code != 3 { - t.Errorf("incorrect code %d, want %d", msg.Code) - } - expect := unhex("c50183303030") - if msg.Size != uint32(len(expect)) { - t.Errorf("incorrect size %d, want %d", msg.Size, len(expect)) - } - pl, _ := ioutil.ReadAll(msg.Payload) - if !bytes.Equal(pl, expect) { - t.Errorf("incorrect payload content, got %x, want %x", pl, expect) - } -} - func ExampleMsgPipe() { rw1, rw2 := MsgPipe() go func() { - EncodeMsg(rw1, 8, []byte{0, 0}) - EncodeMsg(rw1, 5, []byte{1, 1}) + Send(rw1, 8, [][]byte{{0, 0}}) + Send(rw1, 5, [][]byte{{1, 1}}) rw1.Close() }() @@ -40,7 +24,7 @@ func ExampleMsgPipe() { if err != nil { break } - var data [1][]byte + var data [][]byte msg.Decode(&data) fmt.Printf("msg: %d, %x\n", msg.Code, data[0]) } @@ -55,7 +39,7 @@ loop: rw1, rw2 := MsgPipe() done := make(chan struct{}) go func() { - if err := EncodeMsg(rw1, 1); err == nil { + if err := SendItems(rw1, 1); err == nil { t.Error("EncodeMsg returned nil error") } else if err != ErrPipeClosed { t.Error("EncodeMsg returned wrong error: got %v, want %v", err, ErrPipeClosed) diff --git a/p2p/peer.go b/p2p/peer.go index c2c83abfc..ee5199d78 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -132,7 +132,7 @@ loop: select { case <-ping.C: go func() { - if err := EncodeMsg(p.rw, pingMsg, nil); err != nil { + if err := SendItems(p.rw, pingMsg); err != nil { p.protoErr <- err return } @@ -161,7 +161,7 @@ loop: func (p *Peer) politeDisconnect(reason DiscReason) { done := make(chan struct{}) go func() { - EncodeMsg(p.rw, discMsg, uint(reason)) + SendItems(p.rw, discMsg, uint(reason)) // Wait for the other side to close the connection. // Discard any data that they send until then. io.Copy(ioutil.Discard, p.conn) @@ -192,7 +192,7 @@ func (p *Peer) handle(msg Msg) error { switch { case msg.Code == pingMsg: msg.Discard() - go EncodeMsg(p.rw, pongMsg) + go SendItems(p.rw, pongMsg) case msg.Code == discMsg: var reason [1]DiscReason // no need to discard or for error checking, we'll close the diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 626dfd00f..3c4c71c0c 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -4,13 +4,10 @@ import ( "bytes" "fmt" "io" - "io/ioutil" "net" "reflect" "testing" "time" - - "github.com/ethereum/go-ethereum/rlp" ) var discard = Protocol{ @@ -55,13 +52,13 @@ func TestPeerProtoReadMsg(t *testing.T) { Name: "a", Length: 5, Run: func(peer *Peer, rw MsgReadWriter) error { - if err := expectMsg(rw, 2, []uint{1}); err != nil { + if err := ExpectMsg(rw, 2, []uint{1}); err != nil { t.Error(err) } - if err := expectMsg(rw, 3, []uint{2}); err != nil { + if err := ExpectMsg(rw, 3, []uint{2}); err != nil { t.Error(err) } - if err := expectMsg(rw, 4, []uint{3}); err != nil { + if err := ExpectMsg(rw, 4, []uint{3}); err != nil { t.Error(err) } close(done) @@ -72,9 +69,9 @@ func TestPeerProtoReadMsg(t *testing.T) { closer, rw, _, errc := testPeer([]Protocol{proto}) defer closer.Close() - EncodeMsg(rw, baseProtocolLength+2, 1) - EncodeMsg(rw, baseProtocolLength+3, 2) - EncodeMsg(rw, baseProtocolLength+4, 3) + Send(rw, baseProtocolLength+2, []uint{1}) + Send(rw, baseProtocolLength+3, []uint{2}) + Send(rw, baseProtocolLength+4, []uint{3}) select { case <-done: @@ -92,10 +89,10 @@ func TestPeerProtoEncodeMsg(t *testing.T) { Name: "a", Length: 2, Run: func(peer *Peer, rw MsgReadWriter) error { - if err := EncodeMsg(rw, 2); err == nil { + if err := SendItems(rw, 2); err == nil { t.Error("expected error for out-of-range msg code, got nil") } - if err := EncodeMsg(rw, 1, "foo", "bar"); err != nil { + if err := SendItems(rw, 1, "foo", "bar"); err != nil { t.Errorf("write error: %v", err) } return nil @@ -104,7 +101,7 @@ func TestPeerProtoEncodeMsg(t *testing.T) { closer, rw, _, _ := testPeer([]Protocol{proto}) defer closer.Close() - if err := expectMsg(rw, 17, []string{"foo", "bar"}); err != nil { + if err := ExpectMsg(rw, 17, []string{"foo", "bar"}); err != nil { t.Error(err) } } @@ -115,11 +112,15 @@ func TestPeerWriteForBroadcast(t *testing.T) { closer, rw, peer, peerErr := testPeer([]Protocol{discard}) defer closer.Close() + emptymsg := func(code uint64) Msg { + return Msg{Code: code, Size: 0, Payload: bytes.NewReader(nil)} + } + // test write errors - if err := peer.writeProtoMsg("b", NewMsg(3)); err == nil { + if err := peer.writeProtoMsg("b", emptymsg(3)); err == nil { t.Errorf("expected error for unknown protocol, got nil") } - if err := peer.writeProtoMsg("discard", NewMsg(8)); err == nil { + if err := peer.writeProtoMsg("discard", emptymsg(8)); err == nil { t.Errorf("expected error for out-of-range msg code, got nil") } else if perr, ok := err.(*peerError); !ok || perr.Code != errInvalidMsgCode { t.Errorf("wrong error for out-of-range msg code, got %#v", err) @@ -128,14 +129,14 @@ func TestPeerWriteForBroadcast(t *testing.T) { // setup for reading the message on the other end read := make(chan struct{}) go func() { - if err := expectMsg(rw, 16, nil); err != nil { + if err := ExpectMsg(rw, 16, nil); err != nil { t.Error(err) } close(read) }() // test successful write - if err := peer.writeProtoMsg("discard", NewMsg(0)); err != nil { + if err := peer.writeProtoMsg("discard", emptymsg(0)); err != nil { t.Errorf("expect no error for known protocol: %v", err) } select { @@ -150,10 +151,10 @@ func TestPeerPing(t *testing.T) { closer, rw, _, _ := testPeer(nil) defer closer.Close() - if err := EncodeMsg(rw, pingMsg); err != nil { + if err := SendItems(rw, pingMsg); err != nil { t.Fatal(err) } - if err := expectMsg(rw, pongMsg, nil); err != nil { + if err := ExpectMsg(rw, pongMsg, nil); err != nil { t.Error(err) } } @@ -163,10 +164,10 @@ func TestPeerDisconnect(t *testing.T) { closer, rw, _, disc := testPeer(nil) defer closer.Close() - if err := EncodeMsg(rw, discMsg, DiscQuitting); err != nil { + if err := SendItems(rw, discMsg, DiscQuitting); err != nil { t.Fatal(err) } - if err := expectMsg(rw, discMsg, []interface{}{DiscRequested}); err != nil { + if err := ExpectMsg(rw, discMsg, []interface{}{DiscRequested}); err != nil { t.Error(err) } closer.Close() // make test end faster diff --git a/p2p/rlpx_test.go b/p2p/rlpx_test.go index 49354c7ed..d98f1c2cd 100644 --- a/p2p/rlpx_test.go +++ b/p2p/rlpx_test.go @@ -30,7 +30,7 @@ ba628a4ba590cb43f7848f41c4382885 `) // Check WriteMsg. This puts a message into the buffer. - if err := EncodeMsg(rw, 8, 1, 2, 3, 4); err != nil { + if err := Send(rw, 8, []uint{1, 2, 3, 4}); err != nil { t.Fatalf("WriteMsg error: %v", err) } written := buf.Bytes() @@ -102,7 +102,7 @@ func TestRlpxFrameRW(t *testing.T) { for i := 0; i < 10; i++ { // write message into conn buffer wmsg := []interface{}{"foo", "bar", strings.Repeat("test", i)} - err := EncodeMsg(rw1, uint64(i), wmsg...) + err := Send(rw1, uint64(i), wmsg) if err != nil { t.Fatalf("WriteMsg error (i=%d): %v", i, err) } diff --git a/p2p/server.go b/p2p/server.go index 9c148ba39..813b0676f 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -9,10 +9,10 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/nat" + "github.com/ethereum/go-ethereum/rlp" ) const ( @@ -129,10 +129,14 @@ func (srv *Server) SuggestPeer(n *discover.Node) { // Broadcast sends an RLP-encoded message to all connected peers. // This method is deprecated and will be removed later. -func (srv *Server) Broadcast(protocol string, code uint64, data ...interface{}) { +func (srv *Server) Broadcast(protocol string, code uint64, data interface{}) error { var payload []byte if data != nil { - payload = common.Encode(data) + var err error + payload, err = rlp.EncodeToBytes(data) + if err != nil { + return err + } } srv.lock.RLock() defer srv.lock.RUnlock() @@ -146,6 +150,7 @@ func (srv *Server) Broadcast(protocol string, code uint64, data ...interface{}) peer.writeProtoMsg(protocol, msg) } } + return nil } // Start starts running the server. diff --git a/p2p/server_test.go b/p2p/server_test.go index 30447050c..14e7c7de2 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -149,7 +149,7 @@ func TestServerBroadcast(t *testing.T) { connected.Wait() // broadcast one message - srv.Broadcast("discard", 0, "foo") + srv.Broadcast("discard", 0, []string{"foo"}) golden := unhex("66e94d166f0a2c3b884cfa59ca34") // check that the message has been written everywhere -- cgit v1.2.3 From a7bced779a599fb3f87a69a5f8bb8017b62dc0a3 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 19 Mar 2015 15:15:07 +0100 Subject: p2p: log disconnect requests This helps a lot with debugging. --- p2p/peer.go | 1 + 1 file changed, 1 insertion(+) (limited to 'p2p') diff --git a/p2p/peer.go b/p2p/peer.go index ee5199d78..6b97ea58d 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -198,6 +198,7 @@ func (p *Peer) handle(msg Msg) error { // no need to discard or for error checking, we'll close the // connection after this. rlp.Decode(msg.Payload, &reason) + p.Debugf("Disconnect requested: %v\n", reason[0]) p.Disconnect(DiscRequested) return discRequestedError(reason[0]) case msg.Code < baseProtocolLength: -- cgit v1.2.3 From b9e0b11e7da387bbb426d25c36d495c4f099722f Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 19 Mar 2015 15:16:06 +0100 Subject: p2p: interrupt MsgPipe payload read/write This is better because protocols might not actually read the payload for some errors (msg too big, etc.) which can be a pain to test with the old behaviour. --- p2p/message.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'p2p') diff --git a/p2p/message.go b/p2p/message.go index 5dc7d5460..ef3630a90 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -185,7 +185,10 @@ func (p *MsgPipeRW) WriteMsg(msg Msg) error { case p.w <- msg: if msg.Size > 0 { // wait for payload read or discard - <-consumed + select { + case <-consumed: + case <-p.closing: + } } return nil case <-p.closing: @@ -207,8 +210,8 @@ func (p *MsgPipeRW) ReadMsg() (Msg, error) { } // Close unblocks any pending ReadMsg and WriteMsg calls on both ends -// of the pipe. They will return ErrPipeClosed. Note that Close does -// not interrupt any reads from a message payload. +// of the pipe. They will return ErrPipeClosed. Close also +// interrupts any reads from a message payload. func (p *MsgPipeRW) Close() error { if atomic.AddInt32(p.closed, 1) != 1 { // someone else is already closing -- cgit v1.2.3