diff options
author | haoping-ku <haoping.ku@dexon.org> | 2018-12-22 17:47:02 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-12-22 17:47:02 +0800 |
commit | c1ed57c4abaf1f4758e52f082bb7114ad00c8b39 (patch) | |
tree | 58ea4bc3f3e883ba4a19405b4a6dc476ec3c70db /simulation/peer-server.go | |
parent | 6d1c1aeea0d3e75d10cbb2712c68b4c422ba8ba6 (diff) | |
download | dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar.gz dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar.bz2 dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar.lz dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar.xz dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar.zst dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.zip |
core: simulation: add throughput and block event monitoring (#380)
* core: simulation: add throughput and block event monitoring
Added throughput and block event monitoring in TCP-local network. These
data is collected by nodes and reported to peer server.
* fix issues
* fix sent time of throughput issue
Diffstat (limited to 'simulation/peer-server.go')
-rw-r--r-- | simulation/peer-server.go | 124 |
1 files changed, 100 insertions, 24 deletions
diff --git a/simulation/peer-server.go b/simulation/peer-server.go index 95ce5c1..6d1121f 100644 --- a/simulation/peer-server.go +++ b/simulation/peer-server.go @@ -23,6 +23,7 @@ import ( "fmt" "log" "reflect" + "sort" "sync" "time" @@ -34,27 +35,29 @@ import ( // PeerServer is the main object to collect results and monitor simulation. type PeerServer struct { - peers map[types.NodeID]struct{} - msgChannel chan *test.TransportEnvelope - trans test.TransportServer - peerTotalOrder PeerTotalOrder - peerTotalOrderMu sync.Mutex - verifiedLen uint64 - cfg *config.Config - ctx context.Context - ctxCancel context.CancelFunc - blockEvents map[types.NodeID]map[common.Hash][]time.Time + peers map[types.NodeID]struct{} + msgChannel chan *test.TransportEnvelope + trans test.TransportServer + peerTotalOrder PeerTotalOrder + peerTotalOrderMu sync.Mutex + verifiedLen uint64 + cfg *config.Config + ctx context.Context + ctxCancel context.CancelFunc + blockEvents map[types.NodeID]map[common.Hash][]time.Time + throughputRecords map[types.NodeID][]test.ThroughputRecord } // NewPeerServer returns a new PeerServer instance. func NewPeerServer() *PeerServer { ctx, cancel := context.WithCancel(context.Background()) return &PeerServer{ - peers: make(map[types.NodeID]struct{}), - peerTotalOrder: make(PeerTotalOrder), - ctx: ctx, - ctxCancel: cancel, - blockEvents: make(map[types.NodeID]map[common.Hash][]time.Time), + peers: make(map[types.NodeID]struct{}), + peerTotalOrder: make(PeerTotalOrder), + ctx: ctx, + ctxCancel: cancel, + blockEvents: make(map[types.NodeID]map[common.Hash][]time.Time), + throughputRecords: make(map[types.NodeID][]test.ThroughputRecord), } } @@ -118,7 +121,9 @@ func (p *PeerServer) handleMessage(id types.NodeID, m *message) { } } -func (p *PeerServer) handleBlockEventMessage(id types.NodeID, msg *blockEventMessage) { +func (p *PeerServer) handleBlockEventMessage( + id types.NodeID, msg *test.BlockEventMessage) { + if _, exist := p.blockEvents[id]; !exist { p.blockEvents[id] = make(map[common.Hash][]time.Time) } @@ -129,6 +134,12 @@ func (p *PeerServer) handleBlockEventMessage(id types.NodeID, msg *blockEventMes nodeEvents[msg.BlockHash] = msg.Timestamps } +func (p *PeerServer) handleThroughputData( + id types.NodeID, records *[]test.ThroughputRecord) { + + p.throughputRecords[id] = append(p.throughputRecords[id], *records...) +} + func (p *PeerServer) mainLoop() { for { select { @@ -149,8 +160,10 @@ func (p *PeerServer) mainLoop() { p.handleBlockList(e.From, val) case *message: p.handleMessage(e.From, val) - case *blockEventMessage: + case *test.BlockEventMessage: p.handleBlockEventMessage(e.From, val) + case *[]test.ThroughputRecord: + p.handleThroughputData(e.From, val) default: panic(fmt.Errorf("unknown message: %v", reflect.TypeOf(e.Msg))) } @@ -200,20 +213,83 @@ func (p *PeerServer) Run() { log.Printf("Error shutting down peerServer: %v\n", err) } p.logBlockEvents() + p.logThroughputRecords() +} + +func (p *PeerServer) logThroughputRecords() { + // Interval is the sample rate of calculating throughput data, the unit is + // nano second. + intervals := []int64{int64(time.Second), int64(100 * time.Millisecond)} + log.Println("======== throughput data ============") + for nid, records := range p.throughputRecords { + log.Printf("[Node %s]\n", nid) + msgTypes := []string{} + msgMap := make(map[string][]test.ThroughputRecord) + for _, record := range records { + msgMap[record.Type] = append(msgMap[record.Type], record) + } + for k := range msgMap { + msgTypes = append(msgTypes, k) + } + sort.Strings(msgTypes) + for _, interval := range intervals { + log.Printf(" %dms", interval/int64(time.Millisecond)) + for _, msgType := range msgTypes { + sum := 0 + startTime := msgMap[msgType][0].Time.UnixNano() + endTime := startTime + for _, record := range msgMap[msgType] { + sum += record.Size + t := record.Time.UnixNano() + // The receiving order might be different with sending order. + if t < startTime { + startTime = t + } + if t > endTime { + endTime = t + } + } + startIndex := startTime / interval + endIndex := endTime / interval + log.Printf(" %s (count: %d, size: %d)", + msgType, len(msgMap[msgType]), sum) + // A slot stores total throughput in the interval of that time. The + // index of slot of a specified time is calculated by deviding the + // interval and minusing the starting time. For example, start time is + // 5.5s, then time "7.123s"'s index of slot is + // 7123000000 / 100000000 - 55 = 71 - 55 = 16. + slots := make([]int, endIndex-startIndex+1) + for _, record := range msgMap[msgType] { + slots[record.Time.UnixNano()/interval-startIndex] += record.Size + } + mean, std := calculateMeanStdDeviationInts(slots) + log.Printf(" mean: %f, std: %f", mean, std) + min, med, max := getMinMedianMaxInts(slots) + log.Printf(" min: %d, med: %d, max: %d", min, med, max) + } + } + } } func (p *PeerServer) logBlockEvents() { + // diffs stores the difference between two consecutive event time. diffs := [blockEventCount - 1][]float64{} - for _, bs := range p.blockEvents { - for _, ts := range bs { + for _, blocks := range p.blockEvents { + for _, timestamps := range blocks { for i := 0; i < blockEventCount-1; i++ { - diffs[i] = append(diffs[i], float64(ts[i+1].Sub(ts[i]))/1000000000) + diffs[i] = append( + diffs[i], + float64(timestamps[i+1].Sub(timestamps[i]))/1000000000, + ) } } } - log.Printf("===== block event mean and std dev (%d blocks) =====\n", len(diffs[0])) - for i, a := range diffs { - m, d := calcMeanAndStdDeviation(a) - log.Printf(" event %d: mean = %f, std dev = %f\n", i, m, d) + log.Printf("======== block events (%d blocks) ============", len(diffs[0])) + for i, ary := range diffs { + mean, stdDeviation := calculateMeanStdDeviationFloat64s(ary) + min, med, max := getMinMedianMaxFloat64s(ary) + log.Printf(" event %d to %d", i, i+1) + log.Printf(" mean: %f, std dev = %f", mean, stdDeviation) + log.Printf(" min: %f, median: %f, max: %f", min, med, max) } } |