diff options
author | Felix Lange <fjl@twurst.com> | 2014-11-22 04:48:49 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2014-11-22 04:52:45 +0800 |
commit | 59b63caf5e4de64ceb7dcdf01551a080f53b1672 (patch) | |
tree | a4e79590284c5afe4d6927b422a5092b074e7938 /p2p/peer_test.go | |
parent | e4a601c6444afdc11ce0cb80d7fd83116de2c8b9 (diff) | |
download | go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.gz go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.bz2 go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.lz go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.xz go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.zst go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.zip |
p2p: API cleanup and PoC 7 compatibility
Whoa, one more big commit. I didn't manage to untangle the
changes while working towards compatibility.
Diffstat (limited to 'p2p/peer_test.go')
-rw-r--r-- | p2p/peer_test.go | 308 |
1 files changed, 220 insertions, 88 deletions
diff --git a/p2p/peer_test.go b/p2p/peer_test.go index da62cc380..1afa0ab17 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -1,90 +1,222 @@ package p2p -// "net" - -// func TestPeer(t *testing.T) { -// handlers := make(Handlers) -// testProtocol := &TestProtocol{recv: make(chan testMsg)} -// handlers["aaa"] = func(p *Peer) Protocol { return testProtocol } -// handlers["ccc"] = func(p *Peer) Protocol { return testProtocol } -// addr := &TestAddr{"test:30"} -// conn := NewTestNetworkConnection(addr) -// _, server := SetupTestServer(handlers) -// server.Handshake() -// peer := NewPeer(conn, addr, true, server) -// // peer.Messenger().AddProtocols([]string{"aaa", "ccc"}) -// peer.Start() -// defer peer.Stop() -// time.Sleep(2 * time.Millisecond) -// if len(conn.Out) != 1 { -// t.Errorf("handshake not sent") -// } else { -// out := conn.Out[0] -// packet := Packet(0, HandshakeMsg, P2PVersion, []byte(peer.server.identity.String()), []interface{}{peer.server.protocols}, peer.server.port, peer.server.identity.Pubkey()[1:]) -// if bytes.Compare(out, packet) != 0 { -// t.Errorf("incorrect handshake packet %v != %v", out, packet) -// } -// } - -// packet := Packet(0, HandshakeMsg, P2PVersion, []byte("peer"), []interface{}{"bbb", "aaa", "ccc"}, 30, []byte("0000000000000000000000000000000000000000000000000000000000000000")) -// conn.In(0, packet) -// time.Sleep(10 * time.Millisecond) - -// pro, _ := peer.Messenger().protocols[0].(*BaseProtocol) -// if pro.state != handshakeReceived { -// t.Errorf("handshake not received") -// } -// if peer.Port != 30 { -// t.Errorf("port incorrectly set") -// } -// if peer.Id != "peer" { -// t.Errorf("id incorrectly set") -// } -// if string(peer.Pubkey) != "0000000000000000000000000000000000000000000000000000000000000000" { -// t.Errorf("pubkey incorrectly set") -// } -// fmt.Println(peer.Caps) -// if len(peer.Caps) != 3 || peer.Caps[0] != "aaa" || peer.Caps[1] != "bbb" || peer.Caps[2] != "ccc" { -// t.Errorf("protocols incorrectly set") -// } - -// msg := NewMsg(3) -// err := peer.Write("aaa", msg) -// if err != nil { -// t.Errorf("expect no error for known protocol: %v", err) -// } else { -// time.Sleep(1 * time.Millisecond) -// if len(conn.Out) != 2 { -// t.Errorf("msg not written") -// } else { -// out := conn.Out[1] -// packet := Packet(16, 3) -// if bytes.Compare(out, packet) != 0 { -// t.Errorf("incorrect packet %v != %v", out, packet) -// } -// } -// } - -// msg = NewMsg(2) -// err = peer.Write("ccc", msg) -// if err != nil { -// t.Errorf("expect no error for known protocol: %v", err) -// } else { -// time.Sleep(1 * time.Millisecond) -// if len(conn.Out) != 3 { -// t.Errorf("msg not written") -// } else { -// out := conn.Out[2] -// packet := Packet(21, 2) -// if bytes.Compare(out, packet) != 0 { -// t.Errorf("incorrect packet %v != %v", out, packet) -// } -// } -// } - -// err = peer.Write("bbb", msg) -// time.Sleep(1 * time.Millisecond) -// if err == nil { -// t.Errorf("expect error for unknown protocol") -// } -// } +import ( + "bufio" + "net" + "reflect" + "testing" + "time" +) + +var discard = Protocol{ + Name: "discard", + Length: 1, + Run: func(p *Peer, rw MsgReadWriter) error { + for { + msg, err := rw.ReadMsg() + if err != nil { + return err + } + if err = msg.Discard(); err != nil { + return err + } + } + }, +} + +func testPeer(protos []Protocol) (net.Conn, *Peer, <-chan error) { + conn1, conn2 := net.Pipe() + id := NewSimpleClientIdentity("test", "0", "0", "public key") + peer := newPeer(conn1, protos, nil) + peer.ourID = id + peer.pubkeyHook = func(*peerAddr) error { return nil } + errc := make(chan error, 1) + go func() { + _, err := peer.loop() + errc <- err + }() + return conn2, peer, errc +} + +func TestPeerProtoReadMsg(t *testing.T) { + defer testlog(t).detach() + + done := make(chan struct{}) + proto := Protocol{ + Name: "a", + Length: 5, + Run: func(peer *Peer, rw MsgReadWriter) error { + msg, err := rw.ReadMsg() + if err != nil { + t.Errorf("read error: %v", err) + } + if msg.Code != 2 { + t.Errorf("incorrect msg code %d relayed to protocol", msg.Code) + } + data, err := msg.Data() + if err != nil { + t.Errorf("data decoding error: %v", err) + } + expdata := []interface{}{1, []byte{0x30, 0x30, 0x30}} + if !reflect.DeepEqual(data.Slice(), expdata) { + t.Errorf("incorrect msg data %#v", data.Slice()) + } + close(done) + return nil + }, + } + + net, peer, errc := testPeer([]Protocol{proto}) + defer net.Close() + peer.startSubprotocols([]Cap{proto.cap()}) + + writeMsg(net, NewMsg(18, 1, "000")) + select { + case <-done: + case err := <-errc: + t.Errorf("peer returned: %v", err) + case <-time.After(2 * time.Second): + t.Errorf("receive timeout") + } +} + +func TestPeerProtoReadLargeMsg(t *testing.T) { + defer testlog(t).detach() + + msgsize := uint32(10 * 1024 * 1024) + done := make(chan struct{}) + proto := Protocol{ + Name: "a", + Length: 5, + Run: func(peer *Peer, rw MsgReadWriter) error { + msg, err := rw.ReadMsg() + if err != nil { + t.Errorf("read error: %v", err) + } + if msg.Size != msgsize+4 { + t.Errorf("incorrect msg.Size, got %d, expected %d", msg.Size, msgsize) + } + msg.Discard() + close(done) + return nil + }, + } + + net, peer, errc := testPeer([]Protocol{proto}) + defer net.Close() + peer.startSubprotocols([]Cap{proto.cap()}) + + writeMsg(net, NewMsg(18, make([]byte, msgsize))) + select { + case <-done: + case err := <-errc: + t.Errorf("peer returned: %v", err) + case <-time.After(2 * time.Second): + t.Errorf("receive timeout") + } +} + +func TestPeerProtoEncodeMsg(t *testing.T) { + defer testlog(t).detach() + + proto := Protocol{ + Name: "a", + Length: 2, + Run: func(peer *Peer, rw MsgReadWriter) error { + if err := rw.EncodeMsg(2); err == nil { + t.Error("expected error for out-of-range msg code, got nil") + } + if err := rw.EncodeMsg(1); err != nil { + t.Errorf("write error: %v", err) + } + return nil + }, + } + net, peer, _ := testPeer([]Protocol{proto}) + defer net.Close() + peer.startSubprotocols([]Cap{proto.cap()}) + + bufr := bufio.NewReader(net) + msg, err := readMsg(bufr) + if err != nil { + t.Errorf("read error: %v", err) + } + if msg.Code != 17 { + t.Errorf("incorrect message code: got %d, expected %d", msg.Code, 17) + } +} + +func TestPeerWrite(t *testing.T) { + defer testlog(t).detach() + + net, peer, peerErr := testPeer([]Protocol{discard}) + defer net.Close() + peer.startSubprotocols([]Cap{discard.cap()}) + + // test write errors + if err := peer.writeProtoMsg("b", NewMsg(3)); err == nil { + t.Errorf("expected error for unknown protocol, got nil") + } + if err := peer.writeProtoMsg("discard", NewMsg(8)); err == nil { + t.Errorf("expected error for out-of-range msg code, got nil") + } else if perr, ok := err.(*peerError); !ok || perr.Code != errInvalidMsgCode { + t.Errorf("wrong error for out-of-range msg code, got %#v", err) + } + + // setup for reading the message on the other end + read := make(chan struct{}) + go func() { + bufr := bufio.NewReader(net) + msg, err := readMsg(bufr) + if err != nil { + t.Errorf("read error: %v", err) + } else if msg.Code != 16 { + t.Errorf("wrong code, got %d, expected %d", msg.Code, 16) + } + msg.Discard() + close(read) + }() + + // test succcessful write + if err := peer.writeProtoMsg("discard", NewMsg(0)); err != nil { + t.Errorf("expect no error for known protocol: %v", err) + } + select { + case <-read: + case err := <-peerErr: + t.Fatalf("peer stopped: %v", err) + } +} + +func TestPeerActivity(t *testing.T) { + // shorten inactivityTimeout while this test is running + oldT := inactivityTimeout + defer func() { inactivityTimeout = oldT }() + inactivityTimeout = 20 * time.Millisecond + + net, peer, peerErr := testPeer([]Protocol{discard}) + defer net.Close() + peer.startSubprotocols([]Cap{discard.cap()}) + + sub := peer.activity.Subscribe(time.Time{}) + defer sub.Unsubscribe() + + for i := 0; i < 6; i++ { + writeMsg(net, NewMsg(16)) + select { + case <-sub.Chan(): + case <-time.After(inactivityTimeout / 2): + t.Fatal("no event within ", inactivityTimeout/2) + case err := <-peerErr: + t.Fatal("peer error", err) + } + } + + select { + case <-time.After(inactivityTimeout * 2): + case <-sub.Chan(): + t.Fatal("got activity event while connection was inactive") + case err := <-peerErr: + t.Fatal("peer error", err) + } +} |