aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/message.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/message.go')
-rw-r--r--p2p/message.go66
1 files changed, 66 insertions, 0 deletions
diff --git a/p2p/message.go b/p2p/message.go
index 1292d2121..5690494bf 100644
--- a/p2p/message.go
+++ b/p2p/message.go
@@ -27,6 +27,8 @@ import (
"sync/atomic"
"time"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -271,3 +273,67 @@ func ExpectMsg(r MsgReader, code uint64, content interface{}) error {
}
return nil
}
+
+// msgEventer wraps a MsgReadWriter and sends events whenever a message is sent
+// or received
+type msgEventer struct {
+ MsgReadWriter
+
+ feed *event.Feed
+ peerID discover.NodeID
+ Protocol string
+}
+
+// newMsgEventer returns a msgEventer which sends message events to the given
+// feed
+func newMsgEventer(rw MsgReadWriter, feed *event.Feed, peerID discover.NodeID, proto string) *msgEventer {
+ return &msgEventer{
+ MsgReadWriter: rw,
+ feed: feed,
+ peerID: peerID,
+ Protocol: proto,
+ }
+}
+
+// ReadMsg reads a message from the underlying MsgReadWriter and emits a
+// "message received" event
+func (self *msgEventer) ReadMsg() (Msg, error) {
+ msg, err := self.MsgReadWriter.ReadMsg()
+ if err != nil {
+ return msg, err
+ }
+ self.feed.Send(&PeerEvent{
+ Type: PeerEventTypeMsgRecv,
+ Peer: self.peerID,
+ Protocol: self.Protocol,
+ MsgCode: &msg.Code,
+ MsgSize: &msg.Size,
+ })
+ return msg, nil
+}
+
+// WriteMsg writes a message to the underlying MsgReadWriter and emits a
+// "message sent" event
+func (self *msgEventer) WriteMsg(msg Msg) error {
+ err := self.MsgReadWriter.WriteMsg(msg)
+ if err != nil {
+ return err
+ }
+ self.feed.Send(&PeerEvent{
+ Type: PeerEventTypeMsgSend,
+ Peer: self.peerID,
+ Protocol: self.Protocol,
+ MsgCode: &msg.Code,
+ MsgSize: &msg.Size,
+ })
+ return nil
+}
+
+// Close closes the underlying MsgReadWriter if it implements the io.Closer
+// interface
+func (self *msgEventer) Close() error {
+ if v, ok := self.MsgReadWriter.(io.Closer); ok {
+ return v.Close()
+ }
+ return nil
+}