aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/protocols
diff options
context:
space:
mode:
authorAnton Evangelatov <anton.evangelatov@gmail.com>2018-07-13 23:40:28 +0800
committerBalint Gabor <balint.g@gmail.com>2018-07-13 23:40:28 +0800
commit7c9314f231a7ddffbbbc5fec16c65519a0121eeb (patch)
treedbc4021b66ee8968ad747036741fac7e1b972a39 /p2p/protocols
parentf7d3678c28c4b92e45a458e4785bd0f1cdc20e34 (diff)
downloadgo-tangerine-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.tar
go-tangerine-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.tar.gz
go-tangerine-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.tar.bz2
go-tangerine-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.tar.lz
go-tangerine-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.tar.xz
go-tangerine-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.tar.zst
go-tangerine-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.zip
swarm: integrate OpenTracing; propagate ctx to internal APIs (#17169)
* swarm: propagate ctx, enable opentracing * swarm/tracing: log error when tracing is misconfigured
Diffstat (limited to 'p2p/protocols')
-rw-r--r--p2p/protocols/protocol.go89
-rw-r--r--p2p/protocols/protocol_test.go4
2 files changed, 83 insertions, 10 deletions
diff --git a/p2p/protocols/protocol.go b/p2p/protocols/protocol.go
index d5c0375ac..615f74b56 100644
--- a/p2p/protocols/protocol.go
+++ b/p2p/protocols/protocol.go
@@ -29,6 +29,8 @@ devp2p subprotocols by abstracting away code standardly shared by protocols.
package protocols
import (
+ "bufio"
+ "bytes"
"context"
"fmt"
"io"
@@ -39,6 +41,10 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/swarm/spancontext"
+ "github.com/ethereum/go-ethereum/swarm/tracing"
+ opentracing "github.com/opentracing/opentracing-go"
)
// error codes used by this protocol scheme
@@ -109,6 +115,13 @@ func errorf(code int, format string, params ...interface{}) *Error {
}
}
+// WrappedMsg is used to propagate marshalled context alongside message payloads
+type WrappedMsg struct {
+ Context []byte
+ Size uint32
+ Payload []byte
+}
+
// Spec is a protocol specification including its name and version as well as
// the types of messages which are exchanged
type Spec struct {
@@ -201,7 +214,7 @@ func NewPeer(p *p2p.Peer, rw p2p.MsgReadWriter, spec *Spec) *Peer {
// the handler argument is a function which is called for each message received
// from the remote peer, a returned error causes the loop to exit
// resulting in disconnection
-func (p *Peer) Run(handler func(msg interface{}) error) error {
+func (p *Peer) Run(handler func(ctx context.Context, msg interface{}) error) error {
for {
if err := p.handleIncoming(handler); err != nil {
if err != io.EOF {
@@ -225,14 +238,47 @@ func (p *Peer) Drop(err error) {
// message off to the peer
// this low level call will be wrapped by libraries providing routed or broadcast sends
// but often just used to forward and push messages to directly connected peers
-func (p *Peer) Send(msg interface{}) error {
+func (p *Peer) Send(ctx context.Context, msg interface{}) error {
defer metrics.GetOrRegisterResettingTimer("peer.send_t", nil).UpdateSince(time.Now())
metrics.GetOrRegisterCounter("peer.send", nil).Inc(1)
+
+ var b bytes.Buffer
+ if tracing.Enabled {
+ writer := bufio.NewWriter(&b)
+
+ tracer := opentracing.GlobalTracer()
+
+ sctx := spancontext.FromContext(ctx)
+
+ if sctx != nil {
+ err := tracer.Inject(
+ sctx,
+ opentracing.Binary,
+ writer)
+ if err != nil {
+ return err
+ }
+ }
+
+ writer.Flush()
+ }
+
+ r, err := rlp.EncodeToBytes(msg)
+ if err != nil {
+ return err
+ }
+
+ wmsg := WrappedMsg{
+ Context: b.Bytes(),
+ Size: uint32(len(r)),
+ Payload: r,
+ }
+
code, found := p.spec.GetCode(msg)
if !found {
return errorf(ErrInvalidMsgType, "%v", code)
}
- return p2p.Send(p.rw, code, msg)
+ return p2p.Send(p.rw, code, wmsg)
}
// handleIncoming(code)
@@ -243,7 +289,7 @@ func (p *Peer) Send(msg interface{}) error {
// * checks for out-of-range message codes,
// * handles decoding with reflection,
// * call handlers as callbacks
-func (p *Peer) handleIncoming(handle func(msg interface{}) error) error {
+func (p *Peer) handleIncoming(handle func(ctx context.Context, msg interface{}) error) error {
msg, err := p.rw.ReadMsg()
if err != nil {
return err
@@ -255,11 +301,38 @@ func (p *Peer) handleIncoming(handle func(msg interface{}) error) error {
return errorf(ErrMsgTooLong, "%v > %v", msg.Size, p.spec.MaxMsgSize)
}
+ // unmarshal wrapped msg, which might contain context
+ var wmsg WrappedMsg
+ err = msg.Decode(&wmsg)
+ if err != nil {
+ log.Error(err.Error())
+ return err
+ }
+
+ ctx := context.Background()
+
+ // if tracing is enabled and the context coming within the request is
+ // not empty, try to unmarshal it
+ if tracing.Enabled && len(wmsg.Context) > 0 {
+ var sctx opentracing.SpanContext
+
+ tracer := opentracing.GlobalTracer()
+ sctx, err = tracer.Extract(
+ opentracing.Binary,
+ bytes.NewReader(wmsg.Context))
+ if err != nil {
+ log.Error(err.Error())
+ return err
+ }
+
+ ctx = spancontext.WithContext(ctx, sctx)
+ }
+
val, ok := p.spec.NewMsg(msg.Code)
if !ok {
return errorf(ErrInvalidMsgCode, "%v", msg.Code)
}
- if err := msg.Decode(val); err != nil {
+ if err := rlp.DecodeBytes(wmsg.Payload, val); err != nil {
return errorf(ErrDecode, "<= %v: %v", msg, err)
}
@@ -268,7 +341,7 @@ func (p *Peer) handleIncoming(handle func(msg interface{}) error) error {
// which the handler is supposed to cast to the appropriate type
// it is entirely safe not to check the cast in the handler since the handler is
// chosen based on the proper type in the first place
- if err := handle(val); err != nil {
+ if err := handle(ctx, val); err != nil {
return errorf(ErrHandler, "(msg code %v): %v", msg.Code, err)
}
return nil
@@ -288,14 +361,14 @@ func (p *Peer) Handshake(ctx context.Context, hs interface{}, verify func(interf
return nil, errorf(ErrHandshake, "unknown handshake message type: %T", hs)
}
errc := make(chan error, 2)
- handle := func(msg interface{}) error {
+ handle := func(ctx context.Context, msg interface{}) error {
rhs = msg
if verify != nil {
return verify(rhs)
}
return nil
}
- send := func() { errc <- p.Send(hs) }
+ send := func() { errc <- p.Send(ctx, hs) }
receive := func() { errc <- p.handleIncoming(handle) }
go func() {
diff --git a/p2p/protocols/protocol_test.go b/p2p/protocols/protocol_test.go
index aaae7502b..11df8ff39 100644
--- a/p2p/protocols/protocol_test.go
+++ b/p2p/protocols/protocol_test.go
@@ -104,7 +104,7 @@ func newProtocol(pp *p2ptest.TestPeerPool) func(*p2p.Peer, p2p.MsgReadWriter) er
return fmt.Errorf("handshake mismatch remote %v > local %v", rmhs.C, lhs.C)
}
- handle := func(msg interface{}) error {
+ handle := func(ctx context.Context, msg interface{}) error {
switch msg := msg.(type) {
case *protoHandshake:
@@ -116,7 +116,7 @@ func newProtocol(pp *p2ptest.TestPeerPool) func(*p2p.Peer, p2p.MsgReadWriter) er
return fmt.Errorf("handshake mismatch remote %v > local %v", rhs.C, lhs.C)
}
lhs.C += rhs.C
- return peer.Send(lhs)
+ return peer.Send(ctx, lhs)
case *kill:
// demonstrates use of peerPool, killing another peer connection as a response to a message