aboutsummaryrefslogtreecommitdiffstats
path: root/core/test/tcp-transport.go
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2019-01-09 15:32:08 +0800
committerWei-Ning Huang <w@dexon.org>2019-01-09 15:32:08 +0800
commit25018527ec18ec2830801983d19e63a0ebf7b263 (patch)
tree6c6f1bc251b24da1fd9d6df7375f931fbae35a00 /core/test/tcp-transport.go
parentc62ce07468cea07035ddcad3c89b0a5c0b25746a (diff)
downloadtangerine-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar
tangerine-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar.gz
tangerine-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar.bz2
tangerine-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar.lz
tangerine-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar.xz
tangerine-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar.zst
tangerine-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.zip
simulation: fix k8s dmoment issue (#416)
* Handshake with server dmoment * Start simulation from dMoment * Update k8s config
Diffstat (limited to 'core/test/tcp-transport.go')
-rw-r--r--core/test/tcp-transport.go43
1 files changed, 33 insertions, 10 deletions
diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go
index 020b488..d32935b 100644
--- a/core/test/tcp-transport.go
+++ b/core/test/tcp-transport.go
@@ -45,6 +45,11 @@ const (
tcpThroughputReportNum = 10
)
+type tcpHandshake struct {
+ DMoment time.Time
+ Peers map[types.NodeID]string
+}
+
type tcpPeerRecord struct {
conn string
sendChannel chan<- []byte
@@ -112,6 +117,7 @@ type TCPTransport struct {
marshaller Marshaller
throughputRecords []ThroughputRecord
throughputLock sync.Mutex
+ dMoment time.Time
}
// NewTCPTransport constructs an TCPTransport instance.
@@ -297,8 +303,8 @@ func (t *TCPTransport) marshalMessage(
Payload: msg,
}
switch msg.(type) {
- case map[types.NodeID]string:
- msgCarrier.Type = "peerlist"
+ case *tcpHandshake:
+ msgCarrier.Type = "tcp-handshake"
case *tcpMessage:
msgCarrier.Type = "trans-msg"
case []ThroughputRecord:
@@ -344,12 +350,12 @@ func (t *TCPTransport) unmarshalMessage(
peerType = msgCarrier.PeerType
from = msgCarrier.From
switch msgCarrier.Type {
- case "peerlist":
- var peers map[types.NodeID]string
- if err = json.Unmarshal(msgCarrier.Payload, &peers); err != nil {
+ case "tcp-handshake":
+ handshake := &tcpHandshake{}
+ if err = json.Unmarshal(msgCarrier.Payload, &handshake); err != nil {
return
}
- msg = peers
+ msg = handshake
case "trans-msg":
m := &tcpMessage{}
if err = json.Unmarshal(msgCarrier.Payload, m); err != nil {
@@ -653,6 +659,7 @@ func (t *TCPTransportClient) Join(
t.localPort = 1024 + rand.Int()%1024
}
+ fmt.Println("Connecting to server", "endpoint", serverEndpoint)
serverConn, err := net.Dial("tcp", serverEndpoint.(string))
if err != nil {
return
@@ -681,12 +688,13 @@ func (t *TCPTransportClient) Join(
}
// Wait for peers list sent by server.
e := <-t.recvChannel
- peersInfo, ok := e.Msg.(map[types.NodeID]string)
+ handshake, ok := e.Msg.(*tcpHandshake)
if !ok {
- panic(fmt.Errorf("expect peer list, not %v", e))
+ panic(fmt.Errorf("expect handshake, not %v", e))
}
+ t.dMoment = handshake.DMoment
// Setup peers information.
- for nID, info := range peersInfo {
+ for nID, info := range handshake.Peers {
pubKey, conn := parsePeerInfo(info)
t.peers[nID] = &tcpPeerRecord{
conn: conn,
@@ -744,6 +752,11 @@ func (t *TCPTransportClient) Send(
return
}
+// DMoment implments TransportClient.
+func (t *TCPTransportClient) DMoment() time.Time {
+ return t.dMoment
+}
+
// TCPTransportServer implements TransportServer via TCP connections.
type TCPTransportServer struct {
TCPTransport
@@ -780,6 +793,11 @@ func (t *TCPTransportServer) Host() (chan *TransportEnvelope, error) {
return t.recvChannel, nil
}
+// SetDMoment implements TransportServer.SetDMoment method.
+func (t *TCPTransportServer) SetDMoment(dMoment time.Time) {
+ t.dMoment = dMoment
+}
+
// WaitForPeers implements TransportServer.WaitForPeers method.
func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) {
// Collect peers info. Packets other than peer info is
@@ -796,6 +814,7 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) {
panic(fmt.Errorf("expect connection report, not %v", e))
}
pubKey, conn := parsePeerInfo(msg.Info)
+ fmt.Println("Peer connected", "peer", conn)
t.peers[msg.NodeID] = &tcpPeerRecord{
conn: conn,
pubKey: pubKey,
@@ -814,7 +833,11 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) {
for ID := range t.peers {
peers[ID] = struct{}{}
}
- if err = t.Broadcast(peers, &FixedLatencyModel{}, peersInfo); err != nil {
+ handshake := &tcpHandshake{
+ DMoment: t.dMoment,
+ Peers: peersInfo,
+ }
+ if err = t.Broadcast(peers, &FixedLatencyModel{}, handshake); err != nil {
return
}
// Wait for peers to send 'ready' report.