aboutsummaryrefslogtreecommitdiffstats
path: root/simulation
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-08-20 13:16:53 +0800
committerGitHub <noreply@github.com>2018-08-20 13:16:53 +0800
commitd9ba7986a975615fb10790cfd448c48c89c1a7b3 (patch)
tree2741f02522707ba9de43928dda67abf1368b1ec3 /simulation
parentfd8358a607ccd564a5e8158451a5d9ef9cb7b55b (diff)
downloaddexon-consensus-d9ba7986a975615fb10790cfd448c48c89c1a7b3.tar
dexon-consensus-d9ba7986a975615fb10790cfd448c48c89c1a7b3.tar.gz
dexon-consensus-d9ba7986a975615fb10790cfd448c48c89c1a7b3.tar.bz2
dexon-consensus-d9ba7986a975615fb10790cfd448c48c89c1a7b3.tar.lz
dexon-consensus-d9ba7986a975615fb10790cfd448c48c89c1a7b3.tar.xz
dexon-consensus-d9ba7986a975615fb10790cfd448c48c89c1a7b3.tar.zst
dexon-consensus-d9ba7986a975615fb10790cfd448c48c89c1a7b3.zip
simulation: taken network latency model into consideration in TCP mode (#68)
- Clone block once for each broadcast - Add network latency model for TCPNetwork - Fix map concurrent write
Diffstat (limited to 'simulation')
-rw-r--r--simulation/peer-server.go15
-rw-r--r--simulation/simulation.go18
-rw-r--r--simulation/tcp-network.go8
3 files changed, 26 insertions, 15 deletions
diff --git a/simulation/peer-server.go b/simulation/peer-server.go
index 43a1c2b..6567639 100644
--- a/simulation/peer-server.go
+++ b/simulation/peer-server.go
@@ -227,11 +227,16 @@ func (p *PeerServer) Run(configPath string) {
switch m.Type {
case shutdownAck:
- delete(p.peers, id)
- log.Printf("%v shutdown, %d remains.\n", id, len(p.peers))
- if len(p.peers) == 0 {
- stopServer <- struct{}{}
- }
+ func() {
+ p.peersMu.Lock()
+ defer p.peersMu.Unlock()
+
+ delete(p.peers, id)
+ log.Printf("%v shutdown, %d remains.\n", id, len(p.peers))
+ if len(p.peers) == 0 {
+ stopServer <- struct{}{}
+ }
+ }()
break
case blockTimestamp:
msgs := []TimestampMessage{}
diff --git a/simulation/simulation.go b/simulation/simulation.go
index cc5ac10..bd69fb5 100644
--- a/simulation/simulation.go
+++ b/simulation/simulation.go
@@ -34,7 +34,14 @@ func Run(configPath string) {
networkType := cfg.Networking.Type
- var vs []*Validator
+ var (
+ vs []*Validator
+ networkModel = &NormalNetwork{
+ Sigma: cfg.Networking.Sigma,
+ Mean: cfg.Networking.Mean,
+ LossRateValue: cfg.Networking.LossRateValue,
+ }
+ )
if networkType == config.NetworkTypeFake ||
networkType == config.NetworkTypeTCPLocal {
@@ -42,11 +49,6 @@ func Run(configPath string) {
var network Network
if networkType == config.NetworkTypeFake {
- networkModel := &NormalNetwork{
- Sigma: cfg.Networking.Sigma,
- Mean: cfg.Networking.Mean,
- LossRateValue: cfg.Networking.LossRateValue,
- }
network = NewFakeNetwork(networkModel)
for i := 0; i < cfg.Validator.Num; i++ {
@@ -66,7 +68,7 @@ func Run(configPath string) {
}
wg.Add(1)
go func() {
- network := NewTCPNetwork(true, cfg.Networking.PeerServer)
+ network := NewTCPNetwork(true, cfg.Networking.PeerServer, networkModel)
network.Start()
lock.Lock()
defer lock.Unlock()
@@ -86,7 +88,7 @@ func Run(configPath string) {
if err != nil {
panic(err)
}
- network := NewTCPNetwork(false, cfg.Networking.PeerServer)
+ network := NewTCPNetwork(false, cfg.Networking.PeerServer, networkModel)
network.Start()
v := NewValidator(prv, eth.SigToPub, cfg.Validator, network)
go v.Run()
diff --git a/simulation/tcp-network.go b/simulation/tcp-network.go
index 929920a..c606daf 100644
--- a/simulation/tcp-network.go
+++ b/simulation/tcp-network.go
@@ -50,10 +50,11 @@ type TCPNetwork struct {
endpointMutex sync.RWMutex
endpoints map[types.ValidatorID]string
recieveChan chan interface{}
+ model Model
}
// NewTCPNetwork returns pointer to a new Network instance.
-func NewTCPNetwork(local bool, peerServer string) *TCPNetwork {
+func NewTCPNetwork(local bool, peerServer string, model Model) *TCPNetwork {
pServer := peerServer
if local {
pServer = "127.0.0.1"
@@ -73,6 +74,7 @@ func NewTCPNetwork(local bool, peerServer string) *TCPNetwork {
client: client,
endpoints: make(map[types.ValidatorID]string),
recieveChan: make(chan interface{}, msgBufferSize),
+ model: model,
}
}
@@ -266,6 +268,7 @@ func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) {
msgURL := fmt.Sprintf("http://%s/msg", clientAddr)
go func() {
+ time.Sleep(n.model.Delay())
for i := 0; i < retries; i++ {
req, err := http.NewRequest(
http.MethodPost, msgURL, strings.NewReader(string(messageJSON)))
@@ -292,11 +295,12 @@ func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) {
// BroadcastBlock broadcast blocks into the network.
func (n *TCPNetwork) BroadcastBlock(block *types.Block) {
+ block = block.Clone()
for endpoint := range n.endpoints {
if endpoint == block.ProposerID {
continue
}
- n.Send(endpoint, block.Clone())
+ n.Send(endpoint, block)
}
}