From c1ed57c4abaf1f4758e52f082bb7114ad00c8b39 Mon Sep 17 00:00:00 2001 From: haoping-ku Date: Sat, 22 Dec 2018 17:47:02 +0800 Subject: core: simulation: add throughput and block event monitoring (#380) * core: simulation: add throughput and block event monitoring Added throughput and block event monitoring in TCP-local network. These data is collected by nodes and reported to peer server. * fix issues * fix sent time of throughput issue --- core/consensus.go | 34 +++++++++++- core/test/tcp-transport.go | 128 +++++++++++++++++++++++++++++++++++++-------- simulation/app.go | 11 ++-- simulation/node.go | 2 +- simulation/peer-server.go | 124 ++++++++++++++++++++++++++++++++++--------- simulation/utils.go | 25 ++++++++- 6 files changed, 267 insertions(+), 57 deletions(-) diff --git a/core/consensus.go b/core/consensus.go index 965a633..44ce43f 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -399,6 +399,34 @@ func NewConsensus( prv crypto.PrivateKey, logger common.Logger) *Consensus { + return newConsensus(dMoment, app, gov, db, network, prv, logger, true) +} + +// NewConsensusForSimulation creates an instance of Consensus for simulation, +// the only difference with NewConsensus is nonblocking of app. +func NewConsensusForSimulation( + dMoment time.Time, + app Application, + gov Governance, + db db.Database, + network Network, + prv crypto.PrivateKey, + logger common.Logger) *Consensus { + + return newConsensus(dMoment, app, gov, db, network, prv, logger, false) +} + +// newConsensus creates a Consensus instance. +func newConsensus( + dMoment time.Time, + app Application, + gov Governance, + db db.Database, + network Network, + prv crypto.PrivateKey, + logger common.Logger, + usingNonBlocking bool) *Consensus { + // TODO(w): load latest blockHeight from DB, and use config at that height. nodeSetCache := utils.NewNodeSetCache(gov) // Setup signer module. @@ -432,12 +460,16 @@ func NewConsensus( db, logger) recv.cfgModule = cfgModule + appModule := app + if usingNonBlocking { + appModule = newNonBlocking(app, debugApp) + } // Construct Consensus instance. con := &Consensus{ ID: ID, ccModule: newCompactionChain(gov), lattice: lattice, - app: newNonBlocking(app, debugApp), + app: appModule, debugApp: debugApp, gov: gov, db: db, diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go index a3b9aba..b16bbcb 100644 --- a/core/test/tcp-transport.go +++ b/core/test/tcp-transport.go @@ -34,9 +34,15 @@ import ( "syscall" "time" + "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/crypto" "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa" "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/types/dkg" +) + +const ( + tcpThroughputReportNum = 10 ) type tcpPeerRecord struct { @@ -52,6 +58,12 @@ type tcpMessage struct { Info string `json:"conn"` } +// BlockEventMessage is for monitoring block events' time. +type BlockEventMessage struct { + BlockHash common.Hash `json:"hash"` + Timestamps []time.Time `json:"timestamps"` +} + // buildPeerInfo is a tricky way to combine connection string and // base64 encoded byte slice for public key into a single string, // separated by ';'. @@ -88,17 +100,19 @@ var ( // TCPTransport implements Transport interface via TCP connection. type TCPTransport struct { - peerType TransportPeerType - nID types.NodeID - pubKey crypto.PublicKey - localPort int - peers map[types.NodeID]*tcpPeerRecord - peersLock sync.RWMutex - recvChannel chan *TransportEnvelope - ctx context.Context - cancel context.CancelFunc - latency LatencyModel - marshaller Marshaller + peerType TransportPeerType + nID types.NodeID + pubKey crypto.PublicKey + localPort int + peers map[types.NodeID]*tcpPeerRecord + peersLock sync.RWMutex + recvChannel chan *TransportEnvelope + ctx context.Context + cancel context.CancelFunc + latency LatencyModel + marshaller Marshaller + throughputRecords []ThroughputRecord + throughputLock sync.Mutex } // NewTCPTransport constructs an TCPTransport instance. @@ -111,16 +125,17 @@ func NewTCPTransport( ctx, cancel := context.WithCancel(context.Background()) return &TCPTransport{ - peerType: peerType, - nID: types.NewNodeID(pubKey), - pubKey: pubKey, - peers: make(map[types.NodeID]*tcpPeerRecord), - recvChannel: make(chan *TransportEnvelope, 1000), - ctx: ctx, - cancel: cancel, - localPort: localPort, - latency: latency, - marshaller: marshaller, + peerType: peerType, + nID: types.NewNodeID(pubKey), + pubKey: pubKey, + peers: make(map[types.NodeID]*tcpPeerRecord), + recvChannel: make(chan *TransportEnvelope, 1000), + ctx: ctx, + cancel: cancel, + localPort: localPort, + latency: latency, + marshaller: marshaller, + throughputRecords: []ThroughputRecord{}, } } @@ -199,10 +214,9 @@ func (t *TCPTransport) Send( if t.latency != nil { time.Sleep(t.latency.Delay()) } - t.peersLock.RLock() defer t.peersLock.RUnlock() - + t.handleThroughputData(msg, payload) t.peers[endpoint].sendChannel <- payload }() return @@ -225,6 +239,7 @@ func (t *TCPTransport) Broadcast(msg interface{}) (err error) { if t.latency != nil { time.Sleep(t.latency.Delay()) } + t.handleThroughputData(msg, payload) ch <- payload }(rec.sendChannel) } @@ -298,6 +313,10 @@ func (t *TCPTransport) marshalMessage( msgCarrier.Type = "peerlist" case *tcpMessage: msgCarrier.Type = "trans-msg" + case []ThroughputRecord: + msgCarrier.Type = "throughput-record" + case *BlockEventMessage: + msgCarrier.Type = "block-event" default: if t.marshaller == nil { err = fmt.Errorf("unknown msg type: %v", msg) @@ -349,6 +368,18 @@ func (t *TCPTransport) unmarshalMessage( return } msg = m + case "throughput-record": + m := &[]ThroughputRecord{} + if err = json.Unmarshal(msgCarrier.Payload, m); err != nil { + return + } + msg = m + case "block-event": + m := &BlockEventMessage{} + if err = json.Unmarshal(msgCarrier.Payload, m); err != nil { + return + } + msg = m default: if t.marshaller == nil { err = fmt.Errorf("unknown msg type: %v", msgCarrier.Type) @@ -537,6 +568,13 @@ func (t *TCPTransport) buildConnectionsToPeers() (err error) { return } +// ThroughputRecord records the network throughput data. +type ThroughputRecord struct { + Type string `json:"type"` + Size int `json:"size"` + Time time.Time `json:"time"` +} + // TCPTransportClient implement TransportClient base on TCP connection. type TCPTransportClient struct { TCPTransport @@ -702,6 +740,24 @@ func (t *TCPTransportClient) Join( return } +// Send calls TCPTransport's Send, and send the throughput data to peer server. +func (t *TCPTransportClient) Send( + endpoint types.NodeID, msg interface{}) (err error) { + + if err := t.TCPTransport.Send(endpoint, msg); err != nil { + return err + } + if len(t.throughputRecords) > tcpThroughputReportNum { + t.throughputLock.Lock() + defer t.throughputLock.Unlock() + if err := t.Report(t.throughputRecords); err != nil { + panic(err) + } + t.throughputRecords = t.throughputRecords[:0] + } + return +} + // TCPTransportServer implements TransportServer via TCP connections. type TCPTransportServer struct { TCPTransport @@ -800,3 +856,29 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) { } return } + +func (t *TCPTransport) handleThroughputData(msg interface{}, payload []byte) { + sentTime := time.Now() + t.throughputLock.Lock() + defer t.throughputLock.Unlock() + recordType := "" + switch msg.(type) { + case *types.Vote: + recordType = "vote" + case *types.Block: + recordType = "block" + case *types.AgreementResult: + recordType = "agreement_result" + case *dkg.PartialSignature: + recordType = "partial_sig" + case *types.BlockRandomnessResult: + recordType = "block_random" + } + if len(recordType) > 0 { + t.throughputRecords = append(t.throughputRecords, ThroughputRecord{ + Type: recordType, + Time: sentTime, + Size: len(payload), + }) + } +} diff --git a/simulation/app.go b/simulation/app.go index 464c521..2bf0e48 100644 --- a/simulation/app.go +++ b/simulation/app.go @@ -57,11 +57,6 @@ const ( blockEventCount ) -type blockEventMessage struct { - BlockHash common.Hash `json:"hash"` - Timestamps []time.Time `json:"timestamps"` -} - // simApp is an DEXON app for simulation. type simApp struct { NodeID types.NodeID @@ -250,11 +245,13 @@ func (a *simApp) updateBlockEvent(hash common.Hash) { defer a.lock.Unlock() a.blockTimestamps[hash] = append(a.blockTimestamps[hash], time.Now().UTC()) if len(a.blockTimestamps[hash]) == blockEventCount { - msg := &blockEventMessage{ + msg := &test.BlockEventMessage{ BlockHash: hash, Timestamps: a.blockTimestamps[hash], } - a.netModule.Report(msg) + if err := a.netModule.Report(msg); err != nil { + panic(err) + } delete(a.blockTimestamps, hash) } } diff --git a/simulation/node.go b/simulation/node.go index 4d1e4ff..e766da8 100644 --- a/simulation/node.go +++ b/simulation/node.go @@ -150,7 +150,7 @@ func (n *node) run( // Setup of governance is ready, can be switched to remote mode. n.gov.SwitchToRemoteMode(n.netModule) // Setup Consensus. - n.consensus = core.NewConsensus( + n.consensus = core.NewConsensusForSimulation( dMoment, n.app, n.gov, diff --git a/simulation/peer-server.go b/simulation/peer-server.go index 95ce5c1..6d1121f 100644 --- a/simulation/peer-server.go +++ b/simulation/peer-server.go @@ -23,6 +23,7 @@ import ( "fmt" "log" "reflect" + "sort" "sync" "time" @@ -34,27 +35,29 @@ import ( // PeerServer is the main object to collect results and monitor simulation. type PeerServer struct { - peers map[types.NodeID]struct{} - msgChannel chan *test.TransportEnvelope - trans test.TransportServer - peerTotalOrder PeerTotalOrder - peerTotalOrderMu sync.Mutex - verifiedLen uint64 - cfg *config.Config - ctx context.Context - ctxCancel context.CancelFunc - blockEvents map[types.NodeID]map[common.Hash][]time.Time + peers map[types.NodeID]struct{} + msgChannel chan *test.TransportEnvelope + trans test.TransportServer + peerTotalOrder PeerTotalOrder + peerTotalOrderMu sync.Mutex + verifiedLen uint64 + cfg *config.Config + ctx context.Context + ctxCancel context.CancelFunc + blockEvents map[types.NodeID]map[common.Hash][]time.Time + throughputRecords map[types.NodeID][]test.ThroughputRecord } // NewPeerServer returns a new PeerServer instance. func NewPeerServer() *PeerServer { ctx, cancel := context.WithCancel(context.Background()) return &PeerServer{ - peers: make(map[types.NodeID]struct{}), - peerTotalOrder: make(PeerTotalOrder), - ctx: ctx, - ctxCancel: cancel, - blockEvents: make(map[types.NodeID]map[common.Hash][]time.Time), + peers: make(map[types.NodeID]struct{}), + peerTotalOrder: make(PeerTotalOrder), + ctx: ctx, + ctxCancel: cancel, + blockEvents: make(map[types.NodeID]map[common.Hash][]time.Time), + throughputRecords: make(map[types.NodeID][]test.ThroughputRecord), } } @@ -118,7 +121,9 @@ func (p *PeerServer) handleMessage(id types.NodeID, m *message) { } } -func (p *PeerServer) handleBlockEventMessage(id types.NodeID, msg *blockEventMessage) { +func (p *PeerServer) handleBlockEventMessage( + id types.NodeID, msg *test.BlockEventMessage) { + if _, exist := p.blockEvents[id]; !exist { p.blockEvents[id] = make(map[common.Hash][]time.Time) } @@ -129,6 +134,12 @@ func (p *PeerServer) handleBlockEventMessage(id types.NodeID, msg *blockEventMes nodeEvents[msg.BlockHash] = msg.Timestamps } +func (p *PeerServer) handleThroughputData( + id types.NodeID, records *[]test.ThroughputRecord) { + + p.throughputRecords[id] = append(p.throughputRecords[id], *records...) +} + func (p *PeerServer) mainLoop() { for { select { @@ -149,8 +160,10 @@ func (p *PeerServer) mainLoop() { p.handleBlockList(e.From, val) case *message: p.handleMessage(e.From, val) - case *blockEventMessage: + case *test.BlockEventMessage: p.handleBlockEventMessage(e.From, val) + case *[]test.ThroughputRecord: + p.handleThroughputData(e.From, val) default: panic(fmt.Errorf("unknown message: %v", reflect.TypeOf(e.Msg))) } @@ -200,20 +213,83 @@ func (p *PeerServer) Run() { log.Printf("Error shutting down peerServer: %v\n", err) } p.logBlockEvents() + p.logThroughputRecords() +} + +func (p *PeerServer) logThroughputRecords() { + // Interval is the sample rate of calculating throughput data, the unit is + // nano second. + intervals := []int64{int64(time.Second), int64(100 * time.Millisecond)} + log.Println("======== throughput data ============") + for nid, records := range p.throughputRecords { + log.Printf("[Node %s]\n", nid) + msgTypes := []string{} + msgMap := make(map[string][]test.ThroughputRecord) + for _, record := range records { + msgMap[record.Type] = append(msgMap[record.Type], record) + } + for k := range msgMap { + msgTypes = append(msgTypes, k) + } + sort.Strings(msgTypes) + for _, interval := range intervals { + log.Printf(" %dms", interval/int64(time.Millisecond)) + for _, msgType := range msgTypes { + sum := 0 + startTime := msgMap[msgType][0].Time.UnixNano() + endTime := startTime + for _, record := range msgMap[msgType] { + sum += record.Size + t := record.Time.UnixNano() + // The receiving order might be different with sending order. + if t < startTime { + startTime = t + } + if t > endTime { + endTime = t + } + } + startIndex := startTime / interval + endIndex := endTime / interval + log.Printf(" %s (count: %d, size: %d)", + msgType, len(msgMap[msgType]), sum) + // A slot stores total throughput in the interval of that time. The + // index of slot of a specified time is calculated by deviding the + // interval and minusing the starting time. For example, start time is + // 5.5s, then time "7.123s"'s index of slot is + // 7123000000 / 100000000 - 55 = 71 - 55 = 16. + slots := make([]int, endIndex-startIndex+1) + for _, record := range msgMap[msgType] { + slots[record.Time.UnixNano()/interval-startIndex] += record.Size + } + mean, std := calculateMeanStdDeviationInts(slots) + log.Printf(" mean: %f, std: %f", mean, std) + min, med, max := getMinMedianMaxInts(slots) + log.Printf(" min: %d, med: %d, max: %d", min, med, max) + } + } + } } func (p *PeerServer) logBlockEvents() { + // diffs stores the difference between two consecutive event time. diffs := [blockEventCount - 1][]float64{} - for _, bs := range p.blockEvents { - for _, ts := range bs { + for _, blocks := range p.blockEvents { + for _, timestamps := range blocks { for i := 0; i < blockEventCount-1; i++ { - diffs[i] = append(diffs[i], float64(ts[i+1].Sub(ts[i]))/1000000000) + diffs[i] = append( + diffs[i], + float64(timestamps[i+1].Sub(timestamps[i]))/1000000000, + ) } } } - log.Printf("===== block event mean and std dev (%d blocks) =====\n", len(diffs[0])) - for i, a := range diffs { - m, d := calcMeanAndStdDeviation(a) - log.Printf(" event %d: mean = %f, std dev = %f\n", i, m, d) + log.Printf("======== block events (%d blocks) ============", len(diffs[0])) + for i, ary := range diffs { + mean, stdDeviation := calculateMeanStdDeviationFloat64s(ary) + min, med, max := getMinMedianMaxFloat64s(ary) + log.Printf(" event %d to %d", i, i+1) + log.Printf(" mean: %f, std dev = %f", mean, stdDeviation) + log.Printf(" min: %f, median: %f, max: %f", min, med, max) } } diff --git a/simulation/utils.go b/simulation/utils.go index f0d864b..a18e8a4 100644 --- a/simulation/utils.go +++ b/simulation/utils.go @@ -19,9 +19,10 @@ package simulation import ( "math" + "sort" ) -func calcMeanAndStdDeviation(a []float64) (float64, float64) { +func calculateMeanStdDeviationFloat64s(a []float64) (float64, float64) { sum := float64(0) for _, i := range a { sum += i @@ -34,3 +35,25 @@ func calcMeanAndStdDeviation(a []float64) (float64, float64) { dev = math.Sqrt(dev / float64(len(a))) return mean, dev } + +func calculateMeanStdDeviationInts(a []int) (float64, float64) { + floats := []float64{} + for _, i := range a { + floats = append(floats, float64(i)) + } + return calculateMeanStdDeviationFloat64s(floats) +} + +func getMinMedianMaxInts(a []int) (int, int, int) { + aCopied := make([]int, len(a)) + copy(aCopied, a) + sort.Ints(aCopied) + return aCopied[0], aCopied[len(aCopied)/2], aCopied[len(aCopied)-1] +} + +func getMinMedianMaxFloat64s(a []float64) (float64, float64, float64) { + aCopied := make([]float64, len(a)) + copy(aCopied, a) + sort.Float64s(aCopied) + return aCopied[0], aCopied[len(aCopied)/2], aCopied[len(aCopied)-1] +} -- cgit v1.2.3