aboutsummaryrefslogtreecommitdiffstats
path: root/simulation
diff options
context:
space:
mode:
authorWei-Ning Huang <w@dexon.org>2018-08-06 11:14:02 +0800
committerGitHub <noreply@github.com>2018-08-06 11:14:02 +0800
commitbb95322f46524ea52b6762c349905dcc910814c8 (patch)
treecf53a3b40e5eaba66eb8d837215650fab9b6150c /simulation
parent4b606c3c12db824cb996d9eca43c1b66b2bdb440 (diff)
downloaddexon-consensus-bb95322f46524ea52b6762c349905dcc910814c8.tar
dexon-consensus-bb95322f46524ea52b6762c349905dcc910814c8.tar.gz
dexon-consensus-bb95322f46524ea52b6762c349905dcc910814c8.tar.bz2
dexon-consensus-bb95322f46524ea52b6762c349905dcc910814c8.tar.lz
dexon-consensus-bb95322f46524ea52b6762c349905dcc910814c8.tar.xz
dexon-consensus-bb95322f46524ea52b6762c349905dcc910814c8.tar.zst
dexon-consensus-bb95322f46524ea52b6762c349905dcc910814c8.zip
simulation: free resource after use and misc fix for k8s mode (#31)
Diffstat (limited to 'simulation')
-rw-r--r--simulation/constant.go2
-rw-r--r--simulation/kubernetes/config.toml.in1
-rwxr-xr-xsimulation/kubernetes/run_simulation.sh13
-rw-r--r--simulation/network.go5
-rw-r--r--simulation/peer-server.go23
-rw-r--r--simulation/simulation.go6
-rw-r--r--simulation/tcp-network.go25
-rw-r--r--simulation/validator.go2
8 files changed, 63 insertions, 14 deletions
diff --git a/simulation/constant.go b/simulation/constant.go
index aa21a3d..79ef84a 100644
--- a/simulation/constant.go
+++ b/simulation/constant.go
@@ -19,5 +19,5 @@ package simulation
const (
peerPort = 8080
- msgBufferSize = 128
+ msgBufferSize = 256
)
diff --git a/simulation/kubernetes/config.toml.in b/simulation/kubernetes/config.toml.in
index d956055..b46e8d6 100644
--- a/simulation/kubernetes/config.toml.in
+++ b/simulation/kubernetes/config.toml.in
@@ -4,6 +4,7 @@ title = "DEXON Consensus Simulation Config"
num = {{numValidators}}
propose_interval_mean = 5e+02
propose_interval_sigma = 3e+01
+max_block = 100
[networking]
type = "tcp"
diff --git a/simulation/kubernetes/run_simulation.sh b/simulation/kubernetes/run_simulation.sh
index 88f9eee..c5298d7 100755
--- a/simulation/kubernetes/run_simulation.sh
+++ b/simulation/kubernetes/run_simulation.sh
@@ -14,11 +14,20 @@ build_docker_image() {
}
start_simulation() {
- kubectl delete deployment --all --force --grace-period=0
+ kubectl delete deployment dexcon-simulation --force --grace-period=0
+ kubectl delete deployment dexcon-simulation-peer-server --force --grace-period=0
sleep 10
kubectl apply -f peer-server.yaml
- sleep 10
+
+ while true; do
+ if kubectl get pods -l app=dexcon-simulation-peer-server | grep Running;
+ then
+ break
+ fi
+ sleep 1
+ done
+
kubectl apply -f validator.yaml
}
diff --git a/simulation/network.go b/simulation/network.go
index 16e6918..da321bd 100644
--- a/simulation/network.go
+++ b/simulation/network.go
@@ -38,8 +38,9 @@ type Message struct {
type infoStatus string
const (
- normal infoStatus = "normal"
- shutdown infoStatus = "shutdown"
+ statusInit infoStatus = "init"
+ statusNormal infoStatus = "normal"
+ statusShutdown infoStatus = "shutdown"
)
// InfoMessage is a struct used by peerServer's /info.
diff --git a/simulation/peer-server.go b/simulation/peer-server.go
index 4142158..8f2d6ba 100644
--- a/simulation/peer-server.go
+++ b/simulation/peer-server.go
@@ -65,6 +65,8 @@ func (p *PeerServer) Run(configPath string) {
}
joinHandler := func(w http.ResponseWriter, r *http.Request) {
+ defer r.Body.Close()
+
idString := r.Header.Get("ID")
portString := r.Header.Get("PORT")
@@ -78,11 +80,16 @@ func (p *PeerServer) Run(configPath string) {
p.peers[id] = net.JoinHostPort(host, portString)
p.peerTotalOrder[id] = NewTotalOrderResult(id)
log.Printf("Peer %s joined from %s", id, p.peers[id])
+
+ if len(p.peers) == cfg.Validator.Num {
+ log.Println("All peers connected.")
+ }
}
peersHandler := func(w http.ResponseWriter, r *http.Request) {
p.peersMu.Lock()
defer p.peersMu.Unlock()
+ defer r.Body.Close()
if len(p.peers) != cfg.Validator.Num {
w.WriteHeader(http.StatusNotFound)
@@ -102,15 +109,20 @@ func (p *PeerServer) Run(configPath string) {
infoHandler := func(w http.ResponseWriter, r *http.Request) {
p.peersMu.Lock()
defer p.peersMu.Unlock()
+ defer r.Body.Close()
msg := InfoMessage{
- Status: normal,
+ Status: statusNormal,
Peers: p.peers,
}
+ if len(p.peers) < cfg.Validator.Num {
+ msg.Status = statusInit
+ }
+
// Determine msg.status.
if p.verifiedLen >= cfg.Validator.MaxBlock {
- msg.Status = shutdown
+ msg.Status = statusShutdown
}
jsonText, err := json.Marshal(msg)
@@ -124,6 +136,8 @@ func (p *PeerServer) Run(configPath string) {
}
deliveryHandler := func(w http.ResponseWriter, r *http.Request) {
+ defer r.Body.Close()
+
idString := r.Header.Get("ID")
defer r.Body.Close()
@@ -159,6 +173,7 @@ func (p *PeerServer) Run(configPath string) {
go func(id types.ValidatorID) {
p.peerTotalOrderMu.Lock()
defer p.peerTotalOrderMu.Unlock()
+
var correct bool
var length int
p.peerTotalOrder, correct, length = VerifyTotalOrder(id, p.peerTotalOrder)
@@ -172,6 +187,10 @@ func (p *PeerServer) Run(configPath string) {
stopServer := make(chan struct{})
messageHandler := func(w http.ResponseWriter, r *http.Request) {
+ p.peersMu.Lock()
+ defer p.peersMu.Unlock()
+ defer r.Body.Close()
+
idString := r.Header.Get("ID")
id := types.ValidatorID{}
id.UnmarshalText([]byte(idString))
diff --git a/simulation/simulation.go b/simulation/simulation.go
index 8542051..27e31d2 100644
--- a/simulation/simulation.go
+++ b/simulation/simulation.go
@@ -79,4 +79,10 @@ func Run(configPath string) {
v.Wait()
fmt.Printf("Validator %s is shutdown\n", v.GetID())
}
+
+ // Do not exit when we are in TCP node, since k8s will restart the pod and
+ // cause confusions.
+ if networkType == config.NetworkTypeTCP {
+ select {}
+ }
}
diff --git a/simulation/tcp-network.go b/simulation/tcp-network.go
index 911243f..aed038f 100644
--- a/simulation/tcp-network.go
+++ b/simulation/tcp-network.go
@@ -31,6 +31,8 @@ import (
"github.com/dexon-foundation/dexon-consensus-core/core/types"
)
+const retries = 5
+
// TCPNetwork implements the Network interface.
type TCPNetwork struct {
local bool
@@ -138,6 +140,9 @@ func (n *TCPNetwork) Join(endpoint Endpoint) chan interface{} {
req.Header.Add("PORT", fmt.Sprintf("%d", n.port))
resp, err := client.Do(req)
+ if err == nil {
+ defer resp.Body.Close()
+ }
if err == nil && resp.StatusCode == http.StatusOK {
break
}
@@ -154,8 +159,6 @@ func (n *TCPNetwork) Join(endpoint Endpoint) chan interface{} {
fmt.Println(err)
continue
}
- req.Header.Add("ID", endpoint.GetID().String())
-
resp, err := client.Do(req)
if err != nil || resp.StatusCode != http.StatusOK {
continue
@@ -207,7 +210,6 @@ func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) {
msgURL := fmt.Sprintf("http://%s/msg", clientAddr)
go func() {
- retries := 3
client := &http.Client{Timeout: 5 * time.Second}
for i := 0; i < retries; i++ {
@@ -220,9 +222,14 @@ func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) {
req.Header.Add("ID", n.endpoint.GetID().String())
resp, err := client.Do(req)
+ if err == nil {
+ defer resp.Body.Close()
+ }
if err == nil && resp.StatusCode == http.StatusOK {
runtime.Goexit()
}
+
+ fmt.Printf("failed to submit message: %s\n", err)
time.Sleep(1 * time.Second)
}
fmt.Printf("failed to send message: %v\n", msg)
@@ -247,7 +254,6 @@ func (n *TCPNetwork) DeliverBlocks(blocks BlockList) {
msgURL := fmt.Sprintf("http://%s:%d/delivery", n.peerServer, peerPort)
go func() {
- retries := 3
client := &http.Client{Timeout: 5 * time.Second}
for i := 0; i < retries; i++ {
@@ -259,6 +265,10 @@ func (n *TCPNetwork) DeliverBlocks(blocks BlockList) {
req.Header.Add("ID", n.endpoint.GetID().String())
resp, err := client.Do(req)
+ if err == nil {
+ defer resp.Body.Close()
+ }
+
if err == nil && resp.StatusCode == http.StatusOK {
runtime.Goexit()
}
@@ -278,7 +288,6 @@ func (n *TCPNetwork) NotifyServer(msg Message) {
msgURL := fmt.Sprintf("http://%s:%d/message", n.peerServer, peerPort)
- retries := 3
client := &http.Client{Timeout: 5 * time.Second}
for i := 0; i < retries; i++ {
@@ -290,6 +299,9 @@ func (n *TCPNetwork) NotifyServer(msg Message) {
req.Header.Add("ID", n.endpoint.GetID().String())
resp, err := client.Do(req)
+ if err == nil {
+ defer resp.Body.Close()
+ }
if err == nil && resp.StatusCode == http.StatusOK {
return
}
@@ -302,6 +314,7 @@ func (n *TCPNetwork) NotifyServer(msg Message) {
// GetServerInfo retrieve the info message from peerServer.
func (n *TCPNetwork) GetServerInfo() InfoMessage {
+ infoMsg := InfoMessage{}
msgURL := fmt.Sprintf("http://%s:%d/info", n.peerServer, peerPort)
client := &http.Client{Timeout: 5 * time.Second}
@@ -314,13 +327,13 @@ func (n *TCPNetwork) GetServerInfo() InfoMessage {
resp, err := client.Do(req)
if err != nil {
fmt.Printf("error: %v\n", err)
+ return infoMsg
}
if resp.StatusCode != http.StatusOK {
fmt.Printf("error: %v\n", err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
- infoMsg := InfoMessage{}
if err := json.Unmarshal(body, &infoMsg); err != nil {
fmt.Printf("error: %v", err)
diff --git a/simulation/validator.go b/simulation/validator.go
index 97412b1..7397284 100644
--- a/simulation/validator.go
+++ b/simulation/validator.go
@@ -100,7 +100,7 @@ func (v *Validator) Wait() {
func (v *Validator) CheckServerInfo(isShutdown chan struct{}) {
for {
infoMsg := v.network.GetServerInfo()
- if infoMsg.Status == shutdown {
+ if infoMsg.Status == statusShutdown {
isShutdown <- struct{}{}
break
}