diff options
Diffstat (limited to 'core/test/tcp-transport.go')
-rw-r--r-- | core/test/tcp-transport.go | 128 |
1 files changed, 105 insertions, 23 deletions
diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go index a3b9aba..b16bbcb 100644 --- a/core/test/tcp-transport.go +++ b/core/test/tcp-transport.go @@ -34,9 +34,15 @@ import ( "syscall" "time" + "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/crypto" "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa" "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/types/dkg" +) + +const ( + tcpThroughputReportNum = 10 ) type tcpPeerRecord struct { @@ -52,6 +58,12 @@ type tcpMessage struct { Info string `json:"conn"` } +// BlockEventMessage is for monitoring block events' time. +type BlockEventMessage struct { + BlockHash common.Hash `json:"hash"` + Timestamps []time.Time `json:"timestamps"` +} + // buildPeerInfo is a tricky way to combine connection string and // base64 encoded byte slice for public key into a single string, // separated by ';'. @@ -88,17 +100,19 @@ var ( // TCPTransport implements Transport interface via TCP connection. type TCPTransport struct { - peerType TransportPeerType - nID types.NodeID - pubKey crypto.PublicKey - localPort int - peers map[types.NodeID]*tcpPeerRecord - peersLock sync.RWMutex - recvChannel chan *TransportEnvelope - ctx context.Context - cancel context.CancelFunc - latency LatencyModel - marshaller Marshaller + peerType TransportPeerType + nID types.NodeID + pubKey crypto.PublicKey + localPort int + peers map[types.NodeID]*tcpPeerRecord + peersLock sync.RWMutex + recvChannel chan *TransportEnvelope + ctx context.Context + cancel context.CancelFunc + latency LatencyModel + marshaller Marshaller + throughputRecords []ThroughputRecord + throughputLock sync.Mutex } // NewTCPTransport constructs an TCPTransport instance. @@ -111,16 +125,17 @@ func NewTCPTransport( ctx, cancel := context.WithCancel(context.Background()) return &TCPTransport{ - peerType: peerType, - nID: types.NewNodeID(pubKey), - pubKey: pubKey, - peers: make(map[types.NodeID]*tcpPeerRecord), - recvChannel: make(chan *TransportEnvelope, 1000), - ctx: ctx, - cancel: cancel, - localPort: localPort, - latency: latency, - marshaller: marshaller, + peerType: peerType, + nID: types.NewNodeID(pubKey), + pubKey: pubKey, + peers: make(map[types.NodeID]*tcpPeerRecord), + recvChannel: make(chan *TransportEnvelope, 1000), + ctx: ctx, + cancel: cancel, + localPort: localPort, + latency: latency, + marshaller: marshaller, + throughputRecords: []ThroughputRecord{}, } } @@ -199,10 +214,9 @@ func (t *TCPTransport) Send( if t.latency != nil { time.Sleep(t.latency.Delay()) } - t.peersLock.RLock() defer t.peersLock.RUnlock() - + t.handleThroughputData(msg, payload) t.peers[endpoint].sendChannel <- payload }() return @@ -225,6 +239,7 @@ func (t *TCPTransport) Broadcast(msg interface{}) (err error) { if t.latency != nil { time.Sleep(t.latency.Delay()) } + t.handleThroughputData(msg, payload) ch <- payload }(rec.sendChannel) } @@ -298,6 +313,10 @@ func (t *TCPTransport) marshalMessage( msgCarrier.Type = "peerlist" case *tcpMessage: msgCarrier.Type = "trans-msg" + case []ThroughputRecord: + msgCarrier.Type = "throughput-record" + case *BlockEventMessage: + msgCarrier.Type = "block-event" default: if t.marshaller == nil { err = fmt.Errorf("unknown msg type: %v", msg) @@ -349,6 +368,18 @@ func (t *TCPTransport) unmarshalMessage( return } msg = m + case "throughput-record": + m := &[]ThroughputRecord{} + if err = json.Unmarshal(msgCarrier.Payload, m); err != nil { + return + } + msg = m + case "block-event": + m := &BlockEventMessage{} + if err = json.Unmarshal(msgCarrier.Payload, m); err != nil { + return + } + msg = m default: if t.marshaller == nil { err = fmt.Errorf("unknown msg type: %v", msgCarrier.Type) @@ -537,6 +568,13 @@ func (t *TCPTransport) buildConnectionsToPeers() (err error) { return } +// ThroughputRecord records the network throughput data. +type ThroughputRecord struct { + Type string `json:"type"` + Size int `json:"size"` + Time time.Time `json:"time"` +} + // TCPTransportClient implement TransportClient base on TCP connection. type TCPTransportClient struct { TCPTransport @@ -702,6 +740,24 @@ func (t *TCPTransportClient) Join( return } +// Send calls TCPTransport's Send, and send the throughput data to peer server. +func (t *TCPTransportClient) Send( + endpoint types.NodeID, msg interface{}) (err error) { + + if err := t.TCPTransport.Send(endpoint, msg); err != nil { + return err + } + if len(t.throughputRecords) > tcpThroughputReportNum { + t.throughputLock.Lock() + defer t.throughputLock.Unlock() + if err := t.Report(t.throughputRecords); err != nil { + panic(err) + } + t.throughputRecords = t.throughputRecords[:0] + } + return +} + // TCPTransportServer implements TransportServer via TCP connections. type TCPTransportServer struct { TCPTransport @@ -800,3 +856,29 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) { } return } + +func (t *TCPTransport) handleThroughputData(msg interface{}, payload []byte) { + sentTime := time.Now() + t.throughputLock.Lock() + defer t.throughputLock.Unlock() + recordType := "" + switch msg.(type) { + case *types.Vote: + recordType = "vote" + case *types.Block: + recordType = "block" + case *types.AgreementResult: + recordType = "agreement_result" + case *dkg.PartialSignature: + recordType = "partial_sig" + case *types.BlockRandomnessResult: + recordType = "block_random" + } + if len(recordType) > 0 { + t.throughputRecords = append(t.throughputRecords, ThroughputRecord{ + Type: recordType, + Time: sentTime, + Size: len(payload), + }) + } +} |