diff options
author | haoping-ku <haoping.ku@dexon.org> | 2018-12-05 17:38:03 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-12-05 17:38:03 +0800 |
commit | 4eb02f1dd96e136b0f7cf7eff792da1e44176713 (patch) | |
tree | 3757739bff31ce4b9cb7ff45be572f9858fc19e9 | |
parent | 1f48b590f6e9a6d3fd773846a3d8ba1b7f0419e6 (diff) | |
download | dexon-consensus-4eb02f1dd96e136b0f7cf7eff792da1e44176713.tar dexon-consensus-4eb02f1dd96e136b0f7cf7eff792da1e44176713.tar.gz dexon-consensus-4eb02f1dd96e136b0f7cf7eff792da1e44176713.tar.bz2 dexon-consensus-4eb02f1dd96e136b0f7cf7eff792da1e44176713.tar.lz dexon-consensus-4eb02f1dd96e136b0f7cf7eff792da1e44176713.tar.xz dexon-consensus-4eb02f1dd96e136b0f7cf7eff792da1e44176713.tar.zst dexon-consensus-4eb02f1dd96e136b0f7cf7eff792da1e44176713.zip |
Haoping fix simulation (#356)
* simulation: add benchmark features
* tmp
* simulation: modify Debug interface
* Added BlockReceived and BlockReady function to Debug interface.
* Added Benchmark features.
* fix
* fix typos
-rw-r--r-- | cmd/dexcon-simulation/main.go | 10 | ||||
-rw-r--r-- | core/consensus.go | 21 | ||||
-rw-r--r-- | core/interfaces.go | 6 | ||||
-rw-r--r-- | core/nonblocking_test.go | 4 | ||||
-rw-r--r-- | core/test/app.go | 6 | ||||
-rw-r--r-- | simulation/app.go | 66 | ||||
-rw-r--r-- | simulation/peer-server.go | 34 | ||||
-rw-r--r-- | simulation/utils.go | 36 |
8 files changed, 168 insertions, 15 deletions
diff --git a/cmd/dexcon-simulation/main.go b/cmd/dexcon-simulation/main.go index e45dbce..3953777 100644 --- a/cmd/dexcon-simulation/main.go +++ b/cmd/dexcon-simulation/main.go @@ -20,6 +20,7 @@ package main import ( "flag" "fmt" + "io" "log" "math/rand" "net/http" @@ -69,6 +70,15 @@ func main() { defer pprof.StopCPUProfile() } + if *logfile != "" { + f, err := os.Create(*logfile) + if err != nil { + log.Fatal("could not create log file: ", err) + } + mw := io.MultiWriter(os.Stdout, f) + log.SetOutput(mw) + } + cfg, err := config.Read(*configFile) if err != nil { panic(err) diff --git a/core/consensus.go b/core/consensus.go index bfe893c..6ca54e0 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -340,10 +340,11 @@ type Consensus struct { toSyncer *totalOrderingSyncer // Interfaces. - db blockdb.BlockDatabase - app Application - gov Governance - network Network + db blockdb.BlockDatabase + app Application + debugApp Debug + gov Governance + network Network // Misc. dMoment time.Time @@ -372,7 +373,10 @@ func NewConsensus( // Setup auth module. authModule := NewAuthenticator(prv) // Check if the application implement Debug interface. - debugApp, _ := app.(Debug) + var debugApp Debug + if a, ok := app.(Debug); ok { + debugApp = a + } // Get configuration for genesis round. var round uint64 logger.Debug("Calling Governance.Configuration", "round", round) @@ -407,6 +411,7 @@ func NewConsensus( ccModule: newCompactionChain(gov), lattice: lattice, app: newNonBlocking(app, debugApp), + debugApp: debugApp, gov: gov, db: db, network: network, @@ -961,6 +966,9 @@ func (con *Consensus) ProcessBlockRandomnessResult( // preProcessBlock performs Byzantine Agreement on the block. func (con *Consensus) preProcessBlock(b *types.Block) (err error) { err = con.baMgr.processBlock(b) + if err == nil && con.debugApp != nil { + con.debugApp.BlockReceived(b.Hash) + } return } @@ -1027,6 +1035,9 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { } con.cfgModule.untouchTSigHash(b.Hash) con.deliverBlock(b) + if con.debugApp != nil { + con.debugApp.BlockReady(b.Hash) + } } if err = con.lattice.PurgeBlocks(deliveredBlocks); err != nil { return diff --git a/core/interfaces.go b/core/interfaces.go index 6979854..2ebfe86 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -41,7 +41,7 @@ type Application interface { // BlockConfirmed is called when a block is confirmed and added to lattice. BlockConfirmed(block types.Block) - // BlockDelivered is called when a block is add to the compaction chain. + // BlockDelivered is called when a block is added to the compaction chain. BlockDelivered(blockHash common.Hash, blockPosition types.Position, result types.FinalizationResult) } @@ -49,9 +49,13 @@ type Application interface { // Debug describes the application interface that requires // more detailed consensus execution. type Debug interface { + // BlockReceived is called when the block received in agreement. + BlockReceived(common.Hash) // TotalOrderingDelivered is called when the total ordering algorithm deliver // a set of block. TotalOrderingDelivered(common.Hashes, uint32) + // BlockReady is called when the block's randomness is ready. + BlockReady(common.Hash) } // Network describs the network interface that interacts with DEXON consensus diff --git a/core/nonblocking_test.go b/core/nonblocking_test.go index cec1d8d..542382c 100644 --- a/core/nonblocking_test.go +++ b/core/nonblocking_test.go @@ -74,6 +74,10 @@ func (app *slowApp) BlockDelivered(blockHash common.Hash, app.blockDelivered[blockHash] = struct{}{} } +func (app *slowApp) BlockReceived(hash common.Hash) {} + +func (app *slowApp) BlockReady(hash common.Hash) {} + // noDebugApp is to make sure nonBlocking works when Debug interface // is not implemented by the provided Application instance. type noDebugApp struct { diff --git a/core/test/app.go b/core/test/app.go index 9f030e9..d26784e 100644 --- a/core/test/app.go +++ b/core/test/app.go @@ -280,6 +280,12 @@ Loop: return nil } +// BlockReceived implements interface Debug. +func (app *App) BlockReceived(hash common.Hash) {} + +// BlockReady implements interface Debug. +func (app *App) BlockReady(hash common.Hash) {} + // WithLock provides a backdoor to check status of App with reader lock. func (app *App) WithLock(function func(*App)) { app.confirmedLock.RLock() diff --git a/simulation/app.go b/simulation/app.go index af271e1..74bd6cd 100644 --- a/simulation/app.go +++ b/simulation/app.go @@ -44,22 +44,41 @@ type timestampMessage struct { Timestamp time.Time `json:"timestamp"` } +const ( + // Block received or created in agreement. + blockEventReceived int = iota + // Block confirmed in agreement, sent into lattice + blockEventConfirmed + // Block delivered by lattice. + blockEventDelivered + // block is ready (Randomness calculated) + blockEventReady + + blockEventCount +) + +type blockEventMessage struct { + BlockHash common.Hash `json:"hash"` + Timestamps []time.Time `json:"timestamps"` +} + // simApp is an DEXON app for simulation. type simApp struct { - NodeID types.NodeID - Outputs []*types.Block - Early bool - netModule *test.Network - stateModule *test.State - DeliverID int - // blockSeen stores the time when block is delivered by Total Ordering. - blockSeen map[common.Hash]time.Time + NodeID types.NodeID + Outputs []*types.Block + Early bool + netModule *test.Network + stateModule *test.State + DeliverID int + blockTimestamps map[common.Hash][]time.Time + blockSeen map[common.Hash]time.Time // uncofirmBlocks stores the blocks whose timestamps are not ready. unconfirmedBlocks map[types.NodeID]common.Hashes blockByHash map[common.Hash]*types.Block blockByHashMutex sync.RWMutex latestWitness types.Witness latestWitnessReady *sync.Cond + lock sync.RWMutex } // newSimApp returns point to a new instance of simApp. @@ -71,6 +90,7 @@ func newSimApp( stateModule: stateModule, DeliverID: 0, blockSeen: make(map[common.Hash]time.Time), + blockTimestamps: make(map[common.Hash][]time.Time), unconfirmedBlocks: make(map[types.NodeID]common.Hashes), blockByHash: make(map[common.Hash]*types.Block), latestWitnessReady: sync.NewCond(&sync.Mutex{}), @@ -84,6 +104,7 @@ func (a *simApp) BlockConfirmed(block types.Block) { // TODO(jimmy-dexon) : Remove block in this hash if it's no longer needed. a.blockByHash[block.Hash] = &block a.blockSeen[block.Hash] = time.Now().UTC() + a.updateBlockEvent(block.Hash) } // VerifyBlock implements core.Application. @@ -139,7 +160,7 @@ func (a *simApp) TotalOrderingDelivered( fmt.Println("OUTPUT", a.NodeID, mode, blockHashes) latencies := []time.Duration{} for _, h := range blockHashes { - latencies = append(latencies, time.Since(a.blockSeen[h])) + latencies = append(latencies, time.Since(a.blockTimestamps[h][blockEventConfirmed])) } blockList := &BlockList{ ID: a.DeliverID, @@ -153,6 +174,7 @@ func (a *simApp) TotalOrderingDelivered( // BlockDelivered is called when a block in compaction chain is delivered. func (a *simApp) BlockDelivered( blockHash common.Hash, pos types.Position, result types.FinalizationResult) { + if len(result.Randomness) == 0 && pos.Round > 0 { panic(fmt.Errorf("Block %s randomness is empty", blockHash)) } @@ -178,6 +200,8 @@ func (a *simApp) BlockDelivered( a.latestWitnessReady.Broadcast() }() + a.updateBlockEvent(blockHash) + seenTime, exist := a.blockSeen[blockHash] if !exist { return @@ -206,3 +230,27 @@ func (a *simApp) BlockDelivered( } a.netModule.Report(msg) } + +// BlockReceived is called when a block is received in agreement. +func (a *simApp) BlockReceived(hash common.Hash) { + a.updateBlockEvent(hash) +} + +// BlockReady is called when a block is ready. +func (a *simApp) BlockReady(hash common.Hash) { + a.updateBlockEvent(hash) +} + +func (a *simApp) updateBlockEvent(hash common.Hash) { + a.lock.Lock() + defer a.lock.Unlock() + a.blockTimestamps[hash] = append(a.blockTimestamps[hash], time.Now().UTC()) + if len(a.blockTimestamps[hash]) == blockEventCount { + msg := &blockEventMessage{ + BlockHash: hash, + Timestamps: a.blockTimestamps[hash], + } + a.netModule.Report(msg) + delete(a.blockTimestamps, hash) + } +} diff --git a/simulation/peer-server.go b/simulation/peer-server.go index 1cd6ca1..95ce5c1 100644 --- a/simulation/peer-server.go +++ b/simulation/peer-server.go @@ -24,7 +24,9 @@ import ( "log" "reflect" "sync" + "time" + "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/test" "github.com/dexon-foundation/dexon-consensus/core/types" "github.com/dexon-foundation/dexon-consensus/simulation/config" @@ -41,6 +43,7 @@ type PeerServer struct { cfg *config.Config ctx context.Context ctxCancel context.CancelFunc + blockEvents map[types.NodeID]map[common.Hash][]time.Time } // NewPeerServer returns a new PeerServer instance. @@ -51,6 +54,7 @@ func NewPeerServer() *PeerServer { peerTotalOrder: make(PeerTotalOrder), ctx: ctx, ctxCancel: cancel, + blockEvents: make(map[types.NodeID]map[common.Hash][]time.Time), } } @@ -114,6 +118,17 @@ func (p *PeerServer) handleMessage(id types.NodeID, m *message) { } } +func (p *PeerServer) handleBlockEventMessage(id types.NodeID, msg *blockEventMessage) { + if _, exist := p.blockEvents[id]; !exist { + p.blockEvents[id] = make(map[common.Hash][]time.Time) + } + nodeEvents := p.blockEvents[id] + if _, exist := nodeEvents[msg.BlockHash]; !exist { + nodeEvents[msg.BlockHash] = []time.Time{} + } + nodeEvents[msg.BlockHash] = msg.Timestamps +} + func (p *PeerServer) mainLoop() { for { select { @@ -134,6 +149,8 @@ func (p *PeerServer) mainLoop() { p.handleBlockList(e.From, val) case *message: p.handleMessage(e.From, val) + case *blockEventMessage: + p.handleBlockEventMessage(e.From, val) default: panic(fmt.Errorf("unknown message: %v", reflect.TypeOf(e.Msg))) } @@ -182,4 +199,21 @@ func (p *PeerServer) Run() { if err := p.trans.Close(); err != nil { log.Printf("Error shutting down peerServer: %v\n", err) } + p.logBlockEvents() +} + +func (p *PeerServer) logBlockEvents() { + diffs := [blockEventCount - 1][]float64{} + for _, bs := range p.blockEvents { + for _, ts := range bs { + for i := 0; i < blockEventCount-1; i++ { + diffs[i] = append(diffs[i], float64(ts[i+1].Sub(ts[i]))/1000000000) + } + } + } + log.Printf("===== block event mean and std dev (%d blocks) =====\n", len(diffs[0])) + for i, a := range diffs { + m, d := calcMeanAndStdDeviation(a) + log.Printf(" event %d: mean = %f, std dev = %f\n", i, m, d) + } } diff --git a/simulation/utils.go b/simulation/utils.go new file mode 100644 index 0000000..f0d864b --- /dev/null +++ b/simulation/utils.go @@ -0,0 +1,36 @@ +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// <http://www.gnu.org/licenses/>. + +package simulation + +import ( + "math" +) + +func calcMeanAndStdDeviation(a []float64) (float64, float64) { + sum := float64(0) + for _, i := range a { + sum += i + } + mean := sum / float64(len(a)) + dev := float64(0) + for _, i := range a { + dev += (i - mean) * (i - mean) + } + dev = math.Sqrt(dev / float64(len(a))) + return mean, dev +} |