From eb0e7b1b8120852a1d56aa0ebd3a98e652965635 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 6 Jan 2015 11:35:09 +0100 Subject: eth, p2p: remove EncodeMsg from p2p.MsgWriter ...and make it a top-level function instead. The original idea behind having EncodeMsg in the interface was that implementations might be able to encode RLP data to their underlying writer directly instead of buffering the encoded data. The encoder will buffer anyway, so that doesn't matter anymore. Given the recent problems with EncodeMsg (copy-pasted implementation bug) I'd rather implement once, correctly. --- p2p/message.go | 20 +++++++++----------- p2p/message_test.go | 6 +++--- p2p/peer_test.go | 4 ++-- p2p/protocol.go | 10 +++++----- p2p/protocol_test.go | 4 ++-- 5 files changed, 21 insertions(+), 23 deletions(-) (limited to 'p2p') diff --git a/p2p/message.go b/p2p/message.go index a6f62ec4c..daf2bf05c 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -71,14 +71,11 @@ type MsgReader interface { } type MsgWriter interface { - // WriteMsg sends an existing message. - // The Payload reader of the message is consumed. + // WriteMsg sends a message. It will block until the message's + // Payload has been consumed by the other end. + // // Note that messages can be sent only once. WriteMsg(Msg) error - - // EncodeMsg writes an RLP-encoded message with the given - // code and data elements. - EncodeMsg(code uint64, data ...interface{}) error } // MsgReadWriter provides reading and writing of encoded messages. @@ -87,6 +84,12 @@ type MsgReadWriter interface { MsgWriter } +// EncodeMsg writes an RLP-encoded message with the given code and +// data elements. +func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error { + return w.WriteMsg(NewMsg(code, data...)) +} + var magicToken = []byte{34, 64, 8, 145} func writeMsg(w io.Writer, msg Msg) error { @@ -209,11 +212,6 @@ func (p *MsgPipeRW) WriteMsg(msg Msg) error { return ErrPipeClosed } -// EncodeMsg is a convenient shorthand for sending an RLP-encoded message. -func (p *MsgPipeRW) EncodeMsg(code uint64, data ...interface{}) error { - return p.WriteMsg(NewMsg(code, data...)) -} - // ReadMsg returns a message sent on the other end of the pipe. func (p *MsgPipeRW) ReadMsg() (Msg, error) { if atomic.LoadInt32(p.closed) == 0 { diff --git a/p2p/message_test.go b/p2p/message_test.go index 066d2516d..5cde9abf5 100644 --- a/p2p/message_test.go +++ b/p2p/message_test.go @@ -75,8 +75,8 @@ func TestDecodeRealMsg(t *testing.T) { func ExampleMsgPipe() { rw1, rw2 := MsgPipe() go func() { - rw1.EncodeMsg(8, []byte{0, 0}) - rw1.EncodeMsg(5, []byte{1, 1}) + EncodeMsg(rw1, 8, []byte{0, 0}) + EncodeMsg(rw1, 5, []byte{1, 1}) rw1.Close() }() @@ -100,7 +100,7 @@ loop: rw1, rw2 := MsgPipe() done := make(chan struct{}) go func() { - if err := rw1.EncodeMsg(1); err == nil { + if err := EncodeMsg(rw1, 1); err == nil { t.Error("EncodeMsg returned nil error") } else if err != ErrPipeClosed { t.Error("EncodeMsg returned wrong error: got %v, want %v", err, ErrPipeClosed) diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 5b9e9e784..4ee88f112 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -126,10 +126,10 @@ func TestPeerProtoEncodeMsg(t *testing.T) { Name: "a", Length: 2, Run: func(peer *Peer, rw MsgReadWriter) error { - if err := rw.EncodeMsg(2); err == nil { + if err := EncodeMsg(rw, 2); err == nil { t.Error("expected error for out-of-range msg code, got nil") } - if err := rw.EncodeMsg(1, "foo", "bar"); err != nil { + if err := EncodeMsg(rw, 1, "foo", "bar"); err != nil { t.Errorf("write error: %v", err) } return nil diff --git a/p2p/protocol.go b/p2p/protocol.go index dd8cbc4ec..969937076 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -119,14 +119,14 @@ func (bp *baseProtocol) loop(quit <-chan error) error { getPeersTick := time.NewTicker(10 * time.Second) defer getPeersTick.Stop() - err := bp.rw.EncodeMsg(getPeersMsg) + err := EncodeMsg(bp.rw, getPeersMsg) for err == nil { select { case err = <-quit: return err case <-getPeersTick.C: - err = bp.rw.EncodeMsg(getPeersMsg) + err = EncodeMsg(bp.rw, getPeersMsg) case event := <-activity.Chan(): ping.Reset(pingTimeout) lastActive = event.(time.Time) @@ -134,7 +134,7 @@ func (bp *baseProtocol) loop(quit <-chan error) error { if lastActive.Add(pingTimeout * 2).Before(t) { err = newPeerError(errPingTimeout, "") } else if lastActive.Add(pingTimeout).Before(t) { - err = bp.rw.EncodeMsg(pingMsg) + err = EncodeMsg(bp.rw, pingMsg) } } } @@ -164,7 +164,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { return discRequestedError(reason[0]) case pingMsg: - return bp.rw.EncodeMsg(pongMsg) + return EncodeMsg(bp.rw, pongMsg) case pongMsg: @@ -177,7 +177,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { // // TODO: add event mechanism to notify baseProtocol for new peers if len(peers) > 0 { - return bp.rw.EncodeMsg(peersMsg, peers...) + return EncodeMsg(bp.rw, peersMsg, peers...) } case peersMsg: diff --git a/p2p/protocol_test.go b/p2p/protocol_test.go index ce25b3e1b..ba5e95c02 100644 --- a/p2p/protocol_test.go +++ b/p2p/protocol_test.go @@ -93,7 +93,7 @@ func TestBaseProtocolDisconnect(t *testing.T) { if err := expectMsg(rw2, handshakeMsg); err != nil { t.Error(err) } - err := rw2.EncodeMsg(handshakeMsg, + err := EncodeMsg(rw2, handshakeMsg, baseProtocolVersion, "", []interface{}{}, @@ -106,7 +106,7 @@ func TestBaseProtocolDisconnect(t *testing.T) { if err := expectMsg(rw2, getPeersMsg); err != nil { t.Error(err) } - if err := rw2.EncodeMsg(discMsg, DiscQuitting); err != nil { + if err := EncodeMsg(rw2, discMsg, DiscQuitting); err != nil { t.Error(err) } -- cgit v1.2.3 From b0ff946b55c23f0fffc50a700bcb255f95855afc Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 6 Jan 2015 12:14:29 +0100 Subject: p2p: move peerList back into baseProtocol It had been moved to Peer, probably for debugging. --- p2p/peer.go | 22 ---------------------- p2p/protocol.go | 24 +++++++++++++++++++++++- 2 files changed, 23 insertions(+), 23 deletions(-) (limited to 'p2p') diff --git a/p2p/peer.go b/p2p/peer.go index 0d7eec9f4..2380a3285 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -460,25 +460,3 @@ func (r *eofSignal) Read(buf []byte) (int, error) { } return n, err } - -func (peer *Peer) PeerList() []interface{} { - peers := peer.otherPeers() - ds := make([]interface{}, 0, len(peers)) - for _, p := range peers { - p.infolock.Lock() - addr := p.listenAddr - p.infolock.Unlock() - // filter out this peer and peers that are not listening or - // have not completed the handshake. - // TODO: track previously sent peers and exclude them as well. - if p == peer || addr == nil { - continue - } - ds = append(ds, addr) - } - ourAddr := peer.ourListenAddr - if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { - ds = append(ds, ourAddr) - } - return ds -} diff --git a/p2p/protocol.go b/p2p/protocol.go index 969937076..1d121a885 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -169,7 +169,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { case pongMsg: case getPeersMsg: - peers := bp.peer.PeerList() + peers := bp.peerList() // this is dangerous. the spec says that we should _delay_ // sending the response if no new information is available. // this means that would need to send a response later when @@ -264,3 +264,25 @@ func (bp *baseProtocol) handshakeMsg() Msg { bp.peer.ourID.Pubkey()[1:], ) } + +func (bp *baseProtocol) peerList() []interface{} { + peers := bp.peer.otherPeers() + ds := make([]interface{}, 0, len(peers)) + for _, p := range peers { + p.infolock.Lock() + addr := p.listenAddr + p.infolock.Unlock() + // filter out this peer and peers that are not listening or + // have not completed the handshake. + // TODO: track previously sent peers and exclude them as well. + if p == bp.peer || addr == nil { + continue + } + ds = append(ds, addr) + } + ourAddr := bp.peer.ourListenAddr + if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { + ds = append(ds, ourAddr) + } + return ds +} -- cgit v1.2.3 From 3caa4ad1baba3019c06733e1a80d78d9a57137bb Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 6 Jan 2015 12:15:51 +0100 Subject: p2p: improve test for peers message The test now checks that the number of of addresses is correct and terminates cleanly. --- p2p/protocol_test.go | 64 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 43 insertions(+), 21 deletions(-) (limited to 'p2p') diff --git a/p2p/protocol_test.go b/p2p/protocol_test.go index ba5e95c02..b1d10ac53 100644 --- a/p2p/protocol_test.go +++ b/p2p/protocol_test.go @@ -4,6 +4,7 @@ import ( "fmt" "net" "reflect" + "sync" "testing" "github.com/ethereum/go-ethereum/crypto" @@ -36,50 +37,71 @@ func newTestPeer() (peer *Peer) { } func TestBaseProtocolPeers(t *testing.T) { - cannedPeerList := []*peerAddr{ + peerList := []*peerAddr{ {IP: net.ParseIP("1.2.3.4"), Port: 2222, Pubkey: []byte{}}, {IP: net.ParseIP("5.6.7.8"), Port: 3333, Pubkey: []byte{}}, } - var ownAddr *peerAddr = &peerAddr{IP: net.ParseIP("1.3.5.7"), Port: 1111, Pubkey: []byte{}} + listenAddr := &peerAddr{IP: net.ParseIP("1.3.5.7"), Port: 1111, Pubkey: []byte{}} rw1, rw2 := MsgPipe() + defer rw1.Close() + wg := new(sync.WaitGroup) + // run matcher, close pipe when addresses have arrived - addrChan := make(chan *peerAddr, len(cannedPeerList)) + numPeers := len(peerList) + 1 + addrChan := make(chan *peerAddr) + wg.Add(1) go func() { - for _, want := range cannedPeerList { - got := <-addrChan - t.Logf("got peer: %+v", got) + i := 0 + for got := range addrChan { + var want *peerAddr + switch { + case i < len(peerList): + want = peerList[i] + case i == len(peerList): + want = listenAddr // listenAddr should be the last thing sent + } + t.Logf("got peer %d/%d: %v", i+1, numPeers, got) if !reflect.DeepEqual(want, got) { - t.Errorf("mismatch: got %#v, want %#v", got, want) + t.Errorf("mismatch: got %+v, want %+v", got, want) + } + i++ + if i == numPeers { + break } } - close(addrChan) - var own []*peerAddr - var got *peerAddr - for got = range addrChan { - own = append(own, got) - } - if len(own) != 1 || !reflect.DeepEqual(ownAddr, own[0]) { - t.Errorf("mismatch: peers own address is incorrectly or not given, got %v, want %#v", ownAddr) + if i != numPeers { + t.Errorf("wrong number of peers received: got %d, want %d", i, numPeers) } - rw2.Close() + rw1.Close() + wg.Done() }() - // run first peer + + // run first peer (in background) peer1 := newTestPeer() - peer1.ourListenAddr = ownAddr + peer1.ourListenAddr = listenAddr peer1.otherPeers = func() []*Peer { - pl := make([]*Peer, len(cannedPeerList)) - for i, addr := range cannedPeerList { + pl := make([]*Peer, len(peerList)) + for i, addr := range peerList { pl[i] = &Peer{listenAddr: addr} } return pl } - go runBaseProtocol(peer1, rw1) + wg.Add(1) + go func() { + runBaseProtocol(peer1, rw1) + wg.Done() + }() + // run second peer peer2 := newTestPeer() peer2.newPeerAddr = addrChan // feed peer suggestions into matcher if err := runBaseProtocol(peer2, rw2); err != ErrPipeClosed { t.Errorf("peer2 terminated with unexpected error: %v", err) } + + // terminate matcher + close(addrChan) + wg.Wait() } func TestBaseProtocolDisconnect(t *testing.T) { -- cgit v1.2.3 From 8d1637f5677304b3d540648cf9b4ff9f92ad7f1f Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 19 Jan 2015 11:21:46 +0100 Subject: Moved connection errors to DebugDetail level --- p2p/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'p2p') diff --git a/p2p/server.go b/p2p/server.go index cfff442f7..4fd1f7d03 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -333,7 +333,7 @@ func (srv *Server) dialLoop() { case desc := <-suggest: // candidate peer found, will dial out asyncronously // if connection fails slot will be released - srvlog.Infof("dial %v (%v)", desc, *slot) + srvlog.DebugDetailf("dial %v (%v)", desc, *slot) go srv.dialPeer(desc, *slot) // we can watch if more peers needed in the next loop slots = srv.peerSlots @@ -355,7 +355,7 @@ func (srv *Server) dialPeer(desc *peerAddr, slot int) { srvlog.Debugf("Dialing %v (slot %d)\n", desc, slot) conn, err := srv.Dialer.Dial(desc.Network(), desc.String()) if err != nil { - srvlog.Errorf("Dial error: %v", err) + srvlog.DebugDetailf("dial error: %v", err) srv.peerSlots <- slot return } -- cgit v1.2.3