aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/protocols/protocol.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/protocols/protocol.go')
-rw-r--r--p2p/protocols/protocol.go443
1 files changed, 0 insertions, 443 deletions
diff --git a/p2p/protocols/protocol.go b/p2p/protocols/protocol.go
deleted file mode 100644
index a9a00984d..000000000
--- a/p2p/protocols/protocol.go
+++ /dev/null
@@ -1,443 +0,0 @@
-// Copyright 2017 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-/*
-Package protocols is an extension to p2p. It offers a user friendly simple way to define
-devp2p subprotocols by abstracting away code standardly shared by protocols.
-
-* automate assigments of code indexes to messages
-* automate RLP decoding/encoding based on reflecting
-* provide the forever loop to read incoming messages
-* standardise error handling related to communication
-* standardised handshake negotiation
-* TODO: automatic generation of wire protocol specification for peers
-
-*/
-package protocols
-
-import (
- "bufio"
- "bytes"
- "context"
- "fmt"
- "io"
- "reflect"
- "sync"
- "time"
-
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/rlp"
- "github.com/ethereum/go-ethereum/swarm/spancontext"
- "github.com/ethereum/go-ethereum/swarm/tracing"
- opentracing "github.com/opentracing/opentracing-go"
-)
-
-// error codes used by this protocol scheme
-const (
- ErrMsgTooLong = iota
- ErrDecode
- ErrWrite
- ErrInvalidMsgCode
- ErrInvalidMsgType
- ErrHandshake
- ErrNoHandler
- ErrHandler
-)
-
-// error description strings associated with the codes
-var errorToString = map[int]string{
- ErrMsgTooLong: "Message too long",
- ErrDecode: "Invalid message (RLP error)",
- ErrWrite: "Error sending message",
- ErrInvalidMsgCode: "Invalid message code",
- ErrInvalidMsgType: "Invalid message type",
- ErrHandshake: "Handshake error",
- ErrNoHandler: "No handler registered error",
- ErrHandler: "Message handler error",
-}
-
-/*
-Error implements the standard go error interface.
-Use:
-
- errorf(code, format, params ...interface{})
-
-Prints as:
-
- <description>: <details>
-
-where description is given by code in errorToString
-and details is fmt.Sprintf(format, params...)
-
-exported field Code can be checked
-*/
-type Error struct {
- Code int
- message string
- format string
- params []interface{}
-}
-
-func (e Error) Error() (message string) {
- if len(e.message) == 0 {
- name, ok := errorToString[e.Code]
- if !ok {
- panic("invalid message code")
- }
- e.message = name
- if e.format != "" {
- e.message += ": " + fmt.Sprintf(e.format, e.params...)
- }
- }
- return e.message
-}
-
-func errorf(code int, format string, params ...interface{}) *Error {
- return &Error{
- Code: code,
- format: format,
- params: params,
- }
-}
-
-// WrappedMsg is used to propagate marshalled context alongside message payloads
-type WrappedMsg struct {
- Context []byte
- Size uint32
- Payload []byte
-}
-
-//For accounting, the design is to allow the Spec to describe which and how its messages are priced
-//To access this functionality, we provide a Hook interface which will call accounting methods
-//NOTE: there could be more such (horizontal) hooks in the future
-type Hook interface {
- //A hook for sending messages
- Send(peer *Peer, size uint32, msg interface{}) error
- //A hook for receiving messages
- Receive(peer *Peer, size uint32, msg interface{}) error
-}
-
-// Spec is a protocol specification including its name and version as well as
-// the types of messages which are exchanged
-type Spec struct {
- // Name is the name of the protocol, often a three-letter word
- Name string
-
- // Version is the version number of the protocol
- Version uint
-
- // MaxMsgSize is the maximum accepted length of the message payload
- MaxMsgSize uint32
-
- // Messages is a list of message data types which this protocol uses, with
- // each message type being sent with its array index as the code (so
- // [&foo{}, &bar{}, &baz{}] would send foo, bar and baz with codes
- // 0, 1 and 2 respectively)
- // each message must have a single unique data type
- Messages []interface{}
-
- //hook for accounting (could be extended to multiple hooks in the future)
- Hook Hook
-
- initOnce sync.Once
- codes map[reflect.Type]uint64
- types map[uint64]reflect.Type
-}
-
-func (s *Spec) init() {
- s.initOnce.Do(func() {
- s.codes = make(map[reflect.Type]uint64, len(s.Messages))
- s.types = make(map[uint64]reflect.Type, len(s.Messages))
- for i, msg := range s.Messages {
- code := uint64(i)
- typ := reflect.TypeOf(msg)
- if typ.Kind() == reflect.Ptr {
- typ = typ.Elem()
- }
- s.codes[typ] = code
- s.types[code] = typ
- }
- })
-}
-
-// Length returns the number of message types in the protocol
-func (s *Spec) Length() uint64 {
- return uint64(len(s.Messages))
-}
-
-// GetCode returns the message code of a type, and boolean second argument is
-// false if the message type is not found
-func (s *Spec) GetCode(msg interface{}) (uint64, bool) {
- s.init()
- typ := reflect.TypeOf(msg)
- if typ.Kind() == reflect.Ptr {
- typ = typ.Elem()
- }
- code, ok := s.codes[typ]
- return code, ok
-}
-
-// NewMsg construct a new message type given the code
-func (s *Spec) NewMsg(code uint64) (interface{}, bool) {
- s.init()
- typ, ok := s.types[code]
- if !ok {
- return nil, false
- }
- return reflect.New(typ).Interface(), true
-}
-
-// Peer represents a remote peer or protocol instance that is running on a peer connection with
-// a remote peer
-type Peer struct {
- *p2p.Peer // the p2p.Peer object representing the remote
- rw p2p.MsgReadWriter // p2p.MsgReadWriter to send messages to and read messages from
- spec *Spec
-}
-
-// NewPeer constructs a new peer
-// this constructor is called by the p2p.Protocol#Run function
-// the first two arguments are the arguments passed to p2p.Protocol.Run function
-// the third argument is the Spec describing the protocol
-func NewPeer(p *p2p.Peer, rw p2p.MsgReadWriter, spec *Spec) *Peer {
- return &Peer{
- Peer: p,
- rw: rw,
- spec: spec,
- }
-}
-
-// Run starts the forever loop that handles incoming messages
-// called within the p2p.Protocol#Run function
-// the handler argument is a function which is called for each message received
-// from the remote peer, a returned error causes the loop to exit
-// resulting in disconnection
-func (p *Peer) Run(handler func(ctx context.Context, msg interface{}) error) error {
- for {
- if err := p.handleIncoming(handler); err != nil {
- if err != io.EOF {
- metrics.GetOrRegisterCounter("peer.handleincoming.error", nil).Inc(1)
- log.Error("peer.handleIncoming", "err", err)
- }
-
- return err
- }
- }
-}
-
-// Drop disconnects a peer.
-// TODO: may need to implement protocol drop only? don't want to kick off the peer
-// if they are useful for other protocols
-func (p *Peer) Drop() {
- p.Disconnect(p2p.DiscSubprotocolError)
-}
-
-// Send takes a message, encodes it in RLP, finds the right message code and sends the
-// message off to the peer
-// this low level call will be wrapped by libraries providing routed or broadcast sends
-// but often just used to forward and push messages to directly connected peers
-func (p *Peer) Send(ctx context.Context, msg interface{}) error {
- defer metrics.GetOrRegisterResettingTimer("peer.send_t", nil).UpdateSince(time.Now())
- metrics.GetOrRegisterCounter("peer.send", nil).Inc(1)
- metrics.GetOrRegisterCounter(fmt.Sprintf("peer.send.%T", msg), nil).Inc(1)
-
- var b bytes.Buffer
- if tracing.Enabled {
- writer := bufio.NewWriter(&b)
-
- tracer := opentracing.GlobalTracer()
-
- sctx := spancontext.FromContext(ctx)
-
- if sctx != nil {
- err := tracer.Inject(
- sctx,
- opentracing.Binary,
- writer)
- if err != nil {
- return err
- }
- }
-
- writer.Flush()
- }
-
- r, err := rlp.EncodeToBytes(msg)
- if err != nil {
- return err
- }
-
- wmsg := WrappedMsg{
- Context: b.Bytes(),
- Size: uint32(len(r)),
- Payload: r,
- }
-
- //if the accounting hook is set, call it
- if p.spec.Hook != nil {
- err := p.spec.Hook.Send(p, wmsg.Size, msg)
- if err != nil {
- p.Drop()
- return err
- }
- }
-
- code, found := p.spec.GetCode(msg)
- if !found {
- return errorf(ErrInvalidMsgType, "%v", code)
- }
- return p2p.Send(p.rw, code, wmsg)
-}
-
-// handleIncoming(code)
-// is called each cycle of the main forever loop that dispatches incoming messages
-// if this returns an error the loop returns and the peer is disconnected with the error
-// this generic handler
-// * checks message size,
-// * checks for out-of-range message codes,
-// * handles decoding with reflection,
-// * call handlers as callbacks
-func (p *Peer) handleIncoming(handle func(ctx context.Context, msg interface{}) error) error {
- msg, err := p.rw.ReadMsg()
- if err != nil {
- return err
- }
- // make sure that the payload has been fully consumed
- defer msg.Discard()
-
- if msg.Size > p.spec.MaxMsgSize {
- return errorf(ErrMsgTooLong, "%v > %v", msg.Size, p.spec.MaxMsgSize)
- }
-
- // unmarshal wrapped msg, which might contain context
- var wmsg WrappedMsg
- err = msg.Decode(&wmsg)
- if err != nil {
- log.Error(err.Error())
- return err
- }
-
- ctx := context.Background()
-
- // if tracing is enabled and the context coming within the request is
- // not empty, try to unmarshal it
- if tracing.Enabled && len(wmsg.Context) > 0 {
- var sctx opentracing.SpanContext
-
- tracer := opentracing.GlobalTracer()
- sctx, err = tracer.Extract(
- opentracing.Binary,
- bytes.NewReader(wmsg.Context))
- if err != nil {
- log.Error(err.Error())
- return err
- }
-
- ctx = spancontext.WithContext(ctx, sctx)
- }
-
- val, ok := p.spec.NewMsg(msg.Code)
- if !ok {
- return errorf(ErrInvalidMsgCode, "%v", msg.Code)
- }
- if err := rlp.DecodeBytes(wmsg.Payload, val); err != nil {
- return errorf(ErrDecode, "<= %v: %v", msg, err)
- }
-
- //if the accounting hook is set, call it
- if p.spec.Hook != nil {
- err := p.spec.Hook.Receive(p, wmsg.Size, val)
- if err != nil {
- return err
- }
- }
-
- // call the registered handler callbacks
- // a registered callback take the decoded message as argument as an interface
- // which the handler is supposed to cast to the appropriate type
- // it is entirely safe not to check the cast in the handler since the handler is
- // chosen based on the proper type in the first place
- if err := handle(ctx, val); err != nil {
- return errorf(ErrHandler, "(msg code %v): %v", msg.Code, err)
- }
- return nil
-}
-
-// Handshake negotiates a handshake on the peer connection
-// * arguments
-// * context
-// * the local handshake to be sent to the remote peer
-// * function to be called on the remote handshake (can be nil)
-// * expects a remote handshake back of the same type
-// * the dialing peer needs to send the handshake first and then waits for remote
-// * the listening peer waits for the remote handshake and then sends it
-// returns the remote handshake and an error
-func (p *Peer) Handshake(ctx context.Context, hs interface{}, verify func(interface{}) error) (interface{}, error) {
- if _, ok := p.spec.GetCode(hs); !ok {
- return nil, errorf(ErrHandshake, "unknown handshake message type: %T", hs)
- }
-
- var rhs interface{}
- errc := make(chan error, 2)
- handle := func(ctx context.Context, msg interface{}) error {
- rhs = msg
- if verify != nil {
- return verify(rhs)
- }
- return nil
- }
- send := func() { errc <- p.Send(ctx, hs) }
- receive := func() { errc <- p.handleIncoming(handle) }
-
- go func() {
- if p.Inbound() {
- receive()
- send()
- } else {
- send()
- receive()
- }
- }()
-
- for i := 0; i < 2; i++ {
- var err error
- select {
- case err = <-errc:
- case <-ctx.Done():
- err = ctx.Err()
- }
- if err != nil {
- return nil, errorf(ErrHandshake, err.Error())
- }
- }
- return rhs, nil
-}
-
-// HasCap returns true if Peer has a capability
-// with provided name.
-func (p *Peer) HasCap(capName string) (yes bool) {
- if p == nil || p.Peer == nil {
- return false
- }
- for _, c := range p.Caps() {
- if c.Name == capName {
- return true
- }
- }
- return false
-}