diff options
author | Felix Lange <fjl@twurst.com> | 2015-02-27 11:09:25 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2015-03-04 19:27:24 +0800 |
commit | d084aed5e9df5d06812332ed03d3ea55e3ddf819 (patch) | |
tree | 41f192df56d4fefd8e5338713db8ff28134f3230 /p2p/message.go | |
parent | 736e632215d49dd7bc61126f78dda4bad12768ea (diff) | |
download | go-tangerine-d084aed5e9df5d06812332ed03d3ea55e3ddf819.tar go-tangerine-d084aed5e9df5d06812332ed03d3ea55e3ddf819.tar.gz go-tangerine-d084aed5e9df5d06812332ed03d3ea55e3ddf819.tar.bz2 go-tangerine-d084aed5e9df5d06812332ed03d3ea55e3ddf819.tar.lz go-tangerine-d084aed5e9df5d06812332ed03d3ea55e3ddf819.tar.xz go-tangerine-d084aed5e9df5d06812332ed03d3ea55e3ddf819.tar.zst go-tangerine-d084aed5e9df5d06812332ed03d3ea55e3ddf819.zip |
p2p: delete frameRW
Diffstat (limited to 'p2p/message.go')
-rw-r--r-- | p2p/message.go | 138 |
1 files changed, 0 insertions, 138 deletions
diff --git a/p2p/message.go b/p2p/message.go index d61faad13..2ef84f99d 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -1,15 +1,11 @@ package p2p import ( - "bufio" "bytes" - "encoding/binary" "errors" "fmt" "io" "io/ioutil" - "math/big" - "net" "sync" "sync/atomic" "time" @@ -138,140 +134,6 @@ func (rw *lockedRW) WriteMsg(msg Msg) error { return rw.wrapped.WriteMsg(msg) } -// frameRW is a MsgReadWriter that reads and writes devp2p message frames. -// As required by the interface, ReadMsg and WriteMsg can be called from -// multiple goroutines. -type frameRW struct { - net.Conn // make Conn methods available. be careful. - bufconn *bufio.ReadWriter - - // this channel is used to 'lend' bufconn to a caller of ReadMsg - // until the message payload has been consumed. the channel - // receives a value when EOF is reached on the payload, unblocking - // a pending call to ReadMsg. - rsync chan struct{} - - // this mutex guards writes to bufconn. - writeMu sync.Mutex -} - -func newFrameRW(conn net.Conn, timeout time.Duration) *frameRW { - rsync := make(chan struct{}, 1) - rsync <- struct{}{} - return &frameRW{ - Conn: conn, - bufconn: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)), - rsync: rsync, - } -} - -var magicToken = []byte{34, 64, 8, 145} - -func (rw *frameRW) WriteMsg(msg Msg) error { - rw.writeMu.Lock() - defer rw.writeMu.Unlock() - rw.SetWriteDeadline(time.Now().Add(msgWriteTimeout)) - if err := writeMsg(rw.bufconn, msg); err != nil { - return err - } - return rw.bufconn.Flush() -} - -func writeMsg(w io.Writer, msg Msg) error { - // TODO: handle case when Size + len(code) + len(listhdr) overflows uint32 - code := ethutil.Encode(uint32(msg.Code)) - listhdr := makeListHeader(msg.Size + uint32(len(code))) - payloadLen := uint32(len(listhdr)) + uint32(len(code)) + msg.Size - - start := make([]byte, 8) - copy(start, magicToken) - binary.BigEndian.PutUint32(start[4:], payloadLen) - - for _, b := range [][]byte{start, listhdr, code} { - if _, err := w.Write(b); err != nil { - return err - } - } - _, err := io.CopyN(w, msg.Payload, int64(msg.Size)) - return err -} - -func makeListHeader(length uint32) []byte { - if length < 56 { - return []byte{byte(length + 0xc0)} - } - enc := big.NewInt(int64(length)).Bytes() - lenb := byte(len(enc)) + 0xf7 - return append([]byte{lenb}, enc...) -} - -func (rw *frameRW) ReadMsg() (msg Msg, err error) { - <-rw.rsync // wait until bufconn is ours - - rw.SetReadDeadline(time.Now().Add(frameReadTimeout)) - - // read magic and payload size - start := make([]byte, 8) - if _, err = io.ReadFull(rw.bufconn, start); err != nil { - return msg, err - } - if !bytes.HasPrefix(start, magicToken) { - return msg, fmt.Errorf("bad magic token %x", start[:4]) - } - size := binary.BigEndian.Uint32(start[4:]) - - // decode start of RLP message to get the message code - posr := &postrack{rw.bufconn, 0} - s := rlp.NewStream(posr) - if _, err := s.List(); err != nil { - return msg, err - } - msg.Code, err = s.Uint() - if err != nil { - return msg, err - } - msg.Size = size - posr.p - - rw.SetReadDeadline(time.Now().Add(payloadReadTimeout)) - - if msg.Size <= wholePayloadSize { - // msg is small, read all of it and move on to the next message. - pbuf := make([]byte, msg.Size) - if _, err := io.ReadFull(rw.bufconn, pbuf); err != nil { - return msg, err - } - rw.rsync <- struct{}{} // bufconn is available again - msg.Payload = bytes.NewReader(pbuf) - } else { - // lend bufconn to the caller until it has - // consumed the payload. eofSignal will send a value - // on rw.rsync when EOF is reached. - pr := &eofSignal{rw.bufconn, msg.Size, rw.rsync} - msg.Payload = pr - } - return msg, nil -} - -// postrack wraps an rlp.ByteReader with a position counter. -type postrack struct { - r rlp.ByteReader - p uint32 -} - -func (r *postrack) Read(buf []byte) (int, error) { - n, err := r.r.Read(buf) - r.p += uint32(n) - return n, err -} - -func (r *postrack) ReadByte() (byte, error) { - b, err := r.r.ReadByte() - if err == nil { - r.p++ - } - return b, err -} - // eofSignal wraps a reader with eof signaling. the eof channel is // closed when the wrapped reader returns an error or when count bytes // have been read. |