From 6ceb253f743ec0d2bdd9a676c7f365de2201470c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 15 Apr 2015 13:01:22 +0300 Subject: whisper: use async handshakes to handle blocking peers --- whisper/common_test.go | 40 ---------------------------------------- whisper/peer.go | 17 +++++++++++------ whisper/whisper_test.go | 2 +- 3 files changed, 12 insertions(+), 47 deletions(-) delete mode 100644 whisper/common_test.go diff --git a/whisper/common_test.go b/whisper/common_test.go deleted file mode 100644 index a5df762e1..000000000 --- a/whisper/common_test.go +++ /dev/null @@ -1,40 +0,0 @@ -// Contains some common utility functions for testing. - -package whisper - -import ( - "bytes" - "io/ioutil" - - "github.com/ethereum/go-ethereum/p2p" -) - -// bufMsgPipe creates a buffered message pipe between two endpoints. -func bufMsgPipe() (*p2p.MsgPipeRW, *p2p.MsgPipeRW) { - A, midA := p2p.MsgPipe() - midB, B := p2p.MsgPipe() - - go copyMsgPipe(midA, midB) - go copyMsgPipe(midB, midA) - - return A, B -} - -// copyMsgPipe copies messages from the src pipe to the dest. -func copyMsgPipe(dst, src *p2p.MsgPipeRW) { - defer dst.Close() - for { - msg, err := src.ReadMsg() - if err != nil { - return - } - data, err := ioutil.ReadAll(msg.Payload) - if err != nil { - return - } - msg.Payload = bytes.NewReader(data) - if err := dst.WriteMsg(msg); err != nil { - return - } - } -} diff --git a/whisper/peer.go b/whisper/peer.go index f077dbe70..8bf848855 100644 --- a/whisper/peer.go +++ b/whisper/peer.go @@ -53,10 +53,12 @@ func (self *peer) stop() { // handshake sends the protocol initiation status message to the remote peer and // verifies the remote status too. func (self *peer) handshake() error { - // Send own status message, fetch remote one - if err := p2p.SendItems(self.ws, statusCode, protocolVersion); err != nil { - return err - } + // Send the handshake status message asynchronously + errc := make(chan error, 1) + go func() { + errc <- p2p.SendItems(self.ws, statusCode, protocolVersion) + }() + // Fetch the remote status packet and verify protocol match packet, err := self.ws.ReadMsg() if err != nil { return err @@ -64,7 +66,6 @@ func (self *peer) handshake() error { if packet.Code != statusCode { return fmt.Errorf("peer sent %x before status packet", packet.Code) } - // Decode the rest of the status packet and verify protocol match s := rlp.NewStream(packet.Payload) if _, err := s.List(); err != nil { return fmt.Errorf("bad status message: %v", err) @@ -76,7 +77,11 @@ func (self *peer) handshake() error { if peerVersion != protocolVersion { return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, protocolVersion) } - return packet.Discard() // ignore anything after protocol version + // Wait until out own status is consumed too + if err := <-errc; err != nil { + return fmt.Errorf("failed to send status packet: %v", err) + } + return nil } // update executes periodic operations on the peer, including message transmission diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go index 35e2f0524..554a12cb1 100644 --- a/whisper/whisper_test.go +++ b/whisper/whisper_test.go @@ -21,7 +21,7 @@ func startTestCluster(n int) []*Whisper { } // Wire all the peers to the root one for i := 1; i < n; i++ { - src, dst := bufMsgPipe() + src, dst := p2p.MsgPipe() go whispers[0].handlePeer(nodes[i], src) go whispers[i].handlePeer(nodes[0], dst) -- cgit v1.2.3