aboutsummaryrefslogtreecommitdiffstats
path: root/p2p
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2015-01-06 00:10:42 +0800
committerobscuren <geffobscura@gmail.com>2015-01-06 00:10:42 +0800
commit6abf8ef78f1474fdeb7a6a6ce084bf994cc055f2 (patch)
treeb898711590694cfaa6f10dbc2a4c8591232954ef /p2p
parentb0854fbff5c3d588134f577918a39d08002235dc (diff)
downloaddexon-6abf8ef78f1474fdeb7a6a6ce084bf994cc055f2.tar
dexon-6abf8ef78f1474fdeb7a6a6ce084bf994cc055f2.tar.gz
dexon-6abf8ef78f1474fdeb7a6a6ce084bf994cc055f2.tar.bz2
dexon-6abf8ef78f1474fdeb7a6a6ce084bf994cc055f2.tar.lz
dexon-6abf8ef78f1474fdeb7a6a6ce084bf994cc055f2.tar.xz
dexon-6abf8ef78f1474fdeb7a6a6ce084bf994cc055f2.tar.zst
dexon-6abf8ef78f1474fdeb7a6a6ce084bf994cc055f2.zip
Merge
Diffstat (limited to 'p2p')
-rw-r--r--p2p/client_identity_test.go2
-rw-r--r--p2p/message.go10
-rw-r--r--p2p/peer.go28
-rw-r--r--p2p/peer_test.go14
-rw-r--r--p2p/protocol.go54
-rw-r--r--p2p/protocol_test.go82
-rw-r--r--p2p/server.go6
-rw-r--r--p2p/server_test.go2
8 files changed, 144 insertions, 54 deletions
diff --git a/p2p/client_identity_test.go b/p2p/client_identity_test.go
index 40b0e6f5e..7248a7b1a 100644
--- a/p2p/client_identity_test.go
+++ b/p2p/client_identity_test.go
@@ -7,7 +7,7 @@ import (
)
func TestClientIdentity(t *testing.T) {
- clientIdentity := NewSimpleClientIdentity("Ethereum(G)", "0.5.16", "test", "pubkey")
+ clientIdentity := NewSimpleClientIdentity("Ethereum(G)", "0.5.16", "test", []byte("pubkey"))
clientString := clientIdentity.String()
expected := fmt.Sprintf("Ethereum(G)/v0.5.16/test/%s/%s", runtime.GOOS, runtime.Version())
if clientString != expected {
diff --git a/p2p/message.go b/p2p/message.go
index f5418ff47..a6f62ec4c 100644
--- a/p2p/message.go
+++ b/p2p/message.go
@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"errors"
+ "fmt"
"io"
"io/ioutil"
"math/big"
@@ -49,7 +50,14 @@ func encodePayload(params ...interface{}) []byte {
// For the decoding rules, please see package rlp.
func (msg Msg) Decode(val interface{}) error {
s := rlp.NewListStream(msg.Payload, uint64(msg.Size))
- return s.Decode(val)
+ if err := s.Decode(val); err != nil {
+ return newPeerError(errInvalidMsg, "(code %#x) (size %d) %v", msg.Code, msg.Size, err)
+ }
+ return nil
+}
+
+func (msg Msg) String() string {
+ return fmt.Sprintf("msg #%v (%v bytes)", msg.Code, msg.Size)
}
// Discard reads any remaining payload data into a black hole.
diff --git a/p2p/peer.go b/p2p/peer.go
index 86c4d7ab5..0d7eec9f4 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -45,8 +45,8 @@ func (d peerAddr) String() string {
return fmt.Sprintf("%v:%d", d.IP, d.Port)
}
-func (d peerAddr) RlpData() interface{} {
- return []interface{}{d.IP, d.Port, d.Pubkey}
+func (d *peerAddr) RlpData() interface{} {
+ return []interface{}{string(d.IP), d.Port, d.Pubkey}
}
// Peer represents a remote peer.
@@ -426,7 +426,7 @@ func (rw *proto) WriteMsg(msg Msg) error {
}
func (rw *proto) EncodeMsg(code uint64, data ...interface{}) error {
- return rw.WriteMsg(NewMsg(code, data))
+ return rw.WriteMsg(NewMsg(code, data...))
}
func (rw *proto) ReadMsg() (Msg, error) {
@@ -460,3 +460,25 @@ func (r *eofSignal) Read(buf []byte) (int, error) {
}
return n, err
}
+
+func (peer *Peer) PeerList() []interface{} {
+ peers := peer.otherPeers()
+ ds := make([]interface{}, 0, len(peers))
+ for _, p := range peers {
+ p.infolock.Lock()
+ addr := p.listenAddr
+ p.infolock.Unlock()
+ // filter out this peer and peers that are not listening or
+ // have not completed the handshake.
+ // TODO: track previously sent peers and exclude them as well.
+ if p == peer || addr == nil {
+ continue
+ }
+ ds = append(ds, addr)
+ }
+ ourAddr := peer.ourListenAddr
+ if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() {
+ ds = append(ds, ourAddr)
+ }
+ return ds
+}
diff --git a/p2p/peer_test.go b/p2p/peer_test.go
index f7759786e..5b9e9e784 100644
--- a/p2p/peer_test.go
+++ b/p2p/peer_test.go
@@ -30,9 +30,8 @@ var discard = Protocol{
func testPeer(protos []Protocol) (net.Conn, *Peer, <-chan error) {
conn1, conn2 := net.Pipe()
- id := NewSimpleClientIdentity("test", "0", "0", "public key")
peer := newPeer(conn1, protos, nil)
- peer.ourID = id
+ peer.ourID = &peerId{}
peer.pubkeyHook = func(*peerAddr) error { return nil }
errc := make(chan error, 1)
go func() {
@@ -130,7 +129,7 @@ func TestPeerProtoEncodeMsg(t *testing.T) {
if err := rw.EncodeMsg(2); err == nil {
t.Error("expected error for out-of-range msg code, got nil")
}
- if err := rw.EncodeMsg(1); err != nil {
+ if err := rw.EncodeMsg(1, "foo", "bar"); err != nil {
t.Errorf("write error: %v", err)
}
return nil
@@ -148,6 +147,13 @@ func TestPeerProtoEncodeMsg(t *testing.T) {
if msg.Code != 17 {
t.Errorf("incorrect message code: got %d, expected %d", msg.Code, 17)
}
+ var data []string
+ if err := msg.Decode(&data); err != nil {
+ t.Errorf("payload decode error: %v", err)
+ }
+ if !reflect.DeepEqual(data, []string{"foo", "bar"}) {
+ t.Errorf("payload RLP mismatch, got %#v, want %#v", data, []string{"foo", "bar"})
+ }
}
func TestPeerWrite(t *testing.T) {
@@ -226,8 +232,8 @@ func TestPeerActivity(t *testing.T) {
}
func TestNewPeer(t *testing.T) {
- id := NewSimpleClientIdentity("clientid", "version", "customid", "pubkey")
caps := []Cap{{"foo", 2}, {"bar", 3}}
+ id := &peerId{}
p := NewPeer(id, caps)
if !reflect.DeepEqual(p.Caps(), caps) {
t.Errorf("Caps mismatch: got %v, expected %v", p.Caps(), caps)
diff --git a/p2p/protocol.go b/p2p/protocol.go
index 3f52205f5..dd8cbc4ec 100644
--- a/p2p/protocol.go
+++ b/p2p/protocol.go
@@ -3,8 +3,6 @@ package p2p
import (
"bytes"
"time"
-
- "github.com/ethereum/go-ethereum/ethutil"
)
// Protocol represents a P2P subprotocol implementation.
@@ -89,20 +87,25 @@ type baseProtocol struct {
func runBaseProtocol(peer *Peer, rw MsgReadWriter) error {
bp := &baseProtocol{rw, peer}
- if err := bp.doHandshake(rw); err != nil {
+ errc := make(chan error, 1)
+ go func() { errc <- rw.WriteMsg(bp.handshakeMsg()) }()
+ if err := bp.readHandshake(); err != nil {
+ return err
+ }
+ // handle write error
+ if err := <-errc; err != nil {
return err
}
// run main loop
- quit := make(chan error, 1)
go func() {
for {
if err := bp.handle(rw); err != nil {
- quit <- err
+ errc <- err
break
}
}
}()
- return bp.loop(quit)
+ return bp.loop(errc)
}
var pingTimeout = 2 * time.Second
@@ -166,7 +169,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error {
case pongMsg:
case getPeersMsg:
- peers := bp.peerList()
+ peers := bp.peer.PeerList()
// this is dangerous. the spec says that we should _delay_
// sending the response if no new information is available.
// this means that would need to send a response later when
@@ -174,7 +177,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error {
//
// TODO: add event mechanism to notify baseProtocol for new peers
if len(peers) > 0 {
- return bp.rw.EncodeMsg(peersMsg, peers)
+ return bp.rw.EncodeMsg(peersMsg, peers...)
}
case peersMsg:
@@ -193,14 +196,9 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error {
return nil
}
-func (bp *baseProtocol) doHandshake(rw MsgReadWriter) error {
- // send our handshake
- if err := rw.WriteMsg(bp.handshakeMsg()); err != nil {
- return err
- }
-
+func (bp *baseProtocol) readHandshake() error {
// read and handle remote handshake
- msg, err := rw.ReadMsg()
+ msg, err := bp.rw.ReadMsg()
if err != nil {
return err
}
@@ -210,12 +208,10 @@ func (bp *baseProtocol) doHandshake(rw MsgReadWriter) error {
if msg.Size > baseProtocolMaxMsgSize {
return newPeerError(errMisc, "message too big")
}
-
var hs handshake
if err := msg.Decode(&hs); err != nil {
return err
}
-
// validate handshake info
if hs.Version != baseProtocolVersion {
return newPeerError(errP2PVersionMismatch, "Require protocol %d, received %d\n",
@@ -238,9 +234,7 @@ func (bp *baseProtocol) doHandshake(rw MsgReadWriter) error {
if err := bp.peer.pubkeyHook(pa); err != nil {
return newPeerError(errPubkeyForbidden, "%v", err)
}
-
// TODO: remove Caps with empty name
-
var addr *peerAddr
if hs.ListenPort != 0 {
addr = newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID)
@@ -270,25 +264,3 @@ func (bp *baseProtocol) handshakeMsg() Msg {
bp.peer.ourID.Pubkey()[1:],
)
}
-
-func (bp *baseProtocol) peerList() []ethutil.RlpEncodable {
- peers := bp.peer.otherPeers()
- ds := make([]ethutil.RlpEncodable, 0, len(peers))
- for _, p := range peers {
- p.infolock.Lock()
- addr := p.listenAddr
- p.infolock.Unlock()
- // filter out this peer and peers that are not listening or
- // have not completed the handshake.
- // TODO: track previously sent peers and exclude them as well.
- if p == bp.peer || addr == nil {
- continue
- }
- ds = append(ds, addr)
- }
- ourAddr := bp.peer.ourListenAddr
- if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() {
- ds = append(ds, ourAddr)
- }
- return ds
-}
diff --git a/p2p/protocol_test.go b/p2p/protocol_test.go
index 65f26fb12..ce25b3e1b 100644
--- a/p2p/protocol_test.go
+++ b/p2p/protocol_test.go
@@ -2,12 +2,89 @@ package p2p
import (
"fmt"
+ "net"
+ "reflect"
"testing"
+
+ "github.com/ethereum/go-ethereum/crypto"
)
+type peerId struct {
+ pubkey []byte
+}
+
+func (self *peerId) String() string {
+ return fmt.Sprintf("test peer %x", self.Pubkey()[:4])
+}
+
+func (self *peerId) Pubkey() (pubkey []byte) {
+ pubkey = self.pubkey
+ if len(pubkey) == 0 {
+ pubkey = crypto.GenerateNewKeyPair().PublicKey
+ self.pubkey = pubkey
+ }
+ return
+}
+
+func newTestPeer() (peer *Peer) {
+ peer = NewPeer(&peerId{}, []Cap{})
+ peer.pubkeyHook = func(*peerAddr) error { return nil }
+ peer.ourID = &peerId{}
+ peer.listenAddr = &peerAddr{}
+ peer.otherPeers = func() []*Peer { return nil }
+ return
+}
+
+func TestBaseProtocolPeers(t *testing.T) {
+ cannedPeerList := []*peerAddr{
+ {IP: net.ParseIP("1.2.3.4"), Port: 2222, Pubkey: []byte{}},
+ {IP: net.ParseIP("5.6.7.8"), Port: 3333, Pubkey: []byte{}},
+ }
+ var ownAddr *peerAddr = &peerAddr{IP: net.ParseIP("1.3.5.7"), Port: 1111, Pubkey: []byte{}}
+ rw1, rw2 := MsgPipe()
+ // run matcher, close pipe when addresses have arrived
+ addrChan := make(chan *peerAddr, len(cannedPeerList))
+ go func() {
+ for _, want := range cannedPeerList {
+ got := <-addrChan
+ t.Logf("got peer: %+v", got)
+ if !reflect.DeepEqual(want, got) {
+ t.Errorf("mismatch: got %#v, want %#v", got, want)
+ }
+ }
+ close(addrChan)
+ var own []*peerAddr
+ var got *peerAddr
+ for got = range addrChan {
+ own = append(own, got)
+ }
+ if len(own) != 1 || !reflect.DeepEqual(ownAddr, own[0]) {
+ t.Errorf("mismatch: peers own address is incorrectly or not given, got %v, want %#v", ownAddr)
+ }
+ rw2.Close()
+ }()
+ // run first peer
+ peer1 := newTestPeer()
+ peer1.ourListenAddr = ownAddr
+ peer1.otherPeers = func() []*Peer {
+ pl := make([]*Peer, len(cannedPeerList))
+ for i, addr := range cannedPeerList {
+ pl[i] = &Peer{listenAddr: addr}
+ }
+ return pl
+ }
+ go runBaseProtocol(peer1, rw1)
+ // run second peer
+ peer2 := newTestPeer()
+ peer2.newPeerAddr = addrChan // feed peer suggestions into matcher
+ if err := runBaseProtocol(peer2, rw2); err != ErrPipeClosed {
+ t.Errorf("peer2 terminated with unexpected error: %v", err)
+ }
+}
+
func TestBaseProtocolDisconnect(t *testing.T) {
- peer := NewPeer(NewSimpleClientIdentity("p1", "", "", "foo"), nil)
- peer.ourID = NewSimpleClientIdentity("p2", "", "", "bar")
+ peer := NewPeer(&peerId{}, nil)
+ peer.ourID = &peerId{}
peer.pubkeyHook = func(*peerAddr) error { return nil }
rw1, rw2 := MsgPipe()
@@ -32,6 +109,7 @@ func TestBaseProtocolDisconnect(t *testing.T) {
if err := rw2.EncodeMsg(discMsg, DiscQuitting); err != nil {
t.Error(err)
}
+
close(done)
}()
diff --git a/p2p/server.go b/p2p/server.go
index 326781234..cfff442f7 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -113,9 +113,11 @@ func (srv *Server) PeerCount() int {
// SuggestPeer injects an address into the outbound address pool.
func (srv *Server) SuggestPeer(ip net.IP, port int, nodeID []byte) {
+ addr := &peerAddr{ip, uint64(port), nodeID}
select {
- case srv.peerConnect <- &peerAddr{ip, uint64(port), nodeID}:
+ case srv.peerConnect <- addr:
default: // don't block
+ srvlog.Warnf("peer suggestion %v ignored", addr)
}
}
@@ -258,6 +260,7 @@ func (srv *Server) listenLoop() {
for {
select {
case slot := <-srv.peerSlots:
+ srvlog.Debugf("grabbed slot %v for listening", slot)
conn, err := srv.listener.Accept()
if err != nil {
srv.peerSlots <- slot
@@ -330,6 +333,7 @@ func (srv *Server) dialLoop() {
case desc := <-suggest:
// candidate peer found, will dial out asyncronously
// if connection fails slot will be released
+ srvlog.Infof("dial %v (%v)", desc, *slot)
go srv.dialPeer(desc, *slot)
// we can watch if more peers needed in the next loop
slots = srv.peerSlots
diff --git a/p2p/server_test.go b/p2p/server_test.go
index 5c0d08d39..ceb89e3f7 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -11,7 +11,7 @@ import (
func startTestServer(t *testing.T, pf peerFunc) *Server {
server := &Server{
- Identity: NewSimpleClientIdentity("clientIdentifier", "version", "customIdentifier", "pubkey"),
+ Identity: &peerId{},
MaxPeers: 10,
ListenAddr: "127.0.0.1:0",
newPeerFunc: pf,