aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhaoping-ku <haoping.ku@dexon.org>2018-12-22 17:47:02 +0800
committerGitHub <noreply@github.com>2018-12-22 17:47:02 +0800
commitc1ed57c4abaf1f4758e52f082bb7114ad00c8b39 (patch)
tree58ea4bc3f3e883ba4a19405b4a6dc476ec3c70db
parent6d1c1aeea0d3e75d10cbb2712c68b4c422ba8ba6 (diff)
downloaddexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar
dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar.gz
dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar.bz2
dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar.lz
dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar.xz
dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar.zst
dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.zip
core: simulation: add throughput and block event monitoring (#380)
* core: simulation: add throughput and block event monitoring Added throughput and block event monitoring in TCP-local network. These data is collected by nodes and reported to peer server. * fix issues * fix sent time of throughput issue
-rw-r--r--core/consensus.go34
-rw-r--r--core/test/tcp-transport.go128
-rw-r--r--simulation/app.go11
-rw-r--r--simulation/node.go2
-rw-r--r--simulation/peer-server.go124
-rw-r--r--simulation/utils.go25
6 files changed, 267 insertions, 57 deletions
diff --git a/core/consensus.go b/core/consensus.go
index 965a633..44ce43f 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -399,6 +399,34 @@ func NewConsensus(
prv crypto.PrivateKey,
logger common.Logger) *Consensus {
+ return newConsensus(dMoment, app, gov, db, network, prv, logger, true)
+}
+
+// NewConsensusForSimulation creates an instance of Consensus for simulation,
+// the only difference with NewConsensus is nonblocking of app.
+func NewConsensusForSimulation(
+ dMoment time.Time,
+ app Application,
+ gov Governance,
+ db db.Database,
+ network Network,
+ prv crypto.PrivateKey,
+ logger common.Logger) *Consensus {
+
+ return newConsensus(dMoment, app, gov, db, network, prv, logger, false)
+}
+
+// newConsensus creates a Consensus instance.
+func newConsensus(
+ dMoment time.Time,
+ app Application,
+ gov Governance,
+ db db.Database,
+ network Network,
+ prv crypto.PrivateKey,
+ logger common.Logger,
+ usingNonBlocking bool) *Consensus {
+
// TODO(w): load latest blockHeight from DB, and use config at that height.
nodeSetCache := utils.NewNodeSetCache(gov)
// Setup signer module.
@@ -432,12 +460,16 @@ func NewConsensus(
db,
logger)
recv.cfgModule = cfgModule
+ appModule := app
+ if usingNonBlocking {
+ appModule = newNonBlocking(app, debugApp)
+ }
// Construct Consensus instance.
con := &Consensus{
ID: ID,
ccModule: newCompactionChain(gov),
lattice: lattice,
- app: newNonBlocking(app, debugApp),
+ app: appModule,
debugApp: debugApp,
gov: gov,
db: db,
diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go
index a3b9aba..b16bbcb 100644
--- a/core/test/tcp-transport.go
+++ b/core/test/tcp-transport.go
@@ -34,9 +34,15 @@ import (
"syscall"
"time"
+ "github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
"github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa"
"github.com/dexon-foundation/dexon-consensus/core/types"
+ "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
+)
+
+const (
+ tcpThroughputReportNum = 10
)
type tcpPeerRecord struct {
@@ -52,6 +58,12 @@ type tcpMessage struct {
Info string `json:"conn"`
}
+// BlockEventMessage is for monitoring block events' time.
+type BlockEventMessage struct {
+ BlockHash common.Hash `json:"hash"`
+ Timestamps []time.Time `json:"timestamps"`
+}
+
// buildPeerInfo is a tricky way to combine connection string and
// base64 encoded byte slice for public key into a single string,
// separated by ';'.
@@ -88,17 +100,19 @@ var (
// TCPTransport implements Transport interface via TCP connection.
type TCPTransport struct {
- peerType TransportPeerType
- nID types.NodeID
- pubKey crypto.PublicKey
- localPort int
- peers map[types.NodeID]*tcpPeerRecord
- peersLock sync.RWMutex
- recvChannel chan *TransportEnvelope
- ctx context.Context
- cancel context.CancelFunc
- latency LatencyModel
- marshaller Marshaller
+ peerType TransportPeerType
+ nID types.NodeID
+ pubKey crypto.PublicKey
+ localPort int
+ peers map[types.NodeID]*tcpPeerRecord
+ peersLock sync.RWMutex
+ recvChannel chan *TransportEnvelope
+ ctx context.Context
+ cancel context.CancelFunc
+ latency LatencyModel
+ marshaller Marshaller
+ throughputRecords []ThroughputRecord
+ throughputLock sync.Mutex
}
// NewTCPTransport constructs an TCPTransport instance.
@@ -111,16 +125,17 @@ func NewTCPTransport(
ctx, cancel := context.WithCancel(context.Background())
return &TCPTransport{
- peerType: peerType,
- nID: types.NewNodeID(pubKey),
- pubKey: pubKey,
- peers: make(map[types.NodeID]*tcpPeerRecord),
- recvChannel: make(chan *TransportEnvelope, 1000),
- ctx: ctx,
- cancel: cancel,
- localPort: localPort,
- latency: latency,
- marshaller: marshaller,
+ peerType: peerType,
+ nID: types.NewNodeID(pubKey),
+ pubKey: pubKey,
+ peers: make(map[types.NodeID]*tcpPeerRecord),
+ recvChannel: make(chan *TransportEnvelope, 1000),
+ ctx: ctx,
+ cancel: cancel,
+ localPort: localPort,
+ latency: latency,
+ marshaller: marshaller,
+ throughputRecords: []ThroughputRecord{},
}
}
@@ -199,10 +214,9 @@ func (t *TCPTransport) Send(
if t.latency != nil {
time.Sleep(t.latency.Delay())
}
-
t.peersLock.RLock()
defer t.peersLock.RUnlock()
-
+ t.handleThroughputData(msg, payload)
t.peers[endpoint].sendChannel <- payload
}()
return
@@ -225,6 +239,7 @@ func (t *TCPTransport) Broadcast(msg interface{}) (err error) {
if t.latency != nil {
time.Sleep(t.latency.Delay())
}
+ t.handleThroughputData(msg, payload)
ch <- payload
}(rec.sendChannel)
}
@@ -298,6 +313,10 @@ func (t *TCPTransport) marshalMessage(
msgCarrier.Type = "peerlist"
case *tcpMessage:
msgCarrier.Type = "trans-msg"
+ case []ThroughputRecord:
+ msgCarrier.Type = "throughput-record"
+ case *BlockEventMessage:
+ msgCarrier.Type = "block-event"
default:
if t.marshaller == nil {
err = fmt.Errorf("unknown msg type: %v", msg)
@@ -349,6 +368,18 @@ func (t *TCPTransport) unmarshalMessage(
return
}
msg = m
+ case "throughput-record":
+ m := &[]ThroughputRecord{}
+ if err = json.Unmarshal(msgCarrier.Payload, m); err != nil {
+ return
+ }
+ msg = m
+ case "block-event":
+ m := &BlockEventMessage{}
+ if err = json.Unmarshal(msgCarrier.Payload, m); err != nil {
+ return
+ }
+ msg = m
default:
if t.marshaller == nil {
err = fmt.Errorf("unknown msg type: %v", msgCarrier.Type)
@@ -537,6 +568,13 @@ func (t *TCPTransport) buildConnectionsToPeers() (err error) {
return
}
+// ThroughputRecord records the network throughput data.
+type ThroughputRecord struct {
+ Type string `json:"type"`
+ Size int `json:"size"`
+ Time time.Time `json:"time"`
+}
+
// TCPTransportClient implement TransportClient base on TCP connection.
type TCPTransportClient struct {
TCPTransport
@@ -702,6 +740,24 @@ func (t *TCPTransportClient) Join(
return
}
+// Send calls TCPTransport's Send, and send the throughput data to peer server.
+func (t *TCPTransportClient) Send(
+ endpoint types.NodeID, msg interface{}) (err error) {
+
+ if err := t.TCPTransport.Send(endpoint, msg); err != nil {
+ return err
+ }
+ if len(t.throughputRecords) > tcpThroughputReportNum {
+ t.throughputLock.Lock()
+ defer t.throughputLock.Unlock()
+ if err := t.Report(t.throughputRecords); err != nil {
+ panic(err)
+ }
+ t.throughputRecords = t.throughputRecords[:0]
+ }
+ return
+}
+
// TCPTransportServer implements TransportServer via TCP connections.
type TCPTransportServer struct {
TCPTransport
@@ -800,3 +856,29 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) {
}
return
}
+
+func (t *TCPTransport) handleThroughputData(msg interface{}, payload []byte) {
+ sentTime := time.Now()
+ t.throughputLock.Lock()
+ defer t.throughputLock.Unlock()
+ recordType := ""
+ switch msg.(type) {
+ case *types.Vote:
+ recordType = "vote"
+ case *types.Block:
+ recordType = "block"
+ case *types.AgreementResult:
+ recordType = "agreement_result"
+ case *dkg.PartialSignature:
+ recordType = "partial_sig"
+ case *types.BlockRandomnessResult:
+ recordType = "block_random"
+ }
+ if len(recordType) > 0 {
+ t.throughputRecords = append(t.throughputRecords, ThroughputRecord{
+ Type: recordType,
+ Time: sentTime,
+ Size: len(payload),
+ })
+ }
+}
diff --git a/simulation/app.go b/simulation/app.go
index 464c521..2bf0e48 100644
--- a/simulation/app.go
+++ b/simulation/app.go
@@ -57,11 +57,6 @@ const (
blockEventCount
)
-type blockEventMessage struct {
- BlockHash common.Hash `json:"hash"`
- Timestamps []time.Time `json:"timestamps"`
-}
-
// simApp is an DEXON app for simulation.
type simApp struct {
NodeID types.NodeID
@@ -250,11 +245,13 @@ func (a *simApp) updateBlockEvent(hash common.Hash) {
defer a.lock.Unlock()
a.blockTimestamps[hash] = append(a.blockTimestamps[hash], time.Now().UTC())
if len(a.blockTimestamps[hash]) == blockEventCount {
- msg := &blockEventMessage{
+ msg := &test.BlockEventMessage{
BlockHash: hash,
Timestamps: a.blockTimestamps[hash],
}
- a.netModule.Report(msg)
+ if err := a.netModule.Report(msg); err != nil {
+ panic(err)
+ }
delete(a.blockTimestamps, hash)
}
}
diff --git a/simulation/node.go b/simulation/node.go
index 4d1e4ff..e766da8 100644
--- a/simulation/node.go
+++ b/simulation/node.go
@@ -150,7 +150,7 @@ func (n *node) run(
// Setup of governance is ready, can be switched to remote mode.
n.gov.SwitchToRemoteMode(n.netModule)
// Setup Consensus.
- n.consensus = core.NewConsensus(
+ n.consensus = core.NewConsensusForSimulation(
dMoment,
n.app,
n.gov,
diff --git a/simulation/peer-server.go b/simulation/peer-server.go
index 95ce5c1..6d1121f 100644
--- a/simulation/peer-server.go
+++ b/simulation/peer-server.go
@@ -23,6 +23,7 @@ import (
"fmt"
"log"
"reflect"
+ "sort"
"sync"
"time"
@@ -34,27 +35,29 @@ import (
// PeerServer is the main object to collect results and monitor simulation.
type PeerServer struct {
- peers map[types.NodeID]struct{}
- msgChannel chan *test.TransportEnvelope
- trans test.TransportServer
- peerTotalOrder PeerTotalOrder
- peerTotalOrderMu sync.Mutex
- verifiedLen uint64
- cfg *config.Config
- ctx context.Context
- ctxCancel context.CancelFunc
- blockEvents map[types.NodeID]map[common.Hash][]time.Time
+ peers map[types.NodeID]struct{}
+ msgChannel chan *test.TransportEnvelope
+ trans test.TransportServer
+ peerTotalOrder PeerTotalOrder
+ peerTotalOrderMu sync.Mutex
+ verifiedLen uint64
+ cfg *config.Config
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ blockEvents map[types.NodeID]map[common.Hash][]time.Time
+ throughputRecords map[types.NodeID][]test.ThroughputRecord
}
// NewPeerServer returns a new PeerServer instance.
func NewPeerServer() *PeerServer {
ctx, cancel := context.WithCancel(context.Background())
return &PeerServer{
- peers: make(map[types.NodeID]struct{}),
- peerTotalOrder: make(PeerTotalOrder),
- ctx: ctx,
- ctxCancel: cancel,
- blockEvents: make(map[types.NodeID]map[common.Hash][]time.Time),
+ peers: make(map[types.NodeID]struct{}),
+ peerTotalOrder: make(PeerTotalOrder),
+ ctx: ctx,
+ ctxCancel: cancel,
+ blockEvents: make(map[types.NodeID]map[common.Hash][]time.Time),
+ throughputRecords: make(map[types.NodeID][]test.ThroughputRecord),
}
}
@@ -118,7 +121,9 @@ func (p *PeerServer) handleMessage(id types.NodeID, m *message) {
}
}
-func (p *PeerServer) handleBlockEventMessage(id types.NodeID, msg *blockEventMessage) {
+func (p *PeerServer) handleBlockEventMessage(
+ id types.NodeID, msg *test.BlockEventMessage) {
+
if _, exist := p.blockEvents[id]; !exist {
p.blockEvents[id] = make(map[common.Hash][]time.Time)
}
@@ -129,6 +134,12 @@ func (p *PeerServer) handleBlockEventMessage(id types.NodeID, msg *blockEventMes
nodeEvents[msg.BlockHash] = msg.Timestamps
}
+func (p *PeerServer) handleThroughputData(
+ id types.NodeID, records *[]test.ThroughputRecord) {
+
+ p.throughputRecords[id] = append(p.throughputRecords[id], *records...)
+}
+
func (p *PeerServer) mainLoop() {
for {
select {
@@ -149,8 +160,10 @@ func (p *PeerServer) mainLoop() {
p.handleBlockList(e.From, val)
case *message:
p.handleMessage(e.From, val)
- case *blockEventMessage:
+ case *test.BlockEventMessage:
p.handleBlockEventMessage(e.From, val)
+ case *[]test.ThroughputRecord:
+ p.handleThroughputData(e.From, val)
default:
panic(fmt.Errorf("unknown message: %v", reflect.TypeOf(e.Msg)))
}
@@ -200,20 +213,83 @@ func (p *PeerServer) Run() {
log.Printf("Error shutting down peerServer: %v\n", err)
}
p.logBlockEvents()
+ p.logThroughputRecords()
+}
+
+func (p *PeerServer) logThroughputRecords() {
+ // Interval is the sample rate of calculating throughput data, the unit is
+ // nano second.
+ intervals := []int64{int64(time.Second), int64(100 * time.Millisecond)}
+ log.Println("======== throughput data ============")
+ for nid, records := range p.throughputRecords {
+ log.Printf("[Node %s]\n", nid)
+ msgTypes := []string{}
+ msgMap := make(map[string][]test.ThroughputRecord)
+ for _, record := range records {
+ msgMap[record.Type] = append(msgMap[record.Type], record)
+ }
+ for k := range msgMap {
+ msgTypes = append(msgTypes, k)
+ }
+ sort.Strings(msgTypes)
+ for _, interval := range intervals {
+ log.Printf(" %dms", interval/int64(time.Millisecond))
+ for _, msgType := range msgTypes {
+ sum := 0
+ startTime := msgMap[msgType][0].Time.UnixNano()
+ endTime := startTime
+ for _, record := range msgMap[msgType] {
+ sum += record.Size
+ t := record.Time.UnixNano()
+ // The receiving order might be different with sending order.
+ if t < startTime {
+ startTime = t
+ }
+ if t > endTime {
+ endTime = t
+ }
+ }
+ startIndex := startTime / interval
+ endIndex := endTime / interval
+ log.Printf(" %s (count: %d, size: %d)",
+ msgType, len(msgMap[msgType]), sum)
+ // A slot stores total throughput in the interval of that time. The
+ // index of slot of a specified time is calculated by deviding the
+ // interval and minusing the starting time. For example, start time is
+ // 5.5s, then time "7.123s"'s index of slot is
+ // 7123000000 / 100000000 - 55 = 71 - 55 = 16.
+ slots := make([]int, endIndex-startIndex+1)
+ for _, record := range msgMap[msgType] {
+ slots[record.Time.UnixNano()/interval-startIndex] += record.Size
+ }
+ mean, std := calculateMeanStdDeviationInts(slots)
+ log.Printf(" mean: %f, std: %f", mean, std)
+ min, med, max := getMinMedianMaxInts(slots)
+ log.Printf(" min: %d, med: %d, max: %d", min, med, max)
+ }
+ }
+ }
}
func (p *PeerServer) logBlockEvents() {
+ // diffs stores the difference between two consecutive event time.
diffs := [blockEventCount - 1][]float64{}
- for _, bs := range p.blockEvents {
- for _, ts := range bs {
+ for _, blocks := range p.blockEvents {
+ for _, timestamps := range blocks {
for i := 0; i < blockEventCount-1; i++ {
- diffs[i] = append(diffs[i], float64(ts[i+1].Sub(ts[i]))/1000000000)
+ diffs[i] = append(
+ diffs[i],
+ float64(timestamps[i+1].Sub(timestamps[i]))/1000000000,
+ )
}
}
}
- log.Printf("===== block event mean and std dev (%d blocks) =====\n", len(diffs[0]))
- for i, a := range diffs {
- m, d := calcMeanAndStdDeviation(a)
- log.Printf(" event %d: mean = %f, std dev = %f\n", i, m, d)
+ log.Printf("======== block events (%d blocks) ============", len(diffs[0]))
+ for i, ary := range diffs {
+ mean, stdDeviation := calculateMeanStdDeviationFloat64s(ary)
+ min, med, max := getMinMedianMaxFloat64s(ary)
+ log.Printf(" event %d to %d", i, i+1)
+ log.Printf(" mean: %f, std dev = %f", mean, stdDeviation)
+ log.Printf(" min: %f, median: %f, max: %f", min, med, max)
}
}
diff --git a/simulation/utils.go b/simulation/utils.go
index f0d864b..a18e8a4 100644
--- a/simulation/utils.go
+++ b/simulation/utils.go
@@ -19,9 +19,10 @@ package simulation
import (
"math"
+ "sort"
)
-func calcMeanAndStdDeviation(a []float64) (float64, float64) {
+func calculateMeanStdDeviationFloat64s(a []float64) (float64, float64) {
sum := float64(0)
for _, i := range a {
sum += i
@@ -34,3 +35,25 @@ func calcMeanAndStdDeviation(a []float64) (float64, float64) {
dev = math.Sqrt(dev / float64(len(a)))
return mean, dev
}
+
+func calculateMeanStdDeviationInts(a []int) (float64, float64) {
+ floats := []float64{}
+ for _, i := range a {
+ floats = append(floats, float64(i))
+ }
+ return calculateMeanStdDeviationFloat64s(floats)
+}
+
+func getMinMedianMaxInts(a []int) (int, int, int) {
+ aCopied := make([]int, len(a))
+ copy(aCopied, a)
+ sort.Ints(aCopied)
+ return aCopied[0], aCopied[len(aCopied)/2], aCopied[len(aCopied)-1]
+}
+
+func getMinMedianMaxFloat64s(a []float64) (float64, float64, float64) {
+ aCopied := make([]float64, len(a))
+ copy(aCopied, a)
+ sort.Float64s(aCopied)
+ return aCopied[0], aCopied[len(aCopied)/2], aCopied[len(aCopied)-1]
+}