aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/simulations/network.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/simulations/network.go')
-rw-r--r--p2p/simulations/network.go680
1 files changed, 680 insertions, 0 deletions
diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go
new file mode 100644
index 000000000..06890ffcf
--- /dev/null
+++ b/p2p/simulations/network.go
@@ -0,0 +1,680 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package simulations
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+)
+
+// NetworkConfig defines configuration options for starting a Network
+type NetworkConfig struct {
+ ID string `json:"id"`
+ DefaultService string `json:"default_service,omitempty"`
+}
+
+// Network models a p2p simulation network which consists of a collection of
+// simulated nodes and the connections which exist between them.
+//
+// The Network has a single NodeAdapter which is responsible for actually
+// starting nodes and connecting them together.
+//
+// The Network emits events when nodes are started and stopped, when they are
+// connected and disconnected, and also when messages are sent between nodes.
+type Network struct {
+ NetworkConfig
+
+ Nodes []*Node `json:"nodes"`
+ nodeMap map[discover.NodeID]int
+
+ Conns []*Conn `json:"conns"`
+ connMap map[string]int
+
+ nodeAdapter adapters.NodeAdapter
+ events event.Feed
+ lock sync.RWMutex
+ quitc chan struct{}
+}
+
+// NewNetwork returns a Network which uses the given NodeAdapter and NetworkConfig
+func NewNetwork(nodeAdapter adapters.NodeAdapter, conf *NetworkConfig) *Network {
+ return &Network{
+ NetworkConfig: *conf,
+ nodeAdapter: nodeAdapter,
+ nodeMap: make(map[discover.NodeID]int),
+ connMap: make(map[string]int),
+ quitc: make(chan struct{}),
+ }
+}
+
+// Events returns the output event feed of the Network.
+func (self *Network) Events() *event.Feed {
+ return &self.events
+}
+
+// NewNode adds a new node to the network with a random ID
+func (self *Network) NewNode() (*Node, error) {
+ conf := adapters.RandomNodeConfig()
+ conf.Services = []string{self.DefaultService}
+ return self.NewNodeWithConfig(conf)
+}
+
+// NewNodeWithConfig adds a new node to the network with the given config,
+// returning an error if a node with the same ID or name already exists
+func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ // create a random ID and PrivateKey if not set
+ if conf.ID == (discover.NodeID{}) {
+ c := adapters.RandomNodeConfig()
+ conf.ID = c.ID
+ conf.PrivateKey = c.PrivateKey
+ }
+ id := conf.ID
+
+ // assign a name to the node if not set
+ if conf.Name == "" {
+ conf.Name = fmt.Sprintf("node%02d", len(self.Nodes)+1)
+ }
+
+ // check the node doesn't already exist
+ if node := self.getNode(id); node != nil {
+ return nil, fmt.Errorf("node with ID %q already exists", id)
+ }
+ if node := self.getNodeByName(conf.Name); node != nil {
+ return nil, fmt.Errorf("node with name %q already exists", conf.Name)
+ }
+
+ // if no services are configured, use the default service
+ if len(conf.Services) == 0 {
+ conf.Services = []string{self.DefaultService}
+ }
+
+ // use the NodeAdapter to create the node
+ adapterNode, err := self.nodeAdapter.NewNode(conf)
+ if err != nil {
+ return nil, err
+ }
+ node := &Node{
+ Node: adapterNode,
+ Config: conf,
+ }
+ log.Trace(fmt.Sprintf("node %v created", id))
+ self.nodeMap[id] = len(self.Nodes)
+ self.Nodes = append(self.Nodes, node)
+
+ // emit a "control" event
+ self.events.Send(ControlEvent(node))
+
+ return node, nil
+}
+
+// Config returns the network configuration
+func (self *Network) Config() *NetworkConfig {
+ return &self.NetworkConfig
+}
+
+// StartAll starts all nodes in the network
+func (self *Network) StartAll() error {
+ for _, node := range self.Nodes {
+ if node.Up {
+ continue
+ }
+ if err := self.Start(node.ID()); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// StopAll stops all nodes in the network
+func (self *Network) StopAll() error {
+ for _, node := range self.Nodes {
+ if !node.Up {
+ continue
+ }
+ if err := self.Stop(node.ID()); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Start starts the node with the given ID
+func (self *Network) Start(id discover.NodeID) error {
+ return self.startWithSnapshots(id, nil)
+}
+
+// startWithSnapshots starts the node with the given ID using the give
+// snapshots
+func (self *Network) startWithSnapshots(id discover.NodeID, snapshots map[string][]byte) error {
+ node := self.GetNode(id)
+ if node == nil {
+ return fmt.Errorf("node %v does not exist", id)
+ }
+ if node.Up {
+ return fmt.Errorf("node %v already up", id)
+ }
+ log.Trace(fmt.Sprintf("starting node %v: %v using %v", id, node.Up, self.nodeAdapter.Name()))
+ if err := node.Start(snapshots); err != nil {
+ log.Warn(fmt.Sprintf("start up failed: %v", err))
+ return err
+ }
+ node.Up = true
+ log.Info(fmt.Sprintf("started node %v: %v", id, node.Up))
+
+ self.events.Send(NewEvent(node))
+
+ // subscribe to peer events
+ client, err := node.Client()
+ if err != nil {
+ return fmt.Errorf("error getting rpc client for node %v: %s", id, err)
+ }
+ events := make(chan *p2p.PeerEvent)
+ sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
+ if err != nil {
+ return fmt.Errorf("error getting peer events for node %v: %s", id, err)
+ }
+ go self.watchPeerEvents(id, events, sub)
+ return nil
+}
+
+// watchPeerEvents reads peer events from the given channel and emits
+// corresponding network events
+func (self *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEvent, sub event.Subscription) {
+ defer func() {
+ sub.Unsubscribe()
+
+ // assume the node is now down
+ self.lock.Lock()
+ node := self.getNode(id)
+ node.Up = false
+ self.lock.Unlock()
+ self.events.Send(NewEvent(node))
+ }()
+ for {
+ select {
+ case event, ok := <-events:
+ if !ok {
+ return
+ }
+ peer := event.Peer
+ switch event.Type {
+
+ case p2p.PeerEventTypeAdd:
+ self.DidConnect(id, peer)
+
+ case p2p.PeerEventTypeDrop:
+ self.DidDisconnect(id, peer)
+
+ case p2p.PeerEventTypeMsgSend:
+ self.DidSend(id, peer, event.Protocol, *event.MsgCode)
+
+ case p2p.PeerEventTypeMsgRecv:
+ self.DidReceive(peer, id, event.Protocol, *event.MsgCode)
+
+ }
+
+ case err := <-sub.Err():
+ if err != nil {
+ log.Error(fmt.Sprintf("error getting peer events for node %v", id), "err", err)
+ }
+ return
+ }
+ }
+}
+
+// Stop stops the node with the given ID
+func (self *Network) Stop(id discover.NodeID) error {
+ node := self.GetNode(id)
+ if node == nil {
+ return fmt.Errorf("node %v does not exist", id)
+ }
+ if !node.Up {
+ return fmt.Errorf("node %v already down", id)
+ }
+ if err := node.Stop(); err != nil {
+ return err
+ }
+ node.Up = false
+ log.Info(fmt.Sprintf("stop node %v: %v", id, node.Up))
+
+ self.events.Send(ControlEvent(node))
+ return nil
+}
+
+// Connect connects two nodes together by calling the "admin_addPeer" RPC
+// method on the "one" node so that it connects to the "other" node
+func (self *Network) Connect(oneID, otherID discover.NodeID) error {
+ log.Debug(fmt.Sprintf("connecting %s to %s", oneID, otherID))
+ conn, err := self.GetOrCreateConn(oneID, otherID)
+ if err != nil {
+ return err
+ }
+ if conn.Up {
+ return fmt.Errorf("%v and %v already connected", oneID, otherID)
+ }
+ if err := conn.nodesUp(); err != nil {
+ return err
+ }
+ client, err := conn.one.Client()
+ if err != nil {
+ return err
+ }
+ self.events.Send(ControlEvent(conn))
+ return client.Call(nil, "admin_addPeer", string(conn.other.Addr()))
+}
+
+// Disconnect disconnects two nodes by calling the "admin_removePeer" RPC
+// method on the "one" node so that it disconnects from the "other" node
+func (self *Network) Disconnect(oneID, otherID discover.NodeID) error {
+ conn := self.GetConn(oneID, otherID)
+ if conn == nil {
+ return fmt.Errorf("connection between %v and %v does not exist", oneID, otherID)
+ }
+ if !conn.Up {
+ return fmt.Errorf("%v and %v already disconnected", oneID, otherID)
+ }
+ client, err := conn.one.Client()
+ if err != nil {
+ return err
+ }
+ self.events.Send(ControlEvent(conn))
+ return client.Call(nil, "admin_removePeer", string(conn.other.Addr()))
+}
+
+// DidConnect tracks the fact that the "one" node connected to the "other" node
+func (self *Network) DidConnect(one, other discover.NodeID) error {
+ conn, err := self.GetOrCreateConn(one, other)
+ if err != nil {
+ return fmt.Errorf("connection between %v and %v does not exist", one, other)
+ }
+ if conn.Up {
+ return fmt.Errorf("%v and %v already connected", one, other)
+ }
+ conn.Up = true
+ self.events.Send(NewEvent(conn))
+ return nil
+}
+
+// DidDisconnect tracks the fact that the "one" node disconnected from the
+// "other" node
+func (self *Network) DidDisconnect(one, other discover.NodeID) error {
+ conn, err := self.GetOrCreateConn(one, other)
+ if err != nil {
+ return fmt.Errorf("connection between %v and %v does not exist", one, other)
+ }
+ if !conn.Up {
+ return fmt.Errorf("%v and %v already disconnected", one, other)
+ }
+ conn.Up = false
+ self.events.Send(NewEvent(conn))
+ return nil
+}
+
+// DidSend tracks the fact that "sender" sent a message to "receiver"
+func (self *Network) DidSend(sender, receiver discover.NodeID, proto string, code uint64) error {
+ msg := &Msg{
+ One: sender,
+ Other: receiver,
+ Protocol: proto,
+ Code: code,
+ Received: false,
+ }
+ self.events.Send(NewEvent(msg))
+ return nil
+}
+
+// DidReceive tracks the fact that "receiver" received a message from "sender"
+func (self *Network) DidReceive(sender, receiver discover.NodeID, proto string, code uint64) error {
+ msg := &Msg{
+ One: sender,
+ Other: receiver,
+ Protocol: proto,
+ Code: code,
+ Received: true,
+ }
+ self.events.Send(NewEvent(msg))
+ return nil
+}
+
+// GetNode gets the node with the given ID, returning nil if the node does not
+// exist
+func (self *Network) GetNode(id discover.NodeID) *Node {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ return self.getNode(id)
+}
+
+// GetNode gets the node with the given name, returning nil if the node does
+// not exist
+func (self *Network) GetNodeByName(name string) *Node {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ return self.getNodeByName(name)
+}
+
+func (self *Network) getNode(id discover.NodeID) *Node {
+ i, found := self.nodeMap[id]
+ if !found {
+ return nil
+ }
+ return self.Nodes[i]
+}
+
+func (self *Network) getNodeByName(name string) *Node {
+ for _, node := range self.Nodes {
+ if node.Config.Name == name {
+ return node
+ }
+ }
+ return nil
+}
+
+// GetNodes returns the existing nodes
+func (self *Network) GetNodes() []*Node {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ return self.Nodes
+}
+
+// GetConn returns the connection which exists between "one" and "other"
+// regardless of which node initiated the connection
+func (self *Network) GetConn(oneID, otherID discover.NodeID) *Conn {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ return self.getConn(oneID, otherID)
+}
+
+// GetOrCreateConn is like GetConn but creates the connection if it doesn't
+// already exist
+func (self *Network) GetOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if conn := self.getConn(oneID, otherID); conn != nil {
+ return conn, nil
+ }
+
+ one := self.getNode(oneID)
+ if one == nil {
+ return nil, fmt.Errorf("node %v does not exist", oneID)
+ }
+ other := self.getNode(otherID)
+ if other == nil {
+ return nil, fmt.Errorf("node %v does not exist", otherID)
+ }
+ conn := &Conn{
+ One: oneID,
+ Other: otherID,
+ one: one,
+ other: other,
+ }
+ label := ConnLabel(oneID, otherID)
+ self.connMap[label] = len(self.Conns)
+ self.Conns = append(self.Conns, conn)
+ return conn, nil
+}
+
+func (self *Network) getConn(oneID, otherID discover.NodeID) *Conn {
+ label := ConnLabel(oneID, otherID)
+ i, found := self.connMap[label]
+ if !found {
+ return nil
+ }
+ return self.Conns[i]
+}
+
+// Shutdown stops all nodes in the network and closes the quit channel
+func (self *Network) Shutdown() {
+ for _, node := range self.Nodes {
+ log.Debug(fmt.Sprintf("stopping node %s", node.ID().TerminalString()))
+ if err := node.Stop(); err != nil {
+ log.Warn(fmt.Sprintf("error stopping node %s", node.ID().TerminalString()), "err", err)
+ }
+ }
+ close(self.quitc)
+}
+
+// Node is a wrapper around adapters.Node which is used to track the status
+// of a node in the network
+type Node struct {
+ adapters.Node `json:"-"`
+
+ // Config if the config used to created the node
+ Config *adapters.NodeConfig `json:"config"`
+
+ // Up tracks whether or not the node is running
+ Up bool `json:"up"`
+}
+
+// ID returns the ID of the node
+func (self *Node) ID() discover.NodeID {
+ return self.Config.ID
+}
+
+// String returns a log-friendly string
+func (self *Node) String() string {
+ return fmt.Sprintf("Node %v", self.ID().TerminalString())
+}
+
+// NodeInfo returns information about the node
+func (self *Node) NodeInfo() *p2p.NodeInfo {
+ // avoid a panic if the node is not started yet
+ if self.Node == nil {
+ return nil
+ }
+ info := self.Node.NodeInfo()
+ info.Name = self.Config.Name
+ return info
+}
+
+// MarshalJSON implements the json.Marshaler interface so that the encoded
+// JSON includes the NodeInfo
+func (self *Node) MarshalJSON() ([]byte, error) {
+ return json.Marshal(struct {
+ Info *p2p.NodeInfo `json:"info,omitempty"`
+ Config *adapters.NodeConfig `json:"config,omitempty"`
+ Up bool `json:"up"`
+ }{
+ Info: self.NodeInfo(),
+ Config: self.Config,
+ Up: self.Up,
+ })
+}
+
+// Conn represents a connection between two nodes in the network
+type Conn struct {
+ // One is the node which initiated the connection
+ One discover.NodeID `json:"one"`
+
+ // Other is the node which the connection was made to
+ Other discover.NodeID `json:"other"`
+
+ // Up tracks whether or not the connection is active
+ Up bool `json:"up"`
+
+ one *Node
+ other *Node
+}
+
+// nodesUp returns whether both nodes are currently up
+func (self *Conn) nodesUp() error {
+ if !self.one.Up {
+ return fmt.Errorf("one %v is not up", self.One)
+ }
+ if !self.other.Up {
+ return fmt.Errorf("other %v is not up", self.Other)
+ }
+ return nil
+}
+
+// String returns a log-friendly string
+func (self *Conn) String() string {
+ return fmt.Sprintf("Conn %v->%v", self.One.TerminalString(), self.Other.TerminalString())
+}
+
+// Msg represents a p2p message sent between two nodes in the network
+type Msg struct {
+ One discover.NodeID `json:"one"`
+ Other discover.NodeID `json:"other"`
+ Protocol string `json:"protocol"`
+ Code uint64 `json:"code"`
+ Received bool `json:"received"`
+}
+
+// String returns a log-friendly string
+func (self *Msg) String() string {
+ return fmt.Sprintf("Msg(%d) %v->%v", self.Code, self.One.TerminalString(), self.Other.TerminalString())
+}
+
+// ConnLabel generates a deterministic string which represents a connection
+// between two nodes, used to compare if two connections are between the same
+// nodes
+func ConnLabel(source, target discover.NodeID) string {
+ var first, second discover.NodeID
+ if bytes.Compare(source.Bytes(), target.Bytes()) > 0 {
+ first = target
+ second = source
+ } else {
+ first = source
+ second = target
+ }
+ return fmt.Sprintf("%v-%v", first, second)
+}
+
+// Snapshot represents the state of a network at a single point in time and can
+// be used to restore the state of a network
+type Snapshot struct {
+ Nodes []NodeSnapshot `json:"nodes,omitempty"`
+ Conns []Conn `json:"conns,omitempty"`
+}
+
+// NodeSnapshot represents the state of a node in the network
+type NodeSnapshot struct {
+ Node Node `json:"node,omitempty"`
+
+ // Snapshots is arbitrary data gathered from calling node.Snapshots()
+ Snapshots map[string][]byte `json:"snapshots,omitempty"`
+}
+
+// Snapshot creates a network snapshot
+func (self *Network) Snapshot() (*Snapshot, error) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ snap := &Snapshot{
+ Nodes: make([]NodeSnapshot, len(self.Nodes)),
+ Conns: make([]Conn, len(self.Conns)),
+ }
+ for i, node := range self.Nodes {
+ snap.Nodes[i] = NodeSnapshot{Node: *node}
+ if !node.Up {
+ continue
+ }
+ snapshots, err := node.Snapshots()
+ if err != nil {
+ return nil, err
+ }
+ snap.Nodes[i].Snapshots = snapshots
+ }
+ for i, conn := range self.Conns {
+ snap.Conns[i] = *conn
+ }
+ return snap, nil
+}
+
+// Load loads a network snapshot
+func (self *Network) Load(snap *Snapshot) error {
+ for _, n := range snap.Nodes {
+ if _, err := self.NewNodeWithConfig(n.Node.Config); err != nil {
+ return err
+ }
+ if !n.Node.Up {
+ continue
+ }
+ if err := self.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil {
+ return err
+ }
+ }
+ for _, conn := range snap.Conns {
+ if err := self.Connect(conn.One, conn.Other); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Subscribe reads control events from a channel and executes them
+func (self *Network) Subscribe(events chan *Event) {
+ for {
+ select {
+ case event, ok := <-events:
+ if !ok {
+ return
+ }
+ if event.Control {
+ self.executeControlEvent(event)
+ }
+ case <-self.quitc:
+ return
+ }
+ }
+}
+
+func (self *Network) executeControlEvent(event *Event) {
+ log.Trace("execute control event", "type", event.Type, "event", event)
+ switch event.Type {
+ case EventTypeNode:
+ if err := self.executeNodeEvent(event); err != nil {
+ log.Error("error executing node event", "event", event, "err", err)
+ }
+ case EventTypeConn:
+ if err := self.executeConnEvent(event); err != nil {
+ log.Error("error executing conn event", "event", event, "err", err)
+ }
+ case EventTypeMsg:
+ log.Warn("ignoring control msg event")
+ }
+}
+
+func (self *Network) executeNodeEvent(e *Event) error {
+ if !e.Node.Up {
+ return self.Stop(e.Node.ID())
+ }
+
+ if _, err := self.NewNodeWithConfig(e.Node.Config); err != nil {
+ return err
+ }
+ return self.Start(e.Node.ID())
+}
+
+func (self *Network) executeConnEvent(e *Event) error {
+ if e.Conn.Up {
+ return self.Connect(e.Conn.One, e.Conn.Other)
+ } else {
+ return self.Disconnect(e.Conn.One, e.Conn.Other)
+ }
+}