aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/messenger_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/messenger_test.go')
-rw-r--r--p2p/messenger_test.go224
1 files changed, 117 insertions, 107 deletions
diff --git a/p2p/messenger_test.go b/p2p/messenger_test.go
index 472d74515..f10469e2f 100644
--- a/p2p/messenger_test.go
+++ b/p2p/messenger_test.go
@@ -1,147 +1,157 @@
package p2p
import (
- // "fmt"
- "bytes"
+ "bufio"
+ "fmt"
+ "io"
+ "log"
+ "net"
+ "os"
+ "reflect"
"testing"
"time"
"github.com/ethereum/go-ethereum/ethutil"
)
-func setupMessenger(handlers Handlers) (*TestNetworkConnection, chan *PeerError, *Messenger) {
- errchan := NewPeerErrorChannel()
- addr := &TestAddr{"test:30303"}
- net := NewTestNetworkConnection(addr)
- conn := NewConnection(net, errchan)
- mess := NewMessenger(nil, conn, errchan, handlers)
- mess.Start()
- return net, errchan, mess
+func init() {
+ ethlog.AddLogSystem(ethlog.NewStdLogSystem(os.Stdout, log.LstdFlags, ethlog.DebugLevel))
}
-type TestProtocol struct {
- Msgs []*Msg
+func setupMessenger(handlers Handlers) (net.Conn, *Peer, *messenger) {
+ conn1, conn2 := net.Pipe()
+ id := NewSimpleClientIdentity("test", "0", "0", "public key")
+ server := New(nil, conn1.LocalAddr(), id, handlers, 10, NewBlacklist())
+ peer := server.addPeer(conn1, conn1.RemoteAddr(), true, 0)
+ return conn2, peer, peer.messenger
}
-func (self *TestProtocol) Start() {
-}
-
-func (self *TestProtocol) Stop() {
-}
-
-func (self *TestProtocol) Offset() MsgCode {
- return MsgCode(5)
+func performTestHandshake(r *bufio.Reader, w io.Writer) error {
+ // read remote handshake
+ msg, err := readMsg(r)
+ if err != nil {
+ return fmt.Errorf("read error: %v", err)
+ }
+ if msg.Code != handshakeMsg {
+ return fmt.Errorf("first message should be handshake, got %x", msg.Code)
+ }
+ if err := msg.Discard(); err != nil {
+ return err
+ }
+ // send empty handshake
+ pubkey := make([]byte, 64)
+ msg = NewMsg(handshakeMsg, p2pVersion, "testid", nil, 9999, pubkey)
+ return writeMsg(w, msg)
}
-func (self *TestProtocol) HandleIn(msg *Msg, response chan *Msg) {
- self.Msgs = append(self.Msgs, msg)
- close(response)
+type testMsg struct {
+ code MsgCode
+ data *ethutil.Value
}
-func (self *TestProtocol) HandleOut(msg *Msg) bool {
- if msg.Code() > 3 {
- return false
- } else {
- return true
- }
+type testProto struct {
+ recv chan testMsg
}
-func (self *TestProtocol) Name() string {
- return "a"
-}
+func (*testProto) Offset() MsgCode { return 5 }
-func Packet(offset MsgCode, code MsgCode, params ...interface{}) []byte {
- msg, _ := NewMsg(code, params...)
- encoded := msg.Encode(offset)
- packet := []byte{34, 64, 8, 145}
- packet = append(packet, ethutil.NumberToBytes(uint32(len(encoded)), 32)...)
- return append(packet, encoded...)
+func (tp *testProto) Start(peer *Peer, rw MsgReadWriter) error {
+ return MsgLoop(rw, 1024, func(code MsgCode, data *ethutil.Value) error {
+ logger.Debugf("testprotocol got msg: %d\n", code)
+ tp.recv <- testMsg{code, data}
+ return nil
+ })
}
func TestRead(t *testing.T) {
- handlers := make(Handlers)
- testProtocol := &TestProtocol{Msgs: []*Msg{}}
- handlers["a"] = func(p *Peer) Protocol { return testProtocol }
- net, _, mess := setupMessenger(handlers)
- mess.AddProtocols([]string{"a"})
- defer mess.Stop()
- wait := 1 * time.Millisecond
- packet := Packet(16, 1, uint32(1), "000")
- go net.In(0, packet)
- time.Sleep(wait)
- if len(testProtocol.Msgs) != 1 {
- t.Errorf("msg not relayed to correct protocol")
- } else {
- if testProtocol.Msgs[0].Code() != 1 {
- t.Errorf("incorrect msg code relayed to protocol")
+ testProtocol := &testProto{make(chan testMsg)}
+ handlers := Handlers{"a": func() Protocol { return testProtocol }}
+ net, peer, mess := setupMessenger(handlers)
+ bufr := bufio.NewReader(net)
+ defer peer.Stop()
+ if err := performTestHandshake(bufr, net); err != nil {
+ t.Fatalf("handshake failed: %v", err)
+ }
+
+ mess.setRemoteProtocols([]string{"a"})
+ writeMsg(net, NewMsg(17, uint32(1), "000"))
+ select {
+ case msg := <-testProtocol.recv:
+ if msg.code != 1 {
+ t.Errorf("incorrect msg code %d relayed to protocol", msg.code)
+ }
+ expdata := []interface{}{1, []byte{0x30, 0x30, 0x30}}
+ if !reflect.DeepEqual(msg.data.Slice(), expdata) {
+ t.Errorf("incorrect msg data %#v", msg.data.Slice())
}
+ case <-time.After(2 * time.Second):
+ t.Errorf("receive timeout")
}
}
-func TestWrite(t *testing.T) {
+func TestWriteProtoMsg(t *testing.T) {
handlers := make(Handlers)
- testProtocol := &TestProtocol{Msgs: []*Msg{}}
- handlers["a"] = func(p *Peer) Protocol { return testProtocol }
- net, _, mess := setupMessenger(handlers)
- mess.AddProtocols([]string{"a"})
- defer mess.Stop()
- wait := 1 * time.Millisecond
- msg, _ := NewMsg(3, uint32(1), "000")
- err := mess.Write("b", msg)
- if err == nil {
- t.Errorf("expect error for unknown protocol")
+ testProtocol := &testProto{recv: make(chan testMsg, 1)}
+ handlers["a"] = func() Protocol { return testProtocol }
+ net, peer, mess := setupMessenger(handlers)
+ defer peer.Stop()
+ bufr := bufio.NewReader(net)
+ if err := performTestHandshake(bufr, net); err != nil {
+ t.Fatalf("handshake failed: %v", err)
}
- err = mess.Write("a", msg)
- if err != nil {
- t.Errorf("expect no error for known protocol: %v", err)
- } else {
- time.Sleep(wait)
- if len(net.Out) != 1 {
- t.Errorf("msg not written")
+ mess.setRemoteProtocols([]string{"a"})
+
+ // test write errors
+ if err := mess.writeProtoMsg("b", NewMsg(3)); err == nil {
+ t.Errorf("expected error for unknown protocol, got nil")
+ }
+ if err := mess.writeProtoMsg("a", NewMsg(8)); err == nil {
+ t.Errorf("expected error for out-of-range msg code, got nil")
+ } else if perr, ok := err.(*PeerError); !ok || perr.Code != InvalidMsgCode {
+ t.Errorf("wrong error for out-of-range msg code, got %#v")
+ }
+
+ // test succcessful write
+ read, readerr := make(chan Msg), make(chan error)
+ go func() {
+ if msg, err := readMsg(bufr); err != nil {
+ readerr <- err
} else {
- out := net.Out[0]
- packet := Packet(16, 3, uint32(1), "000")
- if bytes.Compare(out, packet) != 0 {
- t.Errorf("incorrect packet %v", out)
- }
+ read <- msg
+ }
+ }()
+ if err := mess.writeProtoMsg("a", NewMsg(3)); err != nil {
+ t.Errorf("expect no error for known protocol: %v", err)
+ }
+ select {
+ case msg := <-read:
+ if msg.Code != 19 {
+ t.Errorf("wrong code, got %d, expected %d", msg.Code, 19)
}
+ msg.Discard()
+ case err := <-readerr:
+ t.Errorf("read error: %v", err)
}
}
func TestPulse(t *testing.T) {
- net, _, mess := setupMessenger(make(Handlers))
- defer mess.Stop()
- ping := false
- timeout := false
- pingTimeout := 10 * time.Millisecond
- gracePeriod := 200 * time.Millisecond
- go mess.PingPong(pingTimeout, gracePeriod, func() { ping = true }, func() { timeout = true })
- net.In(0, Packet(0, 1))
- if ping {
- t.Errorf("ping sent too early")
- }
- time.Sleep(pingTimeout + 100*time.Millisecond)
- if !ping {
- t.Errorf("no ping sent after timeout")
- }
- if timeout {
- t.Errorf("timeout too early")
+ net, peer, _ := setupMessenger(nil)
+ defer peer.Stop()
+ bufr := bufio.NewReader(net)
+ if err := performTestHandshake(bufr, net); err != nil {
+ t.Fatalf("handshake failed: %v", err)
}
- ping = false
- net.In(0, Packet(0, 1))
- time.Sleep(pingTimeout + 100*time.Millisecond)
- if !ping {
- t.Errorf("no ping sent after timeout")
- }
- if timeout {
- t.Errorf("timeout too early")
+
+ before := time.Now()
+ msg, err := readMsg(bufr)
+ if err != nil {
+ t.Fatalf("read error: %v", err)
}
- ping = false
- time.Sleep(gracePeriod)
- if ping {
- t.Errorf("ping called twice")
+ after := time.Now()
+ if msg.Code != pingMsg {
+ t.Errorf("expected ping message, got %x", msg.Code)
}
- if !timeout {
- t.Errorf("no timeout after grace period")
+ if d := after.Sub(before); d < pingTimeout {
+ t.Errorf("ping sent too early after %v, expected at least %v", d, pingTimeout)
}
}