From f38052c499c1fee61423efeddb1f52677f1442e9 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 4 Nov 2014 13:21:44 +0100 Subject: p2p: rework protocol API --- p2p/messenger_test.go | 224 ++++++++++++++++++++++++++------------------------ 1 file changed, 117 insertions(+), 107 deletions(-) (limited to 'p2p/messenger_test.go') diff --git a/p2p/messenger_test.go b/p2p/messenger_test.go index 472d74515..f10469e2f 100644 --- a/p2p/messenger_test.go +++ b/p2p/messenger_test.go @@ -1,147 +1,157 @@ package p2p import ( - // "fmt" - "bytes" + "bufio" + "fmt" + "io" + "log" + "net" + "os" + "reflect" "testing" "time" "github.com/ethereum/go-ethereum/ethutil" ) -func setupMessenger(handlers Handlers) (*TestNetworkConnection, chan *PeerError, *Messenger) { - errchan := NewPeerErrorChannel() - addr := &TestAddr{"test:30303"} - net := NewTestNetworkConnection(addr) - conn := NewConnection(net, errchan) - mess := NewMessenger(nil, conn, errchan, handlers) - mess.Start() - return net, errchan, mess +func init() { + ethlog.AddLogSystem(ethlog.NewStdLogSystem(os.Stdout, log.LstdFlags, ethlog.DebugLevel)) } -type TestProtocol struct { - Msgs []*Msg +func setupMessenger(handlers Handlers) (net.Conn, *Peer, *messenger) { + conn1, conn2 := net.Pipe() + id := NewSimpleClientIdentity("test", "0", "0", "public key") + server := New(nil, conn1.LocalAddr(), id, handlers, 10, NewBlacklist()) + peer := server.addPeer(conn1, conn1.RemoteAddr(), true, 0) + return conn2, peer, peer.messenger } -func (self *TestProtocol) Start() { -} - -func (self *TestProtocol) Stop() { -} - -func (self *TestProtocol) Offset() MsgCode { - return MsgCode(5) +func performTestHandshake(r *bufio.Reader, w io.Writer) error { + // read remote handshake + msg, err := readMsg(r) + if err != nil { + return fmt.Errorf("read error: %v", err) + } + if msg.Code != handshakeMsg { + return fmt.Errorf("first message should be handshake, got %x", msg.Code) + } + if err := msg.Discard(); err != nil { + return err + } + // send empty handshake + pubkey := make([]byte, 64) + msg = NewMsg(handshakeMsg, p2pVersion, "testid", nil, 9999, pubkey) + return writeMsg(w, msg) } -func (self *TestProtocol) HandleIn(msg *Msg, response chan *Msg) { - self.Msgs = append(self.Msgs, msg) - close(response) +type testMsg struct { + code MsgCode + data *ethutil.Value } -func (self *TestProtocol) HandleOut(msg *Msg) bool { - if msg.Code() > 3 { - return false - } else { - return true - } +type testProto struct { + recv chan testMsg } -func (self *TestProtocol) Name() string { - return "a" -} +func (*testProto) Offset() MsgCode { return 5 } -func Packet(offset MsgCode, code MsgCode, params ...interface{}) []byte { - msg, _ := NewMsg(code, params...) - encoded := msg.Encode(offset) - packet := []byte{34, 64, 8, 145} - packet = append(packet, ethutil.NumberToBytes(uint32(len(encoded)), 32)...) - return append(packet, encoded...) +func (tp *testProto) Start(peer *Peer, rw MsgReadWriter) error { + return MsgLoop(rw, 1024, func(code MsgCode, data *ethutil.Value) error { + logger.Debugf("testprotocol got msg: %d\n", code) + tp.recv <- testMsg{code, data} + return nil + }) } func TestRead(t *testing.T) { - handlers := make(Handlers) - testProtocol := &TestProtocol{Msgs: []*Msg{}} - handlers["a"] = func(p *Peer) Protocol { return testProtocol } - net, _, mess := setupMessenger(handlers) - mess.AddProtocols([]string{"a"}) - defer mess.Stop() - wait := 1 * time.Millisecond - packet := Packet(16, 1, uint32(1), "000") - go net.In(0, packet) - time.Sleep(wait) - if len(testProtocol.Msgs) != 1 { - t.Errorf("msg not relayed to correct protocol") - } else { - if testProtocol.Msgs[0].Code() != 1 { - t.Errorf("incorrect msg code relayed to protocol") + testProtocol := &testProto{make(chan testMsg)} + handlers := Handlers{"a": func() Protocol { return testProtocol }} + net, peer, mess := setupMessenger(handlers) + bufr := bufio.NewReader(net) + defer peer.Stop() + if err := performTestHandshake(bufr, net); err != nil { + t.Fatalf("handshake failed: %v", err) + } + + mess.setRemoteProtocols([]string{"a"}) + writeMsg(net, NewMsg(17, uint32(1), "000")) + select { + case msg := <-testProtocol.recv: + if msg.code != 1 { + t.Errorf("incorrect msg code %d relayed to protocol", msg.code) + } + expdata := []interface{}{1, []byte{0x30, 0x30, 0x30}} + if !reflect.DeepEqual(msg.data.Slice(), expdata) { + t.Errorf("incorrect msg data %#v", msg.data.Slice()) } + case <-time.After(2 * time.Second): + t.Errorf("receive timeout") } } -func TestWrite(t *testing.T) { +func TestWriteProtoMsg(t *testing.T) { handlers := make(Handlers) - testProtocol := &TestProtocol{Msgs: []*Msg{}} - handlers["a"] = func(p *Peer) Protocol { return testProtocol } - net, _, mess := setupMessenger(handlers) - mess.AddProtocols([]string{"a"}) - defer mess.Stop() - wait := 1 * time.Millisecond - msg, _ := NewMsg(3, uint32(1), "000") - err := mess.Write("b", msg) - if err == nil { - t.Errorf("expect error for unknown protocol") + testProtocol := &testProto{recv: make(chan testMsg, 1)} + handlers["a"] = func() Protocol { return testProtocol } + net, peer, mess := setupMessenger(handlers) + defer peer.Stop() + bufr := bufio.NewReader(net) + if err := performTestHandshake(bufr, net); err != nil { + t.Fatalf("handshake failed: %v", err) } - err = mess.Write("a", msg) - if err != nil { - t.Errorf("expect no error for known protocol: %v", err) - } else { - time.Sleep(wait) - if len(net.Out) != 1 { - t.Errorf("msg not written") + mess.setRemoteProtocols([]string{"a"}) + + // test write errors + if err := mess.writeProtoMsg("b", NewMsg(3)); err == nil { + t.Errorf("expected error for unknown protocol, got nil") + } + if err := mess.writeProtoMsg("a", NewMsg(8)); err == nil { + t.Errorf("expected error for out-of-range msg code, got nil") + } else if perr, ok := err.(*PeerError); !ok || perr.Code != InvalidMsgCode { + t.Errorf("wrong error for out-of-range msg code, got %#v") + } + + // test succcessful write + read, readerr := make(chan Msg), make(chan error) + go func() { + if msg, err := readMsg(bufr); err != nil { + readerr <- err } else { - out := net.Out[0] - packet := Packet(16, 3, uint32(1), "000") - if bytes.Compare(out, packet) != 0 { - t.Errorf("incorrect packet %v", out) - } + read <- msg + } + }() + if err := mess.writeProtoMsg("a", NewMsg(3)); err != nil { + t.Errorf("expect no error for known protocol: %v", err) + } + select { + case msg := <-read: + if msg.Code != 19 { + t.Errorf("wrong code, got %d, expected %d", msg.Code, 19) } + msg.Discard() + case err := <-readerr: + t.Errorf("read error: %v", err) } } func TestPulse(t *testing.T) { - net, _, mess := setupMessenger(make(Handlers)) - defer mess.Stop() - ping := false - timeout := false - pingTimeout := 10 * time.Millisecond - gracePeriod := 200 * time.Millisecond - go mess.PingPong(pingTimeout, gracePeriod, func() { ping = true }, func() { timeout = true }) - net.In(0, Packet(0, 1)) - if ping { - t.Errorf("ping sent too early") - } - time.Sleep(pingTimeout + 100*time.Millisecond) - if !ping { - t.Errorf("no ping sent after timeout") - } - if timeout { - t.Errorf("timeout too early") + net, peer, _ := setupMessenger(nil) + defer peer.Stop() + bufr := bufio.NewReader(net) + if err := performTestHandshake(bufr, net); err != nil { + t.Fatalf("handshake failed: %v", err) } - ping = false - net.In(0, Packet(0, 1)) - time.Sleep(pingTimeout + 100*time.Millisecond) - if !ping { - t.Errorf("no ping sent after timeout") - } - if timeout { - t.Errorf("timeout too early") + + before := time.Now() + msg, err := readMsg(bufr) + if err != nil { + t.Fatalf("read error: %v", err) } - ping = false - time.Sleep(gracePeriod) - if ping { - t.Errorf("ping called twice") + after := time.Now() + if msg.Code != pingMsg { + t.Errorf("expected ping message, got %x", msg.Code) } - if !timeout { - t.Errorf("no timeout after grace period") + if d := after.Sub(before); d < pingTimeout { + t.Errorf("ping sent too early after %v, expected at least %v", d, pingTimeout) } } -- cgit v1.2.3 From 7149191dd999f4d192398e4b0821b656e62f3345 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 5 Nov 2014 01:28:46 +0100 Subject: p2p: fix issues found during review --- p2p/messenger_test.go | 128 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 87 insertions(+), 41 deletions(-) (limited to 'p2p/messenger_test.go') diff --git a/p2p/messenger_test.go b/p2p/messenger_test.go index f10469e2f..2264e10d3 100644 --- a/p2p/messenger_test.go +++ b/p2p/messenger_test.go @@ -11,14 +11,14 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum/ethutil" + logpkg "github.com/ethereum/go-ethereum/logger" ) func init() { - ethlog.AddLogSystem(ethlog.NewStdLogSystem(os.Stdout, log.LstdFlags, ethlog.DebugLevel)) + logpkg.AddLogSystem(logpkg.NewStdLogSystem(os.Stdout, log.LstdFlags, logpkg.DebugLevel)) } -func setupMessenger(handlers Handlers) (net.Conn, *Peer, *messenger) { +func testMessenger(handlers Handlers) (net.Conn, *Peer, *messenger) { conn1, conn2 := net.Pipe() id := NewSimpleClientIdentity("test", "0", "0", "public key") server := New(nil, conn1.LocalAddr(), id, handlers, 10, NewBlacklist()) @@ -33,7 +33,7 @@ func performTestHandshake(r *bufio.Reader, w io.Writer) error { return fmt.Errorf("read error: %v", err) } if msg.Code != handshakeMsg { - return fmt.Errorf("first message should be handshake, got %x", msg.Code) + return fmt.Errorf("first message should be handshake, got %d", msg.Code) } if err := msg.Discard(); err != nil { return err @@ -44,56 +44,102 @@ func performTestHandshake(r *bufio.Reader, w io.Writer) error { return writeMsg(w, msg) } -type testMsg struct { - code MsgCode - data *ethutil.Value +type testProtocol struct { + offset MsgCode + f func(MsgReadWriter) } -type testProto struct { - recv chan testMsg +func (p *testProtocol) Offset() MsgCode { + return p.offset } -func (*testProto) Offset() MsgCode { return 5 } - -func (tp *testProto) Start(peer *Peer, rw MsgReadWriter) error { - return MsgLoop(rw, 1024, func(code MsgCode, data *ethutil.Value) error { - logger.Debugf("testprotocol got msg: %d\n", code) - tp.recv <- testMsg{code, data} - return nil - }) +func (p *testProtocol) Start(peer *Peer, rw MsgReadWriter) error { + p.f(rw) + return nil } func TestRead(t *testing.T) { - testProtocol := &testProto{make(chan testMsg)} - handlers := Handlers{"a": func() Protocol { return testProtocol }} - net, peer, mess := setupMessenger(handlers) - bufr := bufio.NewReader(net) + done := make(chan struct{}) + handlers := Handlers{ + "a": &testProtocol{5, func(rw MsgReadWriter) { + msg, err := rw.ReadMsg() + if err != nil { + t.Errorf("read error: %v", err) + } + if msg.Code != 2 { + t.Errorf("incorrect msg code %d relayed to protocol", msg.Code) + } + data, err := msg.Data() + if err != nil { + t.Errorf("data decoding error: %v", err) + } + expdata := []interface{}{1, []byte{0x30, 0x30, 0x30}} + if !reflect.DeepEqual(data.Slice(), expdata) { + t.Errorf("incorrect msg data %#v", data.Slice()) + } + close(done) + }}, + } + + net, peer, m := testMessenger(handlers) defer peer.Stop() + bufr := bufio.NewReader(net) if err := performTestHandshake(bufr, net); err != nil { t.Fatalf("handshake failed: %v", err) } + m.setRemoteProtocols([]string{"a"}) - mess.setRemoteProtocols([]string{"a"}) - writeMsg(net, NewMsg(17, uint32(1), "000")) + writeMsg(net, NewMsg(18, 1, "000")) select { - case msg := <-testProtocol.recv: - if msg.code != 1 { - t.Errorf("incorrect msg code %d relayed to protocol", msg.code) - } - expdata := []interface{}{1, []byte{0x30, 0x30, 0x30}} - if !reflect.DeepEqual(msg.data.Slice(), expdata) { - t.Errorf("incorrect msg data %#v", msg.data.Slice()) - } + case <-done: case <-time.After(2 * time.Second): t.Errorf("receive timeout") } } -func TestWriteProtoMsg(t *testing.T) { - handlers := make(Handlers) - testProtocol := &testProto{recv: make(chan testMsg, 1)} - handlers["a"] = func() Protocol { return testProtocol } - net, peer, mess := setupMessenger(handlers) +func TestWriteFromProto(t *testing.T) { + handlers := Handlers{ + "a": &testProtocol{2, func(rw MsgReadWriter) { + if err := rw.WriteMsg(NewMsg(2)); err == nil { + t.Error("expected error for out-of-range msg code, got nil") + } + if err := rw.WriteMsg(NewMsg(1)); err != nil { + t.Errorf("write error: %v", err) + } + }}, + } + net, peer, mess := testMessenger(handlers) + defer peer.Stop() + bufr := bufio.NewReader(net) + if err := performTestHandshake(bufr, net); err != nil { + t.Fatalf("handshake failed: %v", err) + } + mess.setRemoteProtocols([]string{"a"}) + + msg, err := readMsg(bufr) + if err != nil { + t.Errorf("read error: %v") + } + if msg.Code != 17 { + t.Errorf("incorrect message code: got %d, expected %d", msg.Code, 17) + } +} + +var discardProto = &testProtocol{1, func(rw MsgReadWriter) { + for { + msg, err := rw.ReadMsg() + if err != nil { + return + } + if err = msg.Discard(); err != nil { + return + } + } +}} + +func TestMessengerWriteProtoMsg(t *testing.T) { + handlers := Handlers{"a": discardProto} + net, peer, mess := testMessenger(handlers) defer peer.Stop() bufr := bufio.NewReader(net) if err := performTestHandshake(bufr, net); err != nil { @@ -120,13 +166,13 @@ func TestWriteProtoMsg(t *testing.T) { read <- msg } }() - if err := mess.writeProtoMsg("a", NewMsg(3)); err != nil { + if err := mess.writeProtoMsg("a", NewMsg(0)); err != nil { t.Errorf("expect no error for known protocol: %v", err) } select { case msg := <-read: - if msg.Code != 19 { - t.Errorf("wrong code, got %d, expected %d", msg.Code, 19) + if msg.Code != 16 { + t.Errorf("wrong code, got %d, expected %d", msg.Code, 16) } msg.Discard() case err := <-readerr: @@ -135,7 +181,7 @@ func TestWriteProtoMsg(t *testing.T) { } func TestPulse(t *testing.T) { - net, peer, _ := setupMessenger(nil) + net, peer, _ := testMessenger(nil) defer peer.Stop() bufr := bufio.NewReader(net) if err := performTestHandshake(bufr, net); err != nil { @@ -149,7 +195,7 @@ func TestPulse(t *testing.T) { } after := time.Now() if msg.Code != pingMsg { - t.Errorf("expected ping message, got %x", msg.Code) + t.Errorf("expected ping message, got %d", msg.Code) } if d := after.Sub(before); d < pingTimeout { t.Errorf("ping sent too early after %v, expected at least %v", d, pingTimeout) -- cgit v1.2.3 From 59b63caf5e4de64ceb7dcdf01551a080f53b1672 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 21 Nov 2014 21:48:49 +0100 Subject: p2p: API cleanup and PoC 7 compatibility Whoa, one more big commit. I didn't manage to untangle the changes while working towards compatibility. --- p2p/messenger_test.go | 203 -------------------------------------------------- 1 file changed, 203 deletions(-) delete mode 100644 p2p/messenger_test.go (limited to 'p2p/messenger_test.go') diff --git a/p2p/messenger_test.go b/p2p/messenger_test.go deleted file mode 100644 index 2264e10d3..000000000 --- a/p2p/messenger_test.go +++ /dev/null @@ -1,203 +0,0 @@ -package p2p - -import ( - "bufio" - "fmt" - "io" - "log" - "net" - "os" - "reflect" - "testing" - "time" - - logpkg "github.com/ethereum/go-ethereum/logger" -) - -func init() { - logpkg.AddLogSystem(logpkg.NewStdLogSystem(os.Stdout, log.LstdFlags, logpkg.DebugLevel)) -} - -func testMessenger(handlers Handlers) (net.Conn, *Peer, *messenger) { - conn1, conn2 := net.Pipe() - id := NewSimpleClientIdentity("test", "0", "0", "public key") - server := New(nil, conn1.LocalAddr(), id, handlers, 10, NewBlacklist()) - peer := server.addPeer(conn1, conn1.RemoteAddr(), true, 0) - return conn2, peer, peer.messenger -} - -func performTestHandshake(r *bufio.Reader, w io.Writer) error { - // read remote handshake - msg, err := readMsg(r) - if err != nil { - return fmt.Errorf("read error: %v", err) - } - if msg.Code != handshakeMsg { - return fmt.Errorf("first message should be handshake, got %d", msg.Code) - } - if err := msg.Discard(); err != nil { - return err - } - // send empty handshake - pubkey := make([]byte, 64) - msg = NewMsg(handshakeMsg, p2pVersion, "testid", nil, 9999, pubkey) - return writeMsg(w, msg) -} - -type testProtocol struct { - offset MsgCode - f func(MsgReadWriter) -} - -func (p *testProtocol) Offset() MsgCode { - return p.offset -} - -func (p *testProtocol) Start(peer *Peer, rw MsgReadWriter) error { - p.f(rw) - return nil -} - -func TestRead(t *testing.T) { - done := make(chan struct{}) - handlers := Handlers{ - "a": &testProtocol{5, func(rw MsgReadWriter) { - msg, err := rw.ReadMsg() - if err != nil { - t.Errorf("read error: %v", err) - } - if msg.Code != 2 { - t.Errorf("incorrect msg code %d relayed to protocol", msg.Code) - } - data, err := msg.Data() - if err != nil { - t.Errorf("data decoding error: %v", err) - } - expdata := []interface{}{1, []byte{0x30, 0x30, 0x30}} - if !reflect.DeepEqual(data.Slice(), expdata) { - t.Errorf("incorrect msg data %#v", data.Slice()) - } - close(done) - }}, - } - - net, peer, m := testMessenger(handlers) - defer peer.Stop() - bufr := bufio.NewReader(net) - if err := performTestHandshake(bufr, net); err != nil { - t.Fatalf("handshake failed: %v", err) - } - m.setRemoteProtocols([]string{"a"}) - - writeMsg(net, NewMsg(18, 1, "000")) - select { - case <-done: - case <-time.After(2 * time.Second): - t.Errorf("receive timeout") - } -} - -func TestWriteFromProto(t *testing.T) { - handlers := Handlers{ - "a": &testProtocol{2, func(rw MsgReadWriter) { - if err := rw.WriteMsg(NewMsg(2)); err == nil { - t.Error("expected error for out-of-range msg code, got nil") - } - if err := rw.WriteMsg(NewMsg(1)); err != nil { - t.Errorf("write error: %v", err) - } - }}, - } - net, peer, mess := testMessenger(handlers) - defer peer.Stop() - bufr := bufio.NewReader(net) - if err := performTestHandshake(bufr, net); err != nil { - t.Fatalf("handshake failed: %v", err) - } - mess.setRemoteProtocols([]string{"a"}) - - msg, err := readMsg(bufr) - if err != nil { - t.Errorf("read error: %v") - } - if msg.Code != 17 { - t.Errorf("incorrect message code: got %d, expected %d", msg.Code, 17) - } -} - -var discardProto = &testProtocol{1, func(rw MsgReadWriter) { - for { - msg, err := rw.ReadMsg() - if err != nil { - return - } - if err = msg.Discard(); err != nil { - return - } - } -}} - -func TestMessengerWriteProtoMsg(t *testing.T) { - handlers := Handlers{"a": discardProto} - net, peer, mess := testMessenger(handlers) - defer peer.Stop() - bufr := bufio.NewReader(net) - if err := performTestHandshake(bufr, net); err != nil { - t.Fatalf("handshake failed: %v", err) - } - mess.setRemoteProtocols([]string{"a"}) - - // test write errors - if err := mess.writeProtoMsg("b", NewMsg(3)); err == nil { - t.Errorf("expected error for unknown protocol, got nil") - } - if err := mess.writeProtoMsg("a", NewMsg(8)); err == nil { - t.Errorf("expected error for out-of-range msg code, got nil") - } else if perr, ok := err.(*PeerError); !ok || perr.Code != InvalidMsgCode { - t.Errorf("wrong error for out-of-range msg code, got %#v") - } - - // test succcessful write - read, readerr := make(chan Msg), make(chan error) - go func() { - if msg, err := readMsg(bufr); err != nil { - readerr <- err - } else { - read <- msg - } - }() - if err := mess.writeProtoMsg("a", NewMsg(0)); err != nil { - t.Errorf("expect no error for known protocol: %v", err) - } - select { - case msg := <-read: - if msg.Code != 16 { - t.Errorf("wrong code, got %d, expected %d", msg.Code, 16) - } - msg.Discard() - case err := <-readerr: - t.Errorf("read error: %v", err) - } -} - -func TestPulse(t *testing.T) { - net, peer, _ := testMessenger(nil) - defer peer.Stop() - bufr := bufio.NewReader(net) - if err := performTestHandshake(bufr, net); err != nil { - t.Fatalf("handshake failed: %v", err) - } - - before := time.Now() - msg, err := readMsg(bufr) - if err != nil { - t.Fatalf("read error: %v", err) - } - after := time.Now() - if msg.Code != pingMsg { - t.Errorf("expected ping message, got %d", msg.Code) - } - if d := after.Sub(before); d < pingTimeout { - t.Errorf("ping sent too early after %v, expected at least %v", d, pingTimeout) - } -} -- cgit v1.2.3