aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-04-15 18:01:22 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-04-15 18:01:22 +0800
commit6ceb253f743ec0d2bdd9a676c7f365de2201470c (patch)
treed856a141df93719d98100c2cdd4bf2a08bcd7b2c
parent46ea193a49f60bb54cd5fc083adcc6fdf58dbdaf (diff)
downloaddexon-6ceb253f743ec0d2bdd9a676c7f365de2201470c.tar
dexon-6ceb253f743ec0d2bdd9a676c7f365de2201470c.tar.gz
dexon-6ceb253f743ec0d2bdd9a676c7f365de2201470c.tar.bz2
dexon-6ceb253f743ec0d2bdd9a676c7f365de2201470c.tar.lz
dexon-6ceb253f743ec0d2bdd9a676c7f365de2201470c.tar.xz
dexon-6ceb253f743ec0d2bdd9a676c7f365de2201470c.tar.zst
dexon-6ceb253f743ec0d2bdd9a676c7f365de2201470c.zip
whisper: use async handshakes to handle blocking peers
-rw-r--r--whisper/common_test.go40
-rw-r--r--whisper/peer.go17
-rw-r--r--whisper/whisper_test.go2
3 files changed, 12 insertions, 47 deletions
diff --git a/whisper/common_test.go b/whisper/common_test.go
deleted file mode 100644
index a5df762e1..000000000
--- a/whisper/common_test.go
+++ /dev/null
@@ -1,40 +0,0 @@
-// Contains some common utility functions for testing.
-
-package whisper
-
-import (
- "bytes"
- "io/ioutil"
-
- "github.com/ethereum/go-ethereum/p2p"
-)
-
-// bufMsgPipe creates a buffered message pipe between two endpoints.
-func bufMsgPipe() (*p2p.MsgPipeRW, *p2p.MsgPipeRW) {
- A, midA := p2p.MsgPipe()
- midB, B := p2p.MsgPipe()
-
- go copyMsgPipe(midA, midB)
- go copyMsgPipe(midB, midA)
-
- return A, B
-}
-
-// copyMsgPipe copies messages from the src pipe to the dest.
-func copyMsgPipe(dst, src *p2p.MsgPipeRW) {
- defer dst.Close()
- for {
- msg, err := src.ReadMsg()
- if err != nil {
- return
- }
- data, err := ioutil.ReadAll(msg.Payload)
- if err != nil {
- return
- }
- msg.Payload = bytes.NewReader(data)
- if err := dst.WriteMsg(msg); err != nil {
- return
- }
- }
-}
diff --git a/whisper/peer.go b/whisper/peer.go
index f077dbe70..8bf848855 100644
--- a/whisper/peer.go
+++ b/whisper/peer.go
@@ -53,10 +53,12 @@ func (self *peer) stop() {
// handshake sends the protocol initiation status message to the remote peer and
// verifies the remote status too.
func (self *peer) handshake() error {
- // Send own status message, fetch remote one
- if err := p2p.SendItems(self.ws, statusCode, protocolVersion); err != nil {
- return err
- }
+ // Send the handshake status message asynchronously
+ errc := make(chan error, 1)
+ go func() {
+ errc <- p2p.SendItems(self.ws, statusCode, protocolVersion)
+ }()
+ // Fetch the remote status packet and verify protocol match
packet, err := self.ws.ReadMsg()
if err != nil {
return err
@@ -64,7 +66,6 @@ func (self *peer) handshake() error {
if packet.Code != statusCode {
return fmt.Errorf("peer sent %x before status packet", packet.Code)
}
- // Decode the rest of the status packet and verify protocol match
s := rlp.NewStream(packet.Payload)
if _, err := s.List(); err != nil {
return fmt.Errorf("bad status message: %v", err)
@@ -76,7 +77,11 @@ func (self *peer) handshake() error {
if peerVersion != protocolVersion {
return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, protocolVersion)
}
- return packet.Discard() // ignore anything after protocol version
+ // Wait until out own status is consumed too
+ if err := <-errc; err != nil {
+ return fmt.Errorf("failed to send status packet: %v", err)
+ }
+ return nil
}
// update executes periodic operations on the peer, including message transmission
diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go
index 35e2f0524..554a12cb1 100644
--- a/whisper/whisper_test.go
+++ b/whisper/whisper_test.go
@@ -21,7 +21,7 @@ func startTestCluster(n int) []*Whisper {
}
// Wire all the peers to the root one
for i := 1; i < n; i++ {
- src, dst := bufMsgPipe()
+ src, dst := p2p.MsgPipe()
go whispers[0].handlePeer(nodes[i], src)
go whispers[i].handlePeer(nodes[0], dst)