diff options
Diffstat (limited to 'simulation/peer-server.go')
-rw-r--r-- | simulation/peer-server.go | 77 |
1 files changed, 73 insertions, 4 deletions
diff --git a/simulation/peer-server.go b/simulation/peer-server.go index 0d69f8e..9cace94 100644 --- a/simulation/peer-server.go +++ b/simulation/peer-server.go @@ -18,6 +18,7 @@ package simulation import ( + "context" "encoding/json" "fmt" "io/ioutil" @@ -25,6 +26,7 @@ import ( "net" "net/http" "sync" + "time" "github.com/dexon-foundation/dexon-consensus-core/core/types" "github.com/dexon-foundation/dexon-consensus-core/simulation/config" @@ -36,6 +38,7 @@ type PeerServer struct { peersMu sync.Mutex peerTotalOrder PeerTotalOrder peerTotalOrderMu sync.Mutex + verifiedLen uint64 } // NewPeerServer returns a new peer server. @@ -100,7 +103,17 @@ func (p *PeerServer) Run(configPath string) { p.peersMu.Lock() defer p.peersMu.Unlock() - jsonText, err := json.Marshal(p.peers) + msg := InfoMessage{ + Status: normal, + Peers: p.peers, + } + + // Determine msg.status. + if p.verifiedLen >= cfg.Validator.MaxBlock { + msg.Status = shutdown + } + + jsonText, err := json.Marshal(msg) if err != nil { w.WriteHeader(http.StatusInternalServerError) return @@ -127,7 +140,12 @@ func (p *PeerServer) Run(configPath string) { } id := types.ValidatorID{} - id.UnmarshalText([]byte(idString)) + if err := id.UnmarshalText([]byte(idString)); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + w.WriteHeader(http.StatusOK) p.peerTotalOrderMu.Lock() defer p.peerTotalOrderMu.Unlock() @@ -142,21 +160,72 @@ func (p *PeerServer) Run(configPath string) { p.peerTotalOrderMu.Lock() defer p.peerTotalOrderMu.Unlock() var correct bool - p.peerTotalOrder, correct = VerifyTotalOrder(id, p.peerTotalOrder) + var length int + p.peerTotalOrder, correct, length = VerifyTotalOrder(id, p.peerTotalOrder) if !correct { log.Printf("The result of Total Ordering Algorithm has error.\n") } + p.verifiedLen += uint64(length) }(id) } + stopServer := make(chan struct{}) + + messageHandler := func(w http.ResponseWriter, r *http.Request) { + idString := r.Header.Get("ID") + id := types.ValidatorID{} + id.UnmarshalText([]byte(idString)) + + defer r.Body.Close() + body, err := ioutil.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + m := Message{} + if err := json.Unmarshal(body, &m); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + if m.Type != shutdownAck { + w.WriteHeader(http.StatusBadRequest) + return + } + + w.WriteHeader(http.StatusOK) + delete(p.peers, id) + log.Printf("%v shutdown, %d remains.\n", id, len(p.peers)) + if len(p.peers) == 0 { + stopServer <- struct{}{} + } + } + http.HandleFunc("/reset", resetHandler) http.HandleFunc("/join", joinHandler) http.HandleFunc("/peers", peersHandler) http.HandleFunc("/info", infoHandler) http.HandleFunc("/delivery", deliveryHandler) + http.HandleFunc("/message", messageHandler) addr := fmt.Sprintf("0.0.0.0:%d", peerPort) log.Printf("Peer server started at %s", addr) - http.ListenAndServe(addr, nil) + server := &http.Server{Addr: addr} + + go func() { + <-stopServer + + log.Printf("Shutting down peerServer.\n") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := server.Shutdown(ctx); err != nil { + log.Printf("Error shutting down peerServer: %v\n", err) + } + }() + + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("Error starting server %v\n", err) + } } |