aboutsummaryrefslogtreecommitdiffstats
path: root/simulation
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2018-07-30 09:05:58 +0800
committerWei-Ning Huang <aitjcize@gmail.com>2018-07-30 09:05:58 +0800
commit279daea6e004ab6ad9d079ccc35b7c52d79630ad (patch)
tree6e07c9ddf5608339c216c4657250f7df238bd75e /simulation
parent568ce1f526d10184af2ccfe342394f57ae689a14 (diff)
downloadtangerine-consensus-279daea6e004ab6ad9d079ccc35b7c52d79630ad.tar
tangerine-consensus-279daea6e004ab6ad9d079ccc35b7c52d79630ad.tar.gz
tangerine-consensus-279daea6e004ab6ad9d079ccc35b7c52d79630ad.tar.bz2
tangerine-consensus-279daea6e004ab6ad9d079ccc35b7c52d79630ad.tar.lz
tangerine-consensus-279daea6e004ab6ad9d079ccc35b7c52d79630ad.tar.xz
tangerine-consensus-279daea6e004ab6ad9d079ccc35b7c52d79630ad.tar.zst
tangerine-consensus-279daea6e004ab6ad9d079ccc35b7c52d79630ad.zip
Add a config that PeerServer can shutdown after receiving enough of block. (#19)
Diffstat (limited to 'simulation')
-rw-r--r--simulation/config/config.go3
-rw-r--r--simulation/fake-network.go14
-rw-r--r--simulation/network.go29
-rw-r--r--simulation/peer-server.go77
-rw-r--r--simulation/simulation.go9
-rw-r--r--simulation/tcp-network.go63
-rw-r--r--simulation/validator.go61
-rw-r--r--simulation/verification.go6
8 files changed, 240 insertions, 22 deletions
diff --git a/simulation/config/config.go b/simulation/config/config.go
index 30d1562..c704a8d 100644
--- a/simulation/config/config.go
+++ b/simulation/config/config.go
@@ -18,6 +18,7 @@
package config
import (
+ "math"
"os"
"github.com/naoina/toml"
@@ -38,6 +39,7 @@ type Validator struct {
Num int
ProposeIntervalMean float64
ProposeIntervalSigma float64
+ MaxBlock uint64
}
// Networking config.
@@ -71,6 +73,7 @@ func GenerateDefault(path string) error {
Num: 7,
ProposeIntervalMean: 500,
ProposeIntervalSigma: 30,
+ MaxBlock: math.MaxUint64,
},
Networking: Networking{
Type: NetworkTypeFake,
diff --git a/simulation/fake-network.go b/simulation/fake-network.go
index e85917e..dde8bd7 100644
--- a/simulation/fake-network.go
+++ b/simulation/fake-network.go
@@ -96,6 +96,18 @@ func (n *FakeNetwork) BroadcastBlock(block *types.Block) {
// DeliverBlocks sends blocks to peerServer.
func (n *FakeNetwork) DeliverBlocks(blocks common.Hashes, id int) {
- // TODO
+ // TODO(jimmy-dexon): Implement this method.
return
}
+
+// NotifyServer sends message to peerServer
+func (n *FakeNetwork) NotifyServer(msg Message) {
+ // TODO(jimmy-dexon): Implement this method.
+ return
+}
+
+// GetServerInfo retrieve the info message from peerServer.
+func (n *FakeNetwork) GetServerInfo() InfoMessage {
+ // TODO(jimmy-dexon): Implement this method.
+ return InfoMessage{}
+}
diff --git a/simulation/network.go b/simulation/network.go
index 7ce0dbc..e69dd43 100644
--- a/simulation/network.go
+++ b/simulation/network.go
@@ -18,10 +18,37 @@
package simulation
import (
+ "encoding/json"
+
"github.com/dexon-foundation/dexon-consensus-core/common"
"github.com/dexon-foundation/dexon-consensus-core/core/types"
)
+type messageType string
+
+const (
+ shutdownAck messageType = "shutdownAck"
+)
+
+// Message is a struct for peer sending message to server.
+type Message struct {
+ Type messageType `json:"type"`
+ Payload json.RawMessage `json:"payload"`
+}
+
+type infoStatus string
+
+const (
+ normal infoStatus = "normal"
+ shutdown infoStatus = "shutdown"
+)
+
+// InfoMessage is a struct used by peerServer's /info.
+type InfoMessage struct {
+ Status infoStatus `json:"status"`
+ Peers map[types.ValidatorID]string `json:"peers"`
+}
+
// Endpoint is the interface for a client network endpoint.
type Endpoint interface {
GetID() types.ValidatorID
@@ -39,4 +66,6 @@ type Network interface {
// PeerServerNetwork is the interface for peerServer network related functions
type PeerServerNetwork interface {
DeliverBlocks(blocks common.Hashes, id int)
+ NotifyServer(msg Message)
+ GetServerInfo() InfoMessage
}
diff --git a/simulation/peer-server.go b/simulation/peer-server.go
index 0d69f8e..9cace94 100644
--- a/simulation/peer-server.go
+++ b/simulation/peer-server.go
@@ -18,6 +18,7 @@
package simulation
import (
+ "context"
"encoding/json"
"fmt"
"io/ioutil"
@@ -25,6 +26,7 @@ import (
"net"
"net/http"
"sync"
+ "time"
"github.com/dexon-foundation/dexon-consensus-core/core/types"
"github.com/dexon-foundation/dexon-consensus-core/simulation/config"
@@ -36,6 +38,7 @@ type PeerServer struct {
peersMu sync.Mutex
peerTotalOrder PeerTotalOrder
peerTotalOrderMu sync.Mutex
+ verifiedLen uint64
}
// NewPeerServer returns a new peer server.
@@ -100,7 +103,17 @@ func (p *PeerServer) Run(configPath string) {
p.peersMu.Lock()
defer p.peersMu.Unlock()
- jsonText, err := json.Marshal(p.peers)
+ msg := InfoMessage{
+ Status: normal,
+ Peers: p.peers,
+ }
+
+ // Determine msg.status.
+ if p.verifiedLen >= cfg.Validator.MaxBlock {
+ msg.Status = shutdown
+ }
+
+ jsonText, err := json.Marshal(msg)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
@@ -127,7 +140,12 @@ func (p *PeerServer) Run(configPath string) {
}
id := types.ValidatorID{}
- id.UnmarshalText([]byte(idString))
+ if err := id.UnmarshalText([]byte(idString)); err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ w.WriteHeader(http.StatusOK)
p.peerTotalOrderMu.Lock()
defer p.peerTotalOrderMu.Unlock()
@@ -142,21 +160,72 @@ func (p *PeerServer) Run(configPath string) {
p.peerTotalOrderMu.Lock()
defer p.peerTotalOrderMu.Unlock()
var correct bool
- p.peerTotalOrder, correct = VerifyTotalOrder(id, p.peerTotalOrder)
+ var length int
+ p.peerTotalOrder, correct, length = VerifyTotalOrder(id, p.peerTotalOrder)
if !correct {
log.Printf("The result of Total Ordering Algorithm has error.\n")
}
+ p.verifiedLen += uint64(length)
}(id)
}
+ stopServer := make(chan struct{})
+
+ messageHandler := func(w http.ResponseWriter, r *http.Request) {
+ idString := r.Header.Get("ID")
+ id := types.ValidatorID{}
+ id.UnmarshalText([]byte(idString))
+
+ defer r.Body.Close()
+ body, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ m := Message{}
+ if err := json.Unmarshal(body, &m); err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ if m.Type != shutdownAck {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ w.WriteHeader(http.StatusOK)
+ delete(p.peers, id)
+ log.Printf("%v shutdown, %d remains.\n", id, len(p.peers))
+ if len(p.peers) == 0 {
+ stopServer <- struct{}{}
+ }
+ }
+
http.HandleFunc("/reset", resetHandler)
http.HandleFunc("/join", joinHandler)
http.HandleFunc("/peers", peersHandler)
http.HandleFunc("/info", infoHandler)
http.HandleFunc("/delivery", deliveryHandler)
+ http.HandleFunc("/message", messageHandler)
addr := fmt.Sprintf("0.0.0.0:%d", peerPort)
log.Printf("Peer server started at %s", addr)
- http.ListenAndServe(addr, nil)
+ server := &http.Server{Addr: addr}
+
+ go func() {
+ <-stopServer
+
+ log.Printf("Shutting down peerServer.\n")
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := server.Shutdown(ctx); err != nil {
+ log.Printf("Error shutting down peerServer: %v\n", err)
+ }
+ }()
+
+ if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
+ log.Fatalf("Error starting server %v\n", err)
+ }
}
diff --git a/simulation/simulation.go b/simulation/simulation.go
index 8ec72ea..2ea768e 100644
--- a/simulation/simulation.go
+++ b/simulation/simulation.go
@@ -34,10 +34,11 @@ func Run(configPath string) {
networkType := cfg.Networking.Type
+ var vs []*Validator
+
if networkType == config.NetworkTypeFake ||
networkType == config.NetworkTypeTCPLocal {
- var vs []*Validator
var network Network
if networkType == config.NetworkTypeFake {
@@ -71,7 +72,11 @@ func Run(configPath string) {
go network.Start()
v := NewValidator(id, cfg.Validator, network, nil)
go v.Run()
+ vs = append(vs, v)
}
- select {}
+ for _, v := range vs {
+ v.Wait()
+ fmt.Printf("Validator %s is shutdown\n", v.GetID())
+ }
}
diff --git a/simulation/tcp-network.go b/simulation/tcp-network.go
index 464473b..ff4de9e 100644
--- a/simulation/tcp-network.go
+++ b/simulation/tcp-network.go
@@ -52,7 +52,7 @@ func NewTCPNetwork(local bool, peerServer string) *TCPNetwork {
}
pServer := peerServer
if local {
- pServer = "localhost"
+ pServer = "127.0.0.1"
}
return &TCPNetwork{
local: local,
@@ -217,6 +217,7 @@ func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) {
if err != nil {
continue
}
+ req.Close = true
req.Header.Add("ID", n.endpoint.GetID().String())
resp, err := client.Do(req)
@@ -273,3 +274,63 @@ func (n *TCPNetwork) DeliverBlocks(blocks common.Hashes, id int) {
fmt.Printf("failed to send message: %v\n", blocks)
}()
}
+
+// NotifyServer sends message to peerServer
+func (n *TCPNetwork) NotifyServer(msg Message) {
+ messageJSON, err := json.Marshal(msg)
+ if err != nil {
+ fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, msg)
+ return
+ }
+
+ msgURL := fmt.Sprintf("http://%s:%d/message", n.peerServer, peerPort)
+
+ retries := 3
+ client := &http.Client{Timeout: 5 * time.Second}
+
+ for i := 0; i < retries; i++ {
+ req, err := http.NewRequest(
+ http.MethodPost, msgURL, strings.NewReader(string(messageJSON)))
+ if err != nil {
+ continue
+ }
+ req.Header.Add("ID", n.endpoint.GetID().String())
+
+ resp, err := client.Do(req)
+ if err == nil && resp.StatusCode == http.StatusOK {
+ return
+ }
+ time.Sleep(1 * time.Second)
+ }
+ fmt.Printf("failed to send message: %v\n", msg)
+
+ return
+}
+
+// GetServerInfo retrieve the info message from peerServer.
+func (n *TCPNetwork) GetServerInfo() InfoMessage {
+ msgURL := fmt.Sprintf("http://%s:%d/info", n.peerServer, peerPort)
+ client := &http.Client{Timeout: 5 * time.Second}
+
+ req, err := http.NewRequest(
+ http.MethodGet, msgURL, nil)
+ if err != nil {
+ fmt.Printf("error: %v\n", err)
+ }
+
+ resp, err := client.Do(req)
+ if err != nil {
+ fmt.Printf("error: %v\n", err)
+ }
+ if resp.StatusCode != http.StatusOK {
+ fmt.Printf("error: %v\n", err)
+ }
+ defer resp.Body.Close()
+ body, err := ioutil.ReadAll(resp.Body)
+ infoMsg := InfoMessage{}
+
+ if err := json.Unmarshal(body, &infoMsg); err != nil {
+ fmt.Printf("error: %v", err)
+ }
+ return infoMsg
+}
diff --git a/simulation/validator.go b/simulation/validator.go
index b7ec2a0..6ce5fa7 100644
--- a/simulation/validator.go
+++ b/simulation/validator.go
@@ -37,6 +37,7 @@ type Validator struct {
config config.Validator
db *leveldb.DB
msgChannel chan interface{}
+ isFinished chan struct{}
ID types.ValidatorID
lattice *core.BlockLattice
@@ -55,12 +56,13 @@ func NewValidator(
app := NewSimApp(id, network)
lattice := core.NewBlockLattice(blockdb.NewMemBackedBlockDB(), app)
return &Validator{
- ID: id,
- config: config,
- network: network,
- app: app,
- db: db,
- lattice: lattice,
+ ID: id,
+ config: config,
+ network: network,
+ app: app,
+ db: db,
+ lattice: lattice,
+ isFinished: make(chan struct{}),
}
}
@@ -73,11 +75,38 @@ func (v *Validator) GetID() types.ValidatorID {
func (v *Validator) Run() {
v.msgChannel = v.network.Join(v)
+ isStopped := make(chan struct{})
+ isShutdown := make(chan struct{})
+
+ v.BroadcastGenesisBlock()
go v.MsgServer()
- go v.BlockProposer()
+ go v.CheckServerInfo(isShutdown)
+ go v.BlockProposer(isStopped, isShutdown)
// Blocks forever.
- select {}
+ <-isStopped
+ v.network.NotifyServer(Message{
+ Type: shutdownAck,
+ })
+ v.isFinished <- struct{}{}
+}
+
+// Wait for the validator to stop (if peerServer told it to).
+func (v *Validator) Wait() {
+ <-v.isFinished
+}
+
+// CheckServerInfo will check the info from the peerServer and update
+// validator's status if needed.
+func (v *Validator) CheckServerInfo(isShutdown chan struct{}) {
+ for {
+ infoMsg := v.network.GetServerInfo()
+ if infoMsg.Status == shutdown {
+ isShutdown <- struct{}{}
+ break
+ }
+ time.Sleep(250 * time.Millisecond)
+ }
}
// MsgServer listen to the network channel for message and handle it.
@@ -95,8 +124,8 @@ func (v *Validator) MsgServer() {
}
}
-// BlockProposer propose blocks to be send to the DEXON network.
-func (v *Validator) BlockProposer() {
+// BroadcastGenesisBlock broadcasts genesis block to all peers.
+func (v *Validator) BroadcastGenesisBlock() {
// Wait until all peer joined the network.
for v.network.NumPeers() != v.config.Num {
time.Sleep(time.Second)
@@ -120,7 +149,10 @@ func (v *Validator) BlockProposer() {
v.lattice.PrepareBlock(b)
v.network.BroadcastBlock(b)
}
+}
+// BlockProposer propose blocks to be send to the DEXON network.
+func (v *Validator) BlockProposer(isStopped, isShutdown chan struct{}) {
// Wait until all peer knows each other.
for len(v.lattice.ValidatorSet) != v.config.Num {
time.Sleep(time.Second)
@@ -130,7 +162,7 @@ func (v *Validator) BlockProposer() {
Sigma: v.config.ProposeIntervalSigma,
Mean: v.config.ProposeIntervalMean,
}
-
+ProposingBlockLoop:
for {
time.Sleep(model.Delay())
@@ -144,5 +176,12 @@ func (v *Validator) BlockProposer() {
v.current = block
v.lattice.PrepareBlock(block)
v.network.BroadcastBlock(block)
+ select {
+ case <-isShutdown:
+ isStopped <- struct{}{}
+ break ProposingBlockLoop
+ default:
+ break
+ }
}
}
diff --git a/simulation/verification.go b/simulation/verification.go
index b32d719..0e111be 100644
--- a/simulation/verification.go
+++ b/simulation/verification.go
@@ -70,12 +70,12 @@ func (totalOrder *TotalOrderResult) PushBlocks(blocks BlockList) (ready bool) {
// of each validators may not be the same, so only the common part is verified.
func VerifyTotalOrder(id types.ValidatorID,
totalOrder PeerTotalOrder) (
- unverifiedMap PeerTotalOrder, correct bool) {
+ unverifiedMap PeerTotalOrder, correct bool, length int) {
hasError := false
// Get the common length from all validators.
- length := math.MaxInt32
+ length = math.MaxInt32
for _, peerTotalOrder := range totalOrder {
if len(peerTotalOrder.hashList) < length {
length = len(peerTotalOrder.hashList)
@@ -107,5 +107,5 @@ func VerifyTotalOrder(id types.ValidatorID,
totalOrder[vid].hashList[length:]
}
}
- return totalOrder, !hasError
+ return totalOrder, !hasError, length
}