aboutsummaryrefslogblamecommitdiffstats
path: root/p2p/simulations/network_test.go
blob: 968700174c68d123b82f401fa33901152dec7772 (plain) (tree)



















                                                                                  
                       
             
                 

                 


                 



                                                                    

 






















































































































































































































































                                                                                                                                                         












                                                                               
                                          
                                        

                                                            




















                                                                              
                                                                       




































                                                                                             
                                      




































                                                                                                           
                                                                                                        
















                                                   










































































                                                                                                                         





































































































































                                                                                           
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package simulations

import (
    "context"
    "encoding/json"
    "fmt"
    "reflect"
    "strconv"
    "strings"
    "testing"
    "time"

    "github.com/dexon-foundation/dexon/log"
    "github.com/dexon-foundation/dexon/node"
    "github.com/dexon-foundation/dexon/p2p/enode"
    "github.com/dexon-foundation/dexon/p2p/simulations/adapters"
)

// Tests that a created snapshot with a minimal service only contains the expected connections
// and that a network when loaded with this snapshot only contains those same connections
func TestSnapshot(t *testing.T) {

    // PART I
    // create snapshot from ring network

    // this is a minimal service, whose protocol will take exactly one message OR close of connection before quitting
    adapter := adapters.NewSimAdapter(adapters.Services{
        "noopwoop": func(ctx *adapters.ServiceContext) (node.Service, error) {
            return NewNoopService(nil), nil
        },
    })

    // create network
    network := NewNetwork(adapter, &NetworkConfig{
        DefaultService: "noopwoop",
    })
    // \todo consider making a member of network, set to true threadsafe when shutdown
    runningOne := true
    defer func() {
        if runningOne {
            network.Shutdown()
        }
    }()

    // create and start nodes
    nodeCount := 20
    ids := make([]enode.ID, nodeCount)
    for i := 0; i < nodeCount; i++ {
        conf := adapters.RandomNodeConfig()
        node, err := network.NewNodeWithConfig(conf)
        if err != nil {
            t.Fatalf("error creating node: %s", err)
        }
        if err := network.Start(node.ID()); err != nil {
            t.Fatalf("error starting node: %s", err)
        }
        ids[i] = node.ID()
    }

    // subscribe to peer events
    evC := make(chan *Event)
    sub := network.Events().Subscribe(evC)
    defer sub.Unsubscribe()

    // connect nodes in a ring
    // spawn separate thread to avoid deadlock in the event listeners
    go func() {
        for i, id := range ids {
            peerID := ids[(i+1)%len(ids)]
            if err := network.Connect(id, peerID); err != nil {
                t.Fatal(err)
            }
        }
    }()

    // collect connection events up to expected number
    ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
    defer cancel()
    checkIds := make(map[enode.ID][]enode.ID)
    connEventCount := nodeCount
OUTER:
    for {
        select {
        case <-ctx.Done():
            t.Fatal(ctx.Err())
        case ev := <-evC:
            if ev.Type == EventTypeConn && !ev.Control {

                // fail on any disconnect
                if !ev.Conn.Up {
                    t.Fatalf("unexpected disconnect: %v -> %v", ev.Conn.One, ev.Conn.Other)
                }
                checkIds[ev.Conn.One] = append(checkIds[ev.Conn.One], ev.Conn.Other)
                checkIds[ev.Conn.Other] = append(checkIds[ev.Conn.Other], ev.Conn.One)
                connEventCount--
                log.Debug("ev", "count", connEventCount)
                if connEventCount == 0 {
                    break OUTER
                }
            }
        }
    }

    // create snapshot of current network
    snap, err := network.Snapshot()
    if err != nil {
        t.Fatal(err)
    }
    j, err := json.Marshal(snap)
    if err != nil {
        t.Fatal(err)
    }
    log.Debug("snapshot taken", "nodes", len(snap.Nodes), "conns", len(snap.Conns), "json", string(j))

    // verify that the snap element numbers check out
    if len(checkIds) != len(snap.Conns) || len(checkIds) != len(snap.Nodes) {
        t.Fatalf("snapshot wrong node,conn counts %d,%d != %d", len(snap.Nodes), len(snap.Conns), len(checkIds))
    }

    // shut down sim network
    runningOne = false
    sub.Unsubscribe()
    network.Shutdown()

    // check that we have all the expected connections in the snapshot
    for nodid, nodConns := range checkIds {
        for _, nodConn := range nodConns {
            var match bool
            for _, snapConn := range snap.Conns {
                if snapConn.One == nodid && snapConn.Other == nodConn {
                    match = true
                    break
                } else if snapConn.Other == nodid && snapConn.One == nodConn {
                    match = true
                    break
                }
            }
            if !match {
                t.Fatalf("snapshot missing conn %v -> %v", nodid, nodConn)
            }
        }
    }
    log.Info("snapshot checked")

    // PART II
    // load snapshot and verify that exactly same connections are formed

    adapter = adapters.NewSimAdapter(adapters.Services{
        "noopwoop": func(ctx *adapters.ServiceContext) (node.Service, error) {
            return NewNoopService(nil), nil
        },
    })
    network = NewNetwork(adapter, &NetworkConfig{
        DefaultService: "noopwoop",
    })
    defer func() {
        network.Shutdown()
    }()

    // subscribe to peer events
    // every node up and conn up event will generate one additional control event
    // therefore multiply the count by two
    evC = make(chan *Event, (len(snap.Conns)*2)+(len(snap.Nodes)*2))
    sub = network.Events().Subscribe(evC)
    defer sub.Unsubscribe()

    // load the snapshot
    // spawn separate thread to avoid deadlock in the event listeners
    err = network.Load(snap)
    if err != nil {
        t.Fatal(err)
    }

    // collect connection events up to expected number
    ctx, cancel = context.WithTimeout(context.TODO(), time.Second*3)
    defer cancel()

    connEventCount = nodeCount

OUTER_TWO:
    for {
        select {
        case <-ctx.Done():
            t.Fatal(ctx.Err())
        case ev := <-evC:
            if ev.Type == EventTypeConn && !ev.Control {

                // fail on any disconnect
                if !ev.Conn.Up {
                    t.Fatalf("unexpected disconnect: %v -> %v", ev.Conn.One, ev.Conn.Other)
                }
                log.Debug("conn", "on", ev.Conn.One, "other", ev.Conn.Other)
                checkIds[ev.Conn.One] = append(checkIds[ev.Conn.One], ev.Conn.Other)
                checkIds[ev.Conn.Other] = append(checkIds[ev.Conn.Other], ev.Conn.One)
                connEventCount--
                log.Debug("ev", "count", connEventCount)
                if connEventCount == 0 {
                    break OUTER_TWO
                }
            }
        }
    }

    // check that we have all expected connections in the network
    for _, snapConn := range snap.Conns {
        var match bool
        for nodid, nodConns := range checkIds {
            for _, nodConn := range nodConns {
                if snapConn.One == nodid && snapConn.Other == nodConn {
                    match = true
                    break
                } else if snapConn.Other == nodid && snapConn.One == nodConn {
                    match = true
                    break
                }
            }
        }
        if !match {
            t.Fatalf("network missing conn %v -> %v", snapConn.One, snapConn.Other)
        }
    }

    // verify that network didn't generate any other additional connection events after the ones we have collected within a reasonable period of time
    ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
    defer cancel()
    select {
    case <-ctx.Done():
    case ev := <-evC:
        if ev.Type == EventTypeConn {
            t.Fatalf("Superfluous conn found %v -> %v", ev.Conn.One, ev.Conn.Other)
        }
    }

    // This test validates if all connections from the snapshot
    // are created in the network.
    t.Run("conns after load", func(t *testing.T) {
        // Create new network.
        n := NewNetwork(
            adapters.NewSimAdapter(adapters.Services{
                "noopwoop": func(ctx *adapters.ServiceContext) (node.Service, error) {
                    return NewNoopService(nil), nil
                },
            }),
            &NetworkConfig{
                DefaultService: "noopwoop",
            },
        )
        defer n.Shutdown()

        // Load the same snapshot.
        err := n.Load(snap)
        if err != nil {
            t.Fatal(err)
        }

        // Check every connection from the snapshot
        // if it is in the network, too.
        for _, c := range snap.Conns {
            if n.GetConn(c.One, c.Other) == nil {
                t.Errorf("missing connection: %s -> %s", c.One, c.Other)
            }
        }
    })
}

// TestNetworkSimulation creates a multi-node simulation network with each node
// connected in a ring topology, checks that all nodes successfully handshake
// with each other and that a snapshot fully represents the desired topology
func TestNetworkSimulation(t *testing.T) {
    // create simulation network with 20 testService nodes
    adapter := adapters.NewSimAdapter(adapters.Services{
        "test": newTestService,
    })
    network := NewNetwork(adapter, &NetworkConfig{
        DefaultService: "test",
    })
    defer network.Shutdown()
    nodeCount := 20
    ids := make([]enode.ID, nodeCount)
    for i := 0; i < nodeCount; i++ {
        conf := adapters.RandomNodeConfig()
        node, err := network.NewNodeWithConfig(conf)
        if err != nil {
            t.Fatalf("error creating node: %s", err)
        }
        if err := network.Start(node.ID()); err != nil {
            t.Fatalf("error starting node: %s", err)
        }
        ids[i] = node.ID()
    }

    // perform a check which connects the nodes in a ring (so each node is
    // connected to exactly two peers) and then checks that all nodes
    // performed two handshakes by checking their peerCount
    action := func(_ context.Context) error {
        for i, id := range ids {
            peerID := ids[(i+1)%len(ids)]
            if err := network.Connect(id, peerID); err != nil {
                return err
            }
        }
        return nil
    }
    check := func(ctx context.Context, id enode.ID) (bool, error) {
        // check we haven't run out of time
        select {
        case <-ctx.Done():
            return false, ctx.Err()
        default:
        }

        // get the node
        node := network.GetNode(id)
        if node == nil {
            return false, fmt.Errorf("unknown node: %s", id)
        }

        // check it has exactly two peers
        client, err := node.Client()
        if err != nil {
            return false, err
        }
        var peerCount int64
        if err := client.CallContext(ctx, &peerCount, "test_peerCount"); err != nil {
            return false, err
        }
        switch {
        case peerCount < 2:
            return false, nil
        case peerCount == 2:
            return true, nil
        default:
            return false, fmt.Errorf("unexpected peerCount: %d", peerCount)
        }
    }

    timeout := 30 * time.Second
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    // trigger a check every 100ms
    trigger := make(chan enode.ID)
    go triggerChecks(ctx, ids, trigger, 100*time.Millisecond)

    result := NewSimulation(network).Run(ctx, &Step{
        Action:  action,
        Trigger: trigger,
        Expect: &Expectation{
            Nodes: ids,
            Check: check,
        },
    })
    if result.Error != nil {
        t.Fatalf("simulation failed: %s", result.Error)
    }

    // take a network snapshot and check it contains the correct topology
    snap, err := network.Snapshot()
    if err != nil {
        t.Fatal(err)
    }
    if len(snap.Nodes) != nodeCount {
        t.Fatalf("expected snapshot to contain %d nodes, got %d", nodeCount, len(snap.Nodes))
    }
    if len(snap.Conns) != nodeCount {
        t.Fatalf("expected snapshot to contain %d connections, got %d", nodeCount, len(snap.Conns))
    }
    for i, id := range ids {
        conn := snap.Conns[i]
        if conn.One != id {
            t.Fatalf("expected conn[%d].One to be %s, got %s", i, id, conn.One)
        }
        peerID := ids[(i+1)%len(ids)]
        if conn.Other != peerID {
            t.Fatalf("expected conn[%d].Other to be %s, got %s", i, peerID, conn.Other)
        }
    }
}

func triggerChecks(ctx context.Context, ids []enode.ID, trigger chan enode.ID, interval time.Duration) {
    tick := time.NewTicker(interval)
    defer tick.Stop()
    for {
        select {
        case <-tick.C:
            for _, id := range ids {
                select {
                case trigger <- id:
                case <-ctx.Done():
                    return
                }
            }
        case <-ctx.Done():
            return
        }
    }
}

// \todo: refactor to implement shapshots
// and connect configuration methods once these are moved from
// swarm/network/simulations/connect.go
func BenchmarkMinimalService(b *testing.B) {
    b.Run("ring/32", benchmarkMinimalServiceTmp)
}

func benchmarkMinimalServiceTmp(b *testing.B) {

    // stop timer to discard setup time pollution
    args := strings.Split(b.Name(), "/")
    nodeCount, err := strconv.ParseInt(args[2], 10, 16)
    if err != nil {
        b.Fatal(err)
    }

    for i := 0; i < b.N; i++ {
        // this is a minimal service, whose protocol will close a channel upon run of protocol
        // making it possible to bench the time it takes for the service to start and protocol actually to be run
        protoCMap := make(map[enode.ID]map[enode.ID]chan struct{})
        adapter := adapters.NewSimAdapter(adapters.Services{
            "noopwoop": func(ctx *adapters.ServiceContext) (node.Service, error) {
                protoCMap[ctx.Config.ID] = make(map[enode.ID]chan struct{})
                svc := NewNoopService(protoCMap[ctx.Config.ID])
                return svc, nil
            },
        })

        // create network
        network := NewNetwork(adapter, &NetworkConfig{
            DefaultService: "noopwoop",
        })
        defer network.Shutdown()

        // create and start nodes
        ids := make([]enode.ID, nodeCount)
        for i := 0; i < int(nodeCount); i++ {
            conf := adapters.RandomNodeConfig()
            node, err := network.NewNodeWithConfig(conf)
            if err != nil {
                b.Fatalf("error creating node: %s", err)
            }
            if err := network.Start(node.ID()); err != nil {
                b.Fatalf("error starting node: %s", err)
            }
            ids[i] = node.ID()
        }

        // ready, set, go
        b.ResetTimer()

        // connect nodes in a ring
        for i, id := range ids {
            peerID := ids[(i+1)%len(ids)]
            if err := network.Connect(id, peerID); err != nil {
                b.Fatal(err)
            }
        }

        // wait for all protocols to signal to close down
        ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
        defer cancel()
        for nodid, peers := range protoCMap {
            for peerid, peerC := range peers {
                log.Debug("getting ", "node", nodid, "peer", peerid)
                select {
                case <-ctx.Done():
                    b.Fatal(ctx.Err())
                case <-peerC:
                }
            }
        }
    }
}

func TestNode_UnmarshalJSON(t *testing.T) {
    t.Run(
        "test unmarshal of Node up field",
        func(t *testing.T) {
            runNodeUnmarshalJSON(t, casesNodeUnmarshalJSONUpField())
        },
    )
    t.Run(
        "test unmarshal of Node Config field",
        func(t *testing.T) {
            runNodeUnmarshalJSON(t, casesNodeUnmarshalJSONConfigField())
        },
    )
}

func runNodeUnmarshalJSON(t *testing.T, tests []nodeUnmarshalTestCase) {
    t.Helper()
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            var got Node
            if err := got.UnmarshalJSON([]byte(tt.marshaled)); err != nil {
                expectErrorMessageToContain(t, err, tt.wantErr)
            }
            expectNodeEquality(t, got, tt.want)
        })
    }
}

type nodeUnmarshalTestCase struct {
    name      string
    marshaled string
    want      Node
    wantErr   string
}

func expectErrorMessageToContain(t *testing.T, got error, want string) {
    t.Helper()
    if got == nil && want == "" {
        return
    }

    if got == nil && want != "" {
        t.Errorf("error was expected, got: nil, want: %v", want)
        return
    }

    if !strings.Contains(got.Error(), want) {
        t.Errorf(
            "unexpected error message, got  %v, want: %v",
            want,
            got,
        )
    }
}

func expectNodeEquality(t *testing.T, got Node, want Node) {
    t.Helper()
    if !reflect.DeepEqual(got, want) {
        t.Errorf("Node.UnmarshalJSON() = %v, want %v", got, want)
    }
}

func casesNodeUnmarshalJSONUpField() []nodeUnmarshalTestCase {
    return []nodeUnmarshalTestCase{
        {
            name:      "empty json",
            marshaled: "{}",
            want: Node{
                up: false,
            },
        },
        {
            name:      "a stopped node",
            marshaled: "{\"up\": false}",
            want: Node{
                up: false,
            },
        },
        {
            name:      "a running node",
            marshaled: "{\"up\": true}",
            want: Node{
                up: true,
            },
        },
        {
            name:      "invalid JSON value on valid key",
            marshaled: "{\"up\": foo}",
            wantErr:   "invalid character",
        },
        {
            name:      "invalid JSON key and value",
            marshaled: "{foo: bar}",
            wantErr:   "invalid character",
        },
        {
            name:      "bool value expected but got something else (string)",
            marshaled: "{\"up\": \"true\"}",
            wantErr:   "cannot unmarshal string into Go struct",
        },
    }
}

func casesNodeUnmarshalJSONConfigField() []nodeUnmarshalTestCase {
    // Don't do a big fuss around testing, as adapters.NodeConfig should
    // handle it's own serialization. Just do a sanity check.
    return []nodeUnmarshalTestCase{
        {
            name:      "Config field is omitted",
            marshaled: "{}",
            want: Node{
                Config: nil,
            },
        },
        {
            name:      "Config field is nil",
            marshaled: "{\"config\": nil}",
            want: Node{
                Config: nil,
            },
        },
        {
            name:      "a non default Config field",
            marshaled: "{\"config\":{\"name\":\"node_ecdd0\",\"port\":44665}}",
            want: Node{
                Config: &adapters.NodeConfig{
                    Name: "node_ecdd0",
                    Port: 44665,
                },
            },
        },
    }
}