From 292ad73ec08621fa9beef5f028860131fcbf9bd9 Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Tue, 11 Sep 2018 14:56:47 +0800 Subject: simulation: integrate test.Transport (#99) - Add marshaller for simulation by encoding/json - Implement peer server based on test.TranportServer - Remove network models, they are replaced with test.LatencyModel --- core/consensus.go | 1 - core/test/tcp-transport.go | 122 +++++++++++++++++++++++++++++++++------------ core/test/utils.go | 25 ++++++++++ 3 files changed, 115 insertions(+), 33 deletions(-) (limited to 'core') diff --git a/core/consensus.go b/core/consensus.go index a9ec6a5..e021024 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -346,7 +346,6 @@ func (con *Consensus) processMsg( if err := blockProcesser(val); err != nil { fmt.Println(err) } - //types.RecycleBlock(val) case *types.NotaryAck: if err := con.ProcessNotaryAck(val); err != nil { fmt.Println(err) diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go index 673a5a1..2afea14 100644 --- a/core/test/tcp-transport.go +++ b/core/test/tcp-transport.go @@ -35,10 +35,11 @@ import ( "github.com/dexon-foundation/dexon-consensus-core/core/types" ) -// peerInfo describe connection info of one peer in TCP transport. -type peerInfo struct { +// tcpMessage is the general message between peers and server. +type tcpMessage struct { ValidatorID types.ValidatorID `json:"vid"` - Conn string `json:"conn"` + Type string `json:"type"` + Info string `json:"conn"` } // TCPTransport implements Transport interface via TCP connection. @@ -162,8 +163,8 @@ func (t *TCPTransport) marshalMessage( switch msg.(type) { case map[types.ValidatorID]string: msgCarrier.Type = "peerlist" - case *peerInfo: - msgCarrier.Type = "peer" + case *tcpMessage: + msgCarrier.Type = "trans-msg" default: if t.marshaller == nil { err = fmt.Errorf("unknown msg type: %v", msg) @@ -209,12 +210,12 @@ func (t *TCPTransport) unmarshalMessage( return } msg = peers - case "peer": - peer := &peerInfo{} - if err = json.Unmarshal(msgCarrier.Payload, peer); err != nil { + case "trans-msg": + m := &tcpMessage{} + if err = json.Unmarshal(msgCarrier.Payload, m); err != nil { return } - msg = peer + msg = m default: if t.marshaller == nil { err = fmt.Errorf("unknown msg type: %v", msgCarrier.Type) @@ -443,6 +444,7 @@ func (t *TCPTransportClient) Join( envelopes = []*TransportEnvelope{} ok bool addr string + conn string ) for { addr = net.JoinHostPort("0.0.0.0", strconv.Itoa(t.localPort)) @@ -479,27 +481,57 @@ func (t *TCPTransportClient) Join( return } t.serverWriteChannel = t.connWriter(serverConn) - err = t.Report(&peerInfo{ + if t.local { + conn = addr + } else { + // Find my IP. + var ip string + if ip, err = FindMyIP(); err != nil { + return + } + conn = net.JoinHostPort(ip, strconv.Itoa(t.localPort)) + } + if err = t.Report(&tcpMessage{ + Type: "conn", ValidatorID: t.vID, - Conn: addr, - }) + Info: conn, + }); err != nil { + return + } // Wait for peers list sent by server. + e := <-t.recvChannel + if t.peersInfo, ok = e.Msg.(map[types.ValidatorID]string); !ok { + panic(fmt.Errorf("expect peer list, not %v", e)) + } + // Setup connections to other peers. + if err = t.buildConnectionsToPeers(); err != nil { + return + } + // Report to server that the connections to other peers are ready. + if err = t.Report(&tcpMessage{ + Type: "conn-ready", + ValidatorID: t.vID, + }); err != nil { + return + } + // Wait for server to ack us that all peers are ready. for { e := <-t.recvChannel - if t.peersInfo, ok = e.Msg.(map[types.ValidatorID]string); !ok { + msg, ok := e.Msg.(*tcpMessage) + if !ok { envelopes = append(envelopes, e) continue } - // Replay those messages sent before peer list. - if len(envelopes) > cap(t.recvChannel)-len(t.recvChannel) { - panic(fmt.Errorf("unable to replay pending messages")) - } - for _, e := range envelopes { - t.recvChannel <- e + if msg.Type != "all-ready" { + err = fmt.Errorf("expected ready message, but %v", msg) + return } break } - t.buildConnectionsToPeers() + // Replay those messages sent before peer list and ready-ack. + for _, e := range envelopes { + t.recvChannel <- e + } ch = t.recvChannel return } @@ -545,24 +577,50 @@ func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) { // Collect peers info. Packets other than peer info is // unexpected. for { - select { - case <-t.ctx.Done(): - err = fmt.Errorf("cancel waiting") - return - case e := <-t.recvChannel: - peer, ok := e.Msg.(*peerInfo) - if !ok { - panic(fmt.Errorf("expect peerInfo, not %v", e)) - } - t.peersInfo[peer.ValidatorID] = peer.Conn + // Wait for connection info reported by peers. + e := <-t.recvChannel + msg, ok := e.Msg.(*tcpMessage) + if !ok { + panic(fmt.Errorf("expect tcpMessage, not %v", e)) } + if msg.Type != "conn" { + panic(fmt.Errorf("expect connection report, not %v", e)) + } + t.peersInfo[msg.ValidatorID] = msg.Info // Check if we already collect enought peers. if len(t.peersInfo) == numPeers { break } } // Send collected peers back to them. - t.buildConnectionsToPeers() - t.Broadcast(t.peersInfo) + if err = t.buildConnectionsToPeers(); err != nil { + return + } + if err = t.Broadcast(t.peersInfo); err != nil { + return + } + // Wait for peers to send 'ready' report. + readies := make(map[types.ValidatorID]struct{}) + for { + e := <-t.recvChannel + msg, ok := e.Msg.(*tcpMessage) + if !ok { + panic(fmt.Errorf("expect tcpMessage, not %v", e)) + } + if msg.Type != "conn-ready" { + panic(fmt.Errorf("expect connection ready, not %v", e)) + } + if _, reported := readies[msg.ValidatorID]; reported { + panic(fmt.Errorf("already report conn-ready message: %v", e)) + } + readies[msg.ValidatorID] = struct{}{} + if len(readies) == numPeers { + break + } + } + // Ack all peers ready to go. + if err = t.Broadcast(&tcpMessage{Type: "all-ready"}); err != nil { + return + } return } diff --git a/core/test/utils.go b/core/test/utils.go index 5f92ad9..138e8a1 100644 --- a/core/test/utils.go +++ b/core/test/utils.go @@ -18,7 +18,9 @@ package test import ( + "fmt" "math" + "net" "time" "github.com/dexon-foundation/dexon-consensus-core/common" @@ -63,3 +65,26 @@ func CalcLatencyStatistics(latencies []time.Duration) (avg, dev time.Duration) { dev = time.Duration(math.Sqrt(sumOfSquareDiff / float64(len(latencies)-1))) return } + +// FindMyIP returns local IP address. +func FindMyIP() (ip string, err error) { + addrs, err := net.InterfaceAddrs() + if err != nil { + return + } + for _, a := range addrs { + ipnet, ok := a.(*net.IPNet) + if !ok { + continue + } + if ipnet.IP.IsLoopback() { + continue + } + if ipnet.IP.To4() != nil { + ip = ipnet.IP.String() + return + } + } + err = fmt.Errorf("unable to find IP") + return +} -- cgit v1.2.3