diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2018-07-30 09:05:58 +0800 |
---|---|---|
committer | Wei-Ning Huang <aitjcize@gmail.com> | 2018-07-30 09:05:58 +0800 |
commit | 279daea6e004ab6ad9d079ccc35b7c52d79630ad (patch) | |
tree | 6e07c9ddf5608339c216c4657250f7df238bd75e /simulation | |
parent | 568ce1f526d10184af2ccfe342394f57ae689a14 (diff) | |
download | tangerine-consensus-279daea6e004ab6ad9d079ccc35b7c52d79630ad.tar tangerine-consensus-279daea6e004ab6ad9d079ccc35b7c52d79630ad.tar.gz tangerine-consensus-279daea6e004ab6ad9d079ccc35b7c52d79630ad.tar.bz2 tangerine-consensus-279daea6e004ab6ad9d079ccc35b7c52d79630ad.tar.lz tangerine-consensus-279daea6e004ab6ad9d079ccc35b7c52d79630ad.tar.xz tangerine-consensus-279daea6e004ab6ad9d079ccc35b7c52d79630ad.tar.zst tangerine-consensus-279daea6e004ab6ad9d079ccc35b7c52d79630ad.zip |
Add a config that PeerServer can shutdown after receiving enough of block. (#19)
Diffstat (limited to 'simulation')
-rw-r--r-- | simulation/config/config.go | 3 | ||||
-rw-r--r-- | simulation/fake-network.go | 14 | ||||
-rw-r--r-- | simulation/network.go | 29 | ||||
-rw-r--r-- | simulation/peer-server.go | 77 | ||||
-rw-r--r-- | simulation/simulation.go | 9 | ||||
-rw-r--r-- | simulation/tcp-network.go | 63 | ||||
-rw-r--r-- | simulation/validator.go | 61 | ||||
-rw-r--r-- | simulation/verification.go | 6 |
8 files changed, 240 insertions, 22 deletions
diff --git a/simulation/config/config.go b/simulation/config/config.go index 30d1562..c704a8d 100644 --- a/simulation/config/config.go +++ b/simulation/config/config.go @@ -18,6 +18,7 @@ package config import ( + "math" "os" "github.com/naoina/toml" @@ -38,6 +39,7 @@ type Validator struct { Num int ProposeIntervalMean float64 ProposeIntervalSigma float64 + MaxBlock uint64 } // Networking config. @@ -71,6 +73,7 @@ func GenerateDefault(path string) error { Num: 7, ProposeIntervalMean: 500, ProposeIntervalSigma: 30, + MaxBlock: math.MaxUint64, }, Networking: Networking{ Type: NetworkTypeFake, diff --git a/simulation/fake-network.go b/simulation/fake-network.go index e85917e..dde8bd7 100644 --- a/simulation/fake-network.go +++ b/simulation/fake-network.go @@ -96,6 +96,18 @@ func (n *FakeNetwork) BroadcastBlock(block *types.Block) { // DeliverBlocks sends blocks to peerServer. func (n *FakeNetwork) DeliverBlocks(blocks common.Hashes, id int) { - // TODO + // TODO(jimmy-dexon): Implement this method. return } + +// NotifyServer sends message to peerServer +func (n *FakeNetwork) NotifyServer(msg Message) { + // TODO(jimmy-dexon): Implement this method. + return +} + +// GetServerInfo retrieve the info message from peerServer. +func (n *FakeNetwork) GetServerInfo() InfoMessage { + // TODO(jimmy-dexon): Implement this method. + return InfoMessage{} +} diff --git a/simulation/network.go b/simulation/network.go index 7ce0dbc..e69dd43 100644 --- a/simulation/network.go +++ b/simulation/network.go @@ -18,10 +18,37 @@ package simulation import ( + "encoding/json" + "github.com/dexon-foundation/dexon-consensus-core/common" "github.com/dexon-foundation/dexon-consensus-core/core/types" ) +type messageType string + +const ( + shutdownAck messageType = "shutdownAck" +) + +// Message is a struct for peer sending message to server. +type Message struct { + Type messageType `json:"type"` + Payload json.RawMessage `json:"payload"` +} + +type infoStatus string + +const ( + normal infoStatus = "normal" + shutdown infoStatus = "shutdown" +) + +// InfoMessage is a struct used by peerServer's /info. +type InfoMessage struct { + Status infoStatus `json:"status"` + Peers map[types.ValidatorID]string `json:"peers"` +} + // Endpoint is the interface for a client network endpoint. type Endpoint interface { GetID() types.ValidatorID @@ -39,4 +66,6 @@ type Network interface { // PeerServerNetwork is the interface for peerServer network related functions type PeerServerNetwork interface { DeliverBlocks(blocks common.Hashes, id int) + NotifyServer(msg Message) + GetServerInfo() InfoMessage } diff --git a/simulation/peer-server.go b/simulation/peer-server.go index 0d69f8e..9cace94 100644 --- a/simulation/peer-server.go +++ b/simulation/peer-server.go @@ -18,6 +18,7 @@ package simulation import ( + "context" "encoding/json" "fmt" "io/ioutil" @@ -25,6 +26,7 @@ import ( "net" "net/http" "sync" + "time" "github.com/dexon-foundation/dexon-consensus-core/core/types" "github.com/dexon-foundation/dexon-consensus-core/simulation/config" @@ -36,6 +38,7 @@ type PeerServer struct { peersMu sync.Mutex peerTotalOrder PeerTotalOrder peerTotalOrderMu sync.Mutex + verifiedLen uint64 } // NewPeerServer returns a new peer server. @@ -100,7 +103,17 @@ func (p *PeerServer) Run(configPath string) { p.peersMu.Lock() defer p.peersMu.Unlock() - jsonText, err := json.Marshal(p.peers) + msg := InfoMessage{ + Status: normal, + Peers: p.peers, + } + + // Determine msg.status. + if p.verifiedLen >= cfg.Validator.MaxBlock { + msg.Status = shutdown + } + + jsonText, err := json.Marshal(msg) if err != nil { w.WriteHeader(http.StatusInternalServerError) return @@ -127,7 +140,12 @@ func (p *PeerServer) Run(configPath string) { } id := types.ValidatorID{} - id.UnmarshalText([]byte(idString)) + if err := id.UnmarshalText([]byte(idString)); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + w.WriteHeader(http.StatusOK) p.peerTotalOrderMu.Lock() defer p.peerTotalOrderMu.Unlock() @@ -142,21 +160,72 @@ func (p *PeerServer) Run(configPath string) { p.peerTotalOrderMu.Lock() defer p.peerTotalOrderMu.Unlock() var correct bool - p.peerTotalOrder, correct = VerifyTotalOrder(id, p.peerTotalOrder) + var length int + p.peerTotalOrder, correct, length = VerifyTotalOrder(id, p.peerTotalOrder) if !correct { log.Printf("The result of Total Ordering Algorithm has error.\n") } + p.verifiedLen += uint64(length) }(id) } + stopServer := make(chan struct{}) + + messageHandler := func(w http.ResponseWriter, r *http.Request) { + idString := r.Header.Get("ID") + id := types.ValidatorID{} + id.UnmarshalText([]byte(idString)) + + defer r.Body.Close() + body, err := ioutil.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + m := Message{} + if err := json.Unmarshal(body, &m); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + if m.Type != shutdownAck { + w.WriteHeader(http.StatusBadRequest) + return + } + + w.WriteHeader(http.StatusOK) + delete(p.peers, id) + log.Printf("%v shutdown, %d remains.\n", id, len(p.peers)) + if len(p.peers) == 0 { + stopServer <- struct{}{} + } + } + http.HandleFunc("/reset", resetHandler) http.HandleFunc("/join", joinHandler) http.HandleFunc("/peers", peersHandler) http.HandleFunc("/info", infoHandler) http.HandleFunc("/delivery", deliveryHandler) + http.HandleFunc("/message", messageHandler) addr := fmt.Sprintf("0.0.0.0:%d", peerPort) log.Printf("Peer server started at %s", addr) - http.ListenAndServe(addr, nil) + server := &http.Server{Addr: addr} + + go func() { + <-stopServer + + log.Printf("Shutting down peerServer.\n") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := server.Shutdown(ctx); err != nil { + log.Printf("Error shutting down peerServer: %v\n", err) + } + }() + + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("Error starting server %v\n", err) + } } diff --git a/simulation/simulation.go b/simulation/simulation.go index 8ec72ea..2ea768e 100644 --- a/simulation/simulation.go +++ b/simulation/simulation.go @@ -34,10 +34,11 @@ func Run(configPath string) { networkType := cfg.Networking.Type + var vs []*Validator + if networkType == config.NetworkTypeFake || networkType == config.NetworkTypeTCPLocal { - var vs []*Validator var network Network if networkType == config.NetworkTypeFake { @@ -71,7 +72,11 @@ func Run(configPath string) { go network.Start() v := NewValidator(id, cfg.Validator, network, nil) go v.Run() + vs = append(vs, v) } - select {} + for _, v := range vs { + v.Wait() + fmt.Printf("Validator %s is shutdown\n", v.GetID()) + } } diff --git a/simulation/tcp-network.go b/simulation/tcp-network.go index 464473b..ff4de9e 100644 --- a/simulation/tcp-network.go +++ b/simulation/tcp-network.go @@ -52,7 +52,7 @@ func NewTCPNetwork(local bool, peerServer string) *TCPNetwork { } pServer := peerServer if local { - pServer = "localhost" + pServer = "127.0.0.1" } return &TCPNetwork{ local: local, @@ -217,6 +217,7 @@ func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) { if err != nil { continue } + req.Close = true req.Header.Add("ID", n.endpoint.GetID().String()) resp, err := client.Do(req) @@ -273,3 +274,63 @@ func (n *TCPNetwork) DeliverBlocks(blocks common.Hashes, id int) { fmt.Printf("failed to send message: %v\n", blocks) }() } + +// NotifyServer sends message to peerServer +func (n *TCPNetwork) NotifyServer(msg Message) { + messageJSON, err := json.Marshal(msg) + if err != nil { + fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, msg) + return + } + + msgURL := fmt.Sprintf("http://%s:%d/message", n.peerServer, peerPort) + + retries := 3 + client := &http.Client{Timeout: 5 * time.Second} + + for i := 0; i < retries; i++ { + req, err := http.NewRequest( + http.MethodPost, msgURL, strings.NewReader(string(messageJSON))) + if err != nil { + continue + } + req.Header.Add("ID", n.endpoint.GetID().String()) + + resp, err := client.Do(req) + if err == nil && resp.StatusCode == http.StatusOK { + return + } + time.Sleep(1 * time.Second) + } + fmt.Printf("failed to send message: %v\n", msg) + + return +} + +// GetServerInfo retrieve the info message from peerServer. +func (n *TCPNetwork) GetServerInfo() InfoMessage { + msgURL := fmt.Sprintf("http://%s:%d/info", n.peerServer, peerPort) + client := &http.Client{Timeout: 5 * time.Second} + + req, err := http.NewRequest( + http.MethodGet, msgURL, nil) + if err != nil { + fmt.Printf("error: %v\n", err) + } + + resp, err := client.Do(req) + if err != nil { + fmt.Printf("error: %v\n", err) + } + if resp.StatusCode != http.StatusOK { + fmt.Printf("error: %v\n", err) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + infoMsg := InfoMessage{} + + if err := json.Unmarshal(body, &infoMsg); err != nil { + fmt.Printf("error: %v", err) + } + return infoMsg +} diff --git a/simulation/validator.go b/simulation/validator.go index b7ec2a0..6ce5fa7 100644 --- a/simulation/validator.go +++ b/simulation/validator.go @@ -37,6 +37,7 @@ type Validator struct { config config.Validator db *leveldb.DB msgChannel chan interface{} + isFinished chan struct{} ID types.ValidatorID lattice *core.BlockLattice @@ -55,12 +56,13 @@ func NewValidator( app := NewSimApp(id, network) lattice := core.NewBlockLattice(blockdb.NewMemBackedBlockDB(), app) return &Validator{ - ID: id, - config: config, - network: network, - app: app, - db: db, - lattice: lattice, + ID: id, + config: config, + network: network, + app: app, + db: db, + lattice: lattice, + isFinished: make(chan struct{}), } } @@ -73,11 +75,38 @@ func (v *Validator) GetID() types.ValidatorID { func (v *Validator) Run() { v.msgChannel = v.network.Join(v) + isStopped := make(chan struct{}) + isShutdown := make(chan struct{}) + + v.BroadcastGenesisBlock() go v.MsgServer() - go v.BlockProposer() + go v.CheckServerInfo(isShutdown) + go v.BlockProposer(isStopped, isShutdown) // Blocks forever. - select {} + <-isStopped + v.network.NotifyServer(Message{ + Type: shutdownAck, + }) + v.isFinished <- struct{}{} +} + +// Wait for the validator to stop (if peerServer told it to). +func (v *Validator) Wait() { + <-v.isFinished +} + +// CheckServerInfo will check the info from the peerServer and update +// validator's status if needed. +func (v *Validator) CheckServerInfo(isShutdown chan struct{}) { + for { + infoMsg := v.network.GetServerInfo() + if infoMsg.Status == shutdown { + isShutdown <- struct{}{} + break + } + time.Sleep(250 * time.Millisecond) + } } // MsgServer listen to the network channel for message and handle it. @@ -95,8 +124,8 @@ func (v *Validator) MsgServer() { } } -// BlockProposer propose blocks to be send to the DEXON network. -func (v *Validator) BlockProposer() { +// BroadcastGenesisBlock broadcasts genesis block to all peers. +func (v *Validator) BroadcastGenesisBlock() { // Wait until all peer joined the network. for v.network.NumPeers() != v.config.Num { time.Sleep(time.Second) @@ -120,7 +149,10 @@ func (v *Validator) BlockProposer() { v.lattice.PrepareBlock(b) v.network.BroadcastBlock(b) } +} +// BlockProposer propose blocks to be send to the DEXON network. +func (v *Validator) BlockProposer(isStopped, isShutdown chan struct{}) { // Wait until all peer knows each other. for len(v.lattice.ValidatorSet) != v.config.Num { time.Sleep(time.Second) @@ -130,7 +162,7 @@ func (v *Validator) BlockProposer() { Sigma: v.config.ProposeIntervalSigma, Mean: v.config.ProposeIntervalMean, } - +ProposingBlockLoop: for { time.Sleep(model.Delay()) @@ -144,5 +176,12 @@ func (v *Validator) BlockProposer() { v.current = block v.lattice.PrepareBlock(block) v.network.BroadcastBlock(block) + select { + case <-isShutdown: + isStopped <- struct{}{} + break ProposingBlockLoop + default: + break + } } } diff --git a/simulation/verification.go b/simulation/verification.go index b32d719..0e111be 100644 --- a/simulation/verification.go +++ b/simulation/verification.go @@ -70,12 +70,12 @@ func (totalOrder *TotalOrderResult) PushBlocks(blocks BlockList) (ready bool) { // of each validators may not be the same, so only the common part is verified. func VerifyTotalOrder(id types.ValidatorID, totalOrder PeerTotalOrder) ( - unverifiedMap PeerTotalOrder, correct bool) { + unverifiedMap PeerTotalOrder, correct bool, length int) { hasError := false // Get the common length from all validators. - length := math.MaxInt32 + length = math.MaxInt32 for _, peerTotalOrder := range totalOrder { if len(peerTotalOrder.hashList) < length { length = len(peerTotalOrder.hashList) @@ -107,5 +107,5 @@ func VerifyTotalOrder(id types.ValidatorID, totalOrder[vid].hashList[length:] } } - return totalOrder, !hasError + return totalOrder, !hasError, length } |