aboutsummaryrefslogtreecommitdiffstats
path: root/simulation/peer-server.go
diff options
context:
space:
mode:
Diffstat (limited to 'simulation/peer-server.go')
-rw-r--r--simulation/peer-server.go77
1 files changed, 73 insertions, 4 deletions
diff --git a/simulation/peer-server.go b/simulation/peer-server.go
index 0d69f8e..9cace94 100644
--- a/simulation/peer-server.go
+++ b/simulation/peer-server.go
@@ -18,6 +18,7 @@
package simulation
import (
+ "context"
"encoding/json"
"fmt"
"io/ioutil"
@@ -25,6 +26,7 @@ import (
"net"
"net/http"
"sync"
+ "time"
"github.com/dexon-foundation/dexon-consensus-core/core/types"
"github.com/dexon-foundation/dexon-consensus-core/simulation/config"
@@ -36,6 +38,7 @@ type PeerServer struct {
peersMu sync.Mutex
peerTotalOrder PeerTotalOrder
peerTotalOrderMu sync.Mutex
+ verifiedLen uint64
}
// NewPeerServer returns a new peer server.
@@ -100,7 +103,17 @@ func (p *PeerServer) Run(configPath string) {
p.peersMu.Lock()
defer p.peersMu.Unlock()
- jsonText, err := json.Marshal(p.peers)
+ msg := InfoMessage{
+ Status: normal,
+ Peers: p.peers,
+ }
+
+ // Determine msg.status.
+ if p.verifiedLen >= cfg.Validator.MaxBlock {
+ msg.Status = shutdown
+ }
+
+ jsonText, err := json.Marshal(msg)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
@@ -127,7 +140,12 @@ func (p *PeerServer) Run(configPath string) {
}
id := types.ValidatorID{}
- id.UnmarshalText([]byte(idString))
+ if err := id.UnmarshalText([]byte(idString)); err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ w.WriteHeader(http.StatusOK)
p.peerTotalOrderMu.Lock()
defer p.peerTotalOrderMu.Unlock()
@@ -142,21 +160,72 @@ func (p *PeerServer) Run(configPath string) {
p.peerTotalOrderMu.Lock()
defer p.peerTotalOrderMu.Unlock()
var correct bool
- p.peerTotalOrder, correct = VerifyTotalOrder(id, p.peerTotalOrder)
+ var length int
+ p.peerTotalOrder, correct, length = VerifyTotalOrder(id, p.peerTotalOrder)
if !correct {
log.Printf("The result of Total Ordering Algorithm has error.\n")
}
+ p.verifiedLen += uint64(length)
}(id)
}
+ stopServer := make(chan struct{})
+
+ messageHandler := func(w http.ResponseWriter, r *http.Request) {
+ idString := r.Header.Get("ID")
+ id := types.ValidatorID{}
+ id.UnmarshalText([]byte(idString))
+
+ defer r.Body.Close()
+ body, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ m := Message{}
+ if err := json.Unmarshal(body, &m); err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ if m.Type != shutdownAck {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ w.WriteHeader(http.StatusOK)
+ delete(p.peers, id)
+ log.Printf("%v shutdown, %d remains.\n", id, len(p.peers))
+ if len(p.peers) == 0 {
+ stopServer <- struct{}{}
+ }
+ }
+
http.HandleFunc("/reset", resetHandler)
http.HandleFunc("/join", joinHandler)
http.HandleFunc("/peers", peersHandler)
http.HandleFunc("/info", infoHandler)
http.HandleFunc("/delivery", deliveryHandler)
+ http.HandleFunc("/message", messageHandler)
addr := fmt.Sprintf("0.0.0.0:%d", peerPort)
log.Printf("Peer server started at %s", addr)
- http.ListenAndServe(addr, nil)
+ server := &http.Server{Addr: addr}
+
+ go func() {
+ <-stopServer
+
+ log.Printf("Shutting down peerServer.\n")
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := server.Shutdown(ctx); err != nil {
+ log.Printf("Error shutting down peerServer: %v\n", err)
+ }
+ }()
+
+ if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
+ log.Fatalf("Error starting server %v\n", err)
+ }
}