diff options
Diffstat (limited to 'p2p')
-rw-r--r-- | p2p/discover/table_test.go | 20 | ||||
-rw-r--r-- | p2p/discv5/net_test.go | 20 | ||||
-rw-r--r-- | p2p/discv5/node.go | 8 | ||||
-rw-r--r-- | p2p/discv5/ticket.go | 4 | ||||
-rw-r--r-- | p2p/discv5/topic.go | 16 | ||||
-rw-r--r-- | p2p/message.go | 24 | ||||
-rw-r--r-- | p2p/peer_error.go | 4 | ||||
-rw-r--r-- | p2p/simulations/adapters/inproc.go | 98 | ||||
-rw-r--r-- | p2p/simulations/adapters/state.go | 8 | ||||
-rw-r--r-- | p2p/simulations/network.go | 309 | ||||
-rw-r--r-- | p2p/testing/peerpool.go | 34 | ||||
-rw-r--r-- | p2p/testing/protocolsession.go | 26 | ||||
-rw-r--r-- | p2p/testing/protocoltester.go | 12 |
13 files changed, 292 insertions, 291 deletions
diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index 3ce48d299..f2d3f9a2a 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -582,26 +582,26 @@ func (*preminedTestnet) ping(toid NodeID, toaddr *net.UDPAddr) error { return ni // mine generates a testnet struct literal with nodes at // various distances to the given target. -func (n *preminedTestnet) mine(target NodeID) { - n.target = target - n.targetSha = crypto.Keccak256Hash(n.target[:]) +func (tn *preminedTestnet) mine(target NodeID) { + tn.target = target + tn.targetSha = crypto.Keccak256Hash(tn.target[:]) found := 0 for found < bucketSize*10 { k := newkey() id := PubkeyID(&k.PublicKey) sha := crypto.Keccak256Hash(id[:]) - ld := logdist(n.targetSha, sha) - if len(n.dists[ld]) < bucketSize { - n.dists[ld] = append(n.dists[ld], id) + ld := logdist(tn.targetSha, sha) + if len(tn.dists[ld]) < bucketSize { + tn.dists[ld] = append(tn.dists[ld], id) fmt.Println("found ID with ld", ld) found++ } } fmt.Println("&preminedTestnet{") - fmt.Printf(" target: %#v,\n", n.target) - fmt.Printf(" targetSha: %#v,\n", n.targetSha) - fmt.Printf(" dists: [%d][]NodeID{\n", len(n.dists)) - for ld, ns := range n.dists { + fmt.Printf(" target: %#v,\n", tn.target) + fmt.Printf(" targetSha: %#v,\n", tn.targetSha) + fmt.Printf(" dists: [%d][]NodeID{\n", len(tn.dists)) + for ld, ns := range tn.dists { if len(ns) == 0 { continue } diff --git a/p2p/discv5/net_test.go b/p2p/discv5/net_test.go index 369282ca9..001d193cc 100644 --- a/p2p/discv5/net_test.go +++ b/p2p/discv5/net_test.go @@ -336,26 +336,26 @@ func (*preminedTestnet) localAddr() *net.UDPAddr { // mine generates a testnet struct literal with nodes at // various distances to the given target. -func (n *preminedTestnet) mine(target NodeID) { - n.target = target - n.targetSha = crypto.Keccak256Hash(n.target[:]) +func (tn *preminedTestnet) mine(target NodeID) { + tn.target = target + tn.targetSha = crypto.Keccak256Hash(tn.target[:]) found := 0 for found < bucketSize*10 { k := newkey() id := PubkeyID(&k.PublicKey) sha := crypto.Keccak256Hash(id[:]) - ld := logdist(n.targetSha, sha) - if len(n.dists[ld]) < bucketSize { - n.dists[ld] = append(n.dists[ld], id) + ld := logdist(tn.targetSha, sha) + if len(tn.dists[ld]) < bucketSize { + tn.dists[ld] = append(tn.dists[ld], id) fmt.Println("found ID with ld", ld) found++ } } fmt.Println("&preminedTestnet{") - fmt.Printf(" target: %#v,\n", n.target) - fmt.Printf(" targetSha: %#v,\n", n.targetSha) - fmt.Printf(" dists: [%d][]NodeID{\n", len(n.dists)) - for ld, ns := range n.dists { + fmt.Printf(" target: %#v,\n", tn.target) + fmt.Printf(" targetSha: %#v,\n", tn.targetSha) + fmt.Printf(" dists: [%d][]NodeID{\n", len(tn.dists)) + for ld, ns := range tn.dists { if len(ns) == 0 { continue } diff --git a/p2p/discv5/node.go b/p2p/discv5/node.go index fd88a55b1..3d4748512 100644 --- a/p2p/discv5/node.go +++ b/p2p/discv5/node.go @@ -315,11 +315,11 @@ func PubkeyID(pub *ecdsa.PublicKey) NodeID { // Pubkey returns the public key represented by the node ID. // It returns an error if the ID is not a point on the curve. -func (id NodeID) Pubkey() (*ecdsa.PublicKey, error) { +func (n NodeID) Pubkey() (*ecdsa.PublicKey, error) { p := &ecdsa.PublicKey{Curve: crypto.S256(), X: new(big.Int), Y: new(big.Int)} - half := len(id) / 2 - p.X.SetBytes(id[:half]) - p.Y.SetBytes(id[half:]) + half := len(n) / 2 + p.X.SetBytes(n[:half]) + p.Y.SetBytes(n[half:]) if !p.Curve.IsOnCurve(p.X, p.Y) { return nil, errors.New("id is invalid secp256k1 curve point") } diff --git a/p2p/discv5/ticket.go b/p2p/discv5/ticket.go index b3d1ac4ba..ae4b18e7c 100644 --- a/p2p/discv5/ticket.go +++ b/p2p/discv5/ticket.go @@ -304,8 +304,8 @@ func (s ticketRefByWaitTime) Len() int { return len(s) } -func (r ticketRef) waitTime() mclock.AbsTime { - return r.t.regTime[r.idx] - r.t.issueTime +func (ref ticketRef) waitTime() mclock.AbsTime { + return ref.t.regTime[ref.idx] - ref.t.issueTime } // Less reports whether the element with diff --git a/p2p/discv5/topic.go b/p2p/discv5/topic.go index e7a7f8e02..609a41297 100644 --- a/p2p/discv5/topic.go +++ b/p2p/discv5/topic.go @@ -271,15 +271,15 @@ func (t *topicTable) useTicket(node *Node, serialNo uint32, topics []Topic, idx return false } -func (topictab *topicTable) getTicket(node *Node, topics []Topic) *ticket { - topictab.collectGarbage() +func (t *topicTable) getTicket(node *Node, topics []Topic) *ticket { + t.collectGarbage() now := mclock.Now() - n := topictab.getOrNewNode(node) + n := t.getOrNewNode(node) n.lastIssuedTicket++ - topictab.storeTicketCounters(node) + t.storeTicketCounters(node) - t := &ticket{ + tic := &ticket{ issueTime: now, topics: topics, serial: n.lastIssuedTicket, @@ -287,15 +287,15 @@ func (topictab *topicTable) getTicket(node *Node, topics []Topic) *ticket { } for i, topic := range topics { var waitPeriod time.Duration - if topic := topictab.topics[topic]; topic != nil { + if topic := t.topics[topic]; topic != nil { waitPeriod = topic.wcl.waitPeriod } else { waitPeriod = minWaitPeriod } - t.regTime[i] = now + mclock.AbsTime(waitPeriod) + tic.regTime[i] = now + mclock.AbsTime(waitPeriod) } - return t + return tic } const gcInterval = time.Minute diff --git a/p2p/message.go b/p2p/message.go index ffa2ef86a..a4eac54d3 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -270,15 +270,15 @@ func newMsgEventer(rw MsgReadWriter, feed *event.Feed, peerID discover.NodeID, p // ReadMsg reads a message from the underlying MsgReadWriter and emits a // "message received" event -func (self *msgEventer) ReadMsg() (Msg, error) { - msg, err := self.MsgReadWriter.ReadMsg() +func (ev *msgEventer) ReadMsg() (Msg, error) { + msg, err := ev.MsgReadWriter.ReadMsg() if err != nil { return msg, err } - self.feed.Send(&PeerEvent{ + ev.feed.Send(&PeerEvent{ Type: PeerEventTypeMsgRecv, - Peer: self.peerID, - Protocol: self.Protocol, + Peer: ev.peerID, + Protocol: ev.Protocol, MsgCode: &msg.Code, MsgSize: &msg.Size, }) @@ -287,15 +287,15 @@ func (self *msgEventer) ReadMsg() (Msg, error) { // WriteMsg writes a message to the underlying MsgReadWriter and emits a // "message sent" event -func (self *msgEventer) WriteMsg(msg Msg) error { - err := self.MsgReadWriter.WriteMsg(msg) +func (ev *msgEventer) WriteMsg(msg Msg) error { + err := ev.MsgReadWriter.WriteMsg(msg) if err != nil { return err } - self.feed.Send(&PeerEvent{ + ev.feed.Send(&PeerEvent{ Type: PeerEventTypeMsgSend, - Peer: self.peerID, - Protocol: self.Protocol, + Peer: ev.peerID, + Protocol: ev.Protocol, MsgCode: &msg.Code, MsgSize: &msg.Size, }) @@ -304,8 +304,8 @@ func (self *msgEventer) WriteMsg(msg Msg) error { // Close closes the underlying MsgReadWriter if it implements the io.Closer // interface -func (self *msgEventer) Close() error { - if v, ok := self.MsgReadWriter.(io.Closer); ok { +func (ev *msgEventer) Close() error { + if v, ok := ev.MsgReadWriter.(io.Closer); ok { return v.Close() } return nil diff --git a/p2p/peer_error.go b/p2p/peer_error.go index a1cddb707..ab61bfef0 100644 --- a/p2p/peer_error.go +++ b/p2p/peer_error.go @@ -48,8 +48,8 @@ func newPeerError(code int, format string, v ...interface{}) *peerError { return err } -func (self *peerError) Error() string { - return self.message +func (pe *peerError) Error() string { + return pe.message } var errProtocolReturned = errors.New("protocol returned") diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go index 48d7c1730..6d90b4a9f 100644 --- a/p2p/simulations/adapters/inproc.go +++ b/p2p/simulations/adapters/inproc.go @@ -154,30 +154,30 @@ type SimNode struct { } // Addr returns the node's discovery address -func (self *SimNode) Addr() []byte { - return []byte(self.Node().String()) +func (sn *SimNode) Addr() []byte { + return []byte(sn.Node().String()) } // Node returns a discover.Node representing the SimNode -func (self *SimNode) Node() *discover.Node { - return discover.NewNode(self.ID, net.IP{127, 0, 0, 1}, 30303, 30303) +func (sn *SimNode) Node() *discover.Node { + return discover.NewNode(sn.ID, net.IP{127, 0, 0, 1}, 30303, 30303) } // Client returns an rpc.Client which can be used to communicate with the // underlying services (it is set once the node has started) -func (self *SimNode) Client() (*rpc.Client, error) { - self.lock.RLock() - defer self.lock.RUnlock() - if self.client == nil { +func (sn *SimNode) Client() (*rpc.Client, error) { + sn.lock.RLock() + defer sn.lock.RUnlock() + if sn.client == nil { return nil, errors.New("node not started") } - return self.client, nil + return sn.client, nil } // ServeRPC serves RPC requests over the given connection by creating an // in-memory client to the node's RPC server -func (self *SimNode) ServeRPC(conn net.Conn) error { - handler, err := self.node.RPCHandler() +func (sn *SimNode) ServeRPC(conn net.Conn) error { + handler, err := sn.node.RPCHandler() if err != nil { return err } @@ -187,13 +187,13 @@ func (self *SimNode) ServeRPC(conn net.Conn) error { // Snapshots creates snapshots of the services by calling the // simulation_snapshot RPC method -func (self *SimNode) Snapshots() (map[string][]byte, error) { - self.lock.RLock() - services := make(map[string]node.Service, len(self.running)) - for name, service := range self.running { +func (sn *SimNode) Snapshots() (map[string][]byte, error) { + sn.lock.RLock() + services := make(map[string]node.Service, len(sn.running)) + for name, service := range sn.running { services[name] = service } - self.lock.RUnlock() + sn.lock.RUnlock() if len(services) == 0 { return nil, errors.New("no running services") } @@ -213,23 +213,23 @@ func (self *SimNode) Snapshots() (map[string][]byte, error) { } // Start registers the services and starts the underlying devp2p node -func (self *SimNode) Start(snapshots map[string][]byte) error { +func (sn *SimNode) Start(snapshots map[string][]byte) error { newService := func(name string) func(ctx *node.ServiceContext) (node.Service, error) { return func(nodeCtx *node.ServiceContext) (node.Service, error) { ctx := &ServiceContext{ - RPCDialer: self.adapter, + RPCDialer: sn.adapter, NodeContext: nodeCtx, - Config: self.config, + Config: sn.config, } if snapshots != nil { ctx.Snapshot = snapshots[name] } - serviceFunc := self.adapter.services[name] + serviceFunc := sn.adapter.services[name] service, err := serviceFunc(ctx) if err != nil { return nil, err } - self.running[name] = service + sn.running[name] = service return service, nil } } @@ -237,9 +237,9 @@ func (self *SimNode) Start(snapshots map[string][]byte) error { // ensure we only register the services once in the case of the node // being stopped and then started again var regErr error - self.registerOnce.Do(func() { - for _, name := range self.config.Services { - if err := self.node.Register(newService(name)); err != nil { + sn.registerOnce.Do(func() { + for _, name := range sn.config.Services { + if err := sn.node.Register(newService(name)); err != nil { regErr = err return } @@ -249,54 +249,54 @@ func (self *SimNode) Start(snapshots map[string][]byte) error { return regErr } - if err := self.node.Start(); err != nil { + if err := sn.node.Start(); err != nil { return err } // create an in-process RPC client - handler, err := self.node.RPCHandler() + handler, err := sn.node.RPCHandler() if err != nil { return err } - self.lock.Lock() - self.client = rpc.DialInProc(handler) - self.lock.Unlock() + sn.lock.Lock() + sn.client = rpc.DialInProc(handler) + sn.lock.Unlock() return nil } // Stop closes the RPC client and stops the underlying devp2p node -func (self *SimNode) Stop() error { - self.lock.Lock() - if self.client != nil { - self.client.Close() - self.client = nil +func (sn *SimNode) Stop() error { + sn.lock.Lock() + if sn.client != nil { + sn.client.Close() + sn.client = nil } - self.lock.Unlock() - return self.node.Stop() + sn.lock.Unlock() + return sn.node.Stop() } // Services returns a copy of the underlying services -func (self *SimNode) Services() []node.Service { - self.lock.RLock() - defer self.lock.RUnlock() - services := make([]node.Service, 0, len(self.running)) - for _, service := range self.running { +func (sn *SimNode) Services() []node.Service { + sn.lock.RLock() + defer sn.lock.RUnlock() + services := make([]node.Service, 0, len(sn.running)) + for _, service := range sn.running { services = append(services, service) } return services } // Server returns the underlying p2p.Server -func (self *SimNode) Server() *p2p.Server { - return self.node.Server() +func (sn *SimNode) Server() *p2p.Server { + return sn.node.Server() } // SubscribeEvents subscribes the given channel to peer events from the // underlying p2p.Server -func (self *SimNode) SubscribeEvents(ch chan *p2p.PeerEvent) event.Subscription { - srv := self.Server() +func (sn *SimNode) SubscribeEvents(ch chan *p2p.PeerEvent) event.Subscription { + srv := sn.Server() if srv == nil { panic("node not running") } @@ -304,12 +304,12 @@ func (self *SimNode) SubscribeEvents(ch chan *p2p.PeerEvent) event.Subscription } // NodeInfo returns information about the node -func (self *SimNode) NodeInfo() *p2p.NodeInfo { - server := self.Server() +func (sn *SimNode) NodeInfo() *p2p.NodeInfo { + server := sn.Server() if server == nil { return &p2p.NodeInfo{ - ID: self.ID.String(), - Enode: self.Node().String(), + ID: sn.ID.String(), + Enode: sn.Node().String(), } } return server.NodeInfo() diff --git a/p2p/simulations/adapters/state.go b/p2p/simulations/adapters/state.go index 0d4ecfb0f..78dfb11f9 100644 --- a/p2p/simulations/adapters/state.go +++ b/p2p/simulations/adapters/state.go @@ -20,12 +20,12 @@ type SimStateStore struct { m map[string][]byte } -func (self *SimStateStore) Load(s string) ([]byte, error) { - return self.m[s], nil +func (st *SimStateStore) Load(s string) ([]byte, error) { + return st.m[s], nil } -func (self *SimStateStore) Save(s string, data []byte) error { - self.m[s] = data +func (st *SimStateStore) Save(s string, data []byte) error { + st.m[s] = data return nil } diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 0983e0a85..1a2c1e8ff 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -74,22 +74,22 @@ func NewNetwork(nodeAdapter adapters.NodeAdapter, conf *NetworkConfig) *Network } // Events returns the output event feed of the Network. -func (self *Network) Events() *event.Feed { - return &self.events +func (net *Network) Events() *event.Feed { + return &net.events } // NewNode adds a new node to the network with a random ID -func (self *Network) NewNode() (*Node, error) { +func (net *Network) NewNode() (*Node, error) { conf := adapters.RandomNodeConfig() - conf.Services = []string{self.DefaultService} - return self.NewNodeWithConfig(conf) + conf.Services = []string{net.DefaultService} + return net.NewNodeWithConfig(conf) } // NewNodeWithConfig adds a new node to the network with the given config, // returning an error if a node with the same ID or name already exists -func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) { - self.lock.Lock() - defer self.lock.Unlock() +func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) { + net.lock.Lock() + defer net.lock.Unlock() // create a random ID and PrivateKey if not set if conf.ID == (discover.NodeID{}) { @@ -100,31 +100,31 @@ func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) id := conf.ID if conf.Reachable == nil { conf.Reachable = func(otherID discover.NodeID) bool { - _, err := self.InitConn(conf.ID, otherID) + _, err := net.InitConn(conf.ID, otherID) return err == nil } } // assign a name to the node if not set if conf.Name == "" { - conf.Name = fmt.Sprintf("node%02d", len(self.Nodes)+1) + conf.Name = fmt.Sprintf("node%02d", len(net.Nodes)+1) } // check the node doesn't already exist - if node := self.getNode(id); node != nil { + if node := net.getNode(id); node != nil { return nil, fmt.Errorf("node with ID %q already exists", id) } - if node := self.getNodeByName(conf.Name); node != nil { + if node := net.getNodeByName(conf.Name); node != nil { return nil, fmt.Errorf("node with name %q already exists", conf.Name) } // if no services are configured, use the default service if len(conf.Services) == 0 { - conf.Services = []string{self.DefaultService} + conf.Services = []string{net.DefaultService} } // use the NodeAdapter to create the node - adapterNode, err := self.nodeAdapter.NewNode(conf) + adapterNode, err := net.nodeAdapter.NewNode(conf) if err != nil { return nil, err } @@ -133,27 +133,27 @@ func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) Config: conf, } log.Trace(fmt.Sprintf("node %v created", id)) - self.nodeMap[id] = len(self.Nodes) - self.Nodes = append(self.Nodes, node) + net.nodeMap[id] = len(net.Nodes) + net.Nodes = append(net.Nodes, node) // emit a "control" event - self.events.Send(ControlEvent(node)) + net.events.Send(ControlEvent(node)) return node, nil } // Config returns the network configuration -func (self *Network) Config() *NetworkConfig { - return &self.NetworkConfig +func (net *Network) Config() *NetworkConfig { + return &net.NetworkConfig } // StartAll starts all nodes in the network -func (self *Network) StartAll() error { - for _, node := range self.Nodes { +func (net *Network) StartAll() error { + for _, node := range net.Nodes { if node.Up { continue } - if err := self.Start(node.ID()); err != nil { + if err := net.Start(node.ID()); err != nil { return err } } @@ -161,12 +161,12 @@ func (self *Network) StartAll() error { } // StopAll stops all nodes in the network -func (self *Network) StopAll() error { - for _, node := range self.Nodes { +func (net *Network) StopAll() error { + for _, node := range net.Nodes { if !node.Up { continue } - if err := self.Stop(node.ID()); err != nil { + if err := net.Stop(node.ID()); err != nil { return err } } @@ -174,21 +174,21 @@ func (self *Network) StopAll() error { } // Start starts the node with the given ID -func (self *Network) Start(id discover.NodeID) error { - return self.startWithSnapshots(id, nil) +func (net *Network) Start(id discover.NodeID) error { + return net.startWithSnapshots(id, nil) } // startWithSnapshots starts the node with the given ID using the give // snapshots -func (self *Network) startWithSnapshots(id discover.NodeID, snapshots map[string][]byte) error { - node := self.GetNode(id) +func (net *Network) startWithSnapshots(id discover.NodeID, snapshots map[string][]byte) error { + node := net.GetNode(id) if node == nil { return fmt.Errorf("node %v does not exist", id) } if node.Up { return fmt.Errorf("node %v already up", id) } - log.Trace(fmt.Sprintf("starting node %v: %v using %v", id, node.Up, self.nodeAdapter.Name())) + log.Trace(fmt.Sprintf("starting node %v: %v using %v", id, node.Up, net.nodeAdapter.Name())) if err := node.Start(snapshots); err != nil { log.Warn(fmt.Sprintf("start up failed: %v", err)) return err @@ -196,7 +196,7 @@ func (self *Network) startWithSnapshots(id discover.NodeID, snapshots map[string node.Up = true log.Info(fmt.Sprintf("started node %v: %v", id, node.Up)) - self.events.Send(NewEvent(node)) + net.events.Send(NewEvent(node)) // subscribe to peer events client, err := node.Client() @@ -208,22 +208,22 @@ func (self *Network) startWithSnapshots(id discover.NodeID, snapshots map[string if err != nil { return fmt.Errorf("error getting peer events for node %v: %s", id, err) } - go self.watchPeerEvents(id, events, sub) + go net.watchPeerEvents(id, events, sub) return nil } // watchPeerEvents reads peer events from the given channel and emits // corresponding network events -func (self *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEvent, sub event.Subscription) { +func (net *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEvent, sub event.Subscription) { defer func() { sub.Unsubscribe() // assume the node is now down - self.lock.Lock() - node := self.getNode(id) + net.lock.Lock() + node := net.getNode(id) node.Up = false - self.lock.Unlock() - self.events.Send(NewEvent(node)) + net.lock.Unlock() + net.events.Send(NewEvent(node)) }() for { select { @@ -235,16 +235,16 @@ func (self *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEv switch event.Type { case p2p.PeerEventTypeAdd: - self.DidConnect(id, peer) + net.DidConnect(id, peer) case p2p.PeerEventTypeDrop: - self.DidDisconnect(id, peer) + net.DidDisconnect(id, peer) case p2p.PeerEventTypeMsgSend: - self.DidSend(id, peer, event.Protocol, *event.MsgCode) + net.DidSend(id, peer, event.Protocol, *event.MsgCode) case p2p.PeerEventTypeMsgRecv: - self.DidReceive(peer, id, event.Protocol, *event.MsgCode) + net.DidReceive(peer, id, event.Protocol, *event.MsgCode) } @@ -258,8 +258,8 @@ func (self *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEv } // Stop stops the node with the given ID -func (self *Network) Stop(id discover.NodeID) error { - node := self.GetNode(id) +func (net *Network) Stop(id discover.NodeID) error { + node := net.GetNode(id) if node == nil { return fmt.Errorf("node %v does not exist", id) } @@ -272,15 +272,15 @@ func (self *Network) Stop(id discover.NodeID) error { node.Up = false log.Info(fmt.Sprintf("stop node %v: %v", id, node.Up)) - self.events.Send(ControlEvent(node)) + net.events.Send(ControlEvent(node)) return nil } // Connect connects two nodes together by calling the "admin_addPeer" RPC // method on the "one" node so that it connects to the "other" node -func (self *Network) Connect(oneID, otherID discover.NodeID) error { +func (net *Network) Connect(oneID, otherID discover.NodeID) error { log.Debug(fmt.Sprintf("connecting %s to %s", oneID, otherID)) - conn, err := self.InitConn(oneID, otherID) + conn, err := net.InitConn(oneID, otherID) if err != nil { return err } @@ -288,14 +288,14 @@ func (self *Network) Connect(oneID, otherID discover.NodeID) error { if err != nil { return err } - self.events.Send(ControlEvent(conn)) + net.events.Send(ControlEvent(conn)) return client.Call(nil, "admin_addPeer", string(conn.other.Addr())) } // Disconnect disconnects two nodes by calling the "admin_removePeer" RPC // method on the "one" node so that it disconnects from the "other" node -func (self *Network) Disconnect(oneID, otherID discover.NodeID) error { - conn := self.GetConn(oneID, otherID) +func (net *Network) Disconnect(oneID, otherID discover.NodeID) error { + conn := net.GetConn(oneID, otherID) if conn == nil { return fmt.Errorf("connection between %v and %v does not exist", oneID, otherID) } @@ -306,13 +306,13 @@ func (self *Network) Disconnect(oneID, otherID discover.NodeID) error { if err != nil { return err } - self.events.Send(ControlEvent(conn)) + net.events.Send(ControlEvent(conn)) return client.Call(nil, "admin_removePeer", string(conn.other.Addr())) } // DidConnect tracks the fact that the "one" node connected to the "other" node -func (self *Network) DidConnect(one, other discover.NodeID) error { - conn, err := self.GetOrCreateConn(one, other) +func (net *Network) DidConnect(one, other discover.NodeID) error { + conn, err := net.GetOrCreateConn(one, other) if err != nil { return fmt.Errorf("connection between %v and %v does not exist", one, other) } @@ -320,14 +320,14 @@ func (self *Network) DidConnect(one, other discover.NodeID) error { return fmt.Errorf("%v and %v already connected", one, other) } conn.Up = true - self.events.Send(NewEvent(conn)) + net.events.Send(NewEvent(conn)) return nil } // DidDisconnect tracks the fact that the "one" node disconnected from the // "other" node -func (self *Network) DidDisconnect(one, other discover.NodeID) error { - conn := self.GetConn(one, other) +func (net *Network) DidDisconnect(one, other discover.NodeID) error { + conn := net.GetConn(one, other) if conn == nil { return fmt.Errorf("connection between %v and %v does not exist", one, other) } @@ -336,12 +336,12 @@ func (self *Network) DidDisconnect(one, other discover.NodeID) error { } conn.Up = false conn.initiated = time.Now().Add(-dialBanTimeout) - self.events.Send(NewEvent(conn)) + net.events.Send(NewEvent(conn)) return nil } // DidSend tracks the fact that "sender" sent a message to "receiver" -func (self *Network) DidSend(sender, receiver discover.NodeID, proto string, code uint64) error { +func (net *Network) DidSend(sender, receiver discover.NodeID, proto string, code uint64) error { msg := &Msg{ One: sender, Other: receiver, @@ -349,12 +349,12 @@ func (self *Network) DidSend(sender, receiver discover.NodeID, proto string, cod Code: code, Received: false, } - self.events.Send(NewEvent(msg)) + net.events.Send(NewEvent(msg)) return nil } // DidReceive tracks the fact that "receiver" received a message from "sender" -func (self *Network) DidReceive(sender, receiver discover.NodeID, proto string, code uint64) error { +func (net *Network) DidReceive(sender, receiver discover.NodeID, proto string, code uint64) error { msg := &Msg{ One: sender, Other: receiver, @@ -362,36 +362,36 @@ func (self *Network) DidReceive(sender, receiver discover.NodeID, proto string, Code: code, Received: true, } - self.events.Send(NewEvent(msg)) + net.events.Send(NewEvent(msg)) return nil } // GetNode gets the node with the given ID, returning nil if the node does not // exist -func (self *Network) GetNode(id discover.NodeID) *Node { - self.lock.Lock() - defer self.lock.Unlock() - return self.getNode(id) +func (net *Network) GetNode(id discover.NodeID) *Node { + net.lock.Lock() + defer net.lock.Unlock() + return net.getNode(id) } // GetNode gets the node with the given name, returning nil if the node does // not exist -func (self *Network) GetNodeByName(name string) *Node { - self.lock.Lock() - defer self.lock.Unlock() - return self.getNodeByName(name) +func (net *Network) GetNodeByName(name string) *Node { + net.lock.Lock() + defer net.lock.Unlock() + return net.getNodeByName(name) } -func (self *Network) getNode(id discover.NodeID) *Node { - i, found := self.nodeMap[id] +func (net *Network) getNode(id discover.NodeID) *Node { + i, found := net.nodeMap[id] if !found { return nil } - return self.Nodes[i] + return net.Nodes[i] } -func (self *Network) getNodeByName(name string) *Node { - for _, node := range self.Nodes { +func (net *Network) getNodeByName(name string) *Node { + for _, node := range net.Nodes { if node.Config.Name == name { return node } @@ -400,40 +400,40 @@ func (self *Network) getNodeByName(name string) *Node { } // GetNodes returns the existing nodes -func (self *Network) GetNodes() (nodes []*Node) { - self.lock.Lock() - defer self.lock.Unlock() +func (net *Network) GetNodes() (nodes []*Node) { + net.lock.Lock() + defer net.lock.Unlock() - nodes = append(nodes, self.Nodes...) + nodes = append(nodes, net.Nodes...) return nodes } // GetConn returns the connection which exists between "one" and "other" // regardless of which node initiated the connection -func (self *Network) GetConn(oneID, otherID discover.NodeID) *Conn { - self.lock.Lock() - defer self.lock.Unlock() - return self.getConn(oneID, otherID) +func (net *Network) GetConn(oneID, otherID discover.NodeID) *Conn { + net.lock.Lock() + defer net.lock.Unlock() + return net.getConn(oneID, otherID) } // GetOrCreateConn is like GetConn but creates the connection if it doesn't // already exist -func (self *Network) GetOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) { - self.lock.Lock() - defer self.lock.Unlock() - return self.getOrCreateConn(oneID, otherID) +func (net *Network) GetOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) { + net.lock.Lock() + defer net.lock.Unlock() + return net.getOrCreateConn(oneID, otherID) } -func (self *Network) getOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) { - if conn := self.getConn(oneID, otherID); conn != nil { +func (net *Network) getOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) { + if conn := net.getConn(oneID, otherID); conn != nil { return conn, nil } - one := self.getNode(oneID) + one := net.getNode(oneID) if one == nil { return nil, fmt.Errorf("node %v does not exist", oneID) } - other := self.getNode(otherID) + other := net.getNode(otherID) if other == nil { return nil, fmt.Errorf("node %v does not exist", otherID) } @@ -444,18 +444,18 @@ func (self *Network) getOrCreateConn(oneID, otherID discover.NodeID) (*Conn, err other: other, } label := ConnLabel(oneID, otherID) - self.connMap[label] = len(self.Conns) - self.Conns = append(self.Conns, conn) + net.connMap[label] = len(net.Conns) + net.Conns = append(net.Conns, conn) return conn, nil } -func (self *Network) getConn(oneID, otherID discover.NodeID) *Conn { +func (net *Network) getConn(oneID, otherID discover.NodeID) *Conn { label := ConnLabel(oneID, otherID) - i, found := self.connMap[label] + i, found := net.connMap[label] if !found { return nil } - return self.Conns[i] + return net.Conns[i] } // InitConn(one, other) retrieves the connectiton model for the connection between @@ -466,13 +466,13 @@ func (self *Network) getConn(oneID, otherID discover.NodeID) *Conn { // it also checks whether there has been recent attempt to connect the peers // this is cheating as the simulation is used as an oracle and know about // remote peers attempt to connect to a node which will then not initiate the connection -func (self *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) { - self.lock.Lock() - defer self.lock.Unlock() +func (net *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) { + net.lock.Lock() + defer net.lock.Unlock() if oneID == otherID { return nil, fmt.Errorf("refusing to connect to self %v", oneID) } - conn, err := self.getOrCreateConn(oneID, otherID) + conn, err := net.getOrCreateConn(oneID, otherID) if err != nil { return nil, err } @@ -491,28 +491,28 @@ func (self *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) { } // Shutdown stops all nodes in the network and closes the quit channel -func (self *Network) Shutdown() { - for _, node := range self.Nodes { +func (net *Network) Shutdown() { + for _, node := range net.Nodes { log.Debug(fmt.Sprintf("stopping node %s", node.ID().TerminalString())) if err := node.Stop(); err != nil { log.Warn(fmt.Sprintf("error stopping node %s", node.ID().TerminalString()), "err", err) } } - close(self.quitc) + close(net.quitc) } //Reset resets all network properties: //emtpies the nodes and the connection list -func (self *Network) Reset() { - self.lock.Lock() - defer self.lock.Unlock() +func (net *Network) Reset() { + net.lock.Lock() + defer net.lock.Unlock() //re-initialize the maps - self.connMap = make(map[string]int) - self.nodeMap = make(map[discover.NodeID]int) + net.connMap = make(map[string]int) + net.nodeMap = make(map[discover.NodeID]int) - self.Nodes = nil - self.Conns = nil + net.Nodes = nil + net.Conns = nil } // Node is a wrapper around adapters.Node which is used to track the status @@ -528,37 +528,37 @@ type Node struct { } // ID returns the ID of the node -func (self *Node) ID() discover.NodeID { - return self.Config.ID +func (n *Node) ID() discover.NodeID { + return n.Config.ID } // String returns a log-friendly string -func (self *Node) String() string { - return fmt.Sprintf("Node %v", self.ID().TerminalString()) +func (n *Node) String() string { + return fmt.Sprintf("Node %v", n.ID().TerminalString()) } // NodeInfo returns information about the node -func (self *Node) NodeInfo() *p2p.NodeInfo { +func (n *Node) NodeInfo() *p2p.NodeInfo { // avoid a panic if the node is not started yet - if self.Node == nil { + if n.Node == nil { return nil } - info := self.Node.NodeInfo() - info.Name = self.Config.Name + info := n.Node.NodeInfo() + info.Name = n.Config.Name return info } // MarshalJSON implements the json.Marshaler interface so that the encoded // JSON includes the NodeInfo -func (self *Node) MarshalJSON() ([]byte, error) { +func (n *Node) MarshalJSON() ([]byte, error) { return json.Marshal(struct { Info *p2p.NodeInfo `json:"info,omitempty"` Config *adapters.NodeConfig `json:"config,omitempty"` Up bool `json:"up"` }{ - Info: self.NodeInfo(), - Config: self.Config, - Up: self.Up, + Info: n.NodeInfo(), + Config: n.Config, + Up: n.Up, }) } @@ -580,19 +580,19 @@ type Conn struct { } // nodesUp returns whether both nodes are currently up -func (self *Conn) nodesUp() error { - if !self.one.Up { - return fmt.Errorf("one %v is not up", self.One) +func (c *Conn) nodesUp() error { + if !c.one.Up { + return fmt.Errorf("one %v is not up", c.One) } - if !self.other.Up { - return fmt.Errorf("other %v is not up", self.Other) + if !c.other.Up { + return fmt.Errorf("other %v is not up", c.Other) } return nil } // String returns a log-friendly string -func (self *Conn) String() string { - return fmt.Sprintf("Conn %v->%v", self.One.TerminalString(), self.Other.TerminalString()) +func (c *Conn) String() string { + return fmt.Sprintf("Conn %v->%v", c.One.TerminalString(), c.Other.TerminalString()) } // Msg represents a p2p message sent between two nodes in the network @@ -605,8 +605,8 @@ type Msg struct { } // String returns a log-friendly string -func (self *Msg) String() string { - return fmt.Sprintf("Msg(%d) %v->%v", self.Code, self.One.TerminalString(), self.Other.TerminalString()) +func (m *Msg) String() string { + return fmt.Sprintf("Msg(%d) %v->%v", m.Code, m.One.TerminalString(), m.Other.TerminalString()) } // ConnLabel generates a deterministic string which represents a connection @@ -640,14 +640,14 @@ type NodeSnapshot struct { } // Snapshot creates a network snapshot -func (self *Network) Snapshot() (*Snapshot, error) { - self.lock.Lock() - defer self.lock.Unlock() +func (net *Network) Snapshot() (*Snapshot, error) { + net.lock.Lock() + defer net.lock.Unlock() snap := &Snapshot{ - Nodes: make([]NodeSnapshot, len(self.Nodes)), - Conns: make([]Conn, len(self.Conns)), + Nodes: make([]NodeSnapshot, len(net.Nodes)), + Conns: make([]Conn, len(net.Conns)), } - for i, node := range self.Nodes { + for i, node := range net.Nodes { snap.Nodes[i] = NodeSnapshot{Node: *node} if !node.Up { continue @@ -658,33 +658,33 @@ func (self *Network) Snapshot() (*Snapshot, error) { } snap.Nodes[i].Snapshots = snapshots } - for i, conn := range self.Conns { + for i, conn := range net.Conns { snap.Conns[i] = *conn } return snap, nil } // Load loads a network snapshot -func (self *Network) Load(snap *Snapshot) error { +func (net *Network) Load(snap *Snapshot) error { for _, n := range snap.Nodes { - if _, err := self.NewNodeWithConfig(n.Node.Config); err != nil { + if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil { return err } if !n.Node.Up { continue } - if err := self.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil { + if err := net.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil { return err } } for _, conn := range snap.Conns { - if !self.GetNode(conn.One).Up || !self.GetNode(conn.Other).Up { + if !net.GetNode(conn.One).Up || !net.GetNode(conn.Other).Up { //in this case, at least one of the nodes of a connection is not up, //so it would result in the snapshot `Load` to fail continue } - if err := self.Connect(conn.One, conn.Other); err != nil { + if err := net.Connect(conn.One, conn.Other); err != nil { return err } } @@ -692,7 +692,7 @@ func (self *Network) Load(snap *Snapshot) error { } // Subscribe reads control events from a channel and executes them -func (self *Network) Subscribe(events chan *Event) { +func (net *Network) Subscribe(events chan *Event) { for { select { case event, ok := <-events: @@ -700,23 +700,23 @@ func (self *Network) Subscribe(events chan *Event) { return } if event.Control { - self.executeControlEvent(event) + net.executeControlEvent(event) } - case <-self.quitc: + case <-net.quitc: return } } } -func (self *Network) executeControlEvent(event *Event) { +func (net *Network) executeControlEvent(event *Event) { log.Trace("execute control event", "type", event.Type, "event", event) switch event.Type { case EventTypeNode: - if err := self.executeNodeEvent(event); err != nil { + if err := net.executeNodeEvent(event); err != nil { log.Error("error executing node event", "event", event, "err", err) } case EventTypeConn: - if err := self.executeConnEvent(event); err != nil { + if err := net.executeConnEvent(event); err != nil { log.Error("error executing conn event", "event", event, "err", err) } case EventTypeMsg: @@ -724,20 +724,21 @@ func (self *Network) executeControlEvent(event *Event) { } } -func (self *Network) executeNodeEvent(e *Event) error { +func (net *Network) executeNodeEvent(e *Event) error { if !e.Node.Up { - return self.Stop(e.Node.ID()) + return net.Stop(e.Node.ID()) } - if _, err := self.NewNodeWithConfig(e.Node.Config); err != nil { + if _, err := net.NewNodeWithConfig(e.Node.Config); err != nil { return err } - return self.Start(e.Node.ID()) + return net.Start(e.Node.ID()) } -func (self *Network) executeConnEvent(e *Event) error { +func (net *Network) executeConnEvent(e *Event) error { if e.Conn.Up { - return self.Connect(e.Conn.One, e.Conn.Other) + return net.Connect(e.Conn.One, e.Conn.Other) + } else { + return net.Disconnect(e.Conn.One, e.Conn.Other) } - return self.Disconnect(e.Conn.One, e.Conn.Other) } diff --git a/p2p/testing/peerpool.go b/p2p/testing/peerpool.go index 45c6e6142..ed00396e2 100644 --- a/p2p/testing/peerpool.go +++ b/p2p/testing/peerpool.go @@ -39,29 +39,29 @@ func NewTestPeerPool() *TestPeerPool { return &TestPeerPool{peers: make(map[discover.NodeID]TestPeer)} } -func (self *TestPeerPool) Add(p TestPeer) { - self.lock.Lock() - defer self.lock.Unlock() - log.Trace(fmt.Sprintf("pp add peer %v", p.ID())) - self.peers[p.ID()] = p +func (p *TestPeerPool) Add(peer TestPeer) { + p.lock.Lock() + defer p.lock.Unlock() + log.Trace(fmt.Sprintf("pp add peer %v", peer.ID())) + p.peers[peer.ID()] = peer } -func (self *TestPeerPool) Remove(p TestPeer) { - self.lock.Lock() - defer self.lock.Unlock() - delete(self.peers, p.ID()) +func (p *TestPeerPool) Remove(peer TestPeer) { + p.lock.Lock() + defer p.lock.Unlock() + delete(p.peers, peer.ID()) } -func (self *TestPeerPool) Has(id discover.NodeID) bool { - self.lock.Lock() - defer self.lock.Unlock() - _, ok := self.peers[id] +func (p *TestPeerPool) Has(id discover.NodeID) bool { + p.lock.Lock() + defer p.lock.Unlock() + _, ok := p.peers[id] return ok } -func (self *TestPeerPool) Get(id discover.NodeID) TestPeer { - self.lock.Lock() - defer self.lock.Unlock() - return self.peers[id] +func (p *TestPeerPool) Get(id discover.NodeID) TestPeer { + p.lock.Lock() + defer p.lock.Unlock() + return p.peers[id] } diff --git a/p2p/testing/protocolsession.go b/p2p/testing/protocolsession.go index 361285f06..8f73bfa03 100644 --- a/p2p/testing/protocolsession.go +++ b/p2p/testing/protocolsession.go @@ -78,10 +78,10 @@ type Disconnect struct { } // trigger sends messages from peers -func (self *ProtocolSession) trigger(trig Trigger) error { - simNode, ok := self.adapter.GetNode(trig.Peer) +func (s *ProtocolSession) trigger(trig Trigger) error { + simNode, ok := s.adapter.GetNode(trig.Peer) if !ok { - return fmt.Errorf("trigger: peer %v does not exist (1- %v)", trig.Peer, len(self.IDs)) + return fmt.Errorf("trigger: peer %v does not exist (1- %v)", trig.Peer, len(s.IDs)) } mockNode, ok := simNode.Services()[0].(*mockNode) if !ok { @@ -107,7 +107,7 @@ func (self *ProtocolSession) trigger(trig Trigger) error { } // expect checks an expectation of a message sent out by the pivot node -func (self *ProtocolSession) expect(exps []Expect) error { +func (s *ProtocolSession) expect(exps []Expect) error { // construct a map of expectations for each node peerExpects := make(map[discover.NodeID][]Expect) for _, exp := range exps { @@ -120,9 +120,9 @@ func (self *ProtocolSession) expect(exps []Expect) error { // construct a map of mockNodes for each node mockNodes := make(map[discover.NodeID]*mockNode) for nodeID := range peerExpects { - simNode, ok := self.adapter.GetNode(nodeID) + simNode, ok := s.adapter.GetNode(nodeID) if !ok { - return fmt.Errorf("trigger: peer %v does not exist (1- %v)", nodeID, len(self.IDs)) + return fmt.Errorf("trigger: peer %v does not exist (1- %v)", nodeID, len(s.IDs)) } mockNode, ok := simNode.Services()[0].(*mockNode) if !ok { @@ -202,9 +202,9 @@ func (self *ProtocolSession) expect(exps []Expect) error { } // TestExchanges tests a series of exchanges against the session -func (self *ProtocolSession) TestExchanges(exchanges ...Exchange) error { +func (s *ProtocolSession) TestExchanges(exchanges ...Exchange) error { for i, e := range exchanges { - if err := self.testExchange(e); err != nil { + if err := s.testExchange(e); err != nil { return fmt.Errorf("exchange #%d %q: %v", i, e.Label, err) } log.Trace(fmt.Sprintf("exchange #%d %q: run successfully", i, e.Label)) @@ -214,14 +214,14 @@ func (self *ProtocolSession) TestExchanges(exchanges ...Exchange) error { // testExchange tests a single Exchange. // Default timeout value is 2 seconds. -func (self *ProtocolSession) testExchange(e Exchange) error { +func (s *ProtocolSession) testExchange(e Exchange) error { errc := make(chan error) done := make(chan struct{}) defer close(done) go func() { for _, trig := range e.Triggers { - err := self.trigger(trig) + err := s.trigger(trig) if err != nil { errc <- err return @@ -229,7 +229,7 @@ func (self *ProtocolSession) testExchange(e Exchange) error { } select { - case errc <- self.expect(e.Expects): + case errc <- s.expect(e.Expects): case <-done: } }() @@ -250,7 +250,7 @@ func (self *ProtocolSession) testExchange(e Exchange) error { // TestDisconnected tests the disconnections given as arguments // the disconnect structs describe what disconnect error is expected on which peer -func (self *ProtocolSession) TestDisconnected(disconnects ...*Disconnect) error { +func (s *ProtocolSession) TestDisconnected(disconnects ...*Disconnect) error { expects := make(map[discover.NodeID]error) for _, disconnect := range disconnects { expects[disconnect.Peer] = disconnect.Error @@ -259,7 +259,7 @@ func (self *ProtocolSession) TestDisconnected(disconnects ...*Disconnect) error timeout := time.After(time.Second) for len(expects) > 0 { select { - case event := <-self.events: + case event := <-s.events: if event.Type != p2p.PeerEventTypeDrop { continue } diff --git a/p2p/testing/protocoltester.go b/p2p/testing/protocoltester.go index a797412d6..636613c57 100644 --- a/p2p/testing/protocoltester.go +++ b/p2p/testing/protocoltester.go @@ -101,24 +101,24 @@ func NewProtocolTester(t *testing.T, id discover.NodeID, n int, run func(*p2p.Pe } // Stop stops the p2p server -func (self *ProtocolTester) Stop() error { - self.Server.Stop() +func (t *ProtocolTester) Stop() error { + t.Server.Stop() return nil } // Connect brings up the remote peer node and connects it using the // p2p/simulations network connection with the in memory network adapter -func (self *ProtocolTester) Connect(selfID discover.NodeID, peers ...*adapters.NodeConfig) { +func (t *ProtocolTester) Connect(selfID discover.NodeID, peers ...*adapters.NodeConfig) { for _, peer := range peers { log.Trace(fmt.Sprintf("start node %v", peer.ID)) - if _, err := self.network.NewNodeWithConfig(peer); err != nil { + if _, err := t.network.NewNodeWithConfig(peer); err != nil { panic(fmt.Sprintf("error starting peer %v: %v", peer.ID, err)) } - if err := self.network.Start(peer.ID); err != nil { + if err := t.network.Start(peer.ID); err != nil { panic(fmt.Sprintf("error starting peer %v: %v", peer.ID, err)) } log.Trace(fmt.Sprintf("connect to %v", peer.ID)) - if err := self.network.Connect(selfID, peer.ID); err != nil { + if err := t.network.Connect(selfID, peer.ID); err != nil { panic(fmt.Sprintf("error connecting to peer %v: %v", peer.ID, err)) } } |