// Copyright 2018 The dexon-consensus Authors // This file is part of the dexon-consensus library. // // The dexon-consensus library is free software: you can redistribute it // and/or modify it under the terms of the GNU Lesser General Public License as // published by the Free Software Foundation, either version 3 of the License, // or (at your option) any later version. // // The dexon-consensus library is distributed in the hope that it will be // useful, but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser // General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the dexon-consensus library. If not, see // . package test import ( "context" "encoding/base64" "encoding/binary" "encoding/json" "fmt" "io" "math" "math/rand" "net" "os" "strconv" "strings" "sync" "syscall" "time" "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/crypto" "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa" "github.com/dexon-foundation/dexon-consensus/core/types" "github.com/dexon-foundation/dexon-consensus/core/types/dkg" ) const ( tcpThroughputReportNum = 10 ) type tcpHandshake struct { DMoment time.Time Peers map[types.NodeID]string } type tcpPeerRecord struct { conn string sendChannel chan<- []byte pubKey crypto.PublicKey } // tcpMessage is the general message between peers and server. type tcpMessage struct { NodeID types.NodeID `json:"nid"` Type string `json:"type"` Info string `json:"conn"` } // BlockEventMessage is for monitoring block events' time. type BlockEventMessage struct { BlockHash common.Hash `json:"hash"` Timestamps []time.Time `json:"timestamps"` } // buildPeerInfo is a tricky way to combine connection string and // base64 encoded byte slice for public key into a single string, // separated by ';'. func buildPeerInfo(pubKey crypto.PublicKey, conn string) string { return conn + ";" + base64.StdEncoding.EncodeToString(pubKey.Bytes()) } // parsePeerInfo parse connection string and base64 encoded public key built // via buildPeerInfo. func parsePeerInfo(info string) (key crypto.PublicKey, conn string) { tokens := strings.Split(info, ";") conn = tokens[0] data, err := base64.StdEncoding.DecodeString(tokens[1]) if err != nil { panic(err) } key, err = ecdsa.NewPublicKeyFromByteSlice(data) if err != nil { panic(err) } return } var ( // ErrTCPHandShakeFail is reported if the tcp handshake fails. ErrTCPHandShakeFail = fmt.Errorf("tcp handshake fail") // ErrConnectToUnexpectedPeer is reported if connect to unexpected peer. ErrConnectToUnexpectedPeer = fmt.Errorf("connect to unexpected peer") // ErrMessageOverflow is reported if the message is too long. ErrMessageOverflow = fmt.Errorf("message size overflow") ) // TCPTransport implements Transport interface via TCP connection. type TCPTransport struct { peerType TransportPeerType nID types.NodeID pubKey crypto.PublicKey localPort int peers map[types.NodeID]*tcpPeerRecord peersLock sync.RWMutex recvChannel chan *TransportEnvelope ctx context.Context cancel context.CancelFunc marshaller Marshaller throughputRecords []ThroughputRecord throughputLock sync.Mutex dMoment time.Time } // NewTCPTransport constructs an TCPTransport instance. func NewTCPTransport(peerType TransportPeerType, pubKey crypto.PublicKey, marshaller Marshaller, localPort int) *TCPTransport { ctx, cancel := context.WithCancel(context.Background()) return &TCPTransport{ peerType: peerType, nID: types.NewNodeID(pubKey), pubKey: pubKey, peers: make(map[types.NodeID]*tcpPeerRecord), recvChannel: make(chan *TransportEnvelope, 1000), ctx: ctx, cancel: cancel, localPort: localPort, marshaller: marshaller, throughputRecords: []ThroughputRecord{}, } } const handshakeMsg = "Welcome to DEXON network for test." func (t *TCPTransport) serverHandshake(conn net.Conn) ( nID types.NodeID, err error) { if err := conn.SetDeadline(time.Now().Add(3 * time.Second)); err != nil { panic(err) } msg := &tcpMessage{ NodeID: t.nID, Type: "handshake", Info: handshakeMsg, } var payload []byte payload, err = json.Marshal(msg) if err != nil { return } if err = t.write(conn, payload); err != nil { return } if payload, err = t.read(conn); err != nil { return } if err = json.Unmarshal(payload, &msg); err != nil { return } if msg.Type != "handshake-ack" || msg.Info != handshakeMsg { err = ErrTCPHandShakeFail return } nID = msg.NodeID return } func (t *TCPTransport) clientHandshake(conn net.Conn) ( nID types.NodeID, err error) { if err := conn.SetDeadline(time.Now().Add(3 * time.Second)); err != nil { panic(err) } var payload []byte if payload, err = t.read(conn); err != nil { return } msg := &tcpMessage{} if err = json.Unmarshal(payload, &msg); err != nil { return } if msg.Type != "handshake" || msg.Info != handshakeMsg { err = ErrTCPHandShakeFail return } nID = msg.NodeID msg = &tcpMessage{ NodeID: t.nID, Type: "handshake-ack", Info: handshakeMsg, } payload, err = json.Marshal(msg) if err != nil { return } if err = t.write(conn, payload); err != nil { return } return } // Disconnect implements Transport.Disconnect method. func (t *TCPTransport) Disconnect(endpoint types.NodeID) { delete(t.peers, endpoint) } func (t *TCPTransport) send( endpoint types.NodeID, msg interface{}, payload []byte) { t.peersLock.RLock() defer t.peersLock.RUnlock() t.handleThroughputData(msg, payload) t.peers[endpoint].sendChannel <- payload } // Send implements Transport.Send method. func (t *TCPTransport) Send( endpoint types.NodeID, msg interface{}) (err error) { if _, exist := t.peers[endpoint]; !exist { return fmt.Errorf("the endpoint does not exists: %v", endpoint) } payload, err := t.marshalMessage(msg) if err != nil { return } go t.send(endpoint, msg, payload) return } // Broadcast implements Transport.Broadcast method. func (t *TCPTransport) Broadcast(endpoints map[types.NodeID]struct{}, latency LatencyModel, msg interface{}) (err error) { payload, err := t.marshalMessage(msg) if err != nil { return } for nID := range endpoints { if nID == t.nID { continue } go func(ID types.NodeID) { time.Sleep(latency.Delay()) t.send(ID, msg, payload) }(nID) } return } // Close implements Transport.Close method. func (t *TCPTransport) Close() (err error) { // Tell all routines raised by us to die. t.cancel() // Reset peers. t.peersLock.Lock() defer t.peersLock.Unlock() t.peers = make(map[types.NodeID]*tcpPeerRecord) // Tell our user that this channel is closed. close(t.recvChannel) t.recvChannel = nil return } // Peers implements Transport.Peers method. func (t *TCPTransport) Peers() (peers []crypto.PublicKey) { for _, rec := range t.peers { peers = append(peers, rec.pubKey) } return } func (t *TCPTransport) write(conn net.Conn, b []byte) (err error) { if len(b) > math.MaxUint32 { return ErrMessageOverflow } msgLength := make([]byte, 4) binary.LittleEndian.PutUint32(msgLength, uint32(len(b))) if _, err = conn.Write(msgLength); err != nil { return } if _, err = conn.Write(b); err != nil { return } return } func (t *TCPTransport) read(conn net.Conn) (b []byte, err error) { msgLength := make([]byte, 4) if _, err = io.ReadFull(conn, msgLength); err != nil { return } b = make([]byte, int(binary.LittleEndian.Uint32(msgLength))) if _, err = io.ReadFull(conn, b); err != nil { return } return } func (t *TCPTransport) marshalMessage( msg interface{}) (payload []byte, err error) { msgCarrier := struct { PeerType TransportPeerType `json:"peer_type"` From types.NodeID `json:"from"` Type string `json:"type"` Payload interface{} `json:"payload"` }{ PeerType: t.peerType, From: t.nID, Payload: msg, } switch msg.(type) { case *tcpHandshake: msgCarrier.Type = "tcp-handshake" case *tcpMessage: msgCarrier.Type = "trans-msg" case []ThroughputRecord: msgCarrier.Type = "throughput-record" case *BlockEventMessage: msgCarrier.Type = "block-event" default: if t.marshaller == nil { err = fmt.Errorf("unknown msg type: %v", msg) break } // Delegate to user defined marshaller. var buff []byte msgCarrier.Type, buff, err = t.marshaller.Marshal(msg) if err != nil { break } msgCarrier.Payload = json.RawMessage(buff) } if err != nil { return } payload, err = json.Marshal(msgCarrier) return } func (t *TCPTransport) unmarshalMessage( payload []byte) ( peerType TransportPeerType, from types.NodeID, msg interface{}, err error) { msgCarrier := struct { PeerType TransportPeerType `json:"peer_type"` From types.NodeID `json:"from"` Type string `json:"type"` Payload json.RawMessage `json:"payload"` }{} if err = json.Unmarshal(payload, &msgCarrier); err != nil { return } peerType = msgCarrier.PeerType from = msgCarrier.From switch msgCarrier.Type { case "tcp-handshake": handshake := &tcpHandshake{} if err = json.Unmarshal(msgCarrier.Payload, &handshake); err != nil { return } msg = handshake case "trans-msg": m := &tcpMessage{} if err = json.Unmarshal(msgCarrier.Payload, m); err != nil { return } msg = m case "throughput-record": m := &[]ThroughputRecord{} if err = json.Unmarshal(msgCarrier.Payload, m); err != nil { return } msg = m case "block-event": m := &BlockEventMessage{} if err = json.Unmarshal(msgCarrier.Payload, m); err != nil { return } msg = m default: if t.marshaller == nil { err = fmt.Errorf("unknown msg type: %v", msgCarrier.Type) break } msg, err = t.marshaller.Unmarshal(msgCarrier.Type, msgCarrier.Payload) } return } // connReader is a reader routine to read from a TCP connection. func (t *TCPTransport) connReader(conn net.Conn) { defer func() { if err := conn.Close(); err != nil { panic(err) } }() var ( err error payload []byte ) checkErr := func(err error) (toBreak bool) { if err == io.EOF { toBreak = true return } // Check if timeout. nErr, ok := err.(*net.OpError) if !ok { panic(err) } if !nErr.Timeout() { panic(err) } return } Loop: for { select { case <-t.ctx.Done(): break Loop default: } // Add timeout when reading to check if shutdown. if err := conn.SetReadDeadline( time.Now().Add(2 * time.Second)); err != nil { panic(err) } // Read message length. if payload, err = t.read(conn); err != nil { if checkErr(err) { break } continue } peerType, from, msg, err := t.unmarshalMessage(payload) if err != nil { panic(err) } t.recvChannel <- &TransportEnvelope{ PeerType: peerType, From: from, Msg: msg, } } } // connWriter is a writer routine to write to TCP connection. func (t *TCPTransport) connWriter(conn net.Conn) chan<- []byte { // Disable write deadline. if err := conn.SetWriteDeadline(time.Time{}); err != nil { panic(err) } ch := make(chan []byte, 1000) go func() { defer func() { close(ch) if err := conn.Close(); err != nil { panic(err) } }() for { select { case <-t.ctx.Done(): return default: } select { case <-t.ctx.Done(): return case msg := <-ch: // Send message length in uint32. if err := t.write(conn, msg); err != nil { panic(err) } } } }() return ch } // listenerRoutine is a routine to accept incoming request for TCP connection. func (t *TCPTransport) listenerRoutine(listener *net.TCPListener) { closed := false defer func() { if closed { return } if err := listener.Close(); err != nil { panic(err) } }() for { select { case <-t.ctx.Done(): return default: } if err := listener.SetDeadline(time.Now().Add(5 * time.Second)); err != nil { panic(err) } conn, err := listener.Accept() if err != nil { // Check if the connection is closed. if strings.Contains(err.Error(), "use of closed network connection") { closed = true return } // Check if timeout error. nErr, ok := err.(*net.OpError) if !ok { panic(err) } if !nErr.Timeout() { panic(err) } continue } if _, err := t.serverHandshake(conn); err != nil { fmt.Println(err) continue } go t.connReader(conn) } } // buildConnectionToPeers constructs TCP connections to each peer. // Although TCP connection could be used for both read/write operation, // we only utilize the write part for simplicity. func (t *TCPTransport) buildConnectionsToPeers() (err error) { var wg sync.WaitGroup var errs []error var errsLock sync.Mutex addErr := func(err error) { errsLock.Lock() defer errsLock.Unlock() errs = append(errs, err) } for nID, rec := range t.peers { if nID == t.nID { continue } wg.Add(1) go func(nID types.NodeID, addr string) { defer wg.Done() conn, localErr := net.Dial("tcp", addr) if localErr != nil { addErr(localErr) return } serverID, localErr := t.clientHandshake(conn) if localErr != nil { addErr(localErr) return } if nID != serverID { addErr(ErrConnectToUnexpectedPeer) return } t.peersLock.Lock() defer t.peersLock.Unlock() t.peers[nID].sendChannel = t.connWriter(conn) }(nID, rec.conn) } wg.Wait() if len(errs) > 0 { // Propagate this error to outside, at least one error // could be returned to caller. err = errs[0] } return } // ThroughputRecord records the network throughput data. type ThroughputRecord struct { Type string `json:"type"` Size int `json:"size"` Time time.Time `json:"time"` } // TCPTransportClient implement TransportClient base on TCP connection. type TCPTransportClient struct { TCPTransport local bool serverWriteChannel chan<- []byte } // NewTCPTransportClient constructs a TCPTransportClient instance. func NewTCPTransportClient( pubKey crypto.PublicKey, marshaller Marshaller, local bool) *TCPTransportClient { return &TCPTransportClient{ TCPTransport: *NewTCPTransport(TransportPeer, pubKey, marshaller, 8080), local: local, } } // Report implements TransportClient.Report method. func (t *TCPTransportClient) Report(msg interface{}) (err error) { payload, err := t.marshalMessage(msg) if err != nil { return } go func() { t.serverWriteChannel <- payload }() return } // Join implements TransportClient.Join method. func (t *TCPTransportClient) Join( serverEndpoint interface{}) (ch <-chan *TransportEnvelope, err error) { // Initiate a TCP server. // TODO(mission): config initial listening port. var ( ln net.Listener envelopes = []*TransportEnvelope{} ok bool addr string conn string ) for { addr = net.JoinHostPort("0.0.0.0", strconv.Itoa(t.localPort)) ln, err = net.Listen("tcp", addr) if err == nil { go t.listenerRoutine(ln.(*net.TCPListener)) // It is possible to listen on the same port in some platform. // Check if this one is actually listening. testConn, e := net.Dial("tcp", addr) if e != nil { err = e return } nID, e := t.clientHandshake(testConn) if e != nil { err = e return } if nID == t.nID { break } // #nosec G104 ln.Close() } if err != nil { if !t.local { return } // In local-tcp, retry with other port when the address is in use. operr, ok := err.(*net.OpError) if !ok { panic(err) } oserr, ok := operr.Err.(*os.SyscallError) if !ok { panic(operr) } errno, ok := oserr.Err.(syscall.Errno) if !ok { panic(oserr) } if errno != syscall.EADDRINUSE { panic(errno) } } // The port is used, generate another port randomly. t.localPort = 1024 + rand.Int()%1024 // #nosec G404 } fmt.Println("Connecting to server", "endpoint", serverEndpoint) serverConn, err := net.Dial("tcp", serverEndpoint.(string)) if err != nil { return } _, err = t.clientHandshake(serverConn) if err != nil { return } t.serverWriteChannel = t.connWriter(serverConn) if t.local { conn = addr } else { // Find my IP. var ip string if ip, err = FindMyIP(); err != nil { return } conn = net.JoinHostPort(ip, strconv.Itoa(t.localPort)) } if err = t.Report(&tcpMessage{ NodeID: t.nID, Type: "conn", Info: buildPeerInfo(t.pubKey, conn), }); err != nil { return } // Wait for peers list sent by server. e := <-t.recvChannel handshake, ok := e.Msg.(*tcpHandshake) if !ok { panic(fmt.Errorf("expect handshake, not %v", e)) } t.dMoment = handshake.DMoment // Setup peers information. for nID, info := range handshake.Peers { pubKey, conn := parsePeerInfo(info) t.peers[nID] = &tcpPeerRecord{ conn: conn, pubKey: pubKey, } } // Setup connections to other peers. if err = t.buildConnectionsToPeers(); err != nil { return } // Report to server that the connections to other peers are ready. if err = t.Report(&tcpMessage{ Type: "conn-ready", NodeID: t.nID, }); err != nil { return } // Wait for server to ack us that all peers are ready. for { e := <-t.recvChannel msg, ok := e.Msg.(*tcpMessage) if !ok { envelopes = append(envelopes, e) continue } if msg.Type != "all-ready" { err = fmt.Errorf("expected ready message, but %v", msg) return } break } // Replay those messages sent before peer list and ready-ack. for _, e := range envelopes { t.recvChannel <- e } ch = t.recvChannel return } // Send calls TCPTransport's Send, and send the throughput data to peer server. func (t *TCPTransportClient) Send( endpoint types.NodeID, msg interface{}) (err error) { if err := t.TCPTransport.Send(endpoint, msg); err != nil { return err } if len(t.throughputRecords) > tcpThroughputReportNum { t.throughputLock.Lock() defer t.throughputLock.Unlock() if err := t.Report(t.throughputRecords); err != nil { panic(err) } t.throughputRecords = t.throughputRecords[:0] } return } // DMoment implments TransportClient. func (t *TCPTransportClient) DMoment() time.Time { return t.dMoment } // TCPTransportServer implements TransportServer via TCP connections. type TCPTransportServer struct { TCPTransport } // NewTCPTransportServer constructs TCPTransportServer instance. func NewTCPTransportServer( marshaller Marshaller, serverPort int) *TCPTransportServer { prvKey, err := ecdsa.NewPrivateKey() if err != nil { panic(err) } return &TCPTransportServer{ // NOTE: the assumption here is the node ID of peers // won't be zero. TCPTransport: *NewTCPTransport( TransportPeerServer, prvKey.PublicKey(), marshaller, serverPort), } } // Host implements TransportServer.Host method. func (t *TCPTransportServer) Host() (chan *TransportEnvelope, error) { // The port of peer server should be known to other peers, // if we can listen on the pre-defiend part, we don't have to // retry with other random ports. ln, err := net.Listen( "tcp", net.JoinHostPort("0.0.0.0", strconv.Itoa(t.localPort))) if err != nil { return nil, err } go t.listenerRoutine(ln.(*net.TCPListener)) return t.recvChannel, nil } // SetDMoment implements TransportServer.SetDMoment method. func (t *TCPTransportServer) SetDMoment(dMoment time.Time) { t.dMoment = dMoment } // WaitForPeers implements TransportServer.WaitForPeers method. func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) { // Collect peers info. Packets other than peer info is // unexpected. peersInfo := make(map[types.NodeID]string) for { // Wait for connection info reported by peers. e := <-t.recvChannel msg, ok := e.Msg.(*tcpMessage) if !ok { panic(fmt.Errorf("expect tcpMessage, not %v", e)) } if msg.Type != "conn" { panic(fmt.Errorf("expect connection report, not %v", e)) } pubKey, conn := parsePeerInfo(msg.Info) fmt.Println("Peer connected", "peer", conn) t.peers[msg.NodeID] = &tcpPeerRecord{ conn: conn, pubKey: pubKey, } peersInfo[msg.NodeID] = msg.Info // Check if we already collect enought peers. if uint32(len(peersInfo)) == numPeers { break } } // Send collected peers back to them. if err = t.buildConnectionsToPeers(); err != nil { return } peers := make(map[types.NodeID]struct{}) for ID := range t.peers { peers[ID] = struct{}{} } handshake := &tcpHandshake{ DMoment: t.dMoment, Peers: peersInfo, } if err = t.Broadcast(peers, &FixedLatencyModel{}, handshake); err != nil { return } // Wait for peers to send 'ready' report. readies := make(map[types.NodeID]struct{}) for { e := <-t.recvChannel msg, ok := e.Msg.(*tcpMessage) if !ok { panic(fmt.Errorf("expect tcpMessage, not %v", e)) } if msg.Type != "conn-ready" { panic(fmt.Errorf("expect connection ready, not %v", e)) } if _, reported := readies[msg.NodeID]; reported { panic(fmt.Errorf("already report conn-ready message: %v", e)) } readies[msg.NodeID] = struct{}{} if uint32(len(readies)) == numPeers { break } } // Ack all peers ready to go. if err = t.Broadcast(peers, &FixedLatencyModel{}, &tcpMessage{Type: "all-ready"}); err != nil { return } return } func (t *TCPTransport) handleThroughputData(msg interface{}, payload []byte) { sentTime := time.Now() t.throughputLock.Lock() defer t.throughputLock.Unlock() recordType := "" switch msg.(type) { case *types.Vote: recordType = "vote" case *types.Block: recordType = "block" case *types.AgreementResult: recordType = "agreement_result" case *dkg.PartialSignature: recordType = "partial_sig" } if len(recordType) > 0 { t.throughputRecords = append(t.throughputRecords, ThroughputRecord{ Type: recordType, Time: sentTime, Size: len(payload), }) } }