From e187711c6545487d4cac3701f0f506bb536234e2 Mon Sep 17 00:00:00 2001
From: ethersphere <thesw@rm.eth>
Date: Wed, 20 Jun 2018 14:06:27 +0200
Subject: swarm: network rewrite merge

---
 p2p/metrics.go                    | 10 +++---
 p2p/peer.go                       |  7 +++-
 p2p/protocols/protocol.go         |  7 ++++
 p2p/simulations/adapters/state.go | 36 ---------------------
 p2p/simulations/network.go        | 67 +++++++++++++++++++--------------------
 p2p/testing/protocolsession.go    |  2 ++
 6 files changed, 53 insertions(+), 76 deletions(-)
 delete mode 100644 p2p/simulations/adapters/state.go

(limited to 'p2p')

diff --git a/p2p/metrics.go b/p2p/metrics.go
index 4cbff90ac..2d52fd1fd 100644
--- a/p2p/metrics.go
+++ b/p2p/metrics.go
@@ -31,10 +31,10 @@ var (
 	egressTrafficMeter  = metrics.NewRegisteredMeter("p2p/OutboundTraffic", nil)
 )
 
-// meteredConn is a wrapper around a network TCP connection that meters both the
+// meteredConn is a wrapper around a net.Conn that meters both the
 // inbound and outbound network traffic.
 type meteredConn struct {
-	*net.TCPConn // Network connection to wrap with metering
+	net.Conn // Network connection to wrap with metering
 }
 
 // newMeteredConn creates a new metered connection, also bumping the ingress or
@@ -51,13 +51,13 @@ func newMeteredConn(conn net.Conn, ingress bool) net.Conn {
 	} else {
 		egressConnectMeter.Mark(1)
 	}
-	return &meteredConn{conn.(*net.TCPConn)}
+	return &meteredConn{Conn: conn}
 }
 
 // Read delegates a network read to the underlying connection, bumping the ingress
 // traffic meter along the way.
 func (c *meteredConn) Read(b []byte) (n int, err error) {
-	n, err = c.TCPConn.Read(b)
+	n, err = c.Conn.Read(b)
 	ingressTrafficMeter.Mark(int64(n))
 	return
 }
@@ -65,7 +65,7 @@ func (c *meteredConn) Read(b []byte) (n int, err error) {
 // Write delegates a network write to the underlying connection, bumping the
 // egress traffic meter along the way.
 func (c *meteredConn) Write(b []byte) (n int, err error) {
-	n, err = c.TCPConn.Write(b)
+	n, err = c.Conn.Write(b)
 	egressTrafficMeter.Mark(int64(n))
 	return
 }
diff --git a/p2p/peer.go b/p2p/peer.go
index c3907349f..eb2d34441 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -17,6 +17,7 @@
 package p2p
 
 import (
+	"errors"
 	"fmt"
 	"io"
 	"net"
@@ -31,6 +32,10 @@ import (
 	"github.com/ethereum/go-ethereum/rlp"
 )
 
+var (
+	ErrShuttingDown = errors.New("shutting down")
+)
+
 const (
 	baseProtocolVersion    = 5
 	baseProtocolLength     = uint64(16)
@@ -393,7 +398,7 @@ func (rw *protoRW) WriteMsg(msg Msg) (err error) {
 		// as well but we don't want to rely on that.
 		rw.werr <- err
 	case <-rw.closed:
-		err = fmt.Errorf("shutting down")
+		err = ErrShuttingDown
 	}
 	return err
 }
diff --git a/p2p/protocols/protocol.go b/p2p/protocols/protocol.go
index 849a7ef39..d5c0375ac 100644
--- a/p2p/protocols/protocol.go
+++ b/p2p/protocols/protocol.go
@@ -31,10 +31,12 @@ package protocols
 import (
 	"context"
 	"fmt"
+	"io"
 	"reflect"
 	"sync"
 	"time"
 
+	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/metrics"
 	"github.com/ethereum/go-ethereum/p2p"
 )
@@ -202,6 +204,11 @@ func NewPeer(p *p2p.Peer, rw p2p.MsgReadWriter, spec *Spec) *Peer {
 func (p *Peer) Run(handler func(msg interface{}) error) error {
 	for {
 		if err := p.handleIncoming(handler); err != nil {
+			if err != io.EOF {
+				metrics.GetOrRegisterCounter("peer.handleincoming.error", nil).Inc(1)
+				log.Error("peer.handleIncoming", "err", err)
+			}
+
 			return err
 		}
 	}
diff --git a/p2p/simulations/adapters/state.go b/p2p/simulations/adapters/state.go
deleted file mode 100644
index 78dfb11f9..000000000
--- a/p2p/simulations/adapters/state.go
+++ /dev/null
@@ -1,36 +0,0 @@
-// Copyright 2017 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package adapters
-
-type SimStateStore struct {
-	m map[string][]byte
-}
-
-func (st *SimStateStore) Load(s string) ([]byte, error) {
-	return st.m[s], nil
-}
-
-func (st *SimStateStore) Save(s string, data []byte) error {
-	st.m[s] = data
-	return nil
-}
-
-func NewSimStateStore() *SimStateStore {
-	return &SimStateStore{
-		make(map[string][]byte),
-	}
-}
diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go
index a8a46cd87..0fb7485ad 100644
--- a/p2p/simulations/network.go
+++ b/p2p/simulations/network.go
@@ -31,7 +31,7 @@ import (
 	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
 )
 
-var dialBanTimeout = 200 * time.Millisecond
+var DialBanTimeout = 200 * time.Millisecond
 
 // NetworkConfig defines configuration options for starting a Network
 type NetworkConfig struct {
@@ -78,41 +78,25 @@ func (net *Network) Events() *event.Feed {
 	return &net.events
 }
 
-// NewNode adds a new node to the network with a random ID
-func (net *Network) NewNode() (*Node, error) {
-	conf := adapters.RandomNodeConfig()
-	conf.Services = []string{net.DefaultService}
-	return net.NewNodeWithConfig(conf)
-}
-
 // NewNodeWithConfig adds a new node to the network with the given config,
 // returning an error if a node with the same ID or name already exists
 func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) {
 	net.lock.Lock()
 	defer net.lock.Unlock()
 
-	// create a random ID and PrivateKey if not set
-	if conf.ID == (discover.NodeID{}) {
-		c := adapters.RandomNodeConfig()
-		conf.ID = c.ID
-		conf.PrivateKey = c.PrivateKey
-	}
-	id := conf.ID
 	if conf.Reachable == nil {
 		conf.Reachable = func(otherID discover.NodeID) bool {
 			_, err := net.InitConn(conf.ID, otherID)
-			return err == nil
+			if err != nil && bytes.Compare(conf.ID.Bytes(), otherID.Bytes()) < 0 {
+				return false
+			}
+			return true
 		}
 	}
 
-	// assign a name to the node if not set
-	if conf.Name == "" {
-		conf.Name = fmt.Sprintf("node%02d", len(net.Nodes)+1)
-	}
-
 	// check the node doesn't already exist
-	if node := net.getNode(id); node != nil {
-		return nil, fmt.Errorf("node with ID %q already exists", id)
+	if node := net.getNode(conf.ID); node != nil {
+		return nil, fmt.Errorf("node with ID %q already exists", conf.ID)
 	}
 	if node := net.getNodeByName(conf.Name); node != nil {
 		return nil, fmt.Errorf("node with name %q already exists", conf.Name)
@@ -132,8 +116,8 @@ func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error)
 		Node:   adapterNode,
 		Config: conf,
 	}
-	log.Trace(fmt.Sprintf("node %v created", id))
-	net.nodeMap[id] = len(net.Nodes)
+	log.Trace(fmt.Sprintf("node %v created", conf.ID))
+	net.nodeMap[conf.ID] = len(net.Nodes)
 	net.Nodes = append(net.Nodes, node)
 
 	// emit a "control" event
@@ -181,7 +165,9 @@ func (net *Network) Start(id discover.NodeID) error {
 // startWithSnapshots starts the node with the given ID using the give
 // snapshots
 func (net *Network) startWithSnapshots(id discover.NodeID, snapshots map[string][]byte) error {
-	node := net.GetNode(id)
+	net.lock.Lock()
+	defer net.lock.Unlock()
+	node := net.getNode(id)
 	if node == nil {
 		return fmt.Errorf("node %v does not exist", id)
 	}
@@ -220,9 +206,13 @@ func (net *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEve
 
 		// assume the node is now down
 		net.lock.Lock()
+		defer net.lock.Unlock()
 		node := net.getNode(id)
+		if node == nil {
+			log.Error("Can not find node for id", "id", id)
+			return
+		}
 		node.Up = false
-		net.lock.Unlock()
 		net.events.Send(NewEvent(node))
 	}()
 	for {
@@ -259,7 +249,9 @@ func (net *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEve
 
 // Stop stops the node with the given ID
 func (net *Network) Stop(id discover.NodeID) error {
-	node := net.GetNode(id)
+	net.lock.Lock()
+	defer net.lock.Unlock()
+	node := net.getNode(id)
 	if node == nil {
 		return fmt.Errorf("node %v does not exist", id)
 	}
@@ -312,7 +304,9 @@ func (net *Network) Disconnect(oneID, otherID discover.NodeID) error {
 
 // DidConnect tracks the fact that the "one" node connected to the "other" node
 func (net *Network) DidConnect(one, other discover.NodeID) error {
-	conn, err := net.GetOrCreateConn(one, other)
+	net.lock.Lock()
+	defer net.lock.Unlock()
+	conn, err := net.getOrCreateConn(one, other)
 	if err != nil {
 		return fmt.Errorf("connection between %v and %v does not exist", one, other)
 	}
@@ -327,7 +321,9 @@ func (net *Network) DidConnect(one, other discover.NodeID) error {
 // DidDisconnect tracks the fact that the "one" node disconnected from the
 // "other" node
 func (net *Network) DidDisconnect(one, other discover.NodeID) error {
-	conn := net.GetConn(one, other)
+	net.lock.Lock()
+	defer net.lock.Unlock()
+	conn := net.getConn(one, other)
 	if conn == nil {
 		return fmt.Errorf("connection between %v and %v does not exist", one, other)
 	}
@@ -335,7 +331,7 @@ func (net *Network) DidDisconnect(one, other discover.NodeID) error {
 		return fmt.Errorf("%v and %v already disconnected", one, other)
 	}
 	conn.Up = false
-	conn.initiated = time.Now().Add(-dialBanTimeout)
+	conn.initiated = time.Now().Add(-DialBanTimeout)
 	net.events.Send(NewEvent(conn))
 	return nil
 }
@@ -476,16 +472,19 @@ func (net *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) {
 	if err != nil {
 		return nil, err
 	}
-	if time.Since(conn.initiated) < dialBanTimeout {
-		return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
-	}
 	if conn.Up {
 		return nil, fmt.Errorf("%v and %v already connected", oneID, otherID)
 	}
+	if time.Since(conn.initiated) < DialBanTimeout {
+		return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
+	}
+
 	err = conn.nodesUp()
 	if err != nil {
+		log.Trace(fmt.Sprintf("nodes not up: %v", err))
 		return nil, fmt.Errorf("nodes not up: %v", err)
 	}
+	log.Debug("InitConn - connection initiated")
 	conn.initiated = time.Now()
 	return conn, nil
 }
diff --git a/p2p/testing/protocolsession.go b/p2p/testing/protocolsession.go
index 8f73bfa03..e3ec41ad6 100644
--- a/p2p/testing/protocolsession.go
+++ b/p2p/testing/protocolsession.go
@@ -91,7 +91,9 @@ func (s *ProtocolSession) trigger(trig Trigger) error {
 	errc := make(chan error)
 
 	go func() {
+		log.Trace(fmt.Sprintf("trigger %v (%v)....", trig.Msg, trig.Code))
 		errc <- mockNode.Trigger(&trig)
+		log.Trace(fmt.Sprintf("triggered %v (%v)", trig.Msg, trig.Code))
 	}()
 
 	t := trig.Timeout
-- 
cgit v1.2.3