aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/simulation/events.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/simulation/events.go')
-rw-r--r--swarm/network/simulation/events.go114
1 files changed, 82 insertions, 32 deletions
diff --git a/swarm/network/simulation/events.go b/swarm/network/simulation/events.go
index 594d36225..d73c3af4e 100644
--- a/swarm/network/simulation/events.go
+++ b/swarm/network/simulation/events.go
@@ -20,16 +20,18 @@ import (
"context"
"sync"
- "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/simulations"
)
// PeerEvent is the type of the channel returned by Simulation.PeerEvents.
type PeerEvent struct {
// NodeID is the ID of node that the event is caught on.
NodeID enode.ID
+ // PeerID is the ID of the peer node that the event is caught on.
+ PeerID enode.ID
// Event is the event that is caught.
- Event *p2p.PeerEvent
+ Event *simulations.Event
// Error is the error that may have happened during event watching.
Error error
}
@@ -37,9 +39,13 @@ type PeerEvent struct {
// PeerEventsFilter defines a filter on PeerEvents to exclude messages with
// defined properties. Use PeerEventsFilter methods to set required options.
type PeerEventsFilter struct {
- t *p2p.PeerEventType
- protocol *string
- msgCode *uint64
+ eventType simulations.EventType
+
+ connUp *bool
+
+ msgReceive *bool
+ protocol *string
+ msgCode *uint64
}
// NewPeerEventsFilter returns a new PeerEventsFilter instance.
@@ -47,20 +53,48 @@ func NewPeerEventsFilter() *PeerEventsFilter {
return &PeerEventsFilter{}
}
-// Type sets the filter to only one peer event type.
-func (f *PeerEventsFilter) Type(t p2p.PeerEventType) *PeerEventsFilter {
- f.t = &t
+// Connect sets the filter to events when two nodes connect.
+func (f *PeerEventsFilter) Connect() *PeerEventsFilter {
+ f.eventType = simulations.EventTypeConn
+ b := true
+ f.connUp = &b
+ return f
+}
+
+// Drop sets the filter to events when two nodes disconnect.
+func (f *PeerEventsFilter) Drop() *PeerEventsFilter {
+ f.eventType = simulations.EventTypeConn
+ b := false
+ f.connUp = &b
+ return f
+}
+
+// ReceivedMessages sets the filter to only messages that are received.
+func (f *PeerEventsFilter) ReceivedMessages() *PeerEventsFilter {
+ f.eventType = simulations.EventTypeMsg
+ b := true
+ f.msgReceive = &b
+ return f
+}
+
+// SentMessages sets the filter to only messages that are sent.
+func (f *PeerEventsFilter) SentMessages() *PeerEventsFilter {
+ f.eventType = simulations.EventTypeMsg
+ b := false
+ f.msgReceive = &b
return f
}
// Protocol sets the filter to only one message protocol.
func (f *PeerEventsFilter) Protocol(p string) *PeerEventsFilter {
+ f.eventType = simulations.EventTypeMsg
f.protocol = &p
return f
}
// MsgCode sets the filter to only one msg code.
func (f *PeerEventsFilter) MsgCode(c uint64) *PeerEventsFilter {
+ f.eventType = simulations.EventTypeMsg
f.msgCode = &c
return f
}
@@ -80,19 +114,8 @@ func (s *Simulation) PeerEvents(ctx context.Context, ids []enode.ID, filters ...
go func(id enode.ID) {
defer s.shutdownWG.Done()
- client, err := s.Net.GetNode(id).Client()
- if err != nil {
- subsWG.Done()
- eventC <- PeerEvent{NodeID: id, Error: err}
- return
- }
- events := make(chan *p2p.PeerEvent)
- sub, err := client.Subscribe(ctx, "admin", events, "peerEvents")
- if err != nil {
- subsWG.Done()
- eventC <- PeerEvent{NodeID: id, Error: err}
- return
- }
+ events := make(chan *simulations.Event)
+ sub := s.Net.Events().Subscribe(events)
defer sub.Unsubscribe()
subsWG.Done()
@@ -110,28 +133,55 @@ func (s *Simulation) PeerEvents(ctx context.Context, ids []enode.ID, filters ...
case <-s.Done():
return
case e := <-events:
+ // ignore control events
+ if e.Control {
+ continue
+ }
match := len(filters) == 0 // if there are no filters match all events
for _, f := range filters {
- if f.t != nil && *f.t != e.Type {
- continue
+ if f.eventType == simulations.EventTypeConn && e.Conn != nil {
+ if *f.connUp != e.Conn.Up {
+ continue
+ }
+ // all connection filter parameters matched, break the loop
+ match = true
+ break
+ }
+ if f.eventType == simulations.EventTypeMsg && e.Msg != nil {
+ if f.msgReceive != nil && *f.msgReceive != e.Msg.Received {
+ continue
+ }
+ if f.protocol != nil && *f.protocol != e.Msg.Protocol {
+ continue
+ }
+ if f.msgCode != nil && *f.msgCode != e.Msg.Code {
+ continue
+ }
+ // all message filter parameters matched, break the loop
+ match = true
+ break
}
- if f.protocol != nil && *f.protocol != e.Protocol {
- continue
+ }
+ var peerID enode.ID
+ switch e.Type {
+ case simulations.EventTypeConn:
+ peerID = e.Conn.One
+ if peerID == id {
+ peerID = e.Conn.Other
}
- if f.msgCode != nil && e.MsgCode != nil && *f.msgCode != *e.MsgCode {
- continue
+ case simulations.EventTypeMsg:
+ peerID = e.Msg.One
+ if peerID == id {
+ peerID = e.Msg.Other
}
- // all filter parameters matched, break the loop
- match = true
- break
}
if match {
select {
- case eventC <- PeerEvent{NodeID: id, Event: e}:
+ case eventC <- PeerEvent{NodeID: id, PeerID: peerID, Event: e}:
case <-ctx.Done():
if err := ctx.Err(); err != nil {
select {
- case eventC <- PeerEvent{NodeID: id, Error: err}:
+ case eventC <- PeerEvent{NodeID: id, PeerID: peerID, Error: err}:
case <-s.Done():
}
}