aboutsummaryrefslogtreecommitdiffstats
path: root/core/test/tcp-transport.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/test/tcp-transport.go')
-rw-r--r--core/test/tcp-transport.go122
1 files changed, 90 insertions, 32 deletions
diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go
index 673a5a1..2afea14 100644
--- a/core/test/tcp-transport.go
+++ b/core/test/tcp-transport.go
@@ -35,10 +35,11 @@ import (
"github.com/dexon-foundation/dexon-consensus-core/core/types"
)
-// peerInfo describe connection info of one peer in TCP transport.
-type peerInfo struct {
+// tcpMessage is the general message between peers and server.
+type tcpMessage struct {
ValidatorID types.ValidatorID `json:"vid"`
- Conn string `json:"conn"`
+ Type string `json:"type"`
+ Info string `json:"conn"`
}
// TCPTransport implements Transport interface via TCP connection.
@@ -162,8 +163,8 @@ func (t *TCPTransport) marshalMessage(
switch msg.(type) {
case map[types.ValidatorID]string:
msgCarrier.Type = "peerlist"
- case *peerInfo:
- msgCarrier.Type = "peer"
+ case *tcpMessage:
+ msgCarrier.Type = "trans-msg"
default:
if t.marshaller == nil {
err = fmt.Errorf("unknown msg type: %v", msg)
@@ -209,12 +210,12 @@ func (t *TCPTransport) unmarshalMessage(
return
}
msg = peers
- case "peer":
- peer := &peerInfo{}
- if err = json.Unmarshal(msgCarrier.Payload, peer); err != nil {
+ case "trans-msg":
+ m := &tcpMessage{}
+ if err = json.Unmarshal(msgCarrier.Payload, m); err != nil {
return
}
- msg = peer
+ msg = m
default:
if t.marshaller == nil {
err = fmt.Errorf("unknown msg type: %v", msgCarrier.Type)
@@ -443,6 +444,7 @@ func (t *TCPTransportClient) Join(
envelopes = []*TransportEnvelope{}
ok bool
addr string
+ conn string
)
for {
addr = net.JoinHostPort("0.0.0.0", strconv.Itoa(t.localPort))
@@ -479,27 +481,57 @@ func (t *TCPTransportClient) Join(
return
}
t.serverWriteChannel = t.connWriter(serverConn)
- err = t.Report(&peerInfo{
+ if t.local {
+ conn = addr
+ } else {
+ // Find my IP.
+ var ip string
+ if ip, err = FindMyIP(); err != nil {
+ return
+ }
+ conn = net.JoinHostPort(ip, strconv.Itoa(t.localPort))
+ }
+ if err = t.Report(&tcpMessage{
+ Type: "conn",
ValidatorID: t.vID,
- Conn: addr,
- })
+ Info: conn,
+ }); err != nil {
+ return
+ }
// Wait for peers list sent by server.
+ e := <-t.recvChannel
+ if t.peersInfo, ok = e.Msg.(map[types.ValidatorID]string); !ok {
+ panic(fmt.Errorf("expect peer list, not %v", e))
+ }
+ // Setup connections to other peers.
+ if err = t.buildConnectionsToPeers(); err != nil {
+ return
+ }
+ // Report to server that the connections to other peers are ready.
+ if err = t.Report(&tcpMessage{
+ Type: "conn-ready",
+ ValidatorID: t.vID,
+ }); err != nil {
+ return
+ }
+ // Wait for server to ack us that all peers are ready.
for {
e := <-t.recvChannel
- if t.peersInfo, ok = e.Msg.(map[types.ValidatorID]string); !ok {
+ msg, ok := e.Msg.(*tcpMessage)
+ if !ok {
envelopes = append(envelopes, e)
continue
}
- // Replay those messages sent before peer list.
- if len(envelopes) > cap(t.recvChannel)-len(t.recvChannel) {
- panic(fmt.Errorf("unable to replay pending messages"))
- }
- for _, e := range envelopes {
- t.recvChannel <- e
+ if msg.Type != "all-ready" {
+ err = fmt.Errorf("expected ready message, but %v", msg)
+ return
}
break
}
- t.buildConnectionsToPeers()
+ // Replay those messages sent before peer list and ready-ack.
+ for _, e := range envelopes {
+ t.recvChannel <- e
+ }
ch = t.recvChannel
return
}
@@ -545,24 +577,50 @@ func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) {
// Collect peers info. Packets other than peer info is
// unexpected.
for {
- select {
- case <-t.ctx.Done():
- err = fmt.Errorf("cancel waiting")
- return
- case e := <-t.recvChannel:
- peer, ok := e.Msg.(*peerInfo)
- if !ok {
- panic(fmt.Errorf("expect peerInfo, not %v", e))
- }
- t.peersInfo[peer.ValidatorID] = peer.Conn
+ // Wait for connection info reported by peers.
+ e := <-t.recvChannel
+ msg, ok := e.Msg.(*tcpMessage)
+ if !ok {
+ panic(fmt.Errorf("expect tcpMessage, not %v", e))
}
+ if msg.Type != "conn" {
+ panic(fmt.Errorf("expect connection report, not %v", e))
+ }
+ t.peersInfo[msg.ValidatorID] = msg.Info
// Check if we already collect enought peers.
if len(t.peersInfo) == numPeers {
break
}
}
// Send collected peers back to them.
- t.buildConnectionsToPeers()
- t.Broadcast(t.peersInfo)
+ if err = t.buildConnectionsToPeers(); err != nil {
+ return
+ }
+ if err = t.Broadcast(t.peersInfo); err != nil {
+ return
+ }
+ // Wait for peers to send 'ready' report.
+ readies := make(map[types.ValidatorID]struct{})
+ for {
+ e := <-t.recvChannel
+ msg, ok := e.Msg.(*tcpMessage)
+ if !ok {
+ panic(fmt.Errorf("expect tcpMessage, not %v", e))
+ }
+ if msg.Type != "conn-ready" {
+ panic(fmt.Errorf("expect connection ready, not %v", e))
+ }
+ if _, reported := readies[msg.ValidatorID]; reported {
+ panic(fmt.Errorf("already report conn-ready message: %v", e))
+ }
+ readies[msg.ValidatorID] = struct{}{}
+ if len(readies) == numPeers {
+ break
+ }
+ }
+ // Ack all peers ready to go.
+ if err = t.Broadcast(&tcpMessage{Type: "all-ready"}); err != nil {
+ return
+ }
return
}