From 736e632215d49dd7bc61126f78dda4bad12768ea Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 27 Feb 2015 03:06:55 +0000 Subject: p2p: use RLPx frames for messaging --- p2p/message.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) (limited to 'p2p/message.go') diff --git a/p2p/message.go b/p2p/message.go index 7adad4b09..d61faad13 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -119,6 +119,25 @@ func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error { return w.WriteMsg(NewMsg(code, data...)) } +// lockedRW wraps a MsgReadWriter with locks around +// ReadMsg and WriteMsg. +type lockedRW struct { + rmu, wmu sync.Mutex + wrapped MsgReadWriter +} + +func (rw *lockedRW) ReadMsg() (Msg, error) { + rw.rmu.Lock() + defer rw.rmu.Unlock() + return rw.wrapped.ReadMsg() +} + +func (rw *lockedRW) WriteMsg(msg Msg) error { + rw.wmu.Lock() + defer rw.wmu.Unlock() + return rw.wrapped.WriteMsg(msg) +} + // frameRW is a MsgReadWriter that reads and writes devp2p message frames. // As required by the interface, ReadMsg and WriteMsg can be called from // multiple goroutines. -- cgit v1.2.3 From d084aed5e9df5d06812332ed03d3ea55e3ddf819 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 27 Feb 2015 03:09:25 +0000 Subject: p2p: delete frameRW --- p2p/message.go | 138 --------------------------------------------------------- 1 file changed, 138 deletions(-) (limited to 'p2p/message.go') diff --git a/p2p/message.go b/p2p/message.go index d61faad13..2ef84f99d 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -1,15 +1,11 @@ package p2p import ( - "bufio" "bytes" - "encoding/binary" "errors" "fmt" "io" "io/ioutil" - "math/big" - "net" "sync" "sync/atomic" "time" @@ -138,140 +134,6 @@ func (rw *lockedRW) WriteMsg(msg Msg) error { return rw.wrapped.WriteMsg(msg) } -// frameRW is a MsgReadWriter that reads and writes devp2p message frames. -// As required by the interface, ReadMsg and WriteMsg can be called from -// multiple goroutines. -type frameRW struct { - net.Conn // make Conn methods available. be careful. - bufconn *bufio.ReadWriter - - // this channel is used to 'lend' bufconn to a caller of ReadMsg - // until the message payload has been consumed. the channel - // receives a value when EOF is reached on the payload, unblocking - // a pending call to ReadMsg. - rsync chan struct{} - - // this mutex guards writes to bufconn. - writeMu sync.Mutex -} - -func newFrameRW(conn net.Conn, timeout time.Duration) *frameRW { - rsync := make(chan struct{}, 1) - rsync <- struct{}{} - return &frameRW{ - Conn: conn, - bufconn: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)), - rsync: rsync, - } -} - -var magicToken = []byte{34, 64, 8, 145} - -func (rw *frameRW) WriteMsg(msg Msg) error { - rw.writeMu.Lock() - defer rw.writeMu.Unlock() - rw.SetWriteDeadline(time.Now().Add(msgWriteTimeout)) - if err := writeMsg(rw.bufconn, msg); err != nil { - return err - } - return rw.bufconn.Flush() -} - -func writeMsg(w io.Writer, msg Msg) error { - // TODO: handle case when Size + len(code) + len(listhdr) overflows uint32 - code := ethutil.Encode(uint32(msg.Code)) - listhdr := makeListHeader(msg.Size + uint32(len(code))) - payloadLen := uint32(len(listhdr)) + uint32(len(code)) + msg.Size - - start := make([]byte, 8) - copy(start, magicToken) - binary.BigEndian.PutUint32(start[4:], payloadLen) - - for _, b := range [][]byte{start, listhdr, code} { - if _, err := w.Write(b); err != nil { - return err - } - } - _, err := io.CopyN(w, msg.Payload, int64(msg.Size)) - return err -} - -func makeListHeader(length uint32) []byte { - if length < 56 { - return []byte{byte(length + 0xc0)} - } - enc := big.NewInt(int64(length)).Bytes() - lenb := byte(len(enc)) + 0xf7 - return append([]byte{lenb}, enc...) -} - -func (rw *frameRW) ReadMsg() (msg Msg, err error) { - <-rw.rsync // wait until bufconn is ours - - rw.SetReadDeadline(time.Now().Add(frameReadTimeout)) - - // read magic and payload size - start := make([]byte, 8) - if _, err = io.ReadFull(rw.bufconn, start); err != nil { - return msg, err - } - if !bytes.HasPrefix(start, magicToken) { - return msg, fmt.Errorf("bad magic token %x", start[:4]) - } - size := binary.BigEndian.Uint32(start[4:]) - - // decode start of RLP message to get the message code - posr := &postrack{rw.bufconn, 0} - s := rlp.NewStream(posr) - if _, err := s.List(); err != nil { - return msg, err - } - msg.Code, err = s.Uint() - if err != nil { - return msg, err - } - msg.Size = size - posr.p - - rw.SetReadDeadline(time.Now().Add(payloadReadTimeout)) - - if msg.Size <= wholePayloadSize { - // msg is small, read all of it and move on to the next message. - pbuf := make([]byte, msg.Size) - if _, err := io.ReadFull(rw.bufconn, pbuf); err != nil { - return msg, err - } - rw.rsync <- struct{}{} // bufconn is available again - msg.Payload = bytes.NewReader(pbuf) - } else { - // lend bufconn to the caller until it has - // consumed the payload. eofSignal will send a value - // on rw.rsync when EOF is reached. - pr := &eofSignal{rw.bufconn, msg.Size, rw.rsync} - msg.Payload = pr - } - return msg, nil -} - -// postrack wraps an rlp.ByteReader with a position counter. -type postrack struct { - r rlp.ByteReader - p uint32 -} - -func (r *postrack) Read(buf []byte) (int, error) { - n, err := r.r.Read(buf) - r.p += uint32(n) - return n, err -} - -func (r *postrack) ReadByte() (byte, error) { - b, err := r.r.ReadByte() - if err == nil { - r.p++ - } - return b, err -} - // eofSignal wraps a reader with eof signaling. the eof channel is // closed when the wrapped reader returns an error or when count bytes // have been read. -- cgit v1.2.3 From 7964f30dcbdde00b2960ef6e98320e0a0f9300e2 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 4 Mar 2015 12:03:43 +0100 Subject: p2p: msg.Payload contains list data With RLPx frames, the message code is contained in the frame and is no longer part of the encoded data. EncodeMsg, Msg.Decode have been updated to match. Code that decodes RLP directly from Msg.Payload will need to change. --- p2p/message.go | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) (limited to 'p2p/message.go') diff --git a/p2p/message.go b/p2p/message.go index 2ef84f99d..04b9e71f3 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -51,19 +51,8 @@ type Msg struct { // NewMsg creates an RLP-encoded message with the given code. func NewMsg(code uint64, params ...interface{}) Msg { - buf := new(bytes.Buffer) - for _, p := range params { - buf.Write(ethutil.Encode(p)) - } - return Msg{Code: code, Size: uint32(buf.Len()), Payload: buf} -} - -func encodePayload(params ...interface{}) []byte { - buf := new(bytes.Buffer) - for _, p := range params { - buf.Write(ethutil.Encode(p)) - } - return buf.Bytes() + p := bytes.NewReader(ethutil.Encode(params)) + return Msg{Code: code, Size: uint32(p.Len()), Payload: p} } // Decode parse the RLP content of a message into @@ -71,8 +60,7 @@ func encodePayload(params ...interface{}) []byte { // // For the decoding rules, please see package rlp. func (msg Msg) Decode(val interface{}) error { - s := rlp.NewListStream(msg.Payload, uint64(msg.Size)) - if err := s.Decode(val); err != nil { + if err := rlp.Decode(msg.Payload, val); err != nil { return newPeerError(errInvalidMsg, "(code %#x) (size %d) %v", msg.Code, msg.Size, err) } return nil -- cgit v1.2.3 From 22659a7feaf4e939a33762c3f83b43d8bec757db Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 4 Mar 2015 16:27:37 +0100 Subject: p2p: restore read/write timeouts They got lost in the transition to rlpxFrameRW. --- p2p/message.go | 40 ++++++++++++---------------------------- 1 file changed, 12 insertions(+), 28 deletions(-) (limited to 'p2p/message.go') diff --git a/p2p/message.go b/p2p/message.go index 04b9e71f3..f88c31d1d 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "io/ioutil" + "net" "sync" "sync/atomic" "time" @@ -14,28 +15,6 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) -// parameters for frameRW -const ( - // maximum time allowed for reading a message header. - // this is effectively the amount of time a connection can be idle. - frameReadTimeout = 1 * time.Minute - - // maximum time allowed for reading the payload data of a message. - // this is shorter than (and distinct from) frameReadTimeout because - // the connection is not considered idle while a message is transferred. - // this also limits the payload size of messages to how much the connection - // can transfer within the timeout. - payloadReadTimeout = 5 * time.Second - - // maximum amount of time allowed for writing a complete message. - msgWriteTimeout = 5 * time.Second - - // messages smaller than this many bytes will be read at - // once before passing them to a protocol. this increases - // concurrency in the processing. - wholePayloadSize = 64 * 1024 -) - // Msg defines the structure of a p2p message. // // Note that a Msg can only be sent once since the Payload reader is @@ -103,22 +82,27 @@ func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error { return w.WriteMsg(NewMsg(code, data...)) } -// lockedRW wraps a MsgReadWriter with locks around -// ReadMsg and WriteMsg. -type lockedRW struct { +// netWrapper wrapsa MsgReadWriter with locks around +// ReadMsg/WriteMsg and applies read/write deadlines. +type netWrapper struct { rmu, wmu sync.Mutex - wrapped MsgReadWriter + + rtimeout, wtimeout time.Duration + conn net.Conn + wrapped MsgReadWriter } -func (rw *lockedRW) ReadMsg() (Msg, error) { +func (rw *netWrapper) ReadMsg() (Msg, error) { rw.rmu.Lock() defer rw.rmu.Unlock() + rw.conn.SetReadDeadline(time.Now().Add(rw.rtimeout)) return rw.wrapped.ReadMsg() } -func (rw *lockedRW) WriteMsg(msg Msg) error { +func (rw *netWrapper) WriteMsg(msg Msg) error { rw.wmu.Lock() defer rw.wmu.Unlock() + rw.conn.SetWriteDeadline(time.Now().Add(rw.wtimeout)) return rw.wrapped.WriteMsg(msg) } -- cgit v1.2.3