diff options
author | Wei-Ning Huang <w@dexon.org> | 2018-08-06 11:14:02 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-08-06 11:14:02 +0800 |
commit | bb95322f46524ea52b6762c349905dcc910814c8 (patch) | |
tree | cf53a3b40e5eaba66eb8d837215650fab9b6150c /simulation | |
parent | 4b606c3c12db824cb996d9eca43c1b66b2bdb440 (diff) | |
download | dexon-consensus-bb95322f46524ea52b6762c349905dcc910814c8.tar dexon-consensus-bb95322f46524ea52b6762c349905dcc910814c8.tar.gz dexon-consensus-bb95322f46524ea52b6762c349905dcc910814c8.tar.bz2 dexon-consensus-bb95322f46524ea52b6762c349905dcc910814c8.tar.lz dexon-consensus-bb95322f46524ea52b6762c349905dcc910814c8.tar.xz dexon-consensus-bb95322f46524ea52b6762c349905dcc910814c8.tar.zst dexon-consensus-bb95322f46524ea52b6762c349905dcc910814c8.zip |
simulation: free resource after use and misc fix for k8s mode (#31)
Diffstat (limited to 'simulation')
-rw-r--r-- | simulation/constant.go | 2 | ||||
-rw-r--r-- | simulation/kubernetes/config.toml.in | 1 | ||||
-rwxr-xr-x | simulation/kubernetes/run_simulation.sh | 13 | ||||
-rw-r--r-- | simulation/network.go | 5 | ||||
-rw-r--r-- | simulation/peer-server.go | 23 | ||||
-rw-r--r-- | simulation/simulation.go | 6 | ||||
-rw-r--r-- | simulation/tcp-network.go | 25 | ||||
-rw-r--r-- | simulation/validator.go | 2 |
8 files changed, 63 insertions, 14 deletions
diff --git a/simulation/constant.go b/simulation/constant.go index aa21a3d..79ef84a 100644 --- a/simulation/constant.go +++ b/simulation/constant.go @@ -19,5 +19,5 @@ package simulation const ( peerPort = 8080 - msgBufferSize = 128 + msgBufferSize = 256 ) diff --git a/simulation/kubernetes/config.toml.in b/simulation/kubernetes/config.toml.in index d956055..b46e8d6 100644 --- a/simulation/kubernetes/config.toml.in +++ b/simulation/kubernetes/config.toml.in @@ -4,6 +4,7 @@ title = "DEXON Consensus Simulation Config" num = {{numValidators}} propose_interval_mean = 5e+02 propose_interval_sigma = 3e+01 +max_block = 100 [networking] type = "tcp" diff --git a/simulation/kubernetes/run_simulation.sh b/simulation/kubernetes/run_simulation.sh index 88f9eee..c5298d7 100755 --- a/simulation/kubernetes/run_simulation.sh +++ b/simulation/kubernetes/run_simulation.sh @@ -14,11 +14,20 @@ build_docker_image() { } start_simulation() { - kubectl delete deployment --all --force --grace-period=0 + kubectl delete deployment dexcon-simulation --force --grace-period=0 + kubectl delete deployment dexcon-simulation-peer-server --force --grace-period=0 sleep 10 kubectl apply -f peer-server.yaml - sleep 10 + + while true; do + if kubectl get pods -l app=dexcon-simulation-peer-server | grep Running; + then + break + fi + sleep 1 + done + kubectl apply -f validator.yaml } diff --git a/simulation/network.go b/simulation/network.go index 16e6918..da321bd 100644 --- a/simulation/network.go +++ b/simulation/network.go @@ -38,8 +38,9 @@ type Message struct { type infoStatus string const ( - normal infoStatus = "normal" - shutdown infoStatus = "shutdown" + statusInit infoStatus = "init" + statusNormal infoStatus = "normal" + statusShutdown infoStatus = "shutdown" ) // InfoMessage is a struct used by peerServer's /info. diff --git a/simulation/peer-server.go b/simulation/peer-server.go index 4142158..8f2d6ba 100644 --- a/simulation/peer-server.go +++ b/simulation/peer-server.go @@ -65,6 +65,8 @@ func (p *PeerServer) Run(configPath string) { } joinHandler := func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + idString := r.Header.Get("ID") portString := r.Header.Get("PORT") @@ -78,11 +80,16 @@ func (p *PeerServer) Run(configPath string) { p.peers[id] = net.JoinHostPort(host, portString) p.peerTotalOrder[id] = NewTotalOrderResult(id) log.Printf("Peer %s joined from %s", id, p.peers[id]) + + if len(p.peers) == cfg.Validator.Num { + log.Println("All peers connected.") + } } peersHandler := func(w http.ResponseWriter, r *http.Request) { p.peersMu.Lock() defer p.peersMu.Unlock() + defer r.Body.Close() if len(p.peers) != cfg.Validator.Num { w.WriteHeader(http.StatusNotFound) @@ -102,15 +109,20 @@ func (p *PeerServer) Run(configPath string) { infoHandler := func(w http.ResponseWriter, r *http.Request) { p.peersMu.Lock() defer p.peersMu.Unlock() + defer r.Body.Close() msg := InfoMessage{ - Status: normal, + Status: statusNormal, Peers: p.peers, } + if len(p.peers) < cfg.Validator.Num { + msg.Status = statusInit + } + // Determine msg.status. if p.verifiedLen >= cfg.Validator.MaxBlock { - msg.Status = shutdown + msg.Status = statusShutdown } jsonText, err := json.Marshal(msg) @@ -124,6 +136,8 @@ func (p *PeerServer) Run(configPath string) { } deliveryHandler := func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + idString := r.Header.Get("ID") defer r.Body.Close() @@ -159,6 +173,7 @@ func (p *PeerServer) Run(configPath string) { go func(id types.ValidatorID) { p.peerTotalOrderMu.Lock() defer p.peerTotalOrderMu.Unlock() + var correct bool var length int p.peerTotalOrder, correct, length = VerifyTotalOrder(id, p.peerTotalOrder) @@ -172,6 +187,10 @@ func (p *PeerServer) Run(configPath string) { stopServer := make(chan struct{}) messageHandler := func(w http.ResponseWriter, r *http.Request) { + p.peersMu.Lock() + defer p.peersMu.Unlock() + defer r.Body.Close() + idString := r.Header.Get("ID") id := types.ValidatorID{} id.UnmarshalText([]byte(idString)) diff --git a/simulation/simulation.go b/simulation/simulation.go index 8542051..27e31d2 100644 --- a/simulation/simulation.go +++ b/simulation/simulation.go @@ -79,4 +79,10 @@ func Run(configPath string) { v.Wait() fmt.Printf("Validator %s is shutdown\n", v.GetID()) } + + // Do not exit when we are in TCP node, since k8s will restart the pod and + // cause confusions. + if networkType == config.NetworkTypeTCP { + select {} + } } diff --git a/simulation/tcp-network.go b/simulation/tcp-network.go index 911243f..aed038f 100644 --- a/simulation/tcp-network.go +++ b/simulation/tcp-network.go @@ -31,6 +31,8 @@ import ( "github.com/dexon-foundation/dexon-consensus-core/core/types" ) +const retries = 5 + // TCPNetwork implements the Network interface. type TCPNetwork struct { local bool @@ -138,6 +140,9 @@ func (n *TCPNetwork) Join(endpoint Endpoint) chan interface{} { req.Header.Add("PORT", fmt.Sprintf("%d", n.port)) resp, err := client.Do(req) + if err == nil { + defer resp.Body.Close() + } if err == nil && resp.StatusCode == http.StatusOK { break } @@ -154,8 +159,6 @@ func (n *TCPNetwork) Join(endpoint Endpoint) chan interface{} { fmt.Println(err) continue } - req.Header.Add("ID", endpoint.GetID().String()) - resp, err := client.Do(req) if err != nil || resp.StatusCode != http.StatusOK { continue @@ -207,7 +210,6 @@ func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) { msgURL := fmt.Sprintf("http://%s/msg", clientAddr) go func() { - retries := 3 client := &http.Client{Timeout: 5 * time.Second} for i := 0; i < retries; i++ { @@ -220,9 +222,14 @@ func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) { req.Header.Add("ID", n.endpoint.GetID().String()) resp, err := client.Do(req) + if err == nil { + defer resp.Body.Close() + } if err == nil && resp.StatusCode == http.StatusOK { runtime.Goexit() } + + fmt.Printf("failed to submit message: %s\n", err) time.Sleep(1 * time.Second) } fmt.Printf("failed to send message: %v\n", msg) @@ -247,7 +254,6 @@ func (n *TCPNetwork) DeliverBlocks(blocks BlockList) { msgURL := fmt.Sprintf("http://%s:%d/delivery", n.peerServer, peerPort) go func() { - retries := 3 client := &http.Client{Timeout: 5 * time.Second} for i := 0; i < retries; i++ { @@ -259,6 +265,10 @@ func (n *TCPNetwork) DeliverBlocks(blocks BlockList) { req.Header.Add("ID", n.endpoint.GetID().String()) resp, err := client.Do(req) + if err == nil { + defer resp.Body.Close() + } + if err == nil && resp.StatusCode == http.StatusOK { runtime.Goexit() } @@ -278,7 +288,6 @@ func (n *TCPNetwork) NotifyServer(msg Message) { msgURL := fmt.Sprintf("http://%s:%d/message", n.peerServer, peerPort) - retries := 3 client := &http.Client{Timeout: 5 * time.Second} for i := 0; i < retries; i++ { @@ -290,6 +299,9 @@ func (n *TCPNetwork) NotifyServer(msg Message) { req.Header.Add("ID", n.endpoint.GetID().String()) resp, err := client.Do(req) + if err == nil { + defer resp.Body.Close() + } if err == nil && resp.StatusCode == http.StatusOK { return } @@ -302,6 +314,7 @@ func (n *TCPNetwork) NotifyServer(msg Message) { // GetServerInfo retrieve the info message from peerServer. func (n *TCPNetwork) GetServerInfo() InfoMessage { + infoMsg := InfoMessage{} msgURL := fmt.Sprintf("http://%s:%d/info", n.peerServer, peerPort) client := &http.Client{Timeout: 5 * time.Second} @@ -314,13 +327,13 @@ func (n *TCPNetwork) GetServerInfo() InfoMessage { resp, err := client.Do(req) if err != nil { fmt.Printf("error: %v\n", err) + return infoMsg } if resp.StatusCode != http.StatusOK { fmt.Printf("error: %v\n", err) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) - infoMsg := InfoMessage{} if err := json.Unmarshal(body, &infoMsg); err != nil { fmt.Printf("error: %v", err) diff --git a/simulation/validator.go b/simulation/validator.go index 97412b1..7397284 100644 --- a/simulation/validator.go +++ b/simulation/validator.go @@ -100,7 +100,7 @@ func (v *Validator) Wait() { func (v *Validator) CheckServerInfo(isShutdown chan struct{}) { for { infoMsg := v.network.GetServerInfo() - if infoMsg.Status == shutdown { + if infoMsg.Status == statusShutdown { isShutdown <- struct{}{} break } |