diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2019-01-09 15:32:08 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-01-09 15:32:08 +0800 |
commit | 25018527ec18ec2830801983d19e63a0ebf7b263 (patch) | |
tree | 6c6f1bc251b24da1fd9d6df7375f931fbae35a00 /core/test/tcp-transport.go | |
parent | c62ce07468cea07035ddcad3c89b0a5c0b25746a (diff) | |
download | tangerine-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar tangerine-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar.gz tangerine-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar.bz2 tangerine-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar.lz tangerine-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar.xz tangerine-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar.zst tangerine-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.zip |
simulation: fix k8s dmoment issue (#416)
* Handshake with server dmoment
* Start simulation from dMoment
* Update k8s config
Diffstat (limited to 'core/test/tcp-transport.go')
-rw-r--r-- | core/test/tcp-transport.go | 43 |
1 files changed, 33 insertions, 10 deletions
diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go index 020b488..d32935b 100644 --- a/core/test/tcp-transport.go +++ b/core/test/tcp-transport.go @@ -45,6 +45,11 @@ const ( tcpThroughputReportNum = 10 ) +type tcpHandshake struct { + DMoment time.Time + Peers map[types.NodeID]string +} + type tcpPeerRecord struct { conn string sendChannel chan<- []byte @@ -112,6 +117,7 @@ type TCPTransport struct { marshaller Marshaller throughputRecords []ThroughputRecord throughputLock sync.Mutex + dMoment time.Time } // NewTCPTransport constructs an TCPTransport instance. @@ -297,8 +303,8 @@ func (t *TCPTransport) marshalMessage( Payload: msg, } switch msg.(type) { - case map[types.NodeID]string: - msgCarrier.Type = "peerlist" + case *tcpHandshake: + msgCarrier.Type = "tcp-handshake" case *tcpMessage: msgCarrier.Type = "trans-msg" case []ThroughputRecord: @@ -344,12 +350,12 @@ func (t *TCPTransport) unmarshalMessage( peerType = msgCarrier.PeerType from = msgCarrier.From switch msgCarrier.Type { - case "peerlist": - var peers map[types.NodeID]string - if err = json.Unmarshal(msgCarrier.Payload, &peers); err != nil { + case "tcp-handshake": + handshake := &tcpHandshake{} + if err = json.Unmarshal(msgCarrier.Payload, &handshake); err != nil { return } - msg = peers + msg = handshake case "trans-msg": m := &tcpMessage{} if err = json.Unmarshal(msgCarrier.Payload, m); err != nil { @@ -653,6 +659,7 @@ func (t *TCPTransportClient) Join( t.localPort = 1024 + rand.Int()%1024 } + fmt.Println("Connecting to server", "endpoint", serverEndpoint) serverConn, err := net.Dial("tcp", serverEndpoint.(string)) if err != nil { return @@ -681,12 +688,13 @@ func (t *TCPTransportClient) Join( } // Wait for peers list sent by server. e := <-t.recvChannel - peersInfo, ok := e.Msg.(map[types.NodeID]string) + handshake, ok := e.Msg.(*tcpHandshake) if !ok { - panic(fmt.Errorf("expect peer list, not %v", e)) + panic(fmt.Errorf("expect handshake, not %v", e)) } + t.dMoment = handshake.DMoment // Setup peers information. - for nID, info := range peersInfo { + for nID, info := range handshake.Peers { pubKey, conn := parsePeerInfo(info) t.peers[nID] = &tcpPeerRecord{ conn: conn, @@ -744,6 +752,11 @@ func (t *TCPTransportClient) Send( return } +// DMoment implments TransportClient. +func (t *TCPTransportClient) DMoment() time.Time { + return t.dMoment +} + // TCPTransportServer implements TransportServer via TCP connections. type TCPTransportServer struct { TCPTransport @@ -780,6 +793,11 @@ func (t *TCPTransportServer) Host() (chan *TransportEnvelope, error) { return t.recvChannel, nil } +// SetDMoment implements TransportServer.SetDMoment method. +func (t *TCPTransportServer) SetDMoment(dMoment time.Time) { + t.dMoment = dMoment +} + // WaitForPeers implements TransportServer.WaitForPeers method. func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) { // Collect peers info. Packets other than peer info is @@ -796,6 +814,7 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) { panic(fmt.Errorf("expect connection report, not %v", e)) } pubKey, conn := parsePeerInfo(msg.Info) + fmt.Println("Peer connected", "peer", conn) t.peers[msg.NodeID] = &tcpPeerRecord{ conn: conn, pubKey: pubKey, @@ -814,7 +833,11 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) { for ID := range t.peers { peers[ID] = struct{}{} } - if err = t.Broadcast(peers, &FixedLatencyModel{}, peersInfo); err != nil { + handshake := &tcpHandshake{ + DMoment: t.dMoment, + Peers: peersInfo, + } + if err = t.Broadcast(peers, &FixedLatencyModel{}, handshake); err != nil { return } // Wait for peers to send 'ready' report. |