From d9ba7986a975615fb10790cfd448c48c89c1a7b3 Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Mon, 20 Aug 2018 13:16:53 +0800 Subject: simulation: taken network latency model into consideration in TCP mode (#68) - Clone block once for each broadcast - Add network latency model for TCPNetwork - Fix map concurrent write --- simulation/peer-server.go | 15 ++++++++++----- simulation/simulation.go | 18 ++++++++++-------- simulation/tcp-network.go | 8 ++++++-- 3 files changed, 26 insertions(+), 15 deletions(-) (limited to 'simulation') diff --git a/simulation/peer-server.go b/simulation/peer-server.go index 43a1c2b..6567639 100644 --- a/simulation/peer-server.go +++ b/simulation/peer-server.go @@ -227,11 +227,16 @@ func (p *PeerServer) Run(configPath string) { switch m.Type { case shutdownAck: - delete(p.peers, id) - log.Printf("%v shutdown, %d remains.\n", id, len(p.peers)) - if len(p.peers) == 0 { - stopServer <- struct{}{} - } + func() { + p.peersMu.Lock() + defer p.peersMu.Unlock() + + delete(p.peers, id) + log.Printf("%v shutdown, %d remains.\n", id, len(p.peers)) + if len(p.peers) == 0 { + stopServer <- struct{}{} + } + }() break case blockTimestamp: msgs := []TimestampMessage{} diff --git a/simulation/simulation.go b/simulation/simulation.go index cc5ac10..bd69fb5 100644 --- a/simulation/simulation.go +++ b/simulation/simulation.go @@ -34,7 +34,14 @@ func Run(configPath string) { networkType := cfg.Networking.Type - var vs []*Validator + var ( + vs []*Validator + networkModel = &NormalNetwork{ + Sigma: cfg.Networking.Sigma, + Mean: cfg.Networking.Mean, + LossRateValue: cfg.Networking.LossRateValue, + } + ) if networkType == config.NetworkTypeFake || networkType == config.NetworkTypeTCPLocal { @@ -42,11 +49,6 @@ func Run(configPath string) { var network Network if networkType == config.NetworkTypeFake { - networkModel := &NormalNetwork{ - Sigma: cfg.Networking.Sigma, - Mean: cfg.Networking.Mean, - LossRateValue: cfg.Networking.LossRateValue, - } network = NewFakeNetwork(networkModel) for i := 0; i < cfg.Validator.Num; i++ { @@ -66,7 +68,7 @@ func Run(configPath string) { } wg.Add(1) go func() { - network := NewTCPNetwork(true, cfg.Networking.PeerServer) + network := NewTCPNetwork(true, cfg.Networking.PeerServer, networkModel) network.Start() lock.Lock() defer lock.Unlock() @@ -86,7 +88,7 @@ func Run(configPath string) { if err != nil { panic(err) } - network := NewTCPNetwork(false, cfg.Networking.PeerServer) + network := NewTCPNetwork(false, cfg.Networking.PeerServer, networkModel) network.Start() v := NewValidator(prv, eth.SigToPub, cfg.Validator, network) go v.Run() diff --git a/simulation/tcp-network.go b/simulation/tcp-network.go index 929920a..c606daf 100644 --- a/simulation/tcp-network.go +++ b/simulation/tcp-network.go @@ -50,10 +50,11 @@ type TCPNetwork struct { endpointMutex sync.RWMutex endpoints map[types.ValidatorID]string recieveChan chan interface{} + model Model } // NewTCPNetwork returns pointer to a new Network instance. -func NewTCPNetwork(local bool, peerServer string) *TCPNetwork { +func NewTCPNetwork(local bool, peerServer string, model Model) *TCPNetwork { pServer := peerServer if local { pServer = "127.0.0.1" @@ -73,6 +74,7 @@ func NewTCPNetwork(local bool, peerServer string) *TCPNetwork { client: client, endpoints: make(map[types.ValidatorID]string), recieveChan: make(chan interface{}, msgBufferSize), + model: model, } } @@ -266,6 +268,7 @@ func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) { msgURL := fmt.Sprintf("http://%s/msg", clientAddr) go func() { + time.Sleep(n.model.Delay()) for i := 0; i < retries; i++ { req, err := http.NewRequest( http.MethodPost, msgURL, strings.NewReader(string(messageJSON))) @@ -292,11 +295,12 @@ func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) { // BroadcastBlock broadcast blocks into the network. func (n *TCPNetwork) BroadcastBlock(block *types.Block) { + block = block.Clone() for endpoint := range n.endpoints { if endpoint == block.ProposerID { continue } - n.Send(endpoint, block.Clone()) + n.Send(endpoint, block) } } -- cgit v1.2.3