aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2018-08-07 14:33:13 +0800
committerGitHub <noreply@github.com>2018-08-07 14:33:13 +0800
commit3a929b656b6bd5846849fd98dc29ff761db97ed3 (patch)
tree25438de64f434866293aa408915c735352949e6f
parent5a818558fc0e8f038ba92b5b6dfa3b55a04b9589 (diff)
downloaddexon-consensus-3a929b656b6bd5846849fd98dc29ff761db97ed3.tar
dexon-consensus-3a929b656b6bd5846849fd98dc29ff761db97ed3.tar.gz
dexon-consensus-3a929b656b6bd5846849fd98dc29ff761db97ed3.tar.bz2
dexon-consensus-3a929b656b6bd5846849fd98dc29ff761db97ed3.tar.lz
dexon-consensus-3a929b656b6bd5846849fd98dc29ff761db97ed3.tar.xz
dexon-consensus-3a929b656b6bd5846849fd98dc29ff761db97ed3.tar.zst
dexon-consensus-3a929b656b6bd5846849fd98dc29ff761db97ed3.zip
simulation: Show internal and external timestamp latency. (#30)
-rw-r--r--core/blocklattice.go11
-rw-r--r--simulation/app.go97
-rw-r--r--simulation/network.go21
-rw-r--r--simulation/peer-server.go29
-rw-r--r--simulation/verification.go63
5 files changed, 204 insertions, 17 deletions
diff --git a/core/blocklattice.go b/core/blocklattice.go
index 1f12474..f94cc9a 100644
--- a/core/blocklattice.go
+++ b/core/blocklattice.go
@@ -66,6 +66,9 @@ type BlockLattice struct {
candidateSet map[common.Hash]*types.Block
ABS map[common.Hash]map[types.ValidatorID]uint64
AHV map[common.Hash]map[types.ValidatorID]uint64
+
+ // Timestamp.
+ timestampEngine consensusTimestamp
}
// NewBlockLattice returns a new empty BlockLattice instance.
@@ -86,6 +89,7 @@ func NewBlockLattice(
candidateSet: make(map[common.Hash]*types.Block),
ABS: make(map[common.Hash]map[types.ValidatorID]uint64),
AHV: make(map[common.Hash]map[types.ValidatorID]uint64),
+ timestampEngine: *newConsensusTimestamp(),
}
}
@@ -533,6 +537,13 @@ func (l *BlockLattice) totalOrdering(b *types.Block) {
if len(output) > 0 {
l.app.TotalOrderingDeliver(output, earlyDelivery)
+ blocksReady, _, err := l.timestampEngine.processBlocks(output)
+ if err != nil && err != ErrEmptyTimestamps {
+ panic(err)
+ }
+ for _, block := range blocksReady {
+ l.app.DeliverBlock(block.Hash, block.ConsensusTime)
+ }
}
// Rescan pending blocks to add into candidate set.
diff --git a/simulation/app.go b/simulation/app.go
index aea7acf..5533abb 100644
--- a/simulation/app.go
+++ b/simulation/app.go
@@ -18,6 +18,7 @@
package simulation
import (
+ "encoding/json"
"fmt"
"time"
@@ -32,17 +33,50 @@ type SimApp struct {
Early bool
Network PeerServerNetwork
DeliverID int
+ // blockSeen stores the time when block is delivered by Total Ordering.
+ blockSeen map[common.Hash]time.Time
+ // uncofirmBlocks stores the blocks whose timestamps are not ready.
+ unconfirmedBlocks map[types.ValidatorID]common.Hashes
+ blockHash map[common.Hash]*types.Block
}
// NewSimApp returns point to a new instance of SimApp.
func NewSimApp(id types.ValidatorID, Network PeerServerNetwork) *SimApp {
return &SimApp{
- ValidatorID: id,
- Network: Network,
- DeliverID: 0,
+ ValidatorID: id,
+ Network: Network,
+ DeliverID: 0,
+ blockSeen: make(map[common.Hash]time.Time),
+ unconfirmedBlocks: make(map[types.ValidatorID]common.Hashes),
+ blockHash: make(map[common.Hash]*types.Block),
}
}
+// getAckedBlocks will return all unconfirmed blocks' hash with lower Height
+// than the block with ackHash.
+func (a *SimApp) getAckedBlocks(ackHash common.Hash) (output common.Hashes) {
+ // TODO(jimmy-dexon): Why there are some acks never seen?
+ ackBlock, exist := a.blockHash[ackHash]
+ if !exist {
+ return
+ }
+ hashes, exist := a.unconfirmedBlocks[ackBlock.ProposerID]
+ if !exist {
+ return
+ }
+ for i, blockHash := range hashes {
+ if a.blockHash[blockHash].Height > ackBlock.Height {
+ output, a.unconfirmedBlocks[ackBlock.ProposerID] = hashes[:i], hashes[i:]
+ break
+ }
+ }
+ // All of the Height of unconfirmed blocks are lower than the acked block.
+ if len(output) == 0 {
+ output, a.unconfirmedBlocks[ackBlock.ProposerID] = hashes, common.Hashes{}
+ }
+ return
+}
+
// TotalOrderingDeliver is called when blocks are delivered by the total
// ordering algorithm.
func (a *SimApp) TotalOrderingDeliver(blocks []*types.Block, early bool) {
@@ -54,12 +88,37 @@ func (a *SimApp) TotalOrderingDeliver(blocks []*types.Block, early bool) {
blockHash := common.Hashes{}
confirmLatency := []time.Duration{}
+ payload := []TimestampMessage{}
for _, block := range blocks {
blockHash = append(blockHash, block.Hash)
if block.ProposerID == a.ValidatorID {
confirmLatency = append(confirmLatency,
now.Sub(block.Timestamps[a.ValidatorID]))
}
+ // TODO(jimmy-dexon) : Remove block in this hash if it's no longer needed.
+ a.blockHash[block.Hash] = block
+ for hash := range block.Acks {
+ for _, blockHash := range a.getAckedBlocks(hash) {
+ payload = append(payload, TimestampMessage{
+ BlockHash: blockHash,
+ Event: timestampAck,
+ Timestamp: now,
+ })
+ delete(a.blockSeen, block.Hash)
+ }
+ }
+ }
+ if len(payload) > 0 {
+ jsonPayload, err := json.Marshal(payload)
+ if err != nil {
+ fmt.Println(err)
+ } else {
+ msg := Message{
+ Type: blockTimestamp,
+ Payload: jsonPayload,
+ }
+ a.Network.NotifyServer(msg)
+ }
}
blockList := BlockList{
@@ -69,8 +128,40 @@ func (a *SimApp) TotalOrderingDeliver(blocks []*types.Block, early bool) {
}
a.Network.DeliverBlocks(blockList)
a.DeliverID++
+ for _, block := range blocks {
+ a.blockSeen[block.Hash] = now
+ a.unconfirmedBlocks[block.ProposerID] = append(
+ a.unconfirmedBlocks[block.ProposerID], block.Hash)
+ }
}
// DeliverBlock is called when a block in compaction chain is delivered.
func (a *SimApp) DeliverBlock(blockHash common.Hash, timestamp time.Time) {
+ seenTime, exist := a.blockSeen[blockHash]
+ if !exist {
+ return
+ }
+ now := time.Now()
+ payload := []TimestampMessage{
+ {
+ BlockHash: blockHash,
+ Event: blockSeen,
+ Timestamp: seenTime,
+ },
+ {
+ BlockHash: blockHash,
+ Event: timestampConfirm,
+ Timestamp: now,
+ },
+ }
+ jsonPayload, err := json.Marshal(payload)
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ msg := Message{
+ Type: blockTimestamp,
+ Payload: jsonPayload,
+ }
+ a.Network.NotifyServer(msg)
}
diff --git a/simulation/network.go b/simulation/network.go
index da321bd..0788ca1 100644
--- a/simulation/network.go
+++ b/simulation/network.go
@@ -19,14 +19,17 @@ package simulation
import (
"encoding/json"
+ "time"
+ "github.com/dexon-foundation/dexon-consensus-core/common"
"github.com/dexon-foundation/dexon-consensus-core/core/types"
)
type messageType string
const (
- shutdownAck messageType = "shutdownAck"
+ shutdownAck messageType = "shutdownAck"
+ blockTimestamp messageType = "blockTimestamps"
)
// Message is a struct for peer sending message to server.
@@ -35,6 +38,22 @@ type Message struct {
Payload json.RawMessage `json:"payload"`
}
+type timestampEvent string
+
+const (
+ blockSeen timestampEvent = "blockSeen"
+ timestampConfirm timestampEvent = "timestampConfirm"
+ timestampAck timestampEvent = "timestampAck"
+)
+
+// TimestampMessage is a struct for peer sending consensus timestamp information
+// to server.
+type TimestampMessage struct {
+ BlockHash common.Hash `json:"hash"`
+ Event timestampEvent `json:"event"`
+ Timestamp time.Time `json:"timestamp"`
+}
+
type infoStatus string
const (
diff --git a/simulation/peer-server.go b/simulation/peer-server.go
index 8f2d6ba..bae323e 100644
--- a/simulation/peer-server.go
+++ b/simulation/peer-server.go
@@ -208,17 +208,32 @@ func (p *PeerServer) Run(configPath string) {
return
}
- if m.Type != shutdownAck {
+ switch m.Type {
+ case shutdownAck:
+ delete(p.peers, id)
+ log.Printf("%v shutdown, %d remains.\n", id, len(p.peers))
+ if len(p.peers) == 0 {
+ stopServer <- struct{}{}
+ }
+ break
+ case blockTimestamp:
+ msgs := []TimestampMessage{}
+ if err := json.Unmarshal(m.Payload, &msgs); err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ for _, msg := range msgs {
+ if ok := p.peerTotalOrder[id].PushTimestamp(msg); !ok {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ }
+ break
+ default:
w.WriteHeader(http.StatusBadRequest)
return
}
-
w.WriteHeader(http.StatusOK)
- delete(p.peers, id)
- log.Printf("%v shutdown, %d remains.\n", id, len(p.peers))
- if len(p.peers) == 0 {
- stopServer <- struct{}{}
- }
}
http.HandleFunc("/reset", resetHandler)
diff --git a/simulation/verification.go b/simulation/verification.go
index 1f8947c..574f3c5 100644
--- a/simulation/verification.go
+++ b/simulation/verification.go
@@ -33,8 +33,11 @@ type timeStamp struct {
}
type totalOrderStatus struct {
- blockReceive []timeStamp
- confirmLatency []time.Duration
+ blockReceive []timeStamp
+ confirmLatency []time.Duration
+ blockSeen map[common.Hash]time.Time
+ internalTimestampLatency []time.Duration
+ externalTimestampLatency []time.Duration
}
// TotalOrderResult is the object maintaining peer's result of
@@ -54,6 +57,9 @@ type PeerTotalOrder = map[types.ValidatorID]*TotalOrderResult
func NewTotalOrderResult(vID types.ValidatorID) *TotalOrderResult {
totalOrder := &TotalOrderResult{
validatorID: vID,
+ status: totalOrderStatus{
+ blockSeen: make(map[common.Hash]time.Time),
+ },
}
heap.Init(&totalOrder.pendingBlockList)
return totalOrder
@@ -91,6 +97,28 @@ func (totalOrder *TotalOrderResult) PushBlocks(blocks BlockList) (ready bool) {
return true
}
+// PushTimestamp log the information in the msg.
+func (totalOrder *TotalOrderResult) PushTimestamp(msg TimestampMessage) bool {
+ pushLatency := func(latency *[]time.Duration, t1, t2 time.Time) {
+ *latency = append(*latency, t2.Sub(t1))
+ }
+ switch msg.Event {
+ case blockSeen:
+ totalOrder.status.blockSeen[msg.BlockHash] = msg.Timestamp
+ case timestampConfirm:
+ pushLatency(&totalOrder.status.internalTimestampLatency,
+ totalOrder.status.blockSeen[msg.BlockHash], msg.Timestamp)
+ case timestampAck:
+ if seenTime, exist := totalOrder.status.blockSeen[msg.BlockHash]; exist {
+ pushLatency(&totalOrder.status.externalTimestampLatency,
+ seenTime, msg.Timestamp)
+ }
+ default:
+ return false
+ }
+ return true
+}
+
// CalculateBlocksPerSecond calculates the result using status.blockReceive
func (totalOrder *TotalOrderResult) CalculateBlocksPerSecond() float64 {
ts := totalOrder.status.blockReceive
@@ -123,6 +151,25 @@ func (totalOrder *TotalOrderResult) CalculateAverageConfirmLatency() float64 {
return sum / float64(len(totalOrder.status.confirmLatency))
}
+// CalculateAverageTimestampLatency calculates the result using
+// status.timestampLatency
+func (totalOrder *TotalOrderResult) CalculateAverageTimestampLatency() (
+ internal float64, external float64) {
+ for _, latency := range totalOrder.status.internalTimestampLatency {
+ internal += latency.Seconds()
+ }
+ if internal > 0 {
+ internal /= float64(len(totalOrder.status.internalTimestampLatency))
+ }
+ for _, latency := range totalOrder.status.externalTimestampLatency {
+ external += latency.Seconds()
+ }
+ if external > 0 {
+ external /= float64(len(totalOrder.status.externalTimestampLatency))
+ }
+ return
+}
+
// VerifyTotalOrder verifies if the result of Total Ordering Algorithm
// returned by all validators are the same. However, the length of result
// of each validators may not be the same, so only the common part is verified.
@@ -171,9 +218,13 @@ func VerifyTotalOrder(id types.ValidatorID,
// LogStatus prints all the status to log.
func LogStatus(peerTotalOrder PeerTotalOrder) {
for vID, totalOrder := range peerTotalOrder {
- log.Printf("[Validator %s] BPS: %.6f\n",
- vID, totalOrder.CalculateBlocksPerSecond())
- log.Printf("[Validator %s] Confirm Latency: %.3fs\n",
- vID, totalOrder.CalculateAverageConfirmLatency())
+ log.Printf("[Validator %s]\n", vID)
+ log.Printf(" BPS: %.6f\n",
+ totalOrder.CalculateBlocksPerSecond())
+ log.Printf(" Confirm Latency: %.2fms\n",
+ totalOrder.CalculateAverageConfirmLatency()*1000)
+ intLatency, extLatency := totalOrder.CalculateAverageTimestampLatency()
+ log.Printf(" Internal Timestamp Latency: %.2fms\n", intLatency*1000)
+ log.Printf(" External Timestamp Latency: %.2fms\n", extLatency*1000)
}
}