aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/peer.go')
-rw-r--r--p2p/peer.go17
1 files changed, 12 insertions, 5 deletions
diff --git a/p2p/peer.go b/p2p/peer.go
index 893ba86d7..86c4d7ab5 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -300,7 +300,7 @@ func (p *Peer) dispatch(msg Msg, protoDone chan struct{}) (wait bool, err error)
proto.in <- msg
} else {
wait = true
- pr := &eofSignal{msg.Payload, protoDone}
+ pr := &eofSignal{msg.Payload, int64(msg.Size), protoDone}
msg.Payload = pr
proto.in <- msg
}
@@ -438,18 +438,25 @@ func (rw *proto) ReadMsg() (Msg, error) {
return msg, nil
}
-// eofSignal wraps a reader with eof signaling.
-// the eof channel is closed when the wrapped reader
-// reaches EOF.
+// eofSignal wraps a reader with eof signaling. the eof channel is
+// closed when the wrapped reader returns an error or when count bytes
+// have been read.
+//
type eofSignal struct {
wrapped io.Reader
+ count int64
eof chan<- struct{}
}
+// note: when using eofSignal to detect whether a message payload
+// has been read, Read might not be called for zero sized messages.
+
func (r *eofSignal) Read(buf []byte) (int, error) {
n, err := r.wrapped.Read(buf)
- if err != nil {
+ r.count -= int64(n)
+ if (err != nil || r.count <= 0) && r.eof != nil {
r.eof <- struct{}{} // tell Peer that msg has been consumed
+ r.eof = nil
}
return n, err
}