aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/peer_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/peer_test.go')
-rw-r--r--p2p/peer_test.go283
1 files changed, 213 insertions, 70 deletions
diff --git a/p2p/peer_test.go b/p2p/peer_test.go
index c37540bef..d9640292f 100644
--- a/p2p/peer_test.go
+++ b/p2p/peer_test.go
@@ -1,96 +1,239 @@
package p2p
import (
+ "bufio"
"bytes"
- "fmt"
- // "net"
+ "encoding/hex"
+ "io/ioutil"
+ "net"
+ "reflect"
"testing"
"time"
)
-func TestPeer(t *testing.T) {
- handlers := make(Handlers)
- testProtocol := &TestProtocol{Msgs: []*Msg{}}
- handlers["aaa"] = func(p *Peer) Protocol { return testProtocol }
- handlers["ccc"] = func(p *Peer) Protocol { return testProtocol }
- addr := &TestAddr{"test:30"}
- conn := NewTestNetworkConnection(addr)
- _, server := SetupTestServer(handlers)
- server.Handshake()
- peer := NewPeer(conn, addr, true, server)
- // peer.Messenger().AddProtocols([]string{"aaa", "ccc"})
- peer.Start()
- defer peer.Stop()
- time.Sleep(2 * time.Millisecond)
- if len(conn.Out) != 1 {
- t.Errorf("handshake not sent")
- } else {
- out := conn.Out[0]
- packet := Packet(0, HandshakeMsg, P2PVersion, []byte(peer.server.identity.String()), []interface{}{peer.server.protocols}, peer.server.port, peer.server.identity.Pubkey()[1:])
- if bytes.Compare(out, packet) != 0 {
- t.Errorf("incorrect handshake packet %v != %v", out, packet)
+var discard = Protocol{
+ Name: "discard",
+ Length: 1,
+ Run: func(p *Peer, rw MsgReadWriter) error {
+ for {
+ msg, err := rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if err = msg.Discard(); err != nil {
+ return err
+ }
}
- }
+ },
+}
- packet := Packet(0, HandshakeMsg, P2PVersion, []byte("peer"), []interface{}{"bbb", "aaa", "ccc"}, 30, []byte("0000000000000000000000000000000000000000000000000000000000000000"))
- conn.In(0, packet)
- time.Sleep(10 * time.Millisecond)
+func testPeer(protos []Protocol) (net.Conn, *Peer, <-chan error) {
+ conn1, conn2 := net.Pipe()
+ id := NewSimpleClientIdentity("test", "0", "0", "public key")
+ peer := newPeer(conn1, protos, nil)
+ peer.ourID = id
+ peer.pubkeyHook = func(*peerAddr) error { return nil }
+ errc := make(chan error, 1)
+ go func() {
+ _, err := peer.loop()
+ errc <- err
+ }()
+ return conn2, peer, errc
+}
- pro, _ := peer.Messenger().protocols[0].(*BaseProtocol)
- if pro.state != handshakeReceived {
- t.Errorf("handshake not received")
- }
- if peer.Port != 30 {
- t.Errorf("port incorrectly set")
+func TestPeerProtoReadMsg(t *testing.T) {
+ defer testlog(t).detach()
+
+ done := make(chan struct{})
+ proto := Protocol{
+ Name: "a",
+ Length: 5,
+ Run: func(peer *Peer, rw MsgReadWriter) error {
+ msg, err := rw.ReadMsg()
+ if err != nil {
+ t.Errorf("read error: %v", err)
+ }
+ if msg.Code != 2 {
+ t.Errorf("incorrect msg code %d relayed to protocol", msg.Code)
+ }
+ data, err := ioutil.ReadAll(msg.Payload)
+ if err != nil {
+ t.Errorf("payload read error: %v", err)
+ }
+ expdata, _ := hex.DecodeString("0183303030")
+ if !bytes.Equal(expdata, data) {
+ t.Errorf("incorrect msg data %x", data)
+ }
+ close(done)
+ return nil
+ },
}
- if peer.Id != "peer" {
- t.Errorf("id incorrectly set")
+
+ net, peer, errc := testPeer([]Protocol{proto})
+ defer net.Close()
+ peer.startSubprotocols([]Cap{proto.cap()})
+
+ writeMsg(net, NewMsg(18, 1, "000"))
+ select {
+ case <-done:
+ case err := <-errc:
+ t.Errorf("peer returned: %v", err)
+ case <-time.After(2 * time.Second):
+ t.Errorf("receive timeout")
}
- if string(peer.Pubkey) != "0000000000000000000000000000000000000000000000000000000000000000" {
- t.Errorf("pubkey incorrectly set")
+}
+
+func TestPeerProtoReadLargeMsg(t *testing.T) {
+ defer testlog(t).detach()
+
+ msgsize := uint32(10 * 1024 * 1024)
+ done := make(chan struct{})
+ proto := Protocol{
+ Name: "a",
+ Length: 5,
+ Run: func(peer *Peer, rw MsgReadWriter) error {
+ msg, err := rw.ReadMsg()
+ if err != nil {
+ t.Errorf("read error: %v", err)
+ }
+ if msg.Size != msgsize+4 {
+ t.Errorf("incorrect msg.Size, got %d, expected %d", msg.Size, msgsize)
+ }
+ msg.Discard()
+ close(done)
+ return nil
+ },
}
- fmt.Println(peer.Caps)
- if len(peer.Caps) != 3 || peer.Caps[0] != "aaa" || peer.Caps[1] != "bbb" || peer.Caps[2] != "ccc" {
- t.Errorf("protocols incorrectly set")
+
+ net, peer, errc := testPeer([]Protocol{proto})
+ defer net.Close()
+ peer.startSubprotocols([]Cap{proto.cap()})
+
+ writeMsg(net, NewMsg(18, make([]byte, msgsize)))
+ select {
+ case <-done:
+ case err := <-errc:
+ t.Errorf("peer returned: %v", err)
+ case <-time.After(2 * time.Second):
+ t.Errorf("receive timeout")
}
+}
- msg, _ := NewMsg(3)
- err := peer.Write("aaa", msg)
- if err != nil {
- t.Errorf("expect no error for known protocol: %v", err)
- } else {
- time.Sleep(1 * time.Millisecond)
- if len(conn.Out) != 2 {
- t.Errorf("msg not written")
- } else {
- out := conn.Out[1]
- packet := Packet(16, 3)
- if bytes.Compare(out, packet) != 0 {
- t.Errorf("incorrect packet %v != %v", out, packet)
+func TestPeerProtoEncodeMsg(t *testing.T) {
+ defer testlog(t).detach()
+
+ proto := Protocol{
+ Name: "a",
+ Length: 2,
+ Run: func(peer *Peer, rw MsgReadWriter) error {
+ if err := rw.EncodeMsg(2); err == nil {
+ t.Error("expected error for out-of-range msg code, got nil")
}
- }
+ if err := rw.EncodeMsg(1); err != nil {
+ t.Errorf("write error: %v", err)
+ }
+ return nil
+ },
}
+ net, peer, _ := testPeer([]Protocol{proto})
+ defer net.Close()
+ peer.startSubprotocols([]Cap{proto.cap()})
- msg, _ = NewMsg(2)
- err = peer.Write("ccc", msg)
+ bufr := bufio.NewReader(net)
+ msg, err := readMsg(bufr)
if err != nil {
+ t.Errorf("read error: %v", err)
+ }
+ if msg.Code != 17 {
+ t.Errorf("incorrect message code: got %d, expected %d", msg.Code, 17)
+ }
+}
+
+func TestPeerWrite(t *testing.T) {
+ defer testlog(t).detach()
+
+ net, peer, peerErr := testPeer([]Protocol{discard})
+ defer net.Close()
+ peer.startSubprotocols([]Cap{discard.cap()})
+
+ // test write errors
+ if err := peer.writeProtoMsg("b", NewMsg(3)); err == nil {
+ t.Errorf("expected error for unknown protocol, got nil")
+ }
+ if err := peer.writeProtoMsg("discard", NewMsg(8)); err == nil {
+ t.Errorf("expected error for out-of-range msg code, got nil")
+ } else if perr, ok := err.(*peerError); !ok || perr.Code != errInvalidMsgCode {
+ t.Errorf("wrong error for out-of-range msg code, got %#v", err)
+ }
+
+ // setup for reading the message on the other end
+ read := make(chan struct{})
+ go func() {
+ bufr := bufio.NewReader(net)
+ msg, err := readMsg(bufr)
+ if err != nil {
+ t.Errorf("read error: %v", err)
+ } else if msg.Code != 16 {
+ t.Errorf("wrong code, got %d, expected %d", msg.Code, 16)
+ }
+ msg.Discard()
+ close(read)
+ }()
+
+ // test succcessful write
+ if err := peer.writeProtoMsg("discard", NewMsg(0)); err != nil {
t.Errorf("expect no error for known protocol: %v", err)
- } else {
- time.Sleep(1 * time.Millisecond)
- if len(conn.Out) != 3 {
- t.Errorf("msg not written")
- } else {
- out := conn.Out[2]
- packet := Packet(21, 2)
- if bytes.Compare(out, packet) != 0 {
- t.Errorf("incorrect packet %v != %v", out, packet)
- }
+ }
+ select {
+ case <-read:
+ case err := <-peerErr:
+ t.Fatalf("peer stopped: %v", err)
+ }
+}
+
+func TestPeerActivity(t *testing.T) {
+ // shorten inactivityTimeout while this test is running
+ oldT := inactivityTimeout
+ defer func() { inactivityTimeout = oldT }()
+ inactivityTimeout = 20 * time.Millisecond
+
+ net, peer, peerErr := testPeer([]Protocol{discard})
+ defer net.Close()
+ peer.startSubprotocols([]Cap{discard.cap()})
+
+ sub := peer.activity.Subscribe(time.Time{})
+ defer sub.Unsubscribe()
+
+ for i := 0; i < 6; i++ {
+ writeMsg(net, NewMsg(16))
+ select {
+ case <-sub.Chan():
+ case <-time.After(inactivityTimeout / 2):
+ t.Fatal("no event within ", inactivityTimeout/2)
+ case err := <-peerErr:
+ t.Fatal("peer error", err)
}
}
- err = peer.Write("bbb", msg)
- time.Sleep(1 * time.Millisecond)
- if err == nil {
- t.Errorf("expect error for unknown protocol")
+ select {
+ case <-time.After(inactivityTimeout * 2):
+ case <-sub.Chan():
+ t.Fatal("got activity event while connection was inactive")
+ case err := <-peerErr:
+ t.Fatal("peer error", err)
+ }
+}
+
+func TestNewPeer(t *testing.T) {
+ id := NewSimpleClientIdentity("clientid", "version", "customid", "pubkey")
+ caps := []Cap{{"foo", 2}, {"bar", 3}}
+ p := NewPeer(id, caps)
+ if !reflect.DeepEqual(p.Caps(), caps) {
+ t.Errorf("Caps mismatch: got %v, expected %v", p.Caps(), caps)
+ }
+ if p.Identity() != id {
+ t.Errorf("Identity mismatch: got %v, expected %v", p.Identity(), id)
}
+ // Should not hang.
+ p.Disconnect(DiscAlreadyConnected)
}