From 6abf8ef78f1474fdeb7a6a6ce084bf994cc055f2 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 5 Jan 2015 17:10:42 +0100 Subject: Merge --- p2p/client_identity_test.go | 2 +- p2p/message.go | 10 +++++- p2p/peer.go | 28 ++++++++++++++-- p2p/peer_test.go | 14 +++++--- p2p/protocol.go | 54 +++++++---------------------- p2p/protocol_test.go | 82 +++++++++++++++++++++++++++++++++++++++++++-- p2p/server.go | 6 +++- p2p/server_test.go | 2 +- 8 files changed, 144 insertions(+), 54 deletions(-) (limited to 'p2p') diff --git a/p2p/client_identity_test.go b/p2p/client_identity_test.go index 40b0e6f5e..7248a7b1a 100644 --- a/p2p/client_identity_test.go +++ b/p2p/client_identity_test.go @@ -7,7 +7,7 @@ import ( ) func TestClientIdentity(t *testing.T) { - clientIdentity := NewSimpleClientIdentity("Ethereum(G)", "0.5.16", "test", "pubkey") + clientIdentity := NewSimpleClientIdentity("Ethereum(G)", "0.5.16", "test", []byte("pubkey")) clientString := clientIdentity.String() expected := fmt.Sprintf("Ethereum(G)/v0.5.16/test/%s/%s", runtime.GOOS, runtime.Version()) if clientString != expected { diff --git a/p2p/message.go b/p2p/message.go index f5418ff47..a6f62ec4c 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" "io" "io/ioutil" "math/big" @@ -49,7 +50,14 @@ func encodePayload(params ...interface{}) []byte { // For the decoding rules, please see package rlp. func (msg Msg) Decode(val interface{}) error { s := rlp.NewListStream(msg.Payload, uint64(msg.Size)) - return s.Decode(val) + if err := s.Decode(val); err != nil { + return newPeerError(errInvalidMsg, "(code %#x) (size %d) %v", msg.Code, msg.Size, err) + } + return nil +} + +func (msg Msg) String() string { + return fmt.Sprintf("msg #%v (%v bytes)", msg.Code, msg.Size) } // Discard reads any remaining payload data into a black hole. diff --git a/p2p/peer.go b/p2p/peer.go index 86c4d7ab5..0d7eec9f4 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -45,8 +45,8 @@ func (d peerAddr) String() string { return fmt.Sprintf("%v:%d", d.IP, d.Port) } -func (d peerAddr) RlpData() interface{} { - return []interface{}{d.IP, d.Port, d.Pubkey} +func (d *peerAddr) RlpData() interface{} { + return []interface{}{string(d.IP), d.Port, d.Pubkey} } // Peer represents a remote peer. @@ -426,7 +426,7 @@ func (rw *proto) WriteMsg(msg Msg) error { } func (rw *proto) EncodeMsg(code uint64, data ...interface{}) error { - return rw.WriteMsg(NewMsg(code, data)) + return rw.WriteMsg(NewMsg(code, data...)) } func (rw *proto) ReadMsg() (Msg, error) { @@ -460,3 +460,25 @@ func (r *eofSignal) Read(buf []byte) (int, error) { } return n, err } + +func (peer *Peer) PeerList() []interface{} { + peers := peer.otherPeers() + ds := make([]interface{}, 0, len(peers)) + for _, p := range peers { + p.infolock.Lock() + addr := p.listenAddr + p.infolock.Unlock() + // filter out this peer and peers that are not listening or + // have not completed the handshake. + // TODO: track previously sent peers and exclude them as well. + if p == peer || addr == nil { + continue + } + ds = append(ds, addr) + } + ourAddr := peer.ourListenAddr + if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { + ds = append(ds, ourAddr) + } + return ds +} diff --git a/p2p/peer_test.go b/p2p/peer_test.go index f7759786e..5b9e9e784 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -30,9 +30,8 @@ var discard = Protocol{ func testPeer(protos []Protocol) (net.Conn, *Peer, <-chan error) { conn1, conn2 := net.Pipe() - id := NewSimpleClientIdentity("test", "0", "0", "public key") peer := newPeer(conn1, protos, nil) - peer.ourID = id + peer.ourID = &peerId{} peer.pubkeyHook = func(*peerAddr) error { return nil } errc := make(chan error, 1) go func() { @@ -130,7 +129,7 @@ func TestPeerProtoEncodeMsg(t *testing.T) { if err := rw.EncodeMsg(2); err == nil { t.Error("expected error for out-of-range msg code, got nil") } - if err := rw.EncodeMsg(1); err != nil { + if err := rw.EncodeMsg(1, "foo", "bar"); err != nil { t.Errorf("write error: %v", err) } return nil @@ -148,6 +147,13 @@ func TestPeerProtoEncodeMsg(t *testing.T) { if msg.Code != 17 { t.Errorf("incorrect message code: got %d, expected %d", msg.Code, 17) } + var data []string + if err := msg.Decode(&data); err != nil { + t.Errorf("payload decode error: %v", err) + } + if !reflect.DeepEqual(data, []string{"foo", "bar"}) { + t.Errorf("payload RLP mismatch, got %#v, want %#v", data, []string{"foo", "bar"}) + } } func TestPeerWrite(t *testing.T) { @@ -226,8 +232,8 @@ func TestPeerActivity(t *testing.T) { } func TestNewPeer(t *testing.T) { - id := NewSimpleClientIdentity("clientid", "version", "customid", "pubkey") caps := []Cap{{"foo", 2}, {"bar", 3}} + id := &peerId{} p := NewPeer(id, caps) if !reflect.DeepEqual(p.Caps(), caps) { t.Errorf("Caps mismatch: got %v, expected %v", p.Caps(), caps) diff --git a/p2p/protocol.go b/p2p/protocol.go index 3f52205f5..dd8cbc4ec 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -3,8 +3,6 @@ package p2p import ( "bytes" "time" - - "github.com/ethereum/go-ethereum/ethutil" ) // Protocol represents a P2P subprotocol implementation. @@ -89,20 +87,25 @@ type baseProtocol struct { func runBaseProtocol(peer *Peer, rw MsgReadWriter) error { bp := &baseProtocol{rw, peer} - if err := bp.doHandshake(rw); err != nil { + errc := make(chan error, 1) + go func() { errc <- rw.WriteMsg(bp.handshakeMsg()) }() + if err := bp.readHandshake(); err != nil { + return err + } + // handle write error + if err := <-errc; err != nil { return err } // run main loop - quit := make(chan error, 1) go func() { for { if err := bp.handle(rw); err != nil { - quit <- err + errc <- err break } } }() - return bp.loop(quit) + return bp.loop(errc) } var pingTimeout = 2 * time.Second @@ -166,7 +169,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { case pongMsg: case getPeersMsg: - peers := bp.peerList() + peers := bp.peer.PeerList() // this is dangerous. the spec says that we should _delay_ // sending the response if no new information is available. // this means that would need to send a response later when @@ -174,7 +177,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { // // TODO: add event mechanism to notify baseProtocol for new peers if len(peers) > 0 { - return bp.rw.EncodeMsg(peersMsg, peers) + return bp.rw.EncodeMsg(peersMsg, peers...) } case peersMsg: @@ -193,14 +196,9 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { return nil } -func (bp *baseProtocol) doHandshake(rw MsgReadWriter) error { - // send our handshake - if err := rw.WriteMsg(bp.handshakeMsg()); err != nil { - return err - } - +func (bp *baseProtocol) readHandshake() error { // read and handle remote handshake - msg, err := rw.ReadMsg() + msg, err := bp.rw.ReadMsg() if err != nil { return err } @@ -210,12 +208,10 @@ func (bp *baseProtocol) doHandshake(rw MsgReadWriter) error { if msg.Size > baseProtocolMaxMsgSize { return newPeerError(errMisc, "message too big") } - var hs handshake if err := msg.Decode(&hs); err != nil { return err } - // validate handshake info if hs.Version != baseProtocolVersion { return newPeerError(errP2PVersionMismatch, "Require protocol %d, received %d\n", @@ -238,9 +234,7 @@ func (bp *baseProtocol) doHandshake(rw MsgReadWriter) error { if err := bp.peer.pubkeyHook(pa); err != nil { return newPeerError(errPubkeyForbidden, "%v", err) } - // TODO: remove Caps with empty name - var addr *peerAddr if hs.ListenPort != 0 { addr = newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID) @@ -270,25 +264,3 @@ func (bp *baseProtocol) handshakeMsg() Msg { bp.peer.ourID.Pubkey()[1:], ) } - -func (bp *baseProtocol) peerList() []ethutil.RlpEncodable { - peers := bp.peer.otherPeers() - ds := make([]ethutil.RlpEncodable, 0, len(peers)) - for _, p := range peers { - p.infolock.Lock() - addr := p.listenAddr - p.infolock.Unlock() - // filter out this peer and peers that are not listening or - // have not completed the handshake. - // TODO: track previously sent peers and exclude them as well. - if p == bp.peer || addr == nil { - continue - } - ds = append(ds, addr) - } - ourAddr := bp.peer.ourListenAddr - if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { - ds = append(ds, ourAddr) - } - return ds -} diff --git a/p2p/protocol_test.go b/p2p/protocol_test.go index 65f26fb12..ce25b3e1b 100644 --- a/p2p/protocol_test.go +++ b/p2p/protocol_test.go @@ -2,12 +2,89 @@ package p2p import ( "fmt" + "net" + "reflect" "testing" + + "github.com/ethereum/go-ethereum/crypto" ) +type peerId struct { + pubkey []byte +} + +func (self *peerId) String() string { + return fmt.Sprintf("test peer %x", self.Pubkey()[:4]) +} + +func (self *peerId) Pubkey() (pubkey []byte) { + pubkey = self.pubkey + if len(pubkey) == 0 { + pubkey = crypto.GenerateNewKeyPair().PublicKey + self.pubkey = pubkey + } + return +} + +func newTestPeer() (peer *Peer) { + peer = NewPeer(&peerId{}, []Cap{}) + peer.pubkeyHook = func(*peerAddr) error { return nil } + peer.ourID = &peerId{} + peer.listenAddr = &peerAddr{} + peer.otherPeers = func() []*Peer { return nil } + return +} + +func TestBaseProtocolPeers(t *testing.T) { + cannedPeerList := []*peerAddr{ + {IP: net.ParseIP("1.2.3.4"), Port: 2222, Pubkey: []byte{}}, + {IP: net.ParseIP("5.6.7.8"), Port: 3333, Pubkey: []byte{}}, + } + var ownAddr *peerAddr = &peerAddr{IP: net.ParseIP("1.3.5.7"), Port: 1111, Pubkey: []byte{}} + rw1, rw2 := MsgPipe() + // run matcher, close pipe when addresses have arrived + addrChan := make(chan *peerAddr, len(cannedPeerList)) + go func() { + for _, want := range cannedPeerList { + got := <-addrChan + t.Logf("got peer: %+v", got) + if !reflect.DeepEqual(want, got) { + t.Errorf("mismatch: got %#v, want %#v", got, want) + } + } + close(addrChan) + var own []*peerAddr + var got *peerAddr + for got = range addrChan { + own = append(own, got) + } + if len(own) != 1 || !reflect.DeepEqual(ownAddr, own[0]) { + t.Errorf("mismatch: peers own address is incorrectly or not given, got %v, want %#v", ownAddr) + } + rw2.Close() + }() + // run first peer + peer1 := newTestPeer() + peer1.ourListenAddr = ownAddr + peer1.otherPeers = func() []*Peer { + pl := make([]*Peer, len(cannedPeerList)) + for i, addr := range cannedPeerList { + pl[i] = &Peer{listenAddr: addr} + } + return pl + } + go runBaseProtocol(peer1, rw1) + // run second peer + peer2 := newTestPeer() + peer2.newPeerAddr = addrChan // feed peer suggestions into matcher + if err := runBaseProtocol(peer2, rw2); err != ErrPipeClosed { + t.Errorf("peer2 terminated with unexpected error: %v", err) + } +} + func TestBaseProtocolDisconnect(t *testing.T) { - peer := NewPeer(NewSimpleClientIdentity("p1", "", "", "foo"), nil) - peer.ourID = NewSimpleClientIdentity("p2", "", "", "bar") + peer := NewPeer(&peerId{}, nil) + peer.ourID = &peerId{} peer.pubkeyHook = func(*peerAddr) error { return nil } rw1, rw2 := MsgPipe() @@ -32,6 +109,7 @@ func TestBaseProtocolDisconnect(t *testing.T) { if err := rw2.EncodeMsg(discMsg, DiscQuitting); err != nil { t.Error(err) } + close(done) }() diff --git a/p2p/server.go b/p2p/server.go index 326781234..cfff442f7 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -113,9 +113,11 @@ func (srv *Server) PeerCount() int { // SuggestPeer injects an address into the outbound address pool. func (srv *Server) SuggestPeer(ip net.IP, port int, nodeID []byte) { + addr := &peerAddr{ip, uint64(port), nodeID} select { - case srv.peerConnect <- &peerAddr{ip, uint64(port), nodeID}: + case srv.peerConnect <- addr: default: // don't block + srvlog.Warnf("peer suggestion %v ignored", addr) } } @@ -258,6 +260,7 @@ func (srv *Server) listenLoop() { for { select { case slot := <-srv.peerSlots: + srvlog.Debugf("grabbed slot %v for listening", slot) conn, err := srv.listener.Accept() if err != nil { srv.peerSlots <- slot @@ -330,6 +333,7 @@ func (srv *Server) dialLoop() { case desc := <-suggest: // candidate peer found, will dial out asyncronously // if connection fails slot will be released + srvlog.Infof("dial %v (%v)", desc, *slot) go srv.dialPeer(desc, *slot) // we can watch if more peers needed in the next loop slots = srv.peerSlots diff --git a/p2p/server_test.go b/p2p/server_test.go index 5c0d08d39..ceb89e3f7 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -11,7 +11,7 @@ import ( func startTestServer(t *testing.T, pf peerFunc) *Server { server := &Server{ - Identity: NewSimpleClientIdentity("clientIdentifier", "version", "customIdentifier", "pubkey"), + Identity: &peerId{}, MaxPeers: 10, ListenAddr: "127.0.0.1:0", newPeerFunc: pf, -- cgit v1.2.3