diff options
-rw-r--r-- | core/blocklattice.go | 11 | ||||
-rw-r--r-- | simulation/app.go | 97 | ||||
-rw-r--r-- | simulation/network.go | 21 | ||||
-rw-r--r-- | simulation/peer-server.go | 29 | ||||
-rw-r--r-- | simulation/verification.go | 63 |
5 files changed, 204 insertions, 17 deletions
diff --git a/core/blocklattice.go b/core/blocklattice.go index 1f12474..f94cc9a 100644 --- a/core/blocklattice.go +++ b/core/blocklattice.go @@ -66,6 +66,9 @@ type BlockLattice struct { candidateSet map[common.Hash]*types.Block ABS map[common.Hash]map[types.ValidatorID]uint64 AHV map[common.Hash]map[types.ValidatorID]uint64 + + // Timestamp. + timestampEngine consensusTimestamp } // NewBlockLattice returns a new empty BlockLattice instance. @@ -86,6 +89,7 @@ func NewBlockLattice( candidateSet: make(map[common.Hash]*types.Block), ABS: make(map[common.Hash]map[types.ValidatorID]uint64), AHV: make(map[common.Hash]map[types.ValidatorID]uint64), + timestampEngine: *newConsensusTimestamp(), } } @@ -533,6 +537,13 @@ func (l *BlockLattice) totalOrdering(b *types.Block) { if len(output) > 0 { l.app.TotalOrderingDeliver(output, earlyDelivery) + blocksReady, _, err := l.timestampEngine.processBlocks(output) + if err != nil && err != ErrEmptyTimestamps { + panic(err) + } + for _, block := range blocksReady { + l.app.DeliverBlock(block.Hash, block.ConsensusTime) + } } // Rescan pending blocks to add into candidate set. diff --git a/simulation/app.go b/simulation/app.go index aea7acf..5533abb 100644 --- a/simulation/app.go +++ b/simulation/app.go @@ -18,6 +18,7 @@ package simulation import ( + "encoding/json" "fmt" "time" @@ -32,17 +33,50 @@ type SimApp struct { Early bool Network PeerServerNetwork DeliverID int + // blockSeen stores the time when block is delivered by Total Ordering. + blockSeen map[common.Hash]time.Time + // uncofirmBlocks stores the blocks whose timestamps are not ready. + unconfirmedBlocks map[types.ValidatorID]common.Hashes + blockHash map[common.Hash]*types.Block } // NewSimApp returns point to a new instance of SimApp. func NewSimApp(id types.ValidatorID, Network PeerServerNetwork) *SimApp { return &SimApp{ - ValidatorID: id, - Network: Network, - DeliverID: 0, + ValidatorID: id, + Network: Network, + DeliverID: 0, + blockSeen: make(map[common.Hash]time.Time), + unconfirmedBlocks: make(map[types.ValidatorID]common.Hashes), + blockHash: make(map[common.Hash]*types.Block), } } +// getAckedBlocks will return all unconfirmed blocks' hash with lower Height +// than the block with ackHash. +func (a *SimApp) getAckedBlocks(ackHash common.Hash) (output common.Hashes) { + // TODO(jimmy-dexon): Why there are some acks never seen? + ackBlock, exist := a.blockHash[ackHash] + if !exist { + return + } + hashes, exist := a.unconfirmedBlocks[ackBlock.ProposerID] + if !exist { + return + } + for i, blockHash := range hashes { + if a.blockHash[blockHash].Height > ackBlock.Height { + output, a.unconfirmedBlocks[ackBlock.ProposerID] = hashes[:i], hashes[i:] + break + } + } + // All of the Height of unconfirmed blocks are lower than the acked block. + if len(output) == 0 { + output, a.unconfirmedBlocks[ackBlock.ProposerID] = hashes, common.Hashes{} + } + return +} + // TotalOrderingDeliver is called when blocks are delivered by the total // ordering algorithm. func (a *SimApp) TotalOrderingDeliver(blocks []*types.Block, early bool) { @@ -54,12 +88,37 @@ func (a *SimApp) TotalOrderingDeliver(blocks []*types.Block, early bool) { blockHash := common.Hashes{} confirmLatency := []time.Duration{} + payload := []TimestampMessage{} for _, block := range blocks { blockHash = append(blockHash, block.Hash) if block.ProposerID == a.ValidatorID { confirmLatency = append(confirmLatency, now.Sub(block.Timestamps[a.ValidatorID])) } + // TODO(jimmy-dexon) : Remove block in this hash if it's no longer needed. + a.blockHash[block.Hash] = block + for hash := range block.Acks { + for _, blockHash := range a.getAckedBlocks(hash) { + payload = append(payload, TimestampMessage{ + BlockHash: blockHash, + Event: timestampAck, + Timestamp: now, + }) + delete(a.blockSeen, block.Hash) + } + } + } + if len(payload) > 0 { + jsonPayload, err := json.Marshal(payload) + if err != nil { + fmt.Println(err) + } else { + msg := Message{ + Type: blockTimestamp, + Payload: jsonPayload, + } + a.Network.NotifyServer(msg) + } } blockList := BlockList{ @@ -69,8 +128,40 @@ func (a *SimApp) TotalOrderingDeliver(blocks []*types.Block, early bool) { } a.Network.DeliverBlocks(blockList) a.DeliverID++ + for _, block := range blocks { + a.blockSeen[block.Hash] = now + a.unconfirmedBlocks[block.ProposerID] = append( + a.unconfirmedBlocks[block.ProposerID], block.Hash) + } } // DeliverBlock is called when a block in compaction chain is delivered. func (a *SimApp) DeliverBlock(blockHash common.Hash, timestamp time.Time) { + seenTime, exist := a.blockSeen[blockHash] + if !exist { + return + } + now := time.Now() + payload := []TimestampMessage{ + { + BlockHash: blockHash, + Event: blockSeen, + Timestamp: seenTime, + }, + { + BlockHash: blockHash, + Event: timestampConfirm, + Timestamp: now, + }, + } + jsonPayload, err := json.Marshal(payload) + if err != nil { + fmt.Println(err) + return + } + msg := Message{ + Type: blockTimestamp, + Payload: jsonPayload, + } + a.Network.NotifyServer(msg) } diff --git a/simulation/network.go b/simulation/network.go index da321bd..0788ca1 100644 --- a/simulation/network.go +++ b/simulation/network.go @@ -19,14 +19,17 @@ package simulation import ( "encoding/json" + "time" + "github.com/dexon-foundation/dexon-consensus-core/common" "github.com/dexon-foundation/dexon-consensus-core/core/types" ) type messageType string const ( - shutdownAck messageType = "shutdownAck" + shutdownAck messageType = "shutdownAck" + blockTimestamp messageType = "blockTimestamps" ) // Message is a struct for peer sending message to server. @@ -35,6 +38,22 @@ type Message struct { Payload json.RawMessage `json:"payload"` } +type timestampEvent string + +const ( + blockSeen timestampEvent = "blockSeen" + timestampConfirm timestampEvent = "timestampConfirm" + timestampAck timestampEvent = "timestampAck" +) + +// TimestampMessage is a struct for peer sending consensus timestamp information +// to server. +type TimestampMessage struct { + BlockHash common.Hash `json:"hash"` + Event timestampEvent `json:"event"` + Timestamp time.Time `json:"timestamp"` +} + type infoStatus string const ( diff --git a/simulation/peer-server.go b/simulation/peer-server.go index 8f2d6ba..bae323e 100644 --- a/simulation/peer-server.go +++ b/simulation/peer-server.go @@ -208,17 +208,32 @@ func (p *PeerServer) Run(configPath string) { return } - if m.Type != shutdownAck { + switch m.Type { + case shutdownAck: + delete(p.peers, id) + log.Printf("%v shutdown, %d remains.\n", id, len(p.peers)) + if len(p.peers) == 0 { + stopServer <- struct{}{} + } + break + case blockTimestamp: + msgs := []TimestampMessage{} + if err := json.Unmarshal(m.Payload, &msgs); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + for _, msg := range msgs { + if ok := p.peerTotalOrder[id].PushTimestamp(msg); !ok { + w.WriteHeader(http.StatusBadRequest) + return + } + } + break + default: w.WriteHeader(http.StatusBadRequest) return } - w.WriteHeader(http.StatusOK) - delete(p.peers, id) - log.Printf("%v shutdown, %d remains.\n", id, len(p.peers)) - if len(p.peers) == 0 { - stopServer <- struct{}{} - } } http.HandleFunc("/reset", resetHandler) diff --git a/simulation/verification.go b/simulation/verification.go index 1f8947c..574f3c5 100644 --- a/simulation/verification.go +++ b/simulation/verification.go @@ -33,8 +33,11 @@ type timeStamp struct { } type totalOrderStatus struct { - blockReceive []timeStamp - confirmLatency []time.Duration + blockReceive []timeStamp + confirmLatency []time.Duration + blockSeen map[common.Hash]time.Time + internalTimestampLatency []time.Duration + externalTimestampLatency []time.Duration } // TotalOrderResult is the object maintaining peer's result of @@ -54,6 +57,9 @@ type PeerTotalOrder = map[types.ValidatorID]*TotalOrderResult func NewTotalOrderResult(vID types.ValidatorID) *TotalOrderResult { totalOrder := &TotalOrderResult{ validatorID: vID, + status: totalOrderStatus{ + blockSeen: make(map[common.Hash]time.Time), + }, } heap.Init(&totalOrder.pendingBlockList) return totalOrder @@ -91,6 +97,28 @@ func (totalOrder *TotalOrderResult) PushBlocks(blocks BlockList) (ready bool) { return true } +// PushTimestamp log the information in the msg. +func (totalOrder *TotalOrderResult) PushTimestamp(msg TimestampMessage) bool { + pushLatency := func(latency *[]time.Duration, t1, t2 time.Time) { + *latency = append(*latency, t2.Sub(t1)) + } + switch msg.Event { + case blockSeen: + totalOrder.status.blockSeen[msg.BlockHash] = msg.Timestamp + case timestampConfirm: + pushLatency(&totalOrder.status.internalTimestampLatency, + totalOrder.status.blockSeen[msg.BlockHash], msg.Timestamp) + case timestampAck: + if seenTime, exist := totalOrder.status.blockSeen[msg.BlockHash]; exist { + pushLatency(&totalOrder.status.externalTimestampLatency, + seenTime, msg.Timestamp) + } + default: + return false + } + return true +} + // CalculateBlocksPerSecond calculates the result using status.blockReceive func (totalOrder *TotalOrderResult) CalculateBlocksPerSecond() float64 { ts := totalOrder.status.blockReceive @@ -123,6 +151,25 @@ func (totalOrder *TotalOrderResult) CalculateAverageConfirmLatency() float64 { return sum / float64(len(totalOrder.status.confirmLatency)) } +// CalculateAverageTimestampLatency calculates the result using +// status.timestampLatency +func (totalOrder *TotalOrderResult) CalculateAverageTimestampLatency() ( + internal float64, external float64) { + for _, latency := range totalOrder.status.internalTimestampLatency { + internal += latency.Seconds() + } + if internal > 0 { + internal /= float64(len(totalOrder.status.internalTimestampLatency)) + } + for _, latency := range totalOrder.status.externalTimestampLatency { + external += latency.Seconds() + } + if external > 0 { + external /= float64(len(totalOrder.status.externalTimestampLatency)) + } + return +} + // VerifyTotalOrder verifies if the result of Total Ordering Algorithm // returned by all validators are the same. However, the length of result // of each validators may not be the same, so only the common part is verified. @@ -171,9 +218,13 @@ func VerifyTotalOrder(id types.ValidatorID, // LogStatus prints all the status to log. func LogStatus(peerTotalOrder PeerTotalOrder) { for vID, totalOrder := range peerTotalOrder { - log.Printf("[Validator %s] BPS: %.6f\n", - vID, totalOrder.CalculateBlocksPerSecond()) - log.Printf("[Validator %s] Confirm Latency: %.3fs\n", - vID, totalOrder.CalculateAverageConfirmLatency()) + log.Printf("[Validator %s]\n", vID) + log.Printf(" BPS: %.6f\n", + totalOrder.CalculateBlocksPerSecond()) + log.Printf(" Confirm Latency: %.2fms\n", + totalOrder.CalculateAverageConfirmLatency()*1000) + intLatency, extLatency := totalOrder.CalculateAverageTimestampLatency() + log.Printf(" Internal Timestamp Latency: %.2fms\n", intLatency*1000) + log.Printf(" External Timestamp Latency: %.2fms\n", extLatency*1000) } } |