diff options
Diffstat (limited to 'p2p')
-rw-r--r-- | p2p/message.go | 20 | ||||
-rw-r--r-- | p2p/message_test.go | 6 | ||||
-rw-r--r-- | p2p/peer.go | 22 | ||||
-rw-r--r-- | p2p/peer_test.go | 4 | ||||
-rw-r--r-- | p2p/protocol.go | 34 | ||||
-rw-r--r-- | p2p/protocol_test.go | 68 |
6 files changed, 87 insertions, 67 deletions
diff --git a/p2p/message.go b/p2p/message.go index a6f62ec4c..daf2bf05c 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -71,14 +71,11 @@ type MsgReader interface { } type MsgWriter interface { - // WriteMsg sends an existing message. - // The Payload reader of the message is consumed. + // WriteMsg sends a message. It will block until the message's + // Payload has been consumed by the other end. + // // Note that messages can be sent only once. WriteMsg(Msg) error - - // EncodeMsg writes an RLP-encoded message with the given - // code and data elements. - EncodeMsg(code uint64, data ...interface{}) error } // MsgReadWriter provides reading and writing of encoded messages. @@ -87,6 +84,12 @@ type MsgReadWriter interface { MsgWriter } +// EncodeMsg writes an RLP-encoded message with the given code and +// data elements. +func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error { + return w.WriteMsg(NewMsg(code, data...)) +} + var magicToken = []byte{34, 64, 8, 145} func writeMsg(w io.Writer, msg Msg) error { @@ -209,11 +212,6 @@ func (p *MsgPipeRW) WriteMsg(msg Msg) error { return ErrPipeClosed } -// EncodeMsg is a convenient shorthand for sending an RLP-encoded message. -func (p *MsgPipeRW) EncodeMsg(code uint64, data ...interface{}) error { - return p.WriteMsg(NewMsg(code, data...)) -} - // ReadMsg returns a message sent on the other end of the pipe. func (p *MsgPipeRW) ReadMsg() (Msg, error) { if atomic.LoadInt32(p.closed) == 0 { diff --git a/p2p/message_test.go b/p2p/message_test.go index 066d2516d..5cde9abf5 100644 --- a/p2p/message_test.go +++ b/p2p/message_test.go @@ -75,8 +75,8 @@ func TestDecodeRealMsg(t *testing.T) { func ExampleMsgPipe() { rw1, rw2 := MsgPipe() go func() { - rw1.EncodeMsg(8, []byte{0, 0}) - rw1.EncodeMsg(5, []byte{1, 1}) + EncodeMsg(rw1, 8, []byte{0, 0}) + EncodeMsg(rw1, 5, []byte{1, 1}) rw1.Close() }() @@ -100,7 +100,7 @@ loop: rw1, rw2 := MsgPipe() done := make(chan struct{}) go func() { - if err := rw1.EncodeMsg(1); err == nil { + if err := EncodeMsg(rw1, 1); err == nil { t.Error("EncodeMsg returned nil error") } else if err != ErrPipeClosed { t.Error("EncodeMsg returned wrong error: got %v, want %v", err, ErrPipeClosed) diff --git a/p2p/peer.go b/p2p/peer.go index 0d7eec9f4..2380a3285 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -460,25 +460,3 @@ func (r *eofSignal) Read(buf []byte) (int, error) { } return n, err } - -func (peer *Peer) PeerList() []interface{} { - peers := peer.otherPeers() - ds := make([]interface{}, 0, len(peers)) - for _, p := range peers { - p.infolock.Lock() - addr := p.listenAddr - p.infolock.Unlock() - // filter out this peer and peers that are not listening or - // have not completed the handshake. - // TODO: track previously sent peers and exclude them as well. - if p == peer || addr == nil { - continue - } - ds = append(ds, addr) - } - ourAddr := peer.ourListenAddr - if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { - ds = append(ds, ourAddr) - } - return ds -} diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 5b9e9e784..4ee88f112 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -126,10 +126,10 @@ func TestPeerProtoEncodeMsg(t *testing.T) { Name: "a", Length: 2, Run: func(peer *Peer, rw MsgReadWriter) error { - if err := rw.EncodeMsg(2); err == nil { + if err := EncodeMsg(rw, 2); err == nil { t.Error("expected error for out-of-range msg code, got nil") } - if err := rw.EncodeMsg(1, "foo", "bar"); err != nil { + if err := EncodeMsg(rw, 1, "foo", "bar"); err != nil { t.Errorf("write error: %v", err) } return nil diff --git a/p2p/protocol.go b/p2p/protocol.go index dd8cbc4ec..1d121a885 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -119,14 +119,14 @@ func (bp *baseProtocol) loop(quit <-chan error) error { getPeersTick := time.NewTicker(10 * time.Second) defer getPeersTick.Stop() - err := bp.rw.EncodeMsg(getPeersMsg) + err := EncodeMsg(bp.rw, getPeersMsg) for err == nil { select { case err = <-quit: return err case <-getPeersTick.C: - err = bp.rw.EncodeMsg(getPeersMsg) + err = EncodeMsg(bp.rw, getPeersMsg) case event := <-activity.Chan(): ping.Reset(pingTimeout) lastActive = event.(time.Time) @@ -134,7 +134,7 @@ func (bp *baseProtocol) loop(quit <-chan error) error { if lastActive.Add(pingTimeout * 2).Before(t) { err = newPeerError(errPingTimeout, "") } else if lastActive.Add(pingTimeout).Before(t) { - err = bp.rw.EncodeMsg(pingMsg) + err = EncodeMsg(bp.rw, pingMsg) } } } @@ -164,12 +164,12 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { return discRequestedError(reason[0]) case pingMsg: - return bp.rw.EncodeMsg(pongMsg) + return EncodeMsg(bp.rw, pongMsg) case pongMsg: case getPeersMsg: - peers := bp.peer.PeerList() + peers := bp.peerList() // this is dangerous. the spec says that we should _delay_ // sending the response if no new information is available. // this means that would need to send a response later when @@ -177,7 +177,7 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error { // // TODO: add event mechanism to notify baseProtocol for new peers if len(peers) > 0 { - return bp.rw.EncodeMsg(peersMsg, peers...) + return EncodeMsg(bp.rw, peersMsg, peers...) } case peersMsg: @@ -264,3 +264,25 @@ func (bp *baseProtocol) handshakeMsg() Msg { bp.peer.ourID.Pubkey()[1:], ) } + +func (bp *baseProtocol) peerList() []interface{} { + peers := bp.peer.otherPeers() + ds := make([]interface{}, 0, len(peers)) + for _, p := range peers { + p.infolock.Lock() + addr := p.listenAddr + p.infolock.Unlock() + // filter out this peer and peers that are not listening or + // have not completed the handshake. + // TODO: track previously sent peers and exclude them as well. + if p == bp.peer || addr == nil { + continue + } + ds = append(ds, addr) + } + ourAddr := bp.peer.ourListenAddr + if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { + ds = append(ds, ourAddr) + } + return ds +} diff --git a/p2p/protocol_test.go b/p2p/protocol_test.go index ce25b3e1b..b1d10ac53 100644 --- a/p2p/protocol_test.go +++ b/p2p/protocol_test.go @@ -4,6 +4,7 @@ import ( "fmt" "net" "reflect" + "sync" "testing" "github.com/ethereum/go-ethereum/crypto" @@ -36,50 +37,71 @@ func newTestPeer() (peer *Peer) { } func TestBaseProtocolPeers(t *testing.T) { - cannedPeerList := []*peerAddr{ + peerList := []*peerAddr{ {IP: net.ParseIP("1.2.3.4"), Port: 2222, Pubkey: []byte{}}, {IP: net.ParseIP("5.6.7.8"), Port: 3333, Pubkey: []byte{}}, } - var ownAddr *peerAddr = &peerAddr{IP: net.ParseIP("1.3.5.7"), Port: 1111, Pubkey: []byte{}} + listenAddr := &peerAddr{IP: net.ParseIP("1.3.5.7"), Port: 1111, Pubkey: []byte{}} rw1, rw2 := MsgPipe() + defer rw1.Close() + wg := new(sync.WaitGroup) + // run matcher, close pipe when addresses have arrived - addrChan := make(chan *peerAddr, len(cannedPeerList)) + numPeers := len(peerList) + 1 + addrChan := make(chan *peerAddr) + wg.Add(1) go func() { - for _, want := range cannedPeerList { - got := <-addrChan - t.Logf("got peer: %+v", got) + i := 0 + for got := range addrChan { + var want *peerAddr + switch { + case i < len(peerList): + want = peerList[i] + case i == len(peerList): + want = listenAddr // listenAddr should be the last thing sent + } + t.Logf("got peer %d/%d: %v", i+1, numPeers, got) if !reflect.DeepEqual(want, got) { - t.Errorf("mismatch: got %#v, want %#v", got, want) + t.Errorf("mismatch: got %+v, want %+v", got, want) + } + i++ + if i == numPeers { + break } } - close(addrChan) - var own []*peerAddr - var got *peerAddr - for got = range addrChan { - own = append(own, got) - } - if len(own) != 1 || !reflect.DeepEqual(ownAddr, own[0]) { - t.Errorf("mismatch: peers own address is incorrectly or not given, got %v, want %#v", ownAddr) + if i != numPeers { + t.Errorf("wrong number of peers received: got %d, want %d", i, numPeers) } - rw2.Close() + rw1.Close() + wg.Done() }() - // run first peer + + // run first peer (in background) peer1 := newTestPeer() - peer1.ourListenAddr = ownAddr + peer1.ourListenAddr = listenAddr peer1.otherPeers = func() []*Peer { - pl := make([]*Peer, len(cannedPeerList)) - for i, addr := range cannedPeerList { + pl := make([]*Peer, len(peerList)) + for i, addr := range peerList { pl[i] = &Peer{listenAddr: addr} } return pl } - go runBaseProtocol(peer1, rw1) + wg.Add(1) + go func() { + runBaseProtocol(peer1, rw1) + wg.Done() + }() + // run second peer peer2 := newTestPeer() peer2.newPeerAddr = addrChan // feed peer suggestions into matcher if err := runBaseProtocol(peer2, rw2); err != ErrPipeClosed { t.Errorf("peer2 terminated with unexpected error: %v", err) } + + // terminate matcher + close(addrChan) + wg.Wait() } func TestBaseProtocolDisconnect(t *testing.T) { @@ -93,7 +115,7 @@ func TestBaseProtocolDisconnect(t *testing.T) { if err := expectMsg(rw2, handshakeMsg); err != nil { t.Error(err) } - err := rw2.EncodeMsg(handshakeMsg, + err := EncodeMsg(rw2, handshakeMsg, baseProtocolVersion, "", []interface{}{}, @@ -106,7 +128,7 @@ func TestBaseProtocolDisconnect(t *testing.T) { if err := expectMsg(rw2, getPeersMsg); err != nil { t.Error(err) } - if err := rw2.EncodeMsg(discMsg, DiscQuitting); err != nil { + if err := EncodeMsg(rw2, discMsg, DiscQuitting); err != nil { t.Error(err) } |