From 2cf6003621a744ae5b625443774bf885f70acd51 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Thu, 9 Aug 2018 09:05:48 +0800 Subject: simulation: Fix k8s simulation issues. (#36) * Refine peer server * k8s ignore * Keep peer server alive on k8s * Stop validators from accepting new blocks after peer server has shut down. * Add comment --- simulation/peer-server.go | 33 ++++++++++++++++++++++++++++----- simulation/validator.go | 14 ++++++++++---- 2 files changed, 38 insertions(+), 9 deletions(-) (limited to 'simulation') diff --git a/simulation/peer-server.go b/simulation/peer-server.go index bae323e..43a1c2b 100644 --- a/simulation/peer-server.go +++ b/simulation/peer-server.go @@ -49,6 +49,15 @@ func NewPeerServer() *PeerServer { } } +// isValidator checks if vID is in p.peers. If peer server restarts but +// validators are not, it will cause panic if validators send message. +func (p *PeerServer) isValidator(vID types.ValidatorID) bool { + p.peersMu.Lock() + defer p.peersMu.Unlock() + _, exist := p.peers[vID] + return exist +} + // Run starts the peer server. func (p *PeerServer) Run(configPath string) { cfg, err := config.Read(configPath) @@ -84,6 +93,7 @@ func (p *PeerServer) Run(configPath string) { if len(p.peers) == cfg.Validator.Num { log.Println("All peers connected.") } + w.WriteHeader(http.StatusOK) } peersHandler := func(w http.ResponseWriter, r *http.Request) { @@ -102,6 +112,7 @@ func (p *PeerServer) Run(configPath string) { return } + w.WriteHeader(http.StatusOK) w.Header().Set("Content-Type", "application/json") w.Write(jsonText) } @@ -131,6 +142,7 @@ func (p *PeerServer) Run(configPath string) { return } + w.WriteHeader(http.StatusOK) w.Header().Set("Content-Type", "application/json") w.Write(jsonText) } @@ -139,8 +151,6 @@ func (p *PeerServer) Run(configPath string) { defer r.Body.Close() idString := r.Header.Get("ID") - - defer r.Body.Close() body, err := ioutil.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) @@ -159,6 +169,11 @@ func (p *PeerServer) Run(configPath string) { return } + if !p.isValidator(id) { + w.WriteHeader(http.StatusForbidden) + return + } + w.WriteHeader(http.StatusOK) p.peerTotalOrderMu.Lock() @@ -187,15 +202,17 @@ func (p *PeerServer) Run(configPath string) { stopServer := make(chan struct{}) messageHandler := func(w http.ResponseWriter, r *http.Request) { - p.peersMu.Lock() - defer p.peersMu.Unlock() defer r.Body.Close() idString := r.Header.Get("ID") id := types.ValidatorID{} id.UnmarshalText([]byte(idString)) - defer r.Body.Close() + if !p.isValidator(id) { + w.WriteHeader(http.StatusForbidden) + return + } + body, err := ioutil.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) @@ -264,4 +281,10 @@ func (p *PeerServer) Run(configPath string) { if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("Error starting server %v\n", err) } + + // Do not exit when we are in TCP node, since k8s will restart the pod and + // cause confusions. + if cfg.Networking.Type == config.NetworkTypeTCP { + select {} + } } diff --git a/simulation/validator.go b/simulation/validator.go index 69ee317..a30420e 100644 --- a/simulation/validator.go +++ b/simulation/validator.go @@ -78,11 +78,11 @@ func (v *Validator) GetID() types.ValidatorID { func (v *Validator) Run() { v.msgChannel = v.network.Join(v) - isStopped := make(chan struct{}) + isStopped := make(chan struct{}, 2) isShutdown := make(chan struct{}) v.BroadcastGenesisBlock() - go v.MsgServer() + go v.MsgServer(isStopped) go v.CheckServerInfo(isShutdown) go v.BlockProposer(isStopped, isShutdown) @@ -116,10 +116,15 @@ func (v *Validator) CheckServerInfo(isShutdown chan struct{}) { } // MsgServer listen to the network channel for message and handle it. -func (v *Validator) MsgServer() { +func (v *Validator) MsgServer(isStopped chan struct{}) { var pendingBlocks []*types.Block for { - msg := <-v.msgChannel + var msg interface{} + select { + case msg = <-v.msgChannel: + case <-isStopped: + return + } switch val := msg.(type) { case *types.Block: @@ -199,6 +204,7 @@ ProposingBlockLoop: v.network.BroadcastBlock(block) select { case <-isShutdown: + isStopped <- struct{}{} isStopped <- struct{}{} break ProposingBlockLoop default: -- cgit v1.2.3