diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2019-01-09 15:32:08 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-01-09 15:32:08 +0800 |
commit | 25018527ec18ec2830801983d19e63a0ebf7b263 (patch) | |
tree | 6c6f1bc251b24da1fd9d6df7375f931fbae35a00 | |
parent | c62ce07468cea07035ddcad3c89b0a5c0b25746a (diff) | |
download | dexon-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar dexon-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar.gz dexon-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar.bz2 dexon-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar.lz dexon-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar.xz dexon-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.tar.zst dexon-consensus-25018527ec18ec2830801983d19e63a0ebf7b263.zip |
simulation: fix k8s dmoment issue (#416)
* Handshake with server dmoment
* Start simulation from dMoment
* Update k8s config
-rw-r--r-- | core/test/fake-transport.go | 29 | ||||
-rw-r--r-- | core/test/interface.go | 8 | ||||
-rw-r--r-- | core/test/network.go | 6 | ||||
-rw-r--r-- | core/test/tcp-transport.go | 43 | ||||
-rw-r--r-- | simulation/kubernetes/config.toml.in | 4 | ||||
-rw-r--r-- | simulation/node.go | 4 | ||||
-rw-r--r-- | simulation/peer-server.go | 3 | ||||
-rw-r--r-- | simulation/simulation.go | 5 |
8 files changed, 81 insertions, 21 deletions
diff --git a/core/test/fake-transport.go b/core/test/fake-transport.go index 056c3d5..fe19fdf 100644 --- a/core/test/fake-transport.go +++ b/core/test/fake-transport.go @@ -30,6 +30,11 @@ type fakePeerRecord struct { pubKey crypto.PublicKey } +type fakeHandshake struct { + dMoment time.Time + peers map[types.NodeID]fakePeerRecord +} + // FakeTransport implement TransportServer and TransportClient interface // by using golang channel. type FakeTransport struct { @@ -39,6 +44,7 @@ type FakeTransport struct { recvChannel chan *TransportEnvelope serverChannel chan<- *TransportEnvelope peers map[types.NodeID]fakePeerRecord + dMoment time.Time } // NewFakeTransportServer constructs FakeTransport instance for peer server. @@ -137,9 +143,10 @@ func (t *FakeTransport) Join( envelopes = append(envelopes, envelope) continue } - if t.peers, ok = - envelope.Msg.(map[types.NodeID]fakePeerRecord); !ok { - + if handShake, ok := envelope.Msg.(fakeHandshake); ok { + t.dMoment = handShake.dMoment + t.peers = handShake.peers + } else { envelopes = append(envelopes, envelope) continue } @@ -151,11 +158,21 @@ func (t *FakeTransport) Join( return t.recvChannel, nil } +// DMoment implments TrnansportClient.DMoment method. +func (t *FakeTransport) DMoment() time.Time { + return t.dMoment +} + // Host implements TransportServer.Host method. func (t *FakeTransport) Host() (chan *TransportEnvelope, error) { return t.recvChannel, nil } +// SetDMoment implements TransportServer.SetDMoment method. +func (t *FakeTransport) SetDMoment(dMoment time.Time) { + t.dMoment = dMoment +} + // WaitForPeers implements TransportServer.WaitForPeers method. func (t *FakeTransport) WaitForPeers(numPeers uint32) (err error) { t.peers = make(map[types.NodeID]fakePeerRecord) @@ -177,7 +194,11 @@ func (t *FakeTransport) WaitForPeers(numPeers uint32) (err error) { for ID := range t.peers { peers[ID] = struct{}{} } - if err = t.Broadcast(peers, &FixedLatencyModel{}, t.peers); err != nil { + handShake := fakeHandshake{ + dMoment: t.dMoment, + peers: t.peers, + } + if err = t.Broadcast(peers, &FixedLatencyModel{}, handShake); err != nil { return } return diff --git a/core/test/interface.go b/core/test/interface.go index d9578de..000b835 100644 --- a/core/test/interface.go +++ b/core/test/interface.go @@ -18,6 +18,8 @@ package test import ( + "time" + "github.com/dexon-foundation/dexon-consensus/core/crypto" "github.com/dexon-foundation/dexon-consensus/core/db" "github.com/dexon-foundation/dexon-consensus/core/types" @@ -77,6 +79,9 @@ type TransportServer interface { Host() (chan *TransportEnvelope, error) // WaitForPeers waits for all peers to join the network. WaitForPeers(numPeers uint32) error + + // SetDMoment + SetDMoment(time.Time) } // TransportClient defines those peers in the network. @@ -86,6 +91,9 @@ type TransportClient interface { Report(msg interface{}) error // Join the network, should block until joined. Join(serverEndpoint interface{}) (<-chan *TransportEnvelope, error) + + // DMoment returns the DMoment of the network. + DMoment() time.Time } // Transport defines the interface for basic transportation capabilities. diff --git a/core/test/network.go b/core/test/network.go index 49342f0..c922b7b 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -140,6 +140,7 @@ type Network struct { ctx context.Context ctxCancel context.CancelFunc trans TransportClient + dMoment time.Time fromTransport <-chan *TransportEnvelope toConsensus chan interface{} toNode chan interface{} @@ -481,6 +482,11 @@ func (n *Network) Peers() []crypto.PublicKey { return n.trans.Peers() } +// DMoment exports 'DMoment' method of Transport. +func (n *Network) DMoment() time.Time { + return n.trans.DMoment() +} + // ReceiveChanForNode returns a channel for messages not handled by // core.Consensus. func (n *Network) ReceiveChanForNode() <-chan interface{} { diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go index 020b488..d32935b 100644 --- a/core/test/tcp-transport.go +++ b/core/test/tcp-transport.go @@ -45,6 +45,11 @@ const ( tcpThroughputReportNum = 10 ) +type tcpHandshake struct { + DMoment time.Time + Peers map[types.NodeID]string +} + type tcpPeerRecord struct { conn string sendChannel chan<- []byte @@ -112,6 +117,7 @@ type TCPTransport struct { marshaller Marshaller throughputRecords []ThroughputRecord throughputLock sync.Mutex + dMoment time.Time } // NewTCPTransport constructs an TCPTransport instance. @@ -297,8 +303,8 @@ func (t *TCPTransport) marshalMessage( Payload: msg, } switch msg.(type) { - case map[types.NodeID]string: - msgCarrier.Type = "peerlist" + case *tcpHandshake: + msgCarrier.Type = "tcp-handshake" case *tcpMessage: msgCarrier.Type = "trans-msg" case []ThroughputRecord: @@ -344,12 +350,12 @@ func (t *TCPTransport) unmarshalMessage( peerType = msgCarrier.PeerType from = msgCarrier.From switch msgCarrier.Type { - case "peerlist": - var peers map[types.NodeID]string - if err = json.Unmarshal(msgCarrier.Payload, &peers); err != nil { + case "tcp-handshake": + handshake := &tcpHandshake{} + if err = json.Unmarshal(msgCarrier.Payload, &handshake); err != nil { return } - msg = peers + msg = handshake case "trans-msg": m := &tcpMessage{} if err = json.Unmarshal(msgCarrier.Payload, m); err != nil { @@ -653,6 +659,7 @@ func (t *TCPTransportClient) Join( t.localPort = 1024 + rand.Int()%1024 } + fmt.Println("Connecting to server", "endpoint", serverEndpoint) serverConn, err := net.Dial("tcp", serverEndpoint.(string)) if err != nil { return @@ -681,12 +688,13 @@ func (t *TCPTransportClient) Join( } // Wait for peers list sent by server. e := <-t.recvChannel - peersInfo, ok := e.Msg.(map[types.NodeID]string) + handshake, ok := e.Msg.(*tcpHandshake) if !ok { - panic(fmt.Errorf("expect peer list, not %v", e)) + panic(fmt.Errorf("expect handshake, not %v", e)) } + t.dMoment = handshake.DMoment // Setup peers information. - for nID, info := range peersInfo { + for nID, info := range handshake.Peers { pubKey, conn := parsePeerInfo(info) t.peers[nID] = &tcpPeerRecord{ conn: conn, @@ -744,6 +752,11 @@ func (t *TCPTransportClient) Send( return } +// DMoment implments TransportClient. +func (t *TCPTransportClient) DMoment() time.Time { + return t.dMoment +} + // TCPTransportServer implements TransportServer via TCP connections. type TCPTransportServer struct { TCPTransport @@ -780,6 +793,11 @@ func (t *TCPTransportServer) Host() (chan *TransportEnvelope, error) { return t.recvChannel, nil } +// SetDMoment implements TransportServer.SetDMoment method. +func (t *TCPTransportServer) SetDMoment(dMoment time.Time) { + t.dMoment = dMoment +} + // WaitForPeers implements TransportServer.WaitForPeers method. func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) { // Collect peers info. Packets other than peer info is @@ -796,6 +814,7 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) { panic(fmt.Errorf("expect connection report, not %v", e)) } pubKey, conn := parsePeerInfo(msg.Info) + fmt.Println("Peer connected", "peer", conn) t.peers[msg.NodeID] = &tcpPeerRecord{ conn: conn, pubKey: pubKey, @@ -814,7 +833,11 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) { for ID := range t.peers { peers[ID] = struct{}{} } - if err = t.Broadcast(peers, &FixedLatencyModel{}, peersInfo); err != nil { + handshake := &tcpHandshake{ + DMoment: t.dMoment, + Peers: peersInfo, + } + if err = t.Broadcast(peers, &FixedLatencyModel{}, handshake); err != nil { return } // Wait for peers to send 'ready' report. diff --git a/simulation/kubernetes/config.toml.in b/simulation/kubernetes/config.toml.in index af76097..4828903 100644 --- a/simulation/kubernetes/config.toml.in +++ b/simulation/kubernetes/config.toml.in @@ -30,8 +30,8 @@ mean = 3e+02 sigma = 5e+01 [networking.gossip] -mean = 3e+02 -sigma = 2.5e+01 +mean = 6e+02 +sigma = 1e+02 [scheduler] worker_num = 2 diff --git a/simulation/node.go b/simulation/node.go index 026db66..517253d 100644 --- a/simulation/node.go +++ b/simulation/node.go @@ -115,13 +115,15 @@ func (n *node) GetID() types.NodeID { // run starts the node. func (n *node) run( - serverEndpoint interface{}, dMoment time.Time) { + serverEndpoint interface{}) { // Run network. if err := n.netModule.Setup(serverEndpoint); err != nil { panic(err) } msgChannel := n.netModule.ReceiveChanForNode() peers := n.netModule.Peers() + dMoment := n.netModule.DMoment() + n.logger.Info("Simulation DMoment", "dMoment", dMoment) go n.netModule.Run() // Run consensus. hashes := make(common.Hashes, 0, len(peers)) diff --git a/simulation/peer-server.go b/simulation/peer-server.go index 69ed029..11785f4 100644 --- a/simulation/peer-server.go +++ b/simulation/peer-server.go @@ -175,15 +175,18 @@ func (p *PeerServer) mainLoop() { // Setup prepares simualtion. func (p *PeerServer) Setup( cfg *config.Config) (serverEndpoint interface{}, err error) { + dMoment := time.Now().UTC() // Setup transport layer. switch cfg.Networking.Type { case "tcp", "tcp-local": p.trans = test.NewTCPTransportServer(&jsonMarshaller{}, peerPort) + dMoment = dMoment.Add(5 * time.Second) case "fake": p.trans = test.NewFakeTransportServer() default: panic(fmt.Errorf("unknown network type: %v", cfg.Networking.Type)) } + p.trans.SetDMoment(dMoment) p.msgChannel, err = p.trans.Host() if err != nil { return diff --git a/simulation/simulation.go b/simulation/simulation.go index 938b302..c74bb91 100644 --- a/simulation/simulation.go +++ b/simulation/simulation.go @@ -22,7 +22,6 @@ import ( "io" "os" "sync" - "time" "github.com/dexon-foundation/dexon/log" @@ -49,8 +48,6 @@ func Run(cfg *config.Config, logPrefix string) { panic(fmt.Errorf("DKGSetSze should not be larger the node num")) } - dMoment := time.Now().UTC() - newLogger := func(logPrefix string) common.Logger { mw := io.Writer(os.Stderr) if logPrefix != "" { @@ -75,7 +72,7 @@ func Run(cfg *config.Config, logPrefix string) { wg.Add(1) go func() { defer wg.Done() - v.run(serverEndpoint, dMoment) + v.run(serverEndpoint) }() } |