aboutsummaryrefslogblamecommitdiffstats
path: root/core/test/tcp-transport.go
blob: bbc4d56d4200e1dd9890a6e4d31e323eec4a4fd1 (plain) (tree)
1
2
3
4
5
6
7
8
9

                                                    
  
                                                                        
                                                                               


                                                                              
                                                                         


                                                                          

                                                                           
                                                      





                                  
                         








                         
                 



                 
                                                            


                                                                       




                                                                    

 




                                       





                                    

                                                              


                                         

 





                                                          















                                                                             



                                                        


              










                                                                                

                                                                  








                                                         


                                            
                                   


                                                       

                                                                         

                                                               







                                                                         

                                                        


         



                                                         


                                                                                 




























                                                                    


                                                                                 



























                                                                







                                                                 

                                         
                                                             




                                             
                                         



                                                   

                                                                     



                                             
                                    
                                 

                                



                                                   










                                                 
                                                       






                                                     


                                                           



              


























                                                                    




                                                             
                                                        



                                                           
                                


                              

                                                 

                                             



                                                     






















                                                                      
                          




                                                             
                                                        








                                                                   


                                                                                     

                              
                               


                                                                            

                              
                       











                                                                            


















                                                                                      

                              






























                                                                      
                                                            




                                          
                                                                       












                                                                




                                                                  


















                                                                 
                                                                          









                                                                              
                       
                      


                              










                                                        


                                                                                             

                                              




                                                                                              









                                                      



                                                                  








                                                                       






                                        
                                       
                                 


                                
                                                        
                                       

                                                               
                                                

                                      


                                                                     


                                            
                                                                  

                                      

                                                  

                                                                     

                 




                                                                      


              






                                                        








                                                                       
                                



                                         

                                                                                        

















                                                                               






                                                        
                                

             
                                                                             

                                                 















                                                                                      
                                      
                                  
                 



















                                                                                          

                                                                    
                                                                   
         
 
                                                                       



                                                                   



                                              
                                                       










                                                                      
                              

                                                      


                        
                                              
                            
                                              
                
                                                                
         
                                     
                                   
                                                





                                                   





                                                                          

                                     



                                                              

                                    

                                              


                                                        


                                                                               


                     



                                                                     



                          

















                                                                               




                                                  









                                                                     



                                            
                                   
                                                                    

                                               
                                                                                         








                                                                      
                                                                              






                                                   




                                                            
                                                               
                                                                        

                                                              
                                                  
             




                                                                         
                 


                                                                                
                                                       
                                                           




                                                     
                                                             
                                                       



                                             


                                                          



                                                




                                                                                  


                                                 
                                                  








                                                                               
                                                                 

                                                                                     
                                                
                                                     



                                     

                                                             

                      

              














                                                                              








                                                                                   
// Copyright 2018 The dexon-consensus Authors
// This file is part of the dexon-consensus library.
//
// The dexon-consensus library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus library. If not, see
// <http://www.gnu.org/licenses/>.

package test

import (
    "context"
    "encoding/base64"
    "encoding/binary"
    "encoding/json"
    "fmt"
    "io"
    "math"
    "math/rand"
    "net"
    "os"
    "strconv"
    "strings"
    "sync"
    "syscall"
    "time"

    "github.com/dexon-foundation/dexon-consensus/common"
    "github.com/dexon-foundation/dexon-consensus/core/crypto"
    "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa"
    "github.com/dexon-foundation/dexon-consensus/core/types"
    "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
)

const (
    tcpThroughputReportNum = 10
)

type tcpHandshake struct {
    DMoment time.Time
    Peers   map[types.NodeID]string
}

type tcpPeerRecord struct {
    conn        string
    sendChannel chan<- []byte
    pubKey      crypto.PublicKey
}

// tcpMessage is the general message between peers and server.
type tcpMessage struct {
    NodeID types.NodeID `json:"nid"`
    Type   string       `json:"type"`
    Info   string       `json:"conn"`
}

// BlockEventMessage is for monitoring block events' time.
type BlockEventMessage struct {
    BlockHash  common.Hash `json:"hash"`
    Timestamps []time.Time `json:"timestamps"`
}

// buildPeerInfo is a tricky way to combine connection string and
// base64 encoded byte slice for public key into a single string,
// separated by ';'.
func buildPeerInfo(pubKey crypto.PublicKey, conn string) string {
    return conn + ";" + base64.StdEncoding.EncodeToString(pubKey.Bytes())
}

// parsePeerInfo parse connection string and base64 encoded public key built
// via buildPeerInfo.
func parsePeerInfo(info string) (key crypto.PublicKey, conn string) {
    tokens := strings.Split(info, ";")
    conn = tokens[0]
    data, err := base64.StdEncoding.DecodeString(tokens[1])
    if err != nil {
        panic(err)
    }
    key, err = ecdsa.NewPublicKeyFromByteSlice(data)
    if err != nil {
        panic(err)
    }
    return
}

var (
    // ErrTCPHandShakeFail is reported if the tcp handshake fails.
    ErrTCPHandShakeFail = fmt.Errorf("tcp handshake fail")

    // ErrConnectToUnexpectedPeer is reported if connect to unexpected peer.
    ErrConnectToUnexpectedPeer = fmt.Errorf("connect to unexpected peer")

    // ErrMessageOverflow is reported if the message is too long.
    ErrMessageOverflow = fmt.Errorf("message size overflow")
)

// TCPTransport implements Transport interface via TCP connection.
type TCPTransport struct {
    peerType          TransportPeerType
    nID               types.NodeID
    pubKey            crypto.PublicKey
    localPort         int
    peers             map[types.NodeID]*tcpPeerRecord
    peersLock         sync.RWMutex
    recvChannel       chan *TransportEnvelope
    ctx               context.Context
    cancel            context.CancelFunc
    marshaller        Marshaller
    throughputRecords []ThroughputRecord
    throughputLock    sync.Mutex
    dMoment           time.Time
}

// NewTCPTransport constructs an TCPTransport instance.
func NewTCPTransport(peerType TransportPeerType, pubKey crypto.PublicKey,
    marshaller Marshaller, localPort int) *TCPTransport {
    ctx, cancel := context.WithCancel(context.Background())
    return &TCPTransport{
        peerType:          peerType,
        nID:               types.NewNodeID(pubKey),
        pubKey:            pubKey,
        peers:             make(map[types.NodeID]*tcpPeerRecord),
        recvChannel:       make(chan *TransportEnvelope, 1000),
        ctx:               ctx,
        cancel:            cancel,
        localPort:         localPort,
        marshaller:        marshaller,
        throughputRecords: []ThroughputRecord{},
    }
}

const handshakeMsg = "Welcome to DEXON network for test."

func (t *TCPTransport) serverHandshake(conn net.Conn) (
    nID types.NodeID, err error) {
    if err := conn.SetDeadline(time.Now().Add(3 * time.Second)); err != nil {
        panic(err)
    }
    msg := &tcpMessage{
        NodeID: t.nID,
        Type:   "handshake",
        Info:   handshakeMsg,
    }
    var payload []byte
    payload, err = json.Marshal(msg)
    if err != nil {
        return
    }
    if err = t.write(conn, payload); err != nil {
        return
    }
    if payload, err = t.read(conn); err != nil {
        return
    }
    if err = json.Unmarshal(payload, &msg); err != nil {
        return
    }
    if msg.Type != "handshake-ack" || msg.Info != handshakeMsg {
        err = ErrTCPHandShakeFail
        return
    }
    nID = msg.NodeID
    return
}

func (t *TCPTransport) clientHandshake(conn net.Conn) (
    nID types.NodeID, err error) {
    if err := conn.SetDeadline(time.Now().Add(3 * time.Second)); err != nil {
        panic(err)
    }
    var payload []byte
    if payload, err = t.read(conn); err != nil {
        return
    }
    msg := &tcpMessage{}
    if err = json.Unmarshal(payload, &msg); err != nil {
        return
    }
    if msg.Type != "handshake" || msg.Info != handshakeMsg {
        err = ErrTCPHandShakeFail
        return
    }
    nID = msg.NodeID
    msg = &tcpMessage{
        NodeID: t.nID,
        Type:   "handshake-ack",
        Info:   handshakeMsg,
    }
    payload, err = json.Marshal(msg)
    if err != nil {
        return
    }
    if err = t.write(conn, payload); err != nil {
        return
    }
    return
}

func (t *TCPTransport) send(
    endpoint types.NodeID, msg interface{}, payload []byte) {
    t.peersLock.RLock()
    defer t.peersLock.RUnlock()
    t.handleThroughputData(msg, payload)
    t.peers[endpoint].sendChannel <- payload
}

// Send implements Transport.Send method.
func (t *TCPTransport) Send(
    endpoint types.NodeID, msg interface{}) (err error) {

    payload, err := t.marshalMessage(msg)
    if err != nil {
        return
    }
    go t.send(endpoint, msg, payload)
    return
}

// Broadcast implements Transport.Broadcast method.
func (t *TCPTransport) Broadcast(endpoints map[types.NodeID]struct{},
    latency LatencyModel, msg interface{}) (err error) {
    payload, err := t.marshalMessage(msg)
    if err != nil {
        return
    }
    for nID := range endpoints {
        if nID == t.nID {
            continue
        }
        go func(ID types.NodeID) {
            time.Sleep(latency.Delay())
            t.send(ID, msg, payload)
        }(nID)
    }
    return
}

// Close implements Transport.Close method.
func (t *TCPTransport) Close() (err error) {
    // Tell all routines raised by us to die.
    t.cancel()
    // Reset peers.
    t.peersLock.Lock()
    defer t.peersLock.Unlock()
    t.peers = make(map[types.NodeID]*tcpPeerRecord)
    // Tell our user that this channel is closed.
    close(t.recvChannel)
    t.recvChannel = nil
    return
}

// Peers implements Transport.Peers method.
func (t *TCPTransport) Peers() (peers []crypto.PublicKey) {
    for _, rec := range t.peers {
        peers = append(peers, rec.pubKey)
    }
    return
}

func (t *TCPTransport) write(conn net.Conn, b []byte) (err error) {
    if len(b) > math.MaxUint32 {
        return ErrMessageOverflow
    }
    msgLength := make([]byte, 4)
    binary.LittleEndian.PutUint32(msgLength, uint32(len(b)))
    if _, err = conn.Write(msgLength); err != nil {
        return
    }
    if _, err = conn.Write(b); err != nil {
        return
    }
    return
}

func (t *TCPTransport) read(conn net.Conn) (b []byte, err error) {
    msgLength := make([]byte, 4)
    if _, err = io.ReadFull(conn, msgLength); err != nil {
        return
    }
    b = make([]byte, int(binary.LittleEndian.Uint32(msgLength)))
    if _, err = io.ReadFull(conn, b); err != nil {
        return
    }
    return
}

func (t *TCPTransport) marshalMessage(
    msg interface{}) (payload []byte, err error) {

    msgCarrier := struct {
        PeerType TransportPeerType `json:"peer_type"`
        From     types.NodeID      `json:"from"`
        Type     string            `json:"type"`
        Payload  interface{}       `json:"payload"`
    }{
        PeerType: t.peerType,
        From:     t.nID,
        Payload:  msg,
    }
    switch msg.(type) {
    case *tcpHandshake:
        msgCarrier.Type = "tcp-handshake"
    case *tcpMessage:
        msgCarrier.Type = "trans-msg"
    case []ThroughputRecord:
        msgCarrier.Type = "throughput-record"
    case *BlockEventMessage:
        msgCarrier.Type = "block-event"
    default:
        if t.marshaller == nil {
            err = fmt.Errorf("unknown msg type: %v", msg)
            break
        }
        // Delegate to user defined marshaller.
        var buff []byte
        msgCarrier.Type, buff, err = t.marshaller.Marshal(msg)
        if err != nil {
            break
        }
        msgCarrier.Payload = json.RawMessage(buff)
    }
    if err != nil {
        return
    }
    payload, err = json.Marshal(msgCarrier)
    return
}

func (t *TCPTransport) unmarshalMessage(
    payload []byte) (
    peerType TransportPeerType,
    from types.NodeID,
    msg interface{},
    err error) {

    msgCarrier := struct {
        PeerType TransportPeerType `json:"peer_type"`
        From     types.NodeID      `json:"from"`
        Type     string            `json:"type"`
        Payload  json.RawMessage   `json:"payload"`
    }{}
    if err = json.Unmarshal(payload, &msgCarrier); err != nil {
        return
    }
    peerType = msgCarrier.PeerType
    from = msgCarrier.From
    switch msgCarrier.Type {
    case "tcp-handshake":
        handshake := &tcpHandshake{}
        if err = json.Unmarshal(msgCarrier.Payload, &handshake); err != nil {
            return
        }
        msg = handshake
    case "trans-msg":
        m := &tcpMessage{}
        if err = json.Unmarshal(msgCarrier.Payload, m); err != nil {
            return
        }
        msg = m
    case "throughput-record":
        m := &[]ThroughputRecord{}
        if err = json.Unmarshal(msgCarrier.Payload, m); err != nil {
            return
        }
        msg = m
    case "block-event":
        m := &BlockEventMessage{}
        if err = json.Unmarshal(msgCarrier.Payload, m); err != nil {
            return
        }
        msg = m
    default:
        if t.marshaller == nil {
            err = fmt.Errorf("unknown msg type: %v", msgCarrier.Type)
            break
        }
        msg, err = t.marshaller.Unmarshal(msgCarrier.Type, msgCarrier.Payload)
    }
    return
}

// connReader is a reader routine to read from a TCP connection.
func (t *TCPTransport) connReader(conn net.Conn) {
    defer func() {
        if err := conn.Close(); err != nil {
            panic(err)
        }
    }()

    var (
        err     error
        payload []byte
    )

    checkErr := func(err error) (toBreak bool) {
        if err == io.EOF {
            toBreak = true
            return
        }
        // Check if timeout.
        nErr, ok := err.(*net.OpError)
        if !ok {
            panic(err)
        }
        if !nErr.Timeout() {
            panic(err)
        }
        return
    }
Loop:
    for {
        select {
        case <-t.ctx.Done():
            break Loop
        default:
        }
        // Add timeout when reading to check if shutdown.
        if err := conn.SetReadDeadline(
            time.Now().Add(2 * time.Second)); err != nil {

            panic(err)
        }
        // Read message length.
        if payload, err = t.read(conn); err != nil {
            if checkErr(err) {
                break
            }
            continue
        }
        peerType, from, msg, err := t.unmarshalMessage(payload)
        if err != nil {
            panic(err)
        }
        t.recvChannel <- &TransportEnvelope{
            PeerType: peerType,
            From:     from,
            Msg:      msg,
        }
    }
}

// connWriter is a writer routine to write to TCP connection.
func (t *TCPTransport) connWriter(conn net.Conn) chan<- []byte {
    // Disable write deadline.
    if err := conn.SetWriteDeadline(time.Time{}); err != nil {
        panic(err)
    }

    ch := make(chan []byte, 1000)
    go func() {
        defer func() {
            close(ch)
            if err := conn.Close(); err != nil {
                panic(err)
            }
        }()
        for {
            select {
            case <-t.ctx.Done():
                return
            default:
            }
            select {
            case <-t.ctx.Done():
                return
            case msg := <-ch:
                // Send message length in uint32.
                if err := t.write(conn, msg); err != nil {
                    panic(err)
                }
            }
        }
    }()
    return ch
}

// listenerRoutine is a routine to accept incoming request for TCP connection.
func (t *TCPTransport) listenerRoutine(listener *net.TCPListener) {
    closed := false
    defer func() {
        if closed {
            return
        }
        if err := listener.Close(); err != nil {
            panic(err)
        }
    }()
    for {
        select {
        case <-t.ctx.Done():
            return
        default:
        }

        if err := listener.SetDeadline(time.Now().Add(5 * time.Second)); err != nil {
            panic(err)
        }
        conn, err := listener.Accept()
        if err != nil {
            // Check if the connection is closed.
            if strings.Contains(err.Error(), "use of closed network connection") {
                closed = true
                return
            }
            // Check if timeout error.
            nErr, ok := err.(*net.OpError)
            if !ok {
                panic(err)
            }
            if !nErr.Timeout() {
                panic(err)
            }
            continue
        }
        if _, err := t.serverHandshake(conn); err != nil {
            fmt.Println(err)
            continue
        }
        go t.connReader(conn)
    }
}

// buildConnectionToPeers constructs TCP connections to each peer.
// Although TCP connection could be used for both read/write operation,
// we only utilize the write part for simplicity.
func (t *TCPTransport) buildConnectionsToPeers() (err error) {
    var wg sync.WaitGroup
    var errs []error
    var errsLock sync.Mutex
    addErr := func(err error) {
        errsLock.Lock()
        defer errsLock.Unlock()
        errs = append(errs, err)
    }
    for nID, rec := range t.peers {
        if nID == t.nID {
            continue
        }
        wg.Add(1)
        go func(nID types.NodeID, addr string) {
            defer wg.Done()
            conn, localErr := net.Dial("tcp", addr)
            if localErr != nil {
                addErr(localErr)
                return
            }
            serverID, localErr := t.clientHandshake(conn)
            if localErr != nil {
                addErr(localErr)
                return
            }
            if nID != serverID {
                addErr(ErrConnectToUnexpectedPeer)
                return
            }
            t.peersLock.Lock()
            defer t.peersLock.Unlock()
            t.peers[nID].sendChannel = t.connWriter(conn)
        }(nID, rec.conn)
    }
    wg.Wait()
    if len(errs) > 0 {
        // Propagate this error to outside, at least one error
        // could be returned to caller.
        err = errs[0]
    }
    return
}

// ThroughputRecord records the network throughput data.
type ThroughputRecord struct {
    Type string    `json:"type"`
    Size int       `json:"size"`
    Time time.Time `json:"time"`
}

// TCPTransportClient implement TransportClient base on TCP connection.
type TCPTransportClient struct {
    TCPTransport
    local              bool
    serverWriteChannel chan<- []byte
}

// NewTCPTransportClient constructs a TCPTransportClient instance.
func NewTCPTransportClient(
    pubKey crypto.PublicKey,
    marshaller Marshaller,
    local bool) *TCPTransportClient {

    return &TCPTransportClient{
        TCPTransport: *NewTCPTransport(TransportPeer, pubKey, marshaller, 8080),
        local:        local,
    }
}

// Report implements TransportClient.Report method.
func (t *TCPTransportClient) Report(msg interface{}) (err error) {
    payload, err := t.marshalMessage(msg)
    if err != nil {
        return
    }
    go func() {
        t.serverWriteChannel <- payload
    }()
    return
}

// Join implements TransportClient.Join method.
func (t *TCPTransportClient) Join(
    serverEndpoint interface{}) (ch <-chan *TransportEnvelope, err error) {
    // Initiate a TCP server.
    // TODO(mission): config initial listening port.
    var (
        ln        net.Listener
        envelopes = []*TransportEnvelope{}
        ok        bool
        addr      string
        conn      string
    )
    for {
        addr = net.JoinHostPort("0.0.0.0", strconv.Itoa(t.localPort))
        ln, err = net.Listen("tcp", addr)
        if err == nil {
            go t.listenerRoutine(ln.(*net.TCPListener))
            // It is possible to listen on the same port in some platform.
            // Check if this one is actually listening.
            testConn, e := net.Dial("tcp", addr)
            if e != nil {
                err = e
                return
            }
            nID, e := t.clientHandshake(testConn)
            if e != nil {
                err = e
                return
            }
            if nID == t.nID {
                break
            }
            // #nosec G104
            ln.Close()
        }
        if err != nil {
            if !t.local {
                return
            }
            // In local-tcp, retry with other port when the address is in use.
            operr, ok := err.(*net.OpError)
            if !ok {
                panic(err)
            }
            oserr, ok := operr.Err.(*os.SyscallError)
            if !ok {
                panic(operr)
            }
            errno, ok := oserr.Err.(syscall.Errno)
            if !ok {
                panic(oserr)
            }
            if errno != syscall.EADDRINUSE {
                panic(errno)
            }
        }
        // The port is used, generate another port randomly.
        t.localPort = 1024 + rand.Int()%1024 // #nosec G404
    }

    fmt.Println("Connecting to server", "endpoint", serverEndpoint)
    serverConn, err := net.Dial("tcp", serverEndpoint.(string))
    if err != nil {
        return
    }
    _, err = t.clientHandshake(serverConn)
    if err != nil {
        return
    }
    t.serverWriteChannel = t.connWriter(serverConn)
    if t.local {
        conn = addr
    } else {
        // Find my IP.
        var ip string
        if ip, err = FindMyIP(); err != nil {
            return
        }
        conn = net.JoinHostPort(ip, strconv.Itoa(t.localPort))
    }
    if err = t.Report(&tcpMessage{
        NodeID: t.nID,
        Type:   "conn",
        Info:   buildPeerInfo(t.pubKey, conn),
    }); err != nil {
        return
    }
    // Wait for peers list sent by server.
    e := <-t.recvChannel
    handshake, ok := e.Msg.(*tcpHandshake)
    if !ok {
        panic(fmt.Errorf("expect handshake, not %v", e))
    }
    t.dMoment = handshake.DMoment
    // Setup peers information.
    for nID, info := range handshake.Peers {
        pubKey, conn := parsePeerInfo(info)
        t.peers[nID] = &tcpPeerRecord{
            conn:   conn,
            pubKey: pubKey,
        }
    }
    // Setup connections to other peers.
    if err = t.buildConnectionsToPeers(); err != nil {
        return
    }
    // Report to server that the connections to other peers are ready.
    if err = t.Report(&tcpMessage{
        Type:   "conn-ready",
        NodeID: t.nID,
    }); err != nil {
        return
    }
    // Wait for server to ack us that all peers are ready.
    for {
        e := <-t.recvChannel
        msg, ok := e.Msg.(*tcpMessage)
        if !ok {
            envelopes = append(envelopes, e)
            continue
        }
        if msg.Type != "all-ready" {
            err = fmt.Errorf("expected ready message, but %v", msg)
            return
        }
        break
    }
    // Replay those messages sent before peer list and ready-ack.
    for _, e := range envelopes {
        t.recvChannel <- e
    }
    ch = t.recvChannel
    return
}

// Send calls TCPTransport's Send, and send the throughput data to peer server.
func (t *TCPTransportClient) Send(
    endpoint types.NodeID, msg interface{}) (err error) {

    if err := t.TCPTransport.Send(endpoint, msg); err != nil {
        return err
    }
    if len(t.throughputRecords) > tcpThroughputReportNum {
        t.throughputLock.Lock()
        defer t.throughputLock.Unlock()
        if err := t.Report(t.throughputRecords); err != nil {
            panic(err)
        }
        t.throughputRecords = t.throughputRecords[:0]
    }
    return
}

// DMoment implments TransportClient.
func (t *TCPTransportClient) DMoment() time.Time {
    return t.dMoment
}

// TCPTransportServer implements TransportServer via TCP connections.
type TCPTransportServer struct {
    TCPTransport
}

// NewTCPTransportServer constructs TCPTransportServer instance.
func NewTCPTransportServer(
    marshaller Marshaller,
    serverPort int) *TCPTransportServer {

    prvKey, err := ecdsa.NewPrivateKey()
    if err != nil {
        panic(err)
    }
    return &TCPTransportServer{
        // NOTE: the assumption here is the node ID of peers
        //       won't be zero.
        TCPTransport: *NewTCPTransport(
            TransportPeerServer, prvKey.PublicKey(), marshaller, serverPort),
    }
}

// Host implements TransportServer.Host method.
func (t *TCPTransportServer) Host() (chan *TransportEnvelope, error) {
    // The port of peer server should be known to other peers,
    // if we can listen on the pre-defiend part, we don't have to
    // retry with other random ports.
    ln, err := net.Listen(
        "tcp", net.JoinHostPort("0.0.0.0", strconv.Itoa(t.localPort)))
    if err != nil {
        return nil, err
    }
    go t.listenerRoutine(ln.(*net.TCPListener))
    return t.recvChannel, nil
}

// SetDMoment implements TransportServer.SetDMoment method.
func (t *TCPTransportServer) SetDMoment(dMoment time.Time) {
    t.dMoment = dMoment
}

// WaitForPeers implements TransportServer.WaitForPeers method.
func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) {
    // Collect peers info. Packets other than peer info is
    // unexpected.
    peersInfo := make(map[types.NodeID]string)
    for {
        // Wait for connection info reported by peers.
        e := <-t.recvChannel
        msg, ok := e.Msg.(*tcpMessage)
        if !ok {
            panic(fmt.Errorf("expect tcpMessage, not %v", e))
        }
        if msg.Type != "conn" {
            panic(fmt.Errorf("expect connection report, not %v", e))
        }
        pubKey, conn := parsePeerInfo(msg.Info)
        fmt.Println("Peer connected", "peer", conn)
        t.peers[msg.NodeID] = &tcpPeerRecord{
            conn:   conn,
            pubKey: pubKey,
        }
        peersInfo[msg.NodeID] = msg.Info
        // Check if we already collect enought peers.
        if uint32(len(peersInfo)) == numPeers {
            break
        }
    }
    // Send collected peers back to them.
    if err = t.buildConnectionsToPeers(); err != nil {
        return
    }
    peers := make(map[types.NodeID]struct{})
    for ID := range t.peers {
        peers[ID] = struct{}{}
    }
    handshake := &tcpHandshake{
        DMoment: t.dMoment,
        Peers:   peersInfo,
    }
    if err = t.Broadcast(peers, &FixedLatencyModel{}, handshake); err != nil {
        return
    }
    // Wait for peers to send 'ready' report.
    readies := make(map[types.NodeID]struct{})
    for {
        e := <-t.recvChannel
        msg, ok := e.Msg.(*tcpMessage)
        if !ok {
            panic(fmt.Errorf("expect tcpMessage, not %v", e))
        }
        if msg.Type != "conn-ready" {
            panic(fmt.Errorf("expect connection ready, not %v", e))
        }
        if _, reported := readies[msg.NodeID]; reported {
            panic(fmt.Errorf("already report conn-ready message: %v", e))
        }
        readies[msg.NodeID] = struct{}{}
        if uint32(len(readies)) == numPeers {
            break
        }
    }
    // Ack all peers ready to go.
    if err = t.Broadcast(peers, &FixedLatencyModel{},
        &tcpMessage{Type: "all-ready"}); err != nil {
        return
    }
    return
}

func (t *TCPTransport) handleThroughputData(msg interface{}, payload []byte) {
    sentTime := time.Now()
    t.throughputLock.Lock()
    defer t.throughputLock.Unlock()
    recordType := ""
    switch msg.(type) {
    case *types.Vote:
        recordType = "vote"
    case *types.Block:
        recordType = "block"
    case *types.AgreementResult:
        recordType = "agreement_result"
    case *dkg.PartialSignature:
        recordType = "partial_sig"
    }
    if len(recordType) > 0 {
        t.throughputRecords = append(t.throughputRecords, ThroughputRecord{
            Type: recordType,
            Time: sentTime,
            Size: len(payload),
        })
    }
}