diff options
Diffstat (limited to 'p2p/message.go')
-rw-r--r-- | p2p/message.go | 77 |
1 files changed, 77 insertions, 0 deletions
diff --git a/p2p/message.go b/p2p/message.go index d3b8b74d4..f5418ff47 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -3,9 +3,11 @@ package p2p import ( "bytes" "encoding/binary" + "errors" "io" "io/ioutil" "math/big" + "sync/atomic" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/rlp" @@ -153,3 +155,78 @@ func (r *postrack) ReadByte() (byte, error) { } return b, err } + +// MsgPipe creates a message pipe. Reads on one end are matched +// with writes on the other. The pipe is full-duplex, both ends +// implement MsgReadWriter. +func MsgPipe() (*MsgPipeRW, *MsgPipeRW) { + var ( + c1, c2 = make(chan Msg), make(chan Msg) + closing = make(chan struct{}) + closed = new(int32) + rw1 = &MsgPipeRW{c1, c2, closing, closed} + rw2 = &MsgPipeRW{c2, c1, closing, closed} + ) + return rw1, rw2 +} + +// ErrPipeClosed is returned from pipe operations after the +// pipe has been closed. +var ErrPipeClosed = errors.New("p2p: read or write on closed message pipe") + +// MsgPipeRW is an endpoint of a MsgReadWriter pipe. +type MsgPipeRW struct { + w chan<- Msg + r <-chan Msg + closing chan struct{} + closed *int32 +} + +// WriteMsg sends a messsage on the pipe. +// It blocks until the receiver has consumed the message payload. +func (p *MsgPipeRW) WriteMsg(msg Msg) error { + if atomic.LoadInt32(p.closed) == 0 { + consumed := make(chan struct{}, 1) + msg.Payload = &eofSignal{msg.Payload, int64(msg.Size), consumed} + select { + case p.w <- msg: + if msg.Size > 0 { + // wait for payload read or discard + <-consumed + } + return nil + case <-p.closing: + } + } + return ErrPipeClosed +} + +// EncodeMsg is a convenient shorthand for sending an RLP-encoded message. +func (p *MsgPipeRW) EncodeMsg(code uint64, data ...interface{}) error { + return p.WriteMsg(NewMsg(code, data...)) +} + +// ReadMsg returns a message sent on the other end of the pipe. +func (p *MsgPipeRW) ReadMsg() (Msg, error) { + if atomic.LoadInt32(p.closed) == 0 { + select { + case msg := <-p.r: + return msg, nil + case <-p.closing: + } + } + return Msg{}, ErrPipeClosed +} + +// Close unblocks any pending ReadMsg and WriteMsg calls on both ends +// of the pipe. They will return ErrPipeClosed. Note that Close does +// not interrupt any reads from a message payload. +func (p *MsgPipeRW) Close() error { + if atomic.AddInt32(p.closed, 1) != 1 { + // someone else is already closing + atomic.StoreInt32(p.closed, 1) // avoid overflow + return nil + } + close(p.closing) + return nil +} |