diff options
Diffstat (limited to 'p2p/message.go')
-rw-r--r-- | p2p/message.go | 77 |
1 files changed, 61 insertions, 16 deletions
diff --git a/p2p/message.go b/p2p/message.go index 14e4404c9..ef3630a90 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -11,7 +11,6 @@ import ( "sync/atomic" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" ) @@ -28,13 +27,7 @@ type Msg struct { Payload io.Reader } -// NewMsg creates an RLP-encoded message with the given code. -func NewMsg(code uint64, params ...interface{}) Msg { - p := bytes.NewReader(common.Encode(params)) - return Msg{Code: code, Size: uint32(p.Len()), Payload: p} -} - -// Decode parse the RLP content of a message into +// Decode parses the RLP content of a message into // the given value, which must be a pointer. // // For the decoding rules, please see package rlp. @@ -76,13 +69,30 @@ type MsgReadWriter interface { MsgWriter } -// EncodeMsg writes an RLP-encoded message with the given code and -// data elements. -func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error { - return w.WriteMsg(NewMsg(code, data...)) +// Send writes an RLP-encoded message with the given code. +// data should encode as an RLP list. +func Send(w MsgWriter, msgcode uint64, data interface{}) error { + size, r, err := rlp.EncodeToReader(data) + if err != nil { + return err + } + return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r}) +} + +// SendItems writes an RLP with the given code and data elements. +// For a call such as: +// +// SendItems(w, code, e1, e2, e3) +// +// the message payload will be an RLP list containing the items: +// +// [e1, e2, e3] +// +func SendItems(w MsgWriter, msgcode uint64, elems ...interface{}) error { + return Send(w, msgcode, elems) } -// netWrapper wrapsa MsgReadWriter with locks around +// netWrapper wraps a MsgReadWriter with locks around // ReadMsg/WriteMsg and applies read/write deadlines. type netWrapper struct { rmu, wmu sync.Mutex @@ -175,7 +185,10 @@ func (p *MsgPipeRW) WriteMsg(msg Msg) error { case p.w <- msg: if msg.Size > 0 { // wait for payload read or discard - <-consumed + select { + case <-consumed: + case <-p.closing: + } } return nil case <-p.closing: @@ -197,8 +210,8 @@ func (p *MsgPipeRW) ReadMsg() (Msg, error) { } // Close unblocks any pending ReadMsg and WriteMsg calls on both ends -// of the pipe. They will return ErrPipeClosed. Note that Close does -// not interrupt any reads from a message payload. +// of the pipe. They will return ErrPipeClosed. Close also +// interrupts any reads from a message payload. func (p *MsgPipeRW) Close() error { if atomic.AddInt32(p.closed, 1) != 1 { // someone else is already closing @@ -208,3 +221,35 @@ func (p *MsgPipeRW) Close() error { close(p.closing) return nil } + +// ExpectMsg reads a message from r and verifies that its +// code and encoded RLP content match the provided values. +// If content is nil, the payload is discarded and not verified. +func ExpectMsg(r MsgReader, code uint64, content interface{}) error { + msg, err := r.ReadMsg() + if err != nil { + return err + } + if msg.Code != code { + return fmt.Errorf("message code mismatch: got %d, expected %d", msg.Code, code) + } + if content == nil { + return msg.Discard() + } else { + contentEnc, err := rlp.EncodeToBytes(content) + if err != nil { + panic("content encode error: " + err.Error()) + } + if int(msg.Size) != len(contentEnc) { + return fmt.Errorf("message size mismatch: got %d, want %d", msg.Size, len(contentEnc)) + } + actualContent, err := ioutil.ReadAll(msg.Payload) + if err != nil { + return err + } + if !bytes.Equal(actualContent, contentEnc) { + return fmt.Errorf("message payload mismatch:\ngot: %x\nwant: %x", actualContent, contentEnc) + } + } + return nil +} |