aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/protocols
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/protocols')
-rw-r--r--p2p/protocols/protocol.go311
-rw-r--r--p2p/protocols/protocol_test.go389
2 files changed, 700 insertions, 0 deletions
diff --git a/p2p/protocols/protocol.go b/p2p/protocols/protocol.go
new file mode 100644
index 000000000..9914c9958
--- /dev/null
+++ b/p2p/protocols/protocol.go
@@ -0,0 +1,311 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+/*
+Package protocols is an extension to p2p. It offers a user friendly simple way to define
+devp2p subprotocols by abstracting away code standardly shared by protocols.
+
+* automate assigments of code indexes to messages
+* automate RLP decoding/encoding based on reflecting
+* provide the forever loop to read incoming messages
+* standardise error handling related to communication
+* standardised handshake negotiation
+* TODO: automatic generation of wire protocol specification for peers
+
+*/
+package protocols
+
+import (
+ "context"
+ "fmt"
+ "reflect"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/p2p"
+)
+
+// error codes used by this protocol scheme
+const (
+ ErrMsgTooLong = iota
+ ErrDecode
+ ErrWrite
+ ErrInvalidMsgCode
+ ErrInvalidMsgType
+ ErrHandshake
+ ErrNoHandler
+ ErrHandler
+)
+
+// error description strings associated with the codes
+var errorToString = map[int]string{
+ ErrMsgTooLong: "Message too long",
+ ErrDecode: "Invalid message (RLP error)",
+ ErrWrite: "Error sending message",
+ ErrInvalidMsgCode: "Invalid message code",
+ ErrInvalidMsgType: "Invalid message type",
+ ErrHandshake: "Handshake error",
+ ErrNoHandler: "No handler registered error",
+ ErrHandler: "Message handler error",
+}
+
+/*
+Error implements the standard go error interface.
+Use:
+
+ errorf(code, format, params ...interface{})
+
+Prints as:
+
+ <description>: <details>
+
+where description is given by code in errorToString
+and details is fmt.Sprintf(format, params...)
+
+exported field Code can be checked
+*/
+type Error struct {
+ Code int
+ message string
+ format string
+ params []interface{}
+}
+
+func (e Error) Error() (message string) {
+ if len(e.message) == 0 {
+ name, ok := errorToString[e.Code]
+ if !ok {
+ panic("invalid message code")
+ }
+ e.message = name
+ if e.format != "" {
+ e.message += ": " + fmt.Sprintf(e.format, e.params...)
+ }
+ }
+ return e.message
+}
+
+func errorf(code int, format string, params ...interface{}) *Error {
+ return &Error{
+ Code: code,
+ format: format,
+ params: params,
+ }
+}
+
+// Spec is a protocol specification including its name and version as well as
+// the types of messages which are exchanged
+type Spec struct {
+ // Name is the name of the protocol, often a three-letter word
+ Name string
+
+ // Version is the version number of the protocol
+ Version uint
+
+ // MaxMsgSize is the maximum accepted length of the message payload
+ MaxMsgSize uint32
+
+ // Messages is a list of message data types which this protocol uses, with
+ // each message type being sent with its array index as the code (so
+ // [&foo{}, &bar{}, &baz{}] would send foo, bar and baz with codes
+ // 0, 1 and 2 respectively)
+ // each message must have a single unique data type
+ Messages []interface{}
+
+ initOnce sync.Once
+ codes map[reflect.Type]uint64
+ types map[uint64]reflect.Type
+}
+
+func (s *Spec) init() {
+ s.initOnce.Do(func() {
+ s.codes = make(map[reflect.Type]uint64, len(s.Messages))
+ s.types = make(map[uint64]reflect.Type, len(s.Messages))
+ for i, msg := range s.Messages {
+ code := uint64(i)
+ typ := reflect.TypeOf(msg)
+ if typ.Kind() == reflect.Ptr {
+ typ = typ.Elem()
+ }
+ s.codes[typ] = code
+ s.types[code] = typ
+ }
+ })
+}
+
+// Length returns the number of message types in the protocol
+func (s *Spec) Length() uint64 {
+ return uint64(len(s.Messages))
+}
+
+// GetCode returns the message code of a type, and boolean second argument is
+// false if the message type is not found
+func (s *Spec) GetCode(msg interface{}) (uint64, bool) {
+ s.init()
+ typ := reflect.TypeOf(msg)
+ if typ.Kind() == reflect.Ptr {
+ typ = typ.Elem()
+ }
+ code, ok := s.codes[typ]
+ return code, ok
+}
+
+// NewMsg construct a new message type given the code
+func (s *Spec) NewMsg(code uint64) (interface{}, bool) {
+ s.init()
+ typ, ok := s.types[code]
+ if !ok {
+ return nil, false
+ }
+ return reflect.New(typ).Interface(), true
+}
+
+// Peer represents a remote peer or protocol instance that is running on a peer connection with
+// a remote peer
+type Peer struct {
+ *p2p.Peer // the p2p.Peer object representing the remote
+ rw p2p.MsgReadWriter // p2p.MsgReadWriter to send messages to and read messages from
+ spec *Spec
+}
+
+// NewPeer constructs a new peer
+// this constructor is called by the p2p.Protocol#Run function
+// the first two arguments are the arguments passed to p2p.Protocol.Run function
+// the third argument is the Spec describing the protocol
+func NewPeer(p *p2p.Peer, rw p2p.MsgReadWriter, spec *Spec) *Peer {
+ return &Peer{
+ Peer: p,
+ rw: rw,
+ spec: spec,
+ }
+}
+
+// Run starts the forever loop that handles incoming messages
+// called within the p2p.Protocol#Run function
+// the handler argument is a function which is called for each message received
+// from the remote peer, a returned error causes the loop to exit
+// resulting in disconnection
+func (p *Peer) Run(handler func(msg interface{}) error) error {
+ for {
+ if err := p.handleIncoming(handler); err != nil {
+ return err
+ }
+ }
+}
+
+// Drop disconnects a peer.
+// TODO: may need to implement protocol drop only? don't want to kick off the peer
+// if they are useful for other protocols
+func (p *Peer) Drop(err error) {
+ p.Disconnect(p2p.DiscSubprotocolError)
+}
+
+// Send takes a message, encodes it in RLP, finds the right message code and sends the
+// message off to the peer
+// this low level call will be wrapped by libraries providing routed or broadcast sends
+// but often just used to forward and push messages to directly connected peers
+func (p *Peer) Send(msg interface{}) error {
+ code, found := p.spec.GetCode(msg)
+ if !found {
+ return errorf(ErrInvalidMsgType, "%v", code)
+ }
+ return p2p.Send(p.rw, code, msg)
+}
+
+// handleIncoming(code)
+// is called each cycle of the main forever loop that dispatches incoming messages
+// if this returns an error the loop returns and the peer is disconnected with the error
+// this generic handler
+// * checks message size,
+// * checks for out-of-range message codes,
+// * handles decoding with reflection,
+// * call handlers as callbacks
+func (p *Peer) handleIncoming(handle func(msg interface{}) error) error {
+ msg, err := p.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ // make sure that the payload has been fully consumed
+ defer msg.Discard()
+
+ if msg.Size > p.spec.MaxMsgSize {
+ return errorf(ErrMsgTooLong, "%v > %v", msg.Size, p.spec.MaxMsgSize)
+ }
+
+ val, ok := p.spec.NewMsg(msg.Code)
+ if !ok {
+ return errorf(ErrInvalidMsgCode, "%v", msg.Code)
+ }
+ if err := msg.Decode(val); err != nil {
+ return errorf(ErrDecode, "<= %v: %v", msg, err)
+ }
+
+ // call the registered handler callbacks
+ // a registered callback take the decoded message as argument as an interface
+ // which the handler is supposed to cast to the appropriate type
+ // it is entirely safe not to check the cast in the handler since the handler is
+ // chosen based on the proper type in the first place
+ if err := handle(val); err != nil {
+ return errorf(ErrHandler, "(msg code %v): %v", msg.Code, err)
+ }
+ return nil
+}
+
+// Handshake negotiates a handshake on the peer connection
+// * arguments
+// * context
+// * the local handshake to be sent to the remote peer
+// * funcion to be called on the remote handshake (can be nil)
+// * expects a remote handshake back of the same type
+// * the dialing peer needs to send the handshake first and then waits for remote
+// * the listening peer waits for the remote handshake and then sends it
+// returns the remote handshake and an error
+func (p *Peer) Handshake(ctx context.Context, hs interface{}, verify func(interface{}) error) (rhs interface{}, err error) {
+ if _, ok := p.spec.GetCode(hs); !ok {
+ return nil, errorf(ErrHandshake, "unknown handshake message type: %T", hs)
+ }
+ errc := make(chan error, 2)
+ handle := func(msg interface{}) error {
+ rhs = msg
+ if verify != nil {
+ return verify(rhs)
+ }
+ return nil
+ }
+ send := func() { errc <- p.Send(hs) }
+ receive := func() { errc <- p.handleIncoming(handle) }
+
+ go func() {
+ if p.Inbound() {
+ receive()
+ send()
+ } else {
+ send()
+ receive()
+ }
+ }()
+
+ for i := 0; i < 2; i++ {
+ select {
+ case err = <-errc:
+ case <-ctx.Done():
+ err = ctx.Err()
+ }
+ if err != nil {
+ return nil, errorf(ErrHandshake, err.Error())
+ }
+ }
+ return rhs, nil
+}
diff --git a/p2p/protocols/protocol_test.go b/p2p/protocols/protocol_test.go
new file mode 100644
index 000000000..053f537a6
--- /dev/null
+++ b/p2p/protocols/protocol_test.go
@@ -0,0 +1,389 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package protocols
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+ p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
+)
+
+// handshake message type
+type hs0 struct {
+ C uint
+}
+
+// message to kill/drop the peer with nodeID
+type kill struct {
+ C discover.NodeID
+}
+
+// message to drop connection
+type drop struct {
+}
+
+/// protoHandshake represents module-independent aspects of the protocol and is
+// the first message peers send and receive as part the initial exchange
+type protoHandshake struct {
+ Version uint // local and remote peer should have identical version
+ NetworkID string // local and remote peer should have identical network id
+}
+
+// checkProtoHandshake verifies local and remote protoHandshakes match
+func checkProtoHandshake(testVersion uint, testNetworkID string) func(interface{}) error {
+ return func(rhs interface{}) error {
+ remote := rhs.(*protoHandshake)
+ if remote.NetworkID != testNetworkID {
+ return fmt.Errorf("%s (!= %s)", remote.NetworkID, testNetworkID)
+ }
+
+ if remote.Version != testVersion {
+ return fmt.Errorf("%d (!= %d)", remote.Version, testVersion)
+ }
+ return nil
+ }
+}
+
+// newProtocol sets up a protocol
+// the run function here demonstrates a typical protocol using peerPool, handshake
+// and messages registered to handlers
+func newProtocol(pp *p2ptest.TestPeerPool) func(*p2p.Peer, p2p.MsgReadWriter) error {
+ spec := &Spec{
+ Name: "test",
+ Version: 42,
+ MaxMsgSize: 10 * 1024,
+ Messages: []interface{}{
+ protoHandshake{},
+ hs0{},
+ kill{},
+ drop{},
+ },
+ }
+ return func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ peer := NewPeer(p, rw, spec)
+
+ // initiate one-off protohandshake and check validity
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+ phs := &protoHandshake{42, "420"}
+ hsCheck := checkProtoHandshake(phs.Version, phs.NetworkID)
+ _, err := peer.Handshake(ctx, phs, hsCheck)
+ if err != nil {
+ return err
+ }
+
+ lhs := &hs0{42}
+ // module handshake demonstrating a simple repeatable exchange of same-type message
+ hs, err := peer.Handshake(ctx, lhs, nil)
+ if err != nil {
+ return err
+ }
+
+ if rmhs := hs.(*hs0); rmhs.C > lhs.C {
+ return fmt.Errorf("handshake mismatch remote %v > local %v", rmhs.C, lhs.C)
+ }
+
+ handle := func(msg interface{}) error {
+ switch msg := msg.(type) {
+
+ case *protoHandshake:
+ return errors.New("duplicate handshake")
+
+ case *hs0:
+ rhs := msg
+ if rhs.C > lhs.C {
+ return fmt.Errorf("handshake mismatch remote %v > local %v", rhs.C, lhs.C)
+ }
+ lhs.C += rhs.C
+ return peer.Send(lhs)
+
+ case *kill:
+ // demonstrates use of peerPool, killing another peer connection as a response to a message
+ id := msg.C
+ pp.Get(id).Drop(errors.New("killed"))
+ return nil
+
+ case *drop:
+ // for testing we can trigger self induced disconnect upon receiving drop message
+ return errors.New("dropped")
+
+ default:
+ return fmt.Errorf("unknown message type: %T", msg)
+ }
+ }
+
+ pp.Add(peer)
+ defer pp.Remove(peer)
+ return peer.Run(handle)
+ }
+}
+
+func protocolTester(t *testing.T, pp *p2ptest.TestPeerPool) *p2ptest.ProtocolTester {
+ conf := adapters.RandomNodeConfig()
+ return p2ptest.NewProtocolTester(t, conf.ID, 2, newProtocol(pp))
+}
+
+func protoHandshakeExchange(id discover.NodeID, proto *protoHandshake) []p2ptest.Exchange {
+
+ return []p2ptest.Exchange{
+ {
+ Expects: []p2ptest.Expect{
+ {
+ Code: 0,
+ Msg: &protoHandshake{42, "420"},
+ Peer: id,
+ },
+ },
+ },
+ {
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 0,
+ Msg: proto,
+ Peer: id,
+ },
+ },
+ },
+ }
+}
+
+func runProtoHandshake(t *testing.T, proto *protoHandshake, errs ...error) {
+ pp := p2ptest.NewTestPeerPool()
+ s := protocolTester(t, pp)
+ // TODO: make this more than one handshake
+ id := s.IDs[0]
+ if err := s.TestExchanges(protoHandshakeExchange(id, proto)...); err != nil {
+ t.Fatal(err)
+ }
+ var disconnects []*p2ptest.Disconnect
+ for i, err := range errs {
+ disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.IDs[i], Error: err})
+ }
+ if err := s.TestDisconnected(disconnects...); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestProtoHandshakeVersionMismatch(t *testing.T) {
+ runProtoHandshake(t, &protoHandshake{41, "420"}, errorf(ErrHandshake, errorf(ErrHandler, "(msg code 0): 41 (!= 42)").Error()))
+}
+
+func TestProtoHandshakeNetworkIDMismatch(t *testing.T) {
+ runProtoHandshake(t, &protoHandshake{42, "421"}, errorf(ErrHandshake, errorf(ErrHandler, "(msg code 0): 421 (!= 420)").Error()))
+}
+
+func TestProtoHandshakeSuccess(t *testing.T) {
+ runProtoHandshake(t, &protoHandshake{42, "420"})
+}
+
+func moduleHandshakeExchange(id discover.NodeID, resp uint) []p2ptest.Exchange {
+
+ return []p2ptest.Exchange{
+ {
+ Expects: []p2ptest.Expect{
+ {
+ Code: 1,
+ Msg: &hs0{42},
+ Peer: id,
+ },
+ },
+ },
+ {
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 1,
+ Msg: &hs0{resp},
+ Peer: id,
+ },
+ },
+ },
+ }
+}
+
+func runModuleHandshake(t *testing.T, resp uint, errs ...error) {
+ pp := p2ptest.NewTestPeerPool()
+ s := protocolTester(t, pp)
+ id := s.IDs[0]
+ if err := s.TestExchanges(protoHandshakeExchange(id, &protoHandshake{42, "420"})...); err != nil {
+ t.Fatal(err)
+ }
+ if err := s.TestExchanges(moduleHandshakeExchange(id, resp)...); err != nil {
+ t.Fatal(err)
+ }
+ var disconnects []*p2ptest.Disconnect
+ for i, err := range errs {
+ disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.IDs[i], Error: err})
+ }
+ if err := s.TestDisconnected(disconnects...); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestModuleHandshakeError(t *testing.T) {
+ runModuleHandshake(t, 43, fmt.Errorf("handshake mismatch remote 43 > local 42"))
+}
+
+func TestModuleHandshakeSuccess(t *testing.T) {
+ runModuleHandshake(t, 42)
+}
+
+// testing complex interactions over multiple peers, relaying, dropping
+func testMultiPeerSetup(a, b discover.NodeID) []p2ptest.Exchange {
+
+ return []p2ptest.Exchange{
+ {
+ Label: "primary handshake",
+ Expects: []p2ptest.Expect{
+ {
+ Code: 0,
+ Msg: &protoHandshake{42, "420"},
+ Peer: a,
+ },
+ {
+ Code: 0,
+ Msg: &protoHandshake{42, "420"},
+ Peer: b,
+ },
+ },
+ },
+ {
+ Label: "module handshake",
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 0,
+ Msg: &protoHandshake{42, "420"},
+ Peer: a,
+ },
+ {
+ Code: 0,
+ Msg: &protoHandshake{42, "420"},
+ Peer: b,
+ },
+ },
+ Expects: []p2ptest.Expect{
+ {
+ Code: 1,
+ Msg: &hs0{42},
+ Peer: a,
+ },
+ {
+ Code: 1,
+ Msg: &hs0{42},
+ Peer: b,
+ },
+ },
+ },
+
+ {Label: "alternative module handshake", Triggers: []p2ptest.Trigger{{Code: 1, Msg: &hs0{41}, Peer: a},
+ {Code: 1, Msg: &hs0{41}, Peer: b}}},
+ {Label: "repeated module handshake", Triggers: []p2ptest.Trigger{{Code: 1, Msg: &hs0{1}, Peer: a}}},
+ {Label: "receiving repeated module handshake", Expects: []p2ptest.Expect{{Code: 1, Msg: &hs0{43}, Peer: a}}}}
+}
+
+func runMultiplePeers(t *testing.T, peer int, errs ...error) {
+ pp := p2ptest.NewTestPeerPool()
+ s := protocolTester(t, pp)
+
+ if err := s.TestExchanges(testMultiPeerSetup(s.IDs[0], s.IDs[1])...); err != nil {
+ t.Fatal(err)
+ }
+ // after some exchanges of messages, we can test state changes
+ // here this is simply demonstrated by the peerPool
+ // after the handshake negotiations peers must be added to the pool
+ // time.Sleep(1)
+ tick := time.NewTicker(10 * time.Millisecond)
+ timeout := time.NewTimer(1 * time.Second)
+WAIT:
+ for {
+ select {
+ case <-tick.C:
+ if pp.Has(s.IDs[0]) {
+ break WAIT
+ }
+ case <-timeout.C:
+ t.Fatal("timeout")
+ }
+ }
+ if !pp.Has(s.IDs[1]) {
+ t.Fatalf("missing peer test-1: %v (%v)", pp, s.IDs)
+ }
+
+ // peer 0 sends kill request for peer with index <peer>
+ err := s.TestExchanges(p2ptest.Exchange{
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 2,
+ Msg: &kill{s.IDs[peer]},
+ Peer: s.IDs[0],
+ },
+ },
+ })
+
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // the peer not killed sends a drop request
+ err = s.TestExchanges(p2ptest.Exchange{
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 3,
+ Msg: &drop{},
+ Peer: s.IDs[(peer+1)%2],
+ },
+ },
+ })
+
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // check the actual discconnect errors on the individual peers
+ var disconnects []*p2ptest.Disconnect
+ for i, err := range errs {
+ disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.IDs[i], Error: err})
+ }
+ if err := s.TestDisconnected(disconnects...); err != nil {
+ t.Fatal(err)
+ }
+ // test if disconnected peers have been removed from peerPool
+ if pp.Has(s.IDs[peer]) {
+ t.Fatalf("peer test-%v not dropped: %v (%v)", peer, pp, s.IDs)
+ }
+
+}
+
+func TestMultiplePeersDropSelf(t *testing.T) {
+ runMultiplePeers(t, 0,
+ fmt.Errorf("subprotocol error"),
+ fmt.Errorf("Message handler error: (msg code 3): dropped"),
+ )
+}
+
+func TestMultiplePeersDropOther(t *testing.T) {
+ runMultiplePeers(t, 1,
+ fmt.Errorf("Message handler error: (msg code 3): dropped"),
+ fmt.Errorf("subprotocol error"),
+ )
+}