aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorhaoping-ku <haoping.ku@dexon.org>2018-12-22 17:47:02 +0800
committerGitHub <noreply@github.com>2018-12-22 17:47:02 +0800
commitc1ed57c4abaf1f4758e52f082bb7114ad00c8b39 (patch)
tree58ea4bc3f3e883ba4a19405b4a6dc476ec3c70db /core
parent6d1c1aeea0d3e75d10cbb2712c68b4c422ba8ba6 (diff)
downloaddexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar
dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar.gz
dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar.bz2
dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar.lz
dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar.xz
dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.tar.zst
dexon-consensus-c1ed57c4abaf1f4758e52f082bb7114ad00c8b39.zip
core: simulation: add throughput and block event monitoring (#380)
* core: simulation: add throughput and block event monitoring Added throughput and block event monitoring in TCP-local network. These data is collected by nodes and reported to peer server. * fix issues * fix sent time of throughput issue
Diffstat (limited to 'core')
-rw-r--r--core/consensus.go34
-rw-r--r--core/test/tcp-transport.go128
2 files changed, 138 insertions, 24 deletions
diff --git a/core/consensus.go b/core/consensus.go
index 965a633..44ce43f 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -399,6 +399,34 @@ func NewConsensus(
prv crypto.PrivateKey,
logger common.Logger) *Consensus {
+ return newConsensus(dMoment, app, gov, db, network, prv, logger, true)
+}
+
+// NewConsensusForSimulation creates an instance of Consensus for simulation,
+// the only difference with NewConsensus is nonblocking of app.
+func NewConsensusForSimulation(
+ dMoment time.Time,
+ app Application,
+ gov Governance,
+ db db.Database,
+ network Network,
+ prv crypto.PrivateKey,
+ logger common.Logger) *Consensus {
+
+ return newConsensus(dMoment, app, gov, db, network, prv, logger, false)
+}
+
+// newConsensus creates a Consensus instance.
+func newConsensus(
+ dMoment time.Time,
+ app Application,
+ gov Governance,
+ db db.Database,
+ network Network,
+ prv crypto.PrivateKey,
+ logger common.Logger,
+ usingNonBlocking bool) *Consensus {
+
// TODO(w): load latest blockHeight from DB, and use config at that height.
nodeSetCache := utils.NewNodeSetCache(gov)
// Setup signer module.
@@ -432,12 +460,16 @@ func NewConsensus(
db,
logger)
recv.cfgModule = cfgModule
+ appModule := app
+ if usingNonBlocking {
+ appModule = newNonBlocking(app, debugApp)
+ }
// Construct Consensus instance.
con := &Consensus{
ID: ID,
ccModule: newCompactionChain(gov),
lattice: lattice,
- app: newNonBlocking(app, debugApp),
+ app: appModule,
debugApp: debugApp,
gov: gov,
db: db,
diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go
index a3b9aba..b16bbcb 100644
--- a/core/test/tcp-transport.go
+++ b/core/test/tcp-transport.go
@@ -34,9 +34,15 @@ import (
"syscall"
"time"
+ "github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
"github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa"
"github.com/dexon-foundation/dexon-consensus/core/types"
+ "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
+)
+
+const (
+ tcpThroughputReportNum = 10
)
type tcpPeerRecord struct {
@@ -52,6 +58,12 @@ type tcpMessage struct {
Info string `json:"conn"`
}
+// BlockEventMessage is for monitoring block events' time.
+type BlockEventMessage struct {
+ BlockHash common.Hash `json:"hash"`
+ Timestamps []time.Time `json:"timestamps"`
+}
+
// buildPeerInfo is a tricky way to combine connection string and
// base64 encoded byte slice for public key into a single string,
// separated by ';'.
@@ -88,17 +100,19 @@ var (
// TCPTransport implements Transport interface via TCP connection.
type TCPTransport struct {
- peerType TransportPeerType
- nID types.NodeID
- pubKey crypto.PublicKey
- localPort int
- peers map[types.NodeID]*tcpPeerRecord
- peersLock sync.RWMutex
- recvChannel chan *TransportEnvelope
- ctx context.Context
- cancel context.CancelFunc
- latency LatencyModel
- marshaller Marshaller
+ peerType TransportPeerType
+ nID types.NodeID
+ pubKey crypto.PublicKey
+ localPort int
+ peers map[types.NodeID]*tcpPeerRecord
+ peersLock sync.RWMutex
+ recvChannel chan *TransportEnvelope
+ ctx context.Context
+ cancel context.CancelFunc
+ latency LatencyModel
+ marshaller Marshaller
+ throughputRecords []ThroughputRecord
+ throughputLock sync.Mutex
}
// NewTCPTransport constructs an TCPTransport instance.
@@ -111,16 +125,17 @@ func NewTCPTransport(
ctx, cancel := context.WithCancel(context.Background())
return &TCPTransport{
- peerType: peerType,
- nID: types.NewNodeID(pubKey),
- pubKey: pubKey,
- peers: make(map[types.NodeID]*tcpPeerRecord),
- recvChannel: make(chan *TransportEnvelope, 1000),
- ctx: ctx,
- cancel: cancel,
- localPort: localPort,
- latency: latency,
- marshaller: marshaller,
+ peerType: peerType,
+ nID: types.NewNodeID(pubKey),
+ pubKey: pubKey,
+ peers: make(map[types.NodeID]*tcpPeerRecord),
+ recvChannel: make(chan *TransportEnvelope, 1000),
+ ctx: ctx,
+ cancel: cancel,
+ localPort: localPort,
+ latency: latency,
+ marshaller: marshaller,
+ throughputRecords: []ThroughputRecord{},
}
}
@@ -199,10 +214,9 @@ func (t *TCPTransport) Send(
if t.latency != nil {
time.Sleep(t.latency.Delay())
}
-
t.peersLock.RLock()
defer t.peersLock.RUnlock()
-
+ t.handleThroughputData(msg, payload)
t.peers[endpoint].sendChannel <- payload
}()
return
@@ -225,6 +239,7 @@ func (t *TCPTransport) Broadcast(msg interface{}) (err error) {
if t.latency != nil {
time.Sleep(t.latency.Delay())
}
+ t.handleThroughputData(msg, payload)
ch <- payload
}(rec.sendChannel)
}
@@ -298,6 +313,10 @@ func (t *TCPTransport) marshalMessage(
msgCarrier.Type = "peerlist"
case *tcpMessage:
msgCarrier.Type = "trans-msg"
+ case []ThroughputRecord:
+ msgCarrier.Type = "throughput-record"
+ case *BlockEventMessage:
+ msgCarrier.Type = "block-event"
default:
if t.marshaller == nil {
err = fmt.Errorf("unknown msg type: %v", msg)
@@ -349,6 +368,18 @@ func (t *TCPTransport) unmarshalMessage(
return
}
msg = m
+ case "throughput-record":
+ m := &[]ThroughputRecord{}
+ if err = json.Unmarshal(msgCarrier.Payload, m); err != nil {
+ return
+ }
+ msg = m
+ case "block-event":
+ m := &BlockEventMessage{}
+ if err = json.Unmarshal(msgCarrier.Payload, m); err != nil {
+ return
+ }
+ msg = m
default:
if t.marshaller == nil {
err = fmt.Errorf("unknown msg type: %v", msgCarrier.Type)
@@ -537,6 +568,13 @@ func (t *TCPTransport) buildConnectionsToPeers() (err error) {
return
}
+// ThroughputRecord records the network throughput data.
+type ThroughputRecord struct {
+ Type string `json:"type"`
+ Size int `json:"size"`
+ Time time.Time `json:"time"`
+}
+
// TCPTransportClient implement TransportClient base on TCP connection.
type TCPTransportClient struct {
TCPTransport
@@ -702,6 +740,24 @@ func (t *TCPTransportClient) Join(
return
}
+// Send calls TCPTransport's Send, and send the throughput data to peer server.
+func (t *TCPTransportClient) Send(
+ endpoint types.NodeID, msg interface{}) (err error) {
+
+ if err := t.TCPTransport.Send(endpoint, msg); err != nil {
+ return err
+ }
+ if len(t.throughputRecords) > tcpThroughputReportNum {
+ t.throughputLock.Lock()
+ defer t.throughputLock.Unlock()
+ if err := t.Report(t.throughputRecords); err != nil {
+ panic(err)
+ }
+ t.throughputRecords = t.throughputRecords[:0]
+ }
+ return
+}
+
// TCPTransportServer implements TransportServer via TCP connections.
type TCPTransportServer struct {
TCPTransport
@@ -800,3 +856,29 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) {
}
return
}
+
+func (t *TCPTransport) handleThroughputData(msg interface{}, payload []byte) {
+ sentTime := time.Now()
+ t.throughputLock.Lock()
+ defer t.throughputLock.Unlock()
+ recordType := ""
+ switch msg.(type) {
+ case *types.Vote:
+ recordType = "vote"
+ case *types.Block:
+ recordType = "block"
+ case *types.AgreementResult:
+ recordType = "agreement_result"
+ case *dkg.PartialSignature:
+ recordType = "partial_sig"
+ case *types.BlockRandomnessResult:
+ recordType = "block_random"
+ }
+ if len(recordType) > 0 {
+ t.throughputRecords = append(t.throughputRecords, ThroughputRecord{
+ Type: recordType,
+ Time: sentTime,
+ Size: len(payload),
+ })
+ }
+}