diff options
Diffstat (limited to 'p2p/peer_test.go')
-rw-r--r-- | p2p/peer_test.go | 283 |
1 files changed, 213 insertions, 70 deletions
diff --git a/p2p/peer_test.go b/p2p/peer_test.go index c37540bef..d9640292f 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -1,96 +1,239 @@ package p2p import ( + "bufio" "bytes" - "fmt" - // "net" + "encoding/hex" + "io/ioutil" + "net" + "reflect" "testing" "time" ) -func TestPeer(t *testing.T) { - handlers := make(Handlers) - testProtocol := &TestProtocol{Msgs: []*Msg{}} - handlers["aaa"] = func(p *Peer) Protocol { return testProtocol } - handlers["ccc"] = func(p *Peer) Protocol { return testProtocol } - addr := &TestAddr{"test:30"} - conn := NewTestNetworkConnection(addr) - _, server := SetupTestServer(handlers) - server.Handshake() - peer := NewPeer(conn, addr, true, server) - // peer.Messenger().AddProtocols([]string{"aaa", "ccc"}) - peer.Start() - defer peer.Stop() - time.Sleep(2 * time.Millisecond) - if len(conn.Out) != 1 { - t.Errorf("handshake not sent") - } else { - out := conn.Out[0] - packet := Packet(0, HandshakeMsg, P2PVersion, []byte(peer.server.identity.String()), []interface{}{peer.server.protocols}, peer.server.port, peer.server.identity.Pubkey()[1:]) - if bytes.Compare(out, packet) != 0 { - t.Errorf("incorrect handshake packet %v != %v", out, packet) +var discard = Protocol{ + Name: "discard", + Length: 1, + Run: func(p *Peer, rw MsgReadWriter) error { + for { + msg, err := rw.ReadMsg() + if err != nil { + return err + } + if err = msg.Discard(); err != nil { + return err + } } - } + }, +} - packet := Packet(0, HandshakeMsg, P2PVersion, []byte("peer"), []interface{}{"bbb", "aaa", "ccc"}, 30, []byte("0000000000000000000000000000000000000000000000000000000000000000")) - conn.In(0, packet) - time.Sleep(10 * time.Millisecond) +func testPeer(protos []Protocol) (net.Conn, *Peer, <-chan error) { + conn1, conn2 := net.Pipe() + id := NewSimpleClientIdentity("test", "0", "0", "public key") + peer := newPeer(conn1, protos, nil) + peer.ourID = id + peer.pubkeyHook = func(*peerAddr) error { return nil } + errc := make(chan error, 1) + go func() { + _, err := peer.loop() + errc <- err + }() + return conn2, peer, errc +} - pro, _ := peer.Messenger().protocols[0].(*BaseProtocol) - if pro.state != handshakeReceived { - t.Errorf("handshake not received") - } - if peer.Port != 30 { - t.Errorf("port incorrectly set") +func TestPeerProtoReadMsg(t *testing.T) { + defer testlog(t).detach() + + done := make(chan struct{}) + proto := Protocol{ + Name: "a", + Length: 5, + Run: func(peer *Peer, rw MsgReadWriter) error { + msg, err := rw.ReadMsg() + if err != nil { + t.Errorf("read error: %v", err) + } + if msg.Code != 2 { + t.Errorf("incorrect msg code %d relayed to protocol", msg.Code) + } + data, err := ioutil.ReadAll(msg.Payload) + if err != nil { + t.Errorf("payload read error: %v", err) + } + expdata, _ := hex.DecodeString("0183303030") + if !bytes.Equal(expdata, data) { + t.Errorf("incorrect msg data %x", data) + } + close(done) + return nil + }, } - if peer.Id != "peer" { - t.Errorf("id incorrectly set") + + net, peer, errc := testPeer([]Protocol{proto}) + defer net.Close() + peer.startSubprotocols([]Cap{proto.cap()}) + + writeMsg(net, NewMsg(18, 1, "000")) + select { + case <-done: + case err := <-errc: + t.Errorf("peer returned: %v", err) + case <-time.After(2 * time.Second): + t.Errorf("receive timeout") } - if string(peer.Pubkey) != "0000000000000000000000000000000000000000000000000000000000000000" { - t.Errorf("pubkey incorrectly set") +} + +func TestPeerProtoReadLargeMsg(t *testing.T) { + defer testlog(t).detach() + + msgsize := uint32(10 * 1024 * 1024) + done := make(chan struct{}) + proto := Protocol{ + Name: "a", + Length: 5, + Run: func(peer *Peer, rw MsgReadWriter) error { + msg, err := rw.ReadMsg() + if err != nil { + t.Errorf("read error: %v", err) + } + if msg.Size != msgsize+4 { + t.Errorf("incorrect msg.Size, got %d, expected %d", msg.Size, msgsize) + } + msg.Discard() + close(done) + return nil + }, } - fmt.Println(peer.Caps) - if len(peer.Caps) != 3 || peer.Caps[0] != "aaa" || peer.Caps[1] != "bbb" || peer.Caps[2] != "ccc" { - t.Errorf("protocols incorrectly set") + + net, peer, errc := testPeer([]Protocol{proto}) + defer net.Close() + peer.startSubprotocols([]Cap{proto.cap()}) + + writeMsg(net, NewMsg(18, make([]byte, msgsize))) + select { + case <-done: + case err := <-errc: + t.Errorf("peer returned: %v", err) + case <-time.After(2 * time.Second): + t.Errorf("receive timeout") } +} - msg, _ := NewMsg(3) - err := peer.Write("aaa", msg) - if err != nil { - t.Errorf("expect no error for known protocol: %v", err) - } else { - time.Sleep(1 * time.Millisecond) - if len(conn.Out) != 2 { - t.Errorf("msg not written") - } else { - out := conn.Out[1] - packet := Packet(16, 3) - if bytes.Compare(out, packet) != 0 { - t.Errorf("incorrect packet %v != %v", out, packet) +func TestPeerProtoEncodeMsg(t *testing.T) { + defer testlog(t).detach() + + proto := Protocol{ + Name: "a", + Length: 2, + Run: func(peer *Peer, rw MsgReadWriter) error { + if err := rw.EncodeMsg(2); err == nil { + t.Error("expected error for out-of-range msg code, got nil") } - } + if err := rw.EncodeMsg(1); err != nil { + t.Errorf("write error: %v", err) + } + return nil + }, } + net, peer, _ := testPeer([]Protocol{proto}) + defer net.Close() + peer.startSubprotocols([]Cap{proto.cap()}) - msg, _ = NewMsg(2) - err = peer.Write("ccc", msg) + bufr := bufio.NewReader(net) + msg, err := readMsg(bufr) if err != nil { + t.Errorf("read error: %v", err) + } + if msg.Code != 17 { + t.Errorf("incorrect message code: got %d, expected %d", msg.Code, 17) + } +} + +func TestPeerWrite(t *testing.T) { + defer testlog(t).detach() + + net, peer, peerErr := testPeer([]Protocol{discard}) + defer net.Close() + peer.startSubprotocols([]Cap{discard.cap()}) + + // test write errors + if err := peer.writeProtoMsg("b", NewMsg(3)); err == nil { + t.Errorf("expected error for unknown protocol, got nil") + } + if err := peer.writeProtoMsg("discard", NewMsg(8)); err == nil { + t.Errorf("expected error for out-of-range msg code, got nil") + } else if perr, ok := err.(*peerError); !ok || perr.Code != errInvalidMsgCode { + t.Errorf("wrong error for out-of-range msg code, got %#v", err) + } + + // setup for reading the message on the other end + read := make(chan struct{}) + go func() { + bufr := bufio.NewReader(net) + msg, err := readMsg(bufr) + if err != nil { + t.Errorf("read error: %v", err) + } else if msg.Code != 16 { + t.Errorf("wrong code, got %d, expected %d", msg.Code, 16) + } + msg.Discard() + close(read) + }() + + // test succcessful write + if err := peer.writeProtoMsg("discard", NewMsg(0)); err != nil { t.Errorf("expect no error for known protocol: %v", err) - } else { - time.Sleep(1 * time.Millisecond) - if len(conn.Out) != 3 { - t.Errorf("msg not written") - } else { - out := conn.Out[2] - packet := Packet(21, 2) - if bytes.Compare(out, packet) != 0 { - t.Errorf("incorrect packet %v != %v", out, packet) - } + } + select { + case <-read: + case err := <-peerErr: + t.Fatalf("peer stopped: %v", err) + } +} + +func TestPeerActivity(t *testing.T) { + // shorten inactivityTimeout while this test is running + oldT := inactivityTimeout + defer func() { inactivityTimeout = oldT }() + inactivityTimeout = 20 * time.Millisecond + + net, peer, peerErr := testPeer([]Protocol{discard}) + defer net.Close() + peer.startSubprotocols([]Cap{discard.cap()}) + + sub := peer.activity.Subscribe(time.Time{}) + defer sub.Unsubscribe() + + for i := 0; i < 6; i++ { + writeMsg(net, NewMsg(16)) + select { + case <-sub.Chan(): + case <-time.After(inactivityTimeout / 2): + t.Fatal("no event within ", inactivityTimeout/2) + case err := <-peerErr: + t.Fatal("peer error", err) } } - err = peer.Write("bbb", msg) - time.Sleep(1 * time.Millisecond) - if err == nil { - t.Errorf("expect error for unknown protocol") + select { + case <-time.After(inactivityTimeout * 2): + case <-sub.Chan(): + t.Fatal("got activity event while connection was inactive") + case err := <-peerErr: + t.Fatal("peer error", err) + } +} + +func TestNewPeer(t *testing.T) { + id := NewSimpleClientIdentity("clientid", "version", "customid", "pubkey") + caps := []Cap{{"foo", 2}, {"bar", 3}} + p := NewPeer(id, caps) + if !reflect.DeepEqual(p.Caps(), caps) { + t.Errorf("Caps mismatch: got %v, expected %v", p.Caps(), caps) + } + if p.Identity() != id { + t.Errorf("Identity mismatch: got %v, expected %v", p.Identity(), id) } + // Should not hang. + p.Disconnect(DiscAlreadyConnected) } |