diff options
Diffstat (limited to 'p2p/protocols')
-rw-r--r-- | p2p/protocols/protocol.go | 311 | ||||
-rw-r--r-- | p2p/protocols/protocol_test.go | 389 |
2 files changed, 700 insertions, 0 deletions
diff --git a/p2p/protocols/protocol.go b/p2p/protocols/protocol.go new file mode 100644 index 000000000..9914c9958 --- /dev/null +++ b/p2p/protocols/protocol.go @@ -0,0 +1,311 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +/* +Package protocols is an extension to p2p. It offers a user friendly simple way to define +devp2p subprotocols by abstracting away code standardly shared by protocols. + +* automate assigments of code indexes to messages +* automate RLP decoding/encoding based on reflecting +* provide the forever loop to read incoming messages +* standardise error handling related to communication +* standardised handshake negotiation +* TODO: automatic generation of wire protocol specification for peers + +*/ +package protocols + +import ( + "context" + "fmt" + "reflect" + "sync" + + "github.com/ethereum/go-ethereum/p2p" +) + +// error codes used by this protocol scheme +const ( + ErrMsgTooLong = iota + ErrDecode + ErrWrite + ErrInvalidMsgCode + ErrInvalidMsgType + ErrHandshake + ErrNoHandler + ErrHandler +) + +// error description strings associated with the codes +var errorToString = map[int]string{ + ErrMsgTooLong: "Message too long", + ErrDecode: "Invalid message (RLP error)", + ErrWrite: "Error sending message", + ErrInvalidMsgCode: "Invalid message code", + ErrInvalidMsgType: "Invalid message type", + ErrHandshake: "Handshake error", + ErrNoHandler: "No handler registered error", + ErrHandler: "Message handler error", +} + +/* +Error implements the standard go error interface. +Use: + + errorf(code, format, params ...interface{}) + +Prints as: + + <description>: <details> + +where description is given by code in errorToString +and details is fmt.Sprintf(format, params...) + +exported field Code can be checked +*/ +type Error struct { + Code int + message string + format string + params []interface{} +} + +func (e Error) Error() (message string) { + if len(e.message) == 0 { + name, ok := errorToString[e.Code] + if !ok { + panic("invalid message code") + } + e.message = name + if e.format != "" { + e.message += ": " + fmt.Sprintf(e.format, e.params...) + } + } + return e.message +} + +func errorf(code int, format string, params ...interface{}) *Error { + return &Error{ + Code: code, + format: format, + params: params, + } +} + +// Spec is a protocol specification including its name and version as well as +// the types of messages which are exchanged +type Spec struct { + // Name is the name of the protocol, often a three-letter word + Name string + + // Version is the version number of the protocol + Version uint + + // MaxMsgSize is the maximum accepted length of the message payload + MaxMsgSize uint32 + + // Messages is a list of message data types which this protocol uses, with + // each message type being sent with its array index as the code (so + // [&foo{}, &bar{}, &baz{}] would send foo, bar and baz with codes + // 0, 1 and 2 respectively) + // each message must have a single unique data type + Messages []interface{} + + initOnce sync.Once + codes map[reflect.Type]uint64 + types map[uint64]reflect.Type +} + +func (s *Spec) init() { + s.initOnce.Do(func() { + s.codes = make(map[reflect.Type]uint64, len(s.Messages)) + s.types = make(map[uint64]reflect.Type, len(s.Messages)) + for i, msg := range s.Messages { + code := uint64(i) + typ := reflect.TypeOf(msg) + if typ.Kind() == reflect.Ptr { + typ = typ.Elem() + } + s.codes[typ] = code + s.types[code] = typ + } + }) +} + +// Length returns the number of message types in the protocol +func (s *Spec) Length() uint64 { + return uint64(len(s.Messages)) +} + +// GetCode returns the message code of a type, and boolean second argument is +// false if the message type is not found +func (s *Spec) GetCode(msg interface{}) (uint64, bool) { + s.init() + typ := reflect.TypeOf(msg) + if typ.Kind() == reflect.Ptr { + typ = typ.Elem() + } + code, ok := s.codes[typ] + return code, ok +} + +// NewMsg construct a new message type given the code +func (s *Spec) NewMsg(code uint64) (interface{}, bool) { + s.init() + typ, ok := s.types[code] + if !ok { + return nil, false + } + return reflect.New(typ).Interface(), true +} + +// Peer represents a remote peer or protocol instance that is running on a peer connection with +// a remote peer +type Peer struct { + *p2p.Peer // the p2p.Peer object representing the remote + rw p2p.MsgReadWriter // p2p.MsgReadWriter to send messages to and read messages from + spec *Spec +} + +// NewPeer constructs a new peer +// this constructor is called by the p2p.Protocol#Run function +// the first two arguments are the arguments passed to p2p.Protocol.Run function +// the third argument is the Spec describing the protocol +func NewPeer(p *p2p.Peer, rw p2p.MsgReadWriter, spec *Spec) *Peer { + return &Peer{ + Peer: p, + rw: rw, + spec: spec, + } +} + +// Run starts the forever loop that handles incoming messages +// called within the p2p.Protocol#Run function +// the handler argument is a function which is called for each message received +// from the remote peer, a returned error causes the loop to exit +// resulting in disconnection +func (p *Peer) Run(handler func(msg interface{}) error) error { + for { + if err := p.handleIncoming(handler); err != nil { + return err + } + } +} + +// Drop disconnects a peer. +// TODO: may need to implement protocol drop only? don't want to kick off the peer +// if they are useful for other protocols +func (p *Peer) Drop(err error) { + p.Disconnect(p2p.DiscSubprotocolError) +} + +// Send takes a message, encodes it in RLP, finds the right message code and sends the +// message off to the peer +// this low level call will be wrapped by libraries providing routed or broadcast sends +// but often just used to forward and push messages to directly connected peers +func (p *Peer) Send(msg interface{}) error { + code, found := p.spec.GetCode(msg) + if !found { + return errorf(ErrInvalidMsgType, "%v", code) + } + return p2p.Send(p.rw, code, msg) +} + +// handleIncoming(code) +// is called each cycle of the main forever loop that dispatches incoming messages +// if this returns an error the loop returns and the peer is disconnected with the error +// this generic handler +// * checks message size, +// * checks for out-of-range message codes, +// * handles decoding with reflection, +// * call handlers as callbacks +func (p *Peer) handleIncoming(handle func(msg interface{}) error) error { + msg, err := p.rw.ReadMsg() + if err != nil { + return err + } + // make sure that the payload has been fully consumed + defer msg.Discard() + + if msg.Size > p.spec.MaxMsgSize { + return errorf(ErrMsgTooLong, "%v > %v", msg.Size, p.spec.MaxMsgSize) + } + + val, ok := p.spec.NewMsg(msg.Code) + if !ok { + return errorf(ErrInvalidMsgCode, "%v", msg.Code) + } + if err := msg.Decode(val); err != nil { + return errorf(ErrDecode, "<= %v: %v", msg, err) + } + + // call the registered handler callbacks + // a registered callback take the decoded message as argument as an interface + // which the handler is supposed to cast to the appropriate type + // it is entirely safe not to check the cast in the handler since the handler is + // chosen based on the proper type in the first place + if err := handle(val); err != nil { + return errorf(ErrHandler, "(msg code %v): %v", msg.Code, err) + } + return nil +} + +// Handshake negotiates a handshake on the peer connection +// * arguments +// * context +// * the local handshake to be sent to the remote peer +// * funcion to be called on the remote handshake (can be nil) +// * expects a remote handshake back of the same type +// * the dialing peer needs to send the handshake first and then waits for remote +// * the listening peer waits for the remote handshake and then sends it +// returns the remote handshake and an error +func (p *Peer) Handshake(ctx context.Context, hs interface{}, verify func(interface{}) error) (rhs interface{}, err error) { + if _, ok := p.spec.GetCode(hs); !ok { + return nil, errorf(ErrHandshake, "unknown handshake message type: %T", hs) + } + errc := make(chan error, 2) + handle := func(msg interface{}) error { + rhs = msg + if verify != nil { + return verify(rhs) + } + return nil + } + send := func() { errc <- p.Send(hs) } + receive := func() { errc <- p.handleIncoming(handle) } + + go func() { + if p.Inbound() { + receive() + send() + } else { + send() + receive() + } + }() + + for i := 0; i < 2; i++ { + select { + case err = <-errc: + case <-ctx.Done(): + err = ctx.Err() + } + if err != nil { + return nil, errorf(ErrHandshake, err.Error()) + } + } + return rhs, nil +} diff --git a/p2p/protocols/protocol_test.go b/p2p/protocols/protocol_test.go new file mode 100644 index 000000000..053f537a6 --- /dev/null +++ b/p2p/protocols/protocol_test.go @@ -0,0 +1,389 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package protocols + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + p2ptest "github.com/ethereum/go-ethereum/p2p/testing" +) + +// handshake message type +type hs0 struct { + C uint +} + +// message to kill/drop the peer with nodeID +type kill struct { + C discover.NodeID +} + +// message to drop connection +type drop struct { +} + +/// protoHandshake represents module-independent aspects of the protocol and is +// the first message peers send and receive as part the initial exchange +type protoHandshake struct { + Version uint // local and remote peer should have identical version + NetworkID string // local and remote peer should have identical network id +} + +// checkProtoHandshake verifies local and remote protoHandshakes match +func checkProtoHandshake(testVersion uint, testNetworkID string) func(interface{}) error { + return func(rhs interface{}) error { + remote := rhs.(*protoHandshake) + if remote.NetworkID != testNetworkID { + return fmt.Errorf("%s (!= %s)", remote.NetworkID, testNetworkID) + } + + if remote.Version != testVersion { + return fmt.Errorf("%d (!= %d)", remote.Version, testVersion) + } + return nil + } +} + +// newProtocol sets up a protocol +// the run function here demonstrates a typical protocol using peerPool, handshake +// and messages registered to handlers +func newProtocol(pp *p2ptest.TestPeerPool) func(*p2p.Peer, p2p.MsgReadWriter) error { + spec := &Spec{ + Name: "test", + Version: 42, + MaxMsgSize: 10 * 1024, + Messages: []interface{}{ + protoHandshake{}, + hs0{}, + kill{}, + drop{}, + }, + } + return func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := NewPeer(p, rw, spec) + + // initiate one-off protohandshake and check validity + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + phs := &protoHandshake{42, "420"} + hsCheck := checkProtoHandshake(phs.Version, phs.NetworkID) + _, err := peer.Handshake(ctx, phs, hsCheck) + if err != nil { + return err + } + + lhs := &hs0{42} + // module handshake demonstrating a simple repeatable exchange of same-type message + hs, err := peer.Handshake(ctx, lhs, nil) + if err != nil { + return err + } + + if rmhs := hs.(*hs0); rmhs.C > lhs.C { + return fmt.Errorf("handshake mismatch remote %v > local %v", rmhs.C, lhs.C) + } + + handle := func(msg interface{}) error { + switch msg := msg.(type) { + + case *protoHandshake: + return errors.New("duplicate handshake") + + case *hs0: + rhs := msg + if rhs.C > lhs.C { + return fmt.Errorf("handshake mismatch remote %v > local %v", rhs.C, lhs.C) + } + lhs.C += rhs.C + return peer.Send(lhs) + + case *kill: + // demonstrates use of peerPool, killing another peer connection as a response to a message + id := msg.C + pp.Get(id).Drop(errors.New("killed")) + return nil + + case *drop: + // for testing we can trigger self induced disconnect upon receiving drop message + return errors.New("dropped") + + default: + return fmt.Errorf("unknown message type: %T", msg) + } + } + + pp.Add(peer) + defer pp.Remove(peer) + return peer.Run(handle) + } +} + +func protocolTester(t *testing.T, pp *p2ptest.TestPeerPool) *p2ptest.ProtocolTester { + conf := adapters.RandomNodeConfig() + return p2ptest.NewProtocolTester(t, conf.ID, 2, newProtocol(pp)) +} + +func protoHandshakeExchange(id discover.NodeID, proto *protoHandshake) []p2ptest.Exchange { + + return []p2ptest.Exchange{ + { + Expects: []p2ptest.Expect{ + { + Code: 0, + Msg: &protoHandshake{42, "420"}, + Peer: id, + }, + }, + }, + { + Triggers: []p2ptest.Trigger{ + { + Code: 0, + Msg: proto, + Peer: id, + }, + }, + }, + } +} + +func runProtoHandshake(t *testing.T, proto *protoHandshake, errs ...error) { + pp := p2ptest.NewTestPeerPool() + s := protocolTester(t, pp) + // TODO: make this more than one handshake + id := s.IDs[0] + if err := s.TestExchanges(protoHandshakeExchange(id, proto)...); err != nil { + t.Fatal(err) + } + var disconnects []*p2ptest.Disconnect + for i, err := range errs { + disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.IDs[i], Error: err}) + } + if err := s.TestDisconnected(disconnects...); err != nil { + t.Fatal(err) + } +} + +func TestProtoHandshakeVersionMismatch(t *testing.T) { + runProtoHandshake(t, &protoHandshake{41, "420"}, errorf(ErrHandshake, errorf(ErrHandler, "(msg code 0): 41 (!= 42)").Error())) +} + +func TestProtoHandshakeNetworkIDMismatch(t *testing.T) { + runProtoHandshake(t, &protoHandshake{42, "421"}, errorf(ErrHandshake, errorf(ErrHandler, "(msg code 0): 421 (!= 420)").Error())) +} + +func TestProtoHandshakeSuccess(t *testing.T) { + runProtoHandshake(t, &protoHandshake{42, "420"}) +} + +func moduleHandshakeExchange(id discover.NodeID, resp uint) []p2ptest.Exchange { + + return []p2ptest.Exchange{ + { + Expects: []p2ptest.Expect{ + { + Code: 1, + Msg: &hs0{42}, + Peer: id, + }, + }, + }, + { + Triggers: []p2ptest.Trigger{ + { + Code: 1, + Msg: &hs0{resp}, + Peer: id, + }, + }, + }, + } +} + +func runModuleHandshake(t *testing.T, resp uint, errs ...error) { + pp := p2ptest.NewTestPeerPool() + s := protocolTester(t, pp) + id := s.IDs[0] + if err := s.TestExchanges(protoHandshakeExchange(id, &protoHandshake{42, "420"})...); err != nil { + t.Fatal(err) + } + if err := s.TestExchanges(moduleHandshakeExchange(id, resp)...); err != nil { + t.Fatal(err) + } + var disconnects []*p2ptest.Disconnect + for i, err := range errs { + disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.IDs[i], Error: err}) + } + if err := s.TestDisconnected(disconnects...); err != nil { + t.Fatal(err) + } +} + +func TestModuleHandshakeError(t *testing.T) { + runModuleHandshake(t, 43, fmt.Errorf("handshake mismatch remote 43 > local 42")) +} + +func TestModuleHandshakeSuccess(t *testing.T) { + runModuleHandshake(t, 42) +} + +// testing complex interactions over multiple peers, relaying, dropping +func testMultiPeerSetup(a, b discover.NodeID) []p2ptest.Exchange { + + return []p2ptest.Exchange{ + { + Label: "primary handshake", + Expects: []p2ptest.Expect{ + { + Code: 0, + Msg: &protoHandshake{42, "420"}, + Peer: a, + }, + { + Code: 0, + Msg: &protoHandshake{42, "420"}, + Peer: b, + }, + }, + }, + { + Label: "module handshake", + Triggers: []p2ptest.Trigger{ + { + Code: 0, + Msg: &protoHandshake{42, "420"}, + Peer: a, + }, + { + Code: 0, + Msg: &protoHandshake{42, "420"}, + Peer: b, + }, + }, + Expects: []p2ptest.Expect{ + { + Code: 1, + Msg: &hs0{42}, + Peer: a, + }, + { + Code: 1, + Msg: &hs0{42}, + Peer: b, + }, + }, + }, + + {Label: "alternative module handshake", Triggers: []p2ptest.Trigger{{Code: 1, Msg: &hs0{41}, Peer: a}, + {Code: 1, Msg: &hs0{41}, Peer: b}}}, + {Label: "repeated module handshake", Triggers: []p2ptest.Trigger{{Code: 1, Msg: &hs0{1}, Peer: a}}}, + {Label: "receiving repeated module handshake", Expects: []p2ptest.Expect{{Code: 1, Msg: &hs0{43}, Peer: a}}}} +} + +func runMultiplePeers(t *testing.T, peer int, errs ...error) { + pp := p2ptest.NewTestPeerPool() + s := protocolTester(t, pp) + + if err := s.TestExchanges(testMultiPeerSetup(s.IDs[0], s.IDs[1])...); err != nil { + t.Fatal(err) + } + // after some exchanges of messages, we can test state changes + // here this is simply demonstrated by the peerPool + // after the handshake negotiations peers must be added to the pool + // time.Sleep(1) + tick := time.NewTicker(10 * time.Millisecond) + timeout := time.NewTimer(1 * time.Second) +WAIT: + for { + select { + case <-tick.C: + if pp.Has(s.IDs[0]) { + break WAIT + } + case <-timeout.C: + t.Fatal("timeout") + } + } + if !pp.Has(s.IDs[1]) { + t.Fatalf("missing peer test-1: %v (%v)", pp, s.IDs) + } + + // peer 0 sends kill request for peer with index <peer> + err := s.TestExchanges(p2ptest.Exchange{ + Triggers: []p2ptest.Trigger{ + { + Code: 2, + Msg: &kill{s.IDs[peer]}, + Peer: s.IDs[0], + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + + // the peer not killed sends a drop request + err = s.TestExchanges(p2ptest.Exchange{ + Triggers: []p2ptest.Trigger{ + { + Code: 3, + Msg: &drop{}, + Peer: s.IDs[(peer+1)%2], + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + + // check the actual discconnect errors on the individual peers + var disconnects []*p2ptest.Disconnect + for i, err := range errs { + disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.IDs[i], Error: err}) + } + if err := s.TestDisconnected(disconnects...); err != nil { + t.Fatal(err) + } + // test if disconnected peers have been removed from peerPool + if pp.Has(s.IDs[peer]) { + t.Fatalf("peer test-%v not dropped: %v (%v)", peer, pp, s.IDs) + } + +} + +func TestMultiplePeersDropSelf(t *testing.T) { + runMultiplePeers(t, 0, + fmt.Errorf("subprotocol error"), + fmt.Errorf("Message handler error: (msg code 3): dropped"), + ) +} + +func TestMultiplePeersDropOther(t *testing.T) { + runMultiplePeers(t, 1, + fmt.Errorf("Message handler error: (msg code 3): dropped"), + fmt.Errorf("subprotocol error"), + ) +} |