aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/server_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/server_test.go')
-rw-r--r--p2p/server_test.go584
1 files changed, 270 insertions, 314 deletions
diff --git a/p2p/server_test.go b/p2p/server_test.go
index 6f7aaf8e1..01448cc7b 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -2,8 +2,10 @@ package p2p
import (
"crypto/ecdsa"
+ "errors"
"math/rand"
"net"
+ "reflect"
"testing"
"time"
@@ -12,29 +14,50 @@ import (
"github.com/ethereum/go-ethereum/p2p/discover"
)
-func startTestServer(t *testing.T, pf newPeerHook) *Server {
+func init() {
+ // glog.SetV(6)
+ // glog.SetToStderr(true)
+}
+
+type testTransport struct {
+ id discover.NodeID
+ *rlpx
+
+ closeErr error
+}
+
+func newTestTransport(id discover.NodeID, fd net.Conn) transport {
+ wrapped := newRLPX(fd).(*rlpx)
+ wrapped.rw = newRLPXFrameRW(fd, secrets{
+ MAC: zero16,
+ AES: zero16,
+ IngressMAC: sha3.NewKeccak256(),
+ EgressMAC: sha3.NewKeccak256(),
+ })
+ return &testTransport{id: id, rlpx: wrapped}
+}
+
+func (c *testTransport) doEncHandshake(prv *ecdsa.PrivateKey, dialDest *discover.Node) (discover.NodeID, error) {
+ return c.id, nil
+}
+
+func (c *testTransport) doProtoHandshake(our *protoHandshake) (*protoHandshake, error) {
+ return &protoHandshake{ID: c.id, Name: "test"}, nil
+}
+
+func (c *testTransport) close(err error) {
+ c.rlpx.fd.Close()
+ c.closeErr = err
+}
+
+func startTestServer(t *testing.T, id discover.NodeID, pf func(*Peer)) *Server {
server := &Server{
- Name: "test",
- MaxPeers: 10,
- ListenAddr: "127.0.0.1:0",
- PrivateKey: newkey(),
- newPeerHook: pf,
- setupFunc: func(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, keepconn func(discover.NodeID) bool) (*conn, error) {
- id := randomID()
- if !keepconn(id) {
- return nil, DiscAlreadyConnected
- }
- rw := newRlpxFrameRW(fd, secrets{
- MAC: zero16,
- AES: zero16,
- IngressMAC: sha3.NewKeccak256(),
- EgressMAC: sha3.NewKeccak256(),
- })
- return &conn{
- MsgReadWriter: rw,
- protoHandshake: &protoHandshake{ID: id, Version: baseProtocolVersion},
- }, nil
- },
+ Name: "test",
+ MaxPeers: 10,
+ ListenAddr: "127.0.0.1:0",
+ PrivateKey: newkey(),
+ newPeerHook: pf,
+ newTransport: func(fd net.Conn) transport { return newTestTransport(id, fd) },
}
if err := server.Start(); err != nil {
t.Fatalf("Could not start server: %v", err)
@@ -45,7 +68,11 @@ func startTestServer(t *testing.T, pf newPeerHook) *Server {
func TestServerListen(t *testing.T) {
// start the test server
connected := make(chan *Peer)
- srv := startTestServer(t, func(p *Peer) {
+ remid := randomID()
+ srv := startTestServer(t, remid, func(p *Peer) {
+ if p.ID() != remid {
+ t.Error("peer func called with wrong node id")
+ }
if p == nil {
t.Error("peer func called with nil conn")
}
@@ -67,6 +94,10 @@ func TestServerListen(t *testing.T) {
t.Errorf("peer started with wrong conn: got %v, want %v",
peer.LocalAddr(), conn.RemoteAddr())
}
+ peers := srv.Peers()
+ if !reflect.DeepEqual(peers, []*Peer{peer}) {
+ t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer})
+ }
case <-time.After(1 * time.Second):
t.Error("server did not accept within one second")
}
@@ -92,23 +123,33 @@ func TestServerDial(t *testing.T) {
// start the server
connected := make(chan *Peer)
- srv := startTestServer(t, func(p *Peer) { connected <- p })
+ remid := randomID()
+ srv := startTestServer(t, remid, func(p *Peer) { connected <- p })
defer close(connected)
defer srv.Stop()
// tell the server to connect
tcpAddr := listener.Addr().(*net.TCPAddr)
- srv.staticDial <- &discover.Node{IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)}
+ srv.AddPeer(&discover.Node{ID: remid, IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)})
select {
case conn := <-accepted:
select {
case peer := <-connected:
+ if peer.ID() != remid {
+ t.Errorf("peer has wrong id")
+ }
+ if peer.Name() != "test" {
+ t.Errorf("peer has wrong name")
+ }
if peer.RemoteAddr().String() != conn.LocalAddr().String() {
t.Errorf("peer started with wrong conn: got %v, want %v",
peer.RemoteAddr(), conn.LocalAddr())
}
- // TODO: validate more fields
+ peers := srv.Peers()
+ if !reflect.DeepEqual(peers, []*Peer{peer}) {
+ t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer})
+ }
case <-time.After(1 * time.Second):
t.Error("server did not launch peer within one second")
}
@@ -118,331 +159,250 @@ func TestServerDial(t *testing.T) {
}
}
-// This test checks that connections are disconnected
-// just after the encryption handshake when the server is
-// at capacity.
-//
-// It also serves as a light-weight integration test.
-func TestServerDisconnectAtCap(t *testing.T) {
- started := make(chan *Peer)
- srv := &Server{
- ListenAddr: "127.0.0.1:0",
- PrivateKey: newkey(),
- MaxPeers: 10,
- NoDial: true,
- // This hook signals that the peer was actually started. We
- // need to wait for the peer to be started before dialing the
- // next connection to get a deterministic peer count.
- newPeerHook: func(p *Peer) { started <- p },
- }
- if err := srv.Start(); err != nil {
- t.Fatal(err)
- }
- defer srv.Stop()
-
- nconns := srv.MaxPeers + 1
- dialer := &net.Dialer{Deadline: time.Now().Add(3 * time.Second)}
- for i := 0; i < nconns; i++ {
- conn, err := dialer.Dial("tcp", srv.ListenAddr)
- if err != nil {
- t.Fatalf("conn %d: dial error: %v", i, err)
- }
- // Close the connection when the test ends, before
- // shutting down the server.
- defer conn.Close()
- // Run the handshakes just like a real peer would.
- key := newkey()
- hs := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
- _, err = setupConn(conn, key, hs, srv.Self(), keepalways)
- if i == nconns-1 {
- // When handling the last connection, the server should
- // disconnect immediately instead of running the protocol
- // handshake.
- if err != DiscTooManyPeers {
- t.Errorf("conn %d: got error %q, expected %q", i, err, DiscTooManyPeers)
- }
- } else {
- // For all earlier connections, the handshake should go through.
- if err != nil {
- t.Fatalf("conn %d: unexpected error: %v", i, err)
- }
- // Wait for runPeer to be started.
- <-started
+// This test checks that tasks generated by dialstate are
+// actually executed and taskdone is called for them.
+func TestServerTaskScheduling(t *testing.T) {
+ var (
+ done = make(chan *testTask)
+ quit, returned = make(chan struct{}), make(chan struct{})
+ tc = 0
+ tg = taskgen{
+ newFunc: func(running int, peers map[discover.NodeID]*Peer) []task {
+ tc++
+ return []task{&testTask{index: tc - 1}}
+ },
+ doneFunc: func(t task) {
+ select {
+ case done <- t.(*testTask):
+ case <-quit:
+ }
+ },
}
- }
-}
+ )
-// Tests that static peers are (re)connected, and done so even above max peers.
-func TestServerStaticPeers(t *testing.T) {
- // Create a test server with limited connection slots
- started := make(chan *Peer)
- server := &Server{
- ListenAddr: "127.0.0.1:0",
- PrivateKey: newkey(),
- MaxPeers: 3,
- newPeerHook: func(p *Peer) { started <- p },
- staticCycle: time.Second,
- }
- if err := server.Start(); err != nil {
- t.Fatal(err)
+ // The Server in this test isn't actually running
+ // because we're only interested in what run does.
+ srv := &Server{
+ MaxPeers: 10,
+ quit: make(chan struct{}),
+ ntab: fakeTable{},
+ running: true,
}
- defer server.Stop()
-
- // Fill up all the slots on the server
- dialer := &net.Dialer{Deadline: time.Now().Add(3 * time.Second)}
- for i := 0; i < server.MaxPeers; i++ {
- // Establish a new connection
- conn, err := dialer.Dial("tcp", server.ListenAddr)
- if err != nil {
- t.Fatalf("conn %d: dial error: %v", i, err)
- }
- defer conn.Close()
+ srv.loopWG.Add(1)
+ go func() {
+ srv.run(tg)
+ close(returned)
+ }()
- // Run the handshakes just like a real peer would, and wait for completion
- key := newkey()
- shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
- if _, err = setupConn(conn, key, shake, server.Self(), keepalways); err != nil {
- t.Fatalf("conn %d: unexpected error: %v", i, err)
- }
- <-started
+ var gotdone []*testTask
+ for i := 0; i < 100; i++ {
+ gotdone = append(gotdone, <-done)
}
- // Open a TCP listener to accept static connections
- listener, err := net.Listen("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatalf("failed to setup listener: %v", err)
- }
- defer listener.Close()
-
- connected := make(chan net.Conn)
- go func() {
- for i := 0; i < 3; i++ {
- conn, err := listener.Accept()
- if err == nil {
- connected <- conn
- }
+ for i, task := range gotdone {
+ if task.index != i {
+ t.Errorf("task %d has wrong index, got %d", i, task.index)
+ break
+ }
+ if !task.called {
+ t.Errorf("task %d was not called", i)
+ break
}
- }()
- // Inject a static node and wait for a remote dial, then redial, then nothing
- addr := listener.Addr().(*net.TCPAddr)
- static := &discover.Node{
- ID: discover.PubkeyID(&newkey().PublicKey),
- IP: addr.IP,
- TCP: uint16(addr.Port),
}
- server.AddPeer(static)
+ close(quit)
+ srv.Stop()
select {
- case conn := <-connected:
- // Close the first connection, expect redial
- conn.Close()
-
- case <-time.After(2 * server.staticCycle):
- t.Fatalf("remote dial timeout")
+ case <-returned:
+ case <-time.After(500 * time.Millisecond):
+ t.Error("Server.run did not return within 500ms")
}
+}
- select {
- case conn := <-connected:
- // Keep the second connection, don't expect redial
- defer conn.Close()
-
- case <-time.After(2 * server.staticCycle):
- t.Fatalf("remote re-dial timeout")
- }
+type taskgen struct {
+ newFunc func(running int, peers map[discover.NodeID]*Peer) []task
+ doneFunc func(task)
+}
- select {
- case <-time.After(2 * server.staticCycle):
- // Timeout as no dial occurred
+func (tg taskgen) newTasks(running int, peers map[discover.NodeID]*Peer, now time.Time) []task {
+ return tg.newFunc(running, peers)
+}
+func (tg taskgen) taskDone(t task, now time.Time) {
+ tg.doneFunc(t)
+}
+func (tg taskgen) addStatic(*discover.Node) {
+}
- case <-connected:
- t.Fatalf("connected node dialed")
- }
+type testTask struct {
+ index int
+ called bool
}
-// Tests that trusted peers and can connect above max peer caps.
-func TestServerTrustedPeers(t *testing.T) {
+func (t *testTask) Do(srv *Server) {
+ t.called = true
+}
- // Create a trusted peer to accept connections from
- key := newkey()
- trusted := &discover.Node{
- ID: discover.PubkeyID(&key.PublicKey),
- }
- // Create a test server with limited connection slots
- started := make(chan *Peer)
- server := &Server{
- ListenAddr: "127.0.0.1:0",
+// This test checks that connections are disconnected
+// just after the encryption handshake when the server is
+// at capacity. Trusted connections should still be accepted.
+func TestServerAtCap(t *testing.T) {
+ trustedID := randomID()
+ srv := &Server{
PrivateKey: newkey(),
- MaxPeers: 3,
+ MaxPeers: 10,
NoDial: true,
- TrustedNodes: []*discover.Node{trusted},
- newPeerHook: func(p *Peer) { started <- p },
+ TrustedNodes: []*discover.Node{{ID: trustedID}},
}
- if err := server.Start(); err != nil {
- t.Fatal(err)
+ if err := srv.Start(); err != nil {
+ t.Fatalf("could not start: %v", err)
}
- defer server.Stop()
+ defer srv.Stop()
- // Fill up all the slots on the server
- dialer := &net.Dialer{Deadline: time.Now().Add(3 * time.Second)}
- for i := 0; i < server.MaxPeers; i++ {
- // Establish a new connection
- conn, err := dialer.Dial("tcp", server.ListenAddr)
- if err != nil {
- t.Fatalf("conn %d: dial error: %v", i, err)
- }
- defer conn.Close()
+ newconn := func(id discover.NodeID) *conn {
+ fd, _ := net.Pipe()
+ tx := newTestTransport(id, fd)
+ return &conn{fd: fd, transport: tx, flags: inboundConn, id: id, cont: make(chan error)}
+ }
- // Run the handshakes just like a real peer would, and wait for completion
- key := newkey()
- shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
- if _, err = setupConn(conn, key, shake, server.Self(), keepalways); err != nil {
- t.Fatalf("conn %d: unexpected error: %v", i, err)
+ // Inject a few connections to fill up the peer set.
+ for i := 0; i < 10; i++ {
+ c := newconn(randomID())
+ if err := srv.checkpoint(c, srv.addpeer); err != nil {
+ t.Fatalf("could not add conn %d: %v", i, err)
}
- <-started
}
- // Dial from the trusted peer, ensure connection is accepted
- conn, err := dialer.Dial("tcp", server.ListenAddr)
- if err != nil {
- t.Fatalf("trusted node: dial error: %v", err)
+ // Try inserting a non-trusted connection.
+ c := newconn(randomID())
+ if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
+ t.Error("wrong error for insert:", err)
}
- defer conn.Close()
-
- shake := &protoHandshake{Version: baseProtocolVersion, ID: trusted.ID}
- if _, err = setupConn(conn, key, shake, server.Self(), keepalways); err != nil {
- t.Fatalf("trusted node: unexpected error: %v", err)
+ // Try inserting a trusted connection.
+ c = newconn(trustedID)
+ if err := srv.checkpoint(c, srv.posthandshake); err != nil {
+ t.Error("unexpected error for trusted conn @posthandshake:", err)
}
- select {
- case <-started:
- // Ok, trusted peer accepted
-
- case <-time.After(100 * time.Millisecond):
- t.Fatalf("trusted node timeout")
+ if !c.is(trustedConn) {
+ t.Error("Server did not set trusted flag")
}
+
}
-// Tests that a failed dial will temporarily throttle a peer.
-func TestServerMaxPendingDials(t *testing.T) {
- // Start a simple test server
- server := &Server{
- ListenAddr: "127.0.0.1:0",
- PrivateKey: newkey(),
- MaxPeers: 10,
- MaxPendingPeers: 1,
- }
- if err := server.Start(); err != nil {
- t.Fatal("failed to start test server: %v", err)
+func TestServerSetupConn(t *testing.T) {
+ id := randomID()
+ srvkey := newkey()
+ srvid := discover.PubkeyID(&srvkey.PublicKey)
+ tests := []struct {
+ dontstart bool
+ tt *setupTransport
+ flags connFlag
+ dialDest *discover.Node
+
+ wantCloseErr error
+ wantCalls string
+ }{
+ {
+ dontstart: true,
+ tt: &setupTransport{id: id},
+ wantCalls: "close,",
+ wantCloseErr: errServerStopped,
+ },
+ {
+ tt: &setupTransport{id: id, encHandshakeErr: errors.New("read error")},
+ flags: inboundConn,
+ wantCalls: "doEncHandshake,close,",
+ wantCloseErr: errors.New("read error"),
+ },
+ {
+ tt: &setupTransport{id: id},
+ dialDest: &discover.Node{ID: randomID()},
+ flags: dynDialedConn,
+ wantCalls: "doEncHandshake,close,",
+ wantCloseErr: DiscUnexpectedIdentity,
+ },
+ {
+ tt: &setupTransport{id: id, phs: &protoHandshake{ID: randomID()}},
+ dialDest: &discover.Node{ID: id},
+ flags: dynDialedConn,
+ wantCalls: "doEncHandshake,doProtoHandshake,close,",
+ wantCloseErr: DiscUnexpectedIdentity,
+ },
+ {
+ tt: &setupTransport{id: id, protoHandshakeErr: errors.New("foo")},
+ dialDest: &discover.Node{ID: id},
+ flags: dynDialedConn,
+ wantCalls: "doEncHandshake,doProtoHandshake,close,",
+ wantCloseErr: errors.New("foo"),
+ },
+ {
+ tt: &setupTransport{id: srvid, phs: &protoHandshake{ID: srvid}},
+ flags: inboundConn,
+ wantCalls: "doEncHandshake,close,",
+ wantCloseErr: DiscSelf,
+ },
+ {
+ tt: &setupTransport{id: id, phs: &protoHandshake{ID: id}},
+ flags: inboundConn,
+ wantCalls: "doEncHandshake,doProtoHandshake,close,",
+ wantCloseErr: DiscUselessPeer,
+ },
}
- defer server.Stop()
- // Simulate two separate remote peers
- peers := make(chan *discover.Node, 2)
- conns := make(chan net.Conn, 2)
- for i := 0; i < 2; i++ {
- listener, err := net.Listen("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatalf("listener %d: failed to setup: %v", i, err)
- }
- defer listener.Close()
-
- addr := listener.Addr().(*net.TCPAddr)
- peers <- &discover.Node{
- ID: discover.PubkeyID(&newkey().PublicKey),
- IP: addr.IP,
- TCP: uint16(addr.Port),
+ for i, test := range tests {
+ srv := &Server{
+ PrivateKey: srvkey,
+ MaxPeers: 10,
+ NoDial: true,
+ Protocols: []Protocol{discard},
+ newTransport: func(fd net.Conn) transport { return test.tt },
}
- go func() {
- conn, err := listener.Accept()
- if err == nil {
- conns <- conn
+ if !test.dontstart {
+ if err := srv.Start(); err != nil {
+ t.Fatalf("couldn't start server: %v", err)
}
- }()
- }
- // Request a dial for both peers
- go func() {
- for i := 0; i < 2; i++ {
- server.staticDial <- <-peers // hack piggybacking the static implementation
}
- }()
-
- // Make sure only one outbound connection goes through
- var conn net.Conn
-
- select {
- case conn = <-conns:
- case <-time.After(100 * time.Millisecond):
- t.Fatalf("first dial timeout")
- }
- select {
- case conn = <-conns:
- t.Fatalf("second dial completed prematurely")
- case <-time.After(100 * time.Millisecond):
- }
- // Finish the first dial, check the second
- conn.Close()
- select {
- case conn = <-conns:
- conn.Close()
-
- case <-time.After(100 * time.Millisecond):
- t.Fatalf("second dial timeout")
+ p1, _ := net.Pipe()
+ srv.setupConn(p1, test.flags, test.dialDest)
+ if !reflect.DeepEqual(test.tt.closeErr, test.wantCloseErr) {
+ t.Errorf("test %d: close error mismatch: got %q, want %q", i, test.tt.closeErr, test.wantCloseErr)
+ }
+ if test.tt.calls != test.wantCalls {
+ t.Errorf("test %d: calls mismatch: got %q, want %q", i, test.tt.calls, test.wantCalls)
+ }
}
}
-func TestServerMaxPendingAccepts(t *testing.T) {
- // Start a test server and a peer sink for synchronization
- started := make(chan *Peer)
- server := &Server{
- ListenAddr: "127.0.0.1:0",
- PrivateKey: newkey(),
- MaxPeers: 10,
- MaxPendingPeers: 1,
- NoDial: true,
- newPeerHook: func(p *Peer) { started <- p },
- }
- if err := server.Start(); err != nil {
- t.Fatal("failed to start test server: %v", err)
- }
- defer server.Stop()
+type setupTransport struct {
+ id discover.NodeID
+ encHandshakeErr error
- // Try and connect to the server on multiple threads concurrently
- conns := make([]net.Conn, 2)
- for i := 0; i < 2; i++ {
- dialer := &net.Dialer{Deadline: time.Now().Add(3 * time.Second)}
+ phs *protoHandshake
+ protoHandshakeErr error
- conn, err := dialer.Dial("tcp", server.ListenAddr)
- if err != nil {
- t.Fatalf("failed to dial server: %v", err)
- }
- conns[i] = conn
- }
- // Check that a handshake on the second doesn't pass
- go func() {
- key := newkey()
- shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
- if _, err := setupConn(conns[1], key, shake, server.Self(), keepalways); err != nil {
- t.Fatalf("failed to run handshake: %v", err)
- }
- }()
- select {
- case <-started:
- t.Fatalf("handshake on second connection accepted")
+ calls string
+ closeErr error
+}
- case <-time.After(time.Second):
- }
- // Shake on first, check that both go through
- go func() {
- key := newkey()
- shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
- if _, err := setupConn(conns[0], key, shake, server.Self(), keepalways); err != nil {
- t.Fatalf("failed to run handshake: %v", err)
- }
- }()
- for i := 0; i < 2; i++ {
- select {
- case <-started:
- case <-time.After(time.Second):
- t.Fatalf("peer %d: handshake timeout", i)
- }
+func (c *setupTransport) doEncHandshake(prv *ecdsa.PrivateKey, dialDest *discover.Node) (discover.NodeID, error) {
+ c.calls += "doEncHandshake,"
+ return c.id, c.encHandshakeErr
+}
+func (c *setupTransport) doProtoHandshake(our *protoHandshake) (*protoHandshake, error) {
+ c.calls += "doProtoHandshake,"
+ if c.protoHandshakeErr != nil {
+ return nil, c.protoHandshakeErr
}
+ return c.phs, nil
+}
+func (c *setupTransport) close(err error) {
+ c.calls += "close,"
+ c.closeErr = err
+}
+
+// setupConn shouldn't write to/read from the connection.
+func (c *setupTransport) WriteMsg(Msg) error {
+ panic("WriteMsg called on setupTransport")
+}
+func (c *setupTransport) ReadMsg() (Msg, error) {
+ panic("ReadMsg called on setupTransport")
}
func newkey() *ecdsa.PrivateKey {
@@ -459,7 +419,3 @@ func randomID() (id discover.NodeID) {
}
return id
}
-
-func keepalways(id discover.NodeID) bool {
- return true
-}