aboutsummaryrefslogblamecommitdiffstats
path: root/whisper/peer_test.go
blob: 9008cdc593405f6780f32f847ff7df86632214e8 (plain) (tree)
1
2
3
4
5
6
7
8






                                             
                                                      









                                                                    
                                                         













                                                                         
 






                               













                                                                                                 































































                                                                                                   



                                                                     

                                   


























                                                                                           



                                                                     

                                   




























                                                                                            
                                                 



                                                     

















































                                                                                                   
package whisper

import (
    "testing"
    "time"

    "github.com/ethereum/go-ethereum/p2p"
    "github.com/ethereum/go-ethereum/p2p/discover"
)

type testPeer struct {
    client *Whisper
    stream *p2p.MsgPipeRW
    termed chan struct{}
}

func startTestPeer() *testPeer {
    // Create a simulated P2P remote peer and data streams to it
    remote := p2p.NewPeer(discover.NodeID{}, "", nil)
    tester, tested := p2p.MsgPipe()

    // Create a whisper client and connect with it to the tester peer
    client := New()
    client.Start()

    termed := make(chan struct{})
    go func() {
        defer client.Stop()
        defer close(termed)
        defer tested.Close()

        client.handlePeer(remote, tested)
    }()

    return &testPeer{
        client: client,
        stream: tester,
        termed: termed,
    }
}

func startTestPeerInited() (*testPeer, error) {
    peer := startTestPeer()

    if err := p2p.ExpectMsg(peer.stream, statusCode, []uint64{protocolVersion}); err != nil {
        peer.stream.Close()
        return nil, err
    }
    if err := p2p.SendItems(peer.stream, statusCode, protocolVersion); err != nil {
        peer.stream.Close()
        return nil, err
    }
    return peer, nil
}

func TestPeerStatusMessage(t *testing.T) {
    tester := startTestPeer()

    // Wait for the handshake status message and check it
    if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
        t.Fatalf("status message mismatch: %v", err)
    }
    // Terminate the node
    tester.stream.Close()

    select {
    case <-tester.termed:
    case <-time.After(time.Second):
        t.Fatalf("local close timed out")
    }
}

func TestPeerHandshakeFail(t *testing.T) {
    tester := startTestPeer()

    // Wait for and check the handshake
    if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
        t.Fatalf("status message mismatch: %v", err)
    }
    // Send an invalid handshake status and verify disconnect
    if err := p2p.SendItems(tester.stream, messagesCode); err != nil {
        t.Fatalf("failed to send malformed status: %v", err)
    }
    select {
    case <-tester.termed:
    case <-time.After(time.Second):
        t.Fatalf("remote close timed out")
    }
}

func TestPeerHandshakeSuccess(t *testing.T) {
    tester := startTestPeer()

    // Wait for and check the handshake
    if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
        t.Fatalf("status message mismatch: %v", err)
    }
    // Send a valid handshake status and make sure connection stays live
    if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil {
        t.Fatalf("failed to send status: %v", err)
    }
    select {
    case <-tester.termed:
        t.Fatalf("valid handshake disconnected")

    case <-time.After(100 * time.Millisecond):
    }
    // Clean up the test
    tester.stream.Close()

    select {
    case <-tester.termed:
    case <-time.After(time.Second):
        t.Fatalf("local close timed out")
    }
}

func TestPeerSend(t *testing.T) {
    // Start a tester and execute the handshake
    tester, err := startTestPeerInited()
    if err != nil {
        t.Fatalf("failed to start initialized peer: %v", err)
    }
    defer tester.stream.Close()

    // Construct a message and inject into the tester
    message := NewMessage([]byte("peer broadcast test message"))
    envelope, err := message.Wrap(DefaultPoW, Options{
        TTL: DefaultTTL,
    })
    if err != nil {
        t.Fatalf("failed to wrap message: %v", err)
    }
    if err := tester.client.Send(envelope); err != nil {
        t.Fatalf("failed to send message: %v", err)
    }
    // Check that the message is eventually forwarded
    payload := []interface{}{envelope}
    if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
        t.Fatalf("message mismatch: %v", err)
    }
    // Make sure that even with a re-insert, an empty batch is received
    if err := tester.client.Send(envelope); err != nil {
        t.Fatalf("failed to send message: %v", err)
    }
    if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
        t.Fatalf("message mismatch: %v", err)
    }
}

func TestPeerDeliver(t *testing.T) {
    // Start a tester and execute the handshake
    tester, err := startTestPeerInited()
    if err != nil {
        t.Fatalf("failed to start initialized peer: %v", err)
    }
    defer tester.stream.Close()

    // Watch for all inbound messages
    arrived := make(chan struct{}, 1)
    tester.client.Watch(Filter{
        Fn: func(message *Message) {
            arrived <- struct{}{}
        },
    })
    // Construct a message and deliver it to the tester peer
    message := NewMessage([]byte("peer broadcast test message"))
    envelope, err := message.Wrap(DefaultPoW, Options{
        TTL: DefaultTTL,
    })
    if err != nil {
        t.Fatalf("failed to wrap message: %v", err)
    }
    if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
        t.Fatalf("failed to transfer message: %v", err)
    }
    // Check that the message is delivered upstream
    select {
    case <-arrived:
    case <-time.After(time.Second):
        t.Fatalf("message delivery timeout")
    }
    // Check that a resend is not delivered
    if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
        t.Fatalf("failed to transfer message: %v", err)
    }
    select {
    case <-time.After(2 * transmissionCycle):
    case <-arrived:
        t.Fatalf("repeating message arrived")
    }
}

func TestPeerMessageExpiration(t *testing.T) {
    // Start a tester and execute the handshake
    tester, err := startTestPeerInited()
    if err != nil {
        t.Fatalf("failed to start initialized peer: %v", err)
    }
    defer tester.stream.Close()

    // Fetch the peer instance for later inspection
    tester.client.peerMu.RLock()
    if peers := len(tester.client.peers); peers != 1 {
        t.Fatalf("peer pool size mismatch: have %v, want %v", peers, 1)
    }
    var peer *peer
    for peer, _ = range tester.client.peers {
        break
    }
    tester.client.peerMu.RUnlock()

    // Construct a message and pass it through the tester
    message := NewMessage([]byte("peer test message"))
    envelope, err := message.Wrap(DefaultPoW, Options{
        TTL: time.Second,
    })
    if err != nil {
        t.Fatalf("failed to wrap message: %v", err)
    }
    if err := tester.client.Send(envelope); err != nil {
        t.Fatalf("failed to send message: %v", err)
    }
    payload := []interface{}{envelope}
    if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
        t.Fatalf("message mismatch: %v", err)
    }
    // Check that the message is inside the cache
    if !peer.known.Has(envelope.Hash()) {
        t.Fatalf("message not found in cache")
    }
    // Discard messages until expiration and check cache again
    exp := time.Now().Add(time.Second + expirationCycle)
    for time.Now().Before(exp) {
        if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
            t.Fatalf("message mismatch: %v", err)
        }
    }
    if peer.known.Has(envelope.Hash()) {
        t.Fatalf("message not expired from cache")
    }
}