aboutsummaryrefslogtreecommitdiffstats
path: root/simulation/peer-server.go
diff options
context:
space:
mode:
Diffstat (limited to 'simulation/peer-server.go')
-rw-r--r--simulation/peer-server.go124
1 files changed, 100 insertions, 24 deletions
diff --git a/simulation/peer-server.go b/simulation/peer-server.go
index 95ce5c1..6d1121f 100644
--- a/simulation/peer-server.go
+++ b/simulation/peer-server.go
@@ -23,6 +23,7 @@ import (
"fmt"
"log"
"reflect"
+ "sort"
"sync"
"time"
@@ -34,27 +35,29 @@ import (
// PeerServer is the main object to collect results and monitor simulation.
type PeerServer struct {
- peers map[types.NodeID]struct{}
- msgChannel chan *test.TransportEnvelope
- trans test.TransportServer
- peerTotalOrder PeerTotalOrder
- peerTotalOrderMu sync.Mutex
- verifiedLen uint64
- cfg *config.Config
- ctx context.Context
- ctxCancel context.CancelFunc
- blockEvents map[types.NodeID]map[common.Hash][]time.Time
+ peers map[types.NodeID]struct{}
+ msgChannel chan *test.TransportEnvelope
+ trans test.TransportServer
+ peerTotalOrder PeerTotalOrder
+ peerTotalOrderMu sync.Mutex
+ verifiedLen uint64
+ cfg *config.Config
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ blockEvents map[types.NodeID]map[common.Hash][]time.Time
+ throughputRecords map[types.NodeID][]test.ThroughputRecord
}
// NewPeerServer returns a new PeerServer instance.
func NewPeerServer() *PeerServer {
ctx, cancel := context.WithCancel(context.Background())
return &PeerServer{
- peers: make(map[types.NodeID]struct{}),
- peerTotalOrder: make(PeerTotalOrder),
- ctx: ctx,
- ctxCancel: cancel,
- blockEvents: make(map[types.NodeID]map[common.Hash][]time.Time),
+ peers: make(map[types.NodeID]struct{}),
+ peerTotalOrder: make(PeerTotalOrder),
+ ctx: ctx,
+ ctxCancel: cancel,
+ blockEvents: make(map[types.NodeID]map[common.Hash][]time.Time),
+ throughputRecords: make(map[types.NodeID][]test.ThroughputRecord),
}
}
@@ -118,7 +121,9 @@ func (p *PeerServer) handleMessage(id types.NodeID, m *message) {
}
}
-func (p *PeerServer) handleBlockEventMessage(id types.NodeID, msg *blockEventMessage) {
+func (p *PeerServer) handleBlockEventMessage(
+ id types.NodeID, msg *test.BlockEventMessage) {
+
if _, exist := p.blockEvents[id]; !exist {
p.blockEvents[id] = make(map[common.Hash][]time.Time)
}
@@ -129,6 +134,12 @@ func (p *PeerServer) handleBlockEventMessage(id types.NodeID, msg *blockEventMes
nodeEvents[msg.BlockHash] = msg.Timestamps
}
+func (p *PeerServer) handleThroughputData(
+ id types.NodeID, records *[]test.ThroughputRecord) {
+
+ p.throughputRecords[id] = append(p.throughputRecords[id], *records...)
+}
+
func (p *PeerServer) mainLoop() {
for {
select {
@@ -149,8 +160,10 @@ func (p *PeerServer) mainLoop() {
p.handleBlockList(e.From, val)
case *message:
p.handleMessage(e.From, val)
- case *blockEventMessage:
+ case *test.BlockEventMessage:
p.handleBlockEventMessage(e.From, val)
+ case *[]test.ThroughputRecord:
+ p.handleThroughputData(e.From, val)
default:
panic(fmt.Errorf("unknown message: %v", reflect.TypeOf(e.Msg)))
}
@@ -200,20 +213,83 @@ func (p *PeerServer) Run() {
log.Printf("Error shutting down peerServer: %v\n", err)
}
p.logBlockEvents()
+ p.logThroughputRecords()
+}
+
+func (p *PeerServer) logThroughputRecords() {
+ // Interval is the sample rate of calculating throughput data, the unit is
+ // nano second.
+ intervals := []int64{int64(time.Second), int64(100 * time.Millisecond)}
+ log.Println("======== throughput data ============")
+ for nid, records := range p.throughputRecords {
+ log.Printf("[Node %s]\n", nid)
+ msgTypes := []string{}
+ msgMap := make(map[string][]test.ThroughputRecord)
+ for _, record := range records {
+ msgMap[record.Type] = append(msgMap[record.Type], record)
+ }
+ for k := range msgMap {
+ msgTypes = append(msgTypes, k)
+ }
+ sort.Strings(msgTypes)
+ for _, interval := range intervals {
+ log.Printf(" %dms", interval/int64(time.Millisecond))
+ for _, msgType := range msgTypes {
+ sum := 0
+ startTime := msgMap[msgType][0].Time.UnixNano()
+ endTime := startTime
+ for _, record := range msgMap[msgType] {
+ sum += record.Size
+ t := record.Time.UnixNano()
+ // The receiving order might be different with sending order.
+ if t < startTime {
+ startTime = t
+ }
+ if t > endTime {
+ endTime = t
+ }
+ }
+ startIndex := startTime / interval
+ endIndex := endTime / interval
+ log.Printf(" %s (count: %d, size: %d)",
+ msgType, len(msgMap[msgType]), sum)
+ // A slot stores total throughput in the interval of that time. The
+ // index of slot of a specified time is calculated by deviding the
+ // interval and minusing the starting time. For example, start time is
+ // 5.5s, then time "7.123s"'s index of slot is
+ // 7123000000 / 100000000 - 55 = 71 - 55 = 16.
+ slots := make([]int, endIndex-startIndex+1)
+ for _, record := range msgMap[msgType] {
+ slots[record.Time.UnixNano()/interval-startIndex] += record.Size
+ }
+ mean, std := calculateMeanStdDeviationInts(slots)
+ log.Printf(" mean: %f, std: %f", mean, std)
+ min, med, max := getMinMedianMaxInts(slots)
+ log.Printf(" min: %d, med: %d, max: %d", min, med, max)
+ }
+ }
+ }
}
func (p *PeerServer) logBlockEvents() {
+ // diffs stores the difference between two consecutive event time.
diffs := [blockEventCount - 1][]float64{}
- for _, bs := range p.blockEvents {
- for _, ts := range bs {
+ for _, blocks := range p.blockEvents {
+ for _, timestamps := range blocks {
for i := 0; i < blockEventCount-1; i++ {
- diffs[i] = append(diffs[i], float64(ts[i+1].Sub(ts[i]))/1000000000)
+ diffs[i] = append(
+ diffs[i],
+ float64(timestamps[i+1].Sub(timestamps[i]))/1000000000,
+ )
}
}
}
- log.Printf("===== block event mean and std dev (%d blocks) =====\n", len(diffs[0]))
- for i, a := range diffs {
- m, d := calcMeanAndStdDeviation(a)
- log.Printf(" event %d: mean = %f, std dev = %f\n", i, m, d)
+ log.Printf("======== block events (%d blocks) ============", len(diffs[0]))
+ for i, ary := range diffs {
+ mean, stdDeviation := calculateMeanStdDeviationFloat64s(ary)
+ min, med, max := getMinMedianMaxFloat64s(ary)
+ log.Printf(" event %d to %d", i, i+1)
+ log.Printf(" mean: %f, std dev = %f", mean, stdDeviation)
+ log.Printf(" min: %f, median: %f, max: %f", min, med, max)
}
}