diff options
author | Wei-Ning Huang <w@dexon.org> | 2018-07-20 09:52:52 +0800 |
---|---|---|
committer | missionliao <38416648+missionliao@users.noreply.github.com> | 2018-07-20 09:52:52 +0800 |
commit | dfa19fc2b4e38097334f4a30e159f9bcd92909c0 (patch) | |
tree | 718524746a0871a07a4a68dea43481e6ac841b39 /simulation | |
parent | 46a84bff9ac419853550f7c17b13fb8633e507c9 (diff) | |
download | dexon-consensus-dfa19fc2b4e38097334f4a30e159f9bcd92909c0.tar dexon-consensus-dfa19fc2b4e38097334f4a30e159f9bcd92909c0.tar.gz dexon-consensus-dfa19fc2b4e38097334f4a30e159f9bcd92909c0.tar.bz2 dexon-consensus-dfa19fc2b4e38097334f4a30e159f9bcd92909c0.tar.lz dexon-consensus-dfa19fc2b4e38097334f4a30e159f9bcd92909c0.tar.xz dexon-consensus-dfa19fc2b4e38097334f4a30e159f9bcd92909c0.tar.zst dexon-consensus-dfa19fc2b4e38097334f4a30e159f9bcd92909c0.zip |
Implement simulation on a real network (#5)
simulation: implement simulation on a real network
Diffstat (limited to 'simulation')
-rw-r--r-- | simulation/config/config.go | 19 | ||||
-rw-r--r-- | simulation/constant.go | 23 | ||||
-rw-r--r-- | simulation/fake-network.go | 95 | ||||
-rw-r--r-- | simulation/kubernetes/Dockerfile | 15 | ||||
-rw-r--r-- | simulation/kubernetes/config.toml.in | 13 | ||||
-rwxr-xr-x | simulation/kubernetes/entrypoint.sh | 7 | ||||
-rw-r--r-- | simulation/kubernetes/peer-server.yaml | 51 | ||||
-rwxr-xr-x | simulation/kubernetes/run_simulation.sh | 41 | ||||
-rw-r--r-- | simulation/kubernetes/validator.yaml.in | 38 | ||||
-rw-r--r-- | simulation/network.go | 86 | ||||
-rw-r--r-- | simulation/peer-server.go | 117 | ||||
-rw-r--r-- | simulation/simulation.go | 56 | ||||
-rw-r--r-- | simulation/tcp-network.go | 233 | ||||
-rw-r--r-- | simulation/validator.go | 55 |
14 files changed, 720 insertions, 129 deletions
diff --git a/simulation/config/config.go b/simulation/config/config.go index 228d69b..30d1562 100644 --- a/simulation/config/config.go +++ b/simulation/config/config.go @@ -23,6 +23,16 @@ import ( "github.com/naoina/toml" ) +// NetworkType is the simulation network type. +type NetworkType string + +// NetworkType enums. +const ( + NetworkTypeFake NetworkType = "fake" + NetworkTypeTCP NetworkType = "tcp" + NetworkTypeTCPLocal NetworkType = "tcp-local" +) + // Validator config for the simulation. type Validator struct { Num int @@ -32,6 +42,9 @@ type Validator struct { // Networking config. type Networking struct { + Type NetworkType + PeerServer string + Mean float64 Sigma float64 LossRateValue float64 @@ -55,13 +68,15 @@ func GenerateDefault(path string) error { config := Config{ Title: "DEXON Consensus Simulation Config", Validator: Validator{ - Num: 4, + Num: 7, ProposeIntervalMean: 500, ProposeIntervalSigma: 30, }, Networking: Networking{ + Type: NetworkTypeFake, + PeerServer: "peer.server", Mean: 100, - Sigma: 30, + Sigma: 10, LossRateValue: 0, }, } diff --git a/simulation/constant.go b/simulation/constant.go new file mode 100644 index 0000000..aa21a3d --- /dev/null +++ b/simulation/constant.go @@ -0,0 +1,23 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package simulation + +const ( + peerPort = 8080 + msgBufferSize = 128 +) diff --git a/simulation/fake-network.go b/simulation/fake-network.go new file mode 100644 index 0000000..923fab4 --- /dev/null +++ b/simulation/fake-network.go @@ -0,0 +1,95 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package simulation + +import ( + "math/rand" + "sync" + "time" + + "github.com/dexon-foundation/dexon-consensus-core/core" + "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +// FakeNetwork implements the core.Network interface. +type FakeNetwork struct { + model Model + + endpointMutex sync.RWMutex + endpoints map[types.ValidatorID]chan interface{} +} + +// NewFakeNetwork returns pointer to a new Network instance. +func NewFakeNetwork(model Model) *FakeNetwork { + return &FakeNetwork{ + model: model, + endpoints: make(map[types.ValidatorID]chan interface{}), + } +} + +// Start starts the network. +func (n *FakeNetwork) Start() { +} + +// NumPeers returns the number of peers in the network. +func (n *FakeNetwork) NumPeers() int { + n.endpointMutex.Lock() + defer n.endpointMutex.Unlock() + return len(n.endpoints) +} + +// Join allow a client to join the network. It reutnrs a interface{} channel for +// the client to recieve information. +func (n *FakeNetwork) Join(endpoint core.Endpoint) chan interface{} { + n.endpointMutex.Lock() + defer n.endpointMutex.Unlock() + + if x, exists := n.endpoints[endpoint.GetID()]; exists { + return x + } + recivingChannel := make(chan interface{}, msgBufferSize) + + n.endpoints[endpoint.GetID()] = recivingChannel + return recivingChannel +} + +// Send sends a msg to another client. +func (n *FakeNetwork) Send(destID types.ValidatorID, msg interface{}) { + clientChannel, exists := n.endpoints[destID] + if !exists { + return + } + + go func() { + if rand.Float64() > n.model.LossRate() { + time.Sleep(n.model.Delay()) + + clientChannel <- msg + } + }() +} + +// BroadcastBlock broadcast blocks into the network. +func (n *FakeNetwork) BroadcastBlock(block *types.Block) { + n.endpointMutex.Lock() + defer n.endpointMutex.Unlock() + + for endpoint := range n.endpoints { + n.Send(endpoint, block.Clone()) + } +} diff --git a/simulation/kubernetes/Dockerfile b/simulation/kubernetes/Dockerfile new file mode 100644 index 0000000..c6599d8 --- /dev/null +++ b/simulation/kubernetes/Dockerfile @@ -0,0 +1,15 @@ +FROM golang:alpine +MAINTAINER Wei-Ning Huang <w@dexon.org> + +# Cobinhood vendor base directory. +RUN mkdir -p /opt/dexon/ + +# Copy data. +COPY build/dexcon-simulation /opt/dexon +COPY build/dexcon-simulation-peer-server /opt/dexon +COPY entrypoint.sh /opt/dexon +COPY config.toml /opt/dexon + +WORKDIR /opt/dexon + +ENTRYPOINT ["./entrypoint.sh"] diff --git a/simulation/kubernetes/config.toml.in b/simulation/kubernetes/config.toml.in new file mode 100644 index 0000000..d956055 --- /dev/null +++ b/simulation/kubernetes/config.toml.in @@ -0,0 +1,13 @@ +title = "DEXON Consensus Simulation Config" + +[validator] +num = {{numValidators}} +propose_interval_mean = 5e+02 +propose_interval_sigma = 3e+01 + +[networking] +type = "tcp" +peer_server = "peer-server-svc.default.svc.cluster.local" +mean = 1e+02 +sigma = 1e+01 +loss_rate_value = 0e+00 diff --git a/simulation/kubernetes/entrypoint.sh b/simulation/kubernetes/entrypoint.sh new file mode 100755 index 0000000..a892580 --- /dev/null +++ b/simulation/kubernetes/entrypoint.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +if [ "$ROLE" = "validator" ]; then + exec ./dexcon-simulation -config config.toml +elif [ "$ROLE" = "peer-server" ]; then + exec ./dexcon-simulation-peer-server -config config.toml +fi diff --git a/simulation/kubernetes/peer-server.yaml b/simulation/kubernetes/peer-server.yaml new file mode 100644 index 0000000..360736b --- /dev/null +++ b/simulation/kubernetes/peer-server.yaml @@ -0,0 +1,51 @@ +apiVersion: v1 +kind: Service +metadata: + name: peer-server-svc + labels: + app: peer-server-svc +spec: + selector: + app: dexcon-simulation-peer-server + ports: + - protocol: TCP + port: 8080 + targetPort: 8080 +--- +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: dexcon-simulation-peer-server + labels: + app: dexcon-simulation-peer-server +spec: + revisionHistoryLimit: 5 + replicas: 1 + template: + metadata: + name: dexcon-simulation-peer-server + labels: + app: dexcon-simulation-peer-server + spec: + nodeSelector: + cloud.google.com/gke-nodepool: default-pool + volumes: + - name: ssl-certs + hostPath: + path: /etc/ssl/certs + containers: + - name: dexcon-simulation + image: asia.gcr.io/cobinhood/dexcon-simulation:latest + imagePullPolicy: Always + ports: + - containerPort: 8080 + resources: + requests: + cpu: 100m + memory: 256Mi + limits: + cpu: 200m + memory: 512Mi + env: + - name: ROLE + value: "peer-server" diff --git a/simulation/kubernetes/run_simulation.sh b/simulation/kubernetes/run_simulation.sh new file mode 100755 index 0000000..88f9eee --- /dev/null +++ b/simulation/kubernetes/run_simulation.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +IMAGE_TAG=asia.gcr.io/cobinhood/dexcon-simulation:latest + + +build_binary() { + make DOCKER=true -C ../.. + cp -r ../../build . +} + +build_docker_image() { + docker build -t ${IMAGE_TAG} . + docker push ${IMAGE_TAG} +} + +start_simulation() { + kubectl delete deployment --all --force --grace-period=0 + sleep 10 + + kubectl apply -f peer-server.yaml + sleep 10 + kubectl apply -f validator.yaml +} + +main() { + local num_validators=$1 + + if [ "$num_validators" == "" ]; then + num_validators=7 + fi + + # Render configuration files. + sed "s/{{numValidators}}/$num_validators/" validator.yaml.in > validator.yaml + sed "s/{{numValidators}}/$num_validators/" config.toml.in > config.toml + + build_binary + build_docker_image + start_simulation +} + +main $* diff --git a/simulation/kubernetes/validator.yaml.in b/simulation/kubernetes/validator.yaml.in new file mode 100644 index 0000000..8c0dd1d --- /dev/null +++ b/simulation/kubernetes/validator.yaml.in @@ -0,0 +1,38 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: dexcon-simulation + labels: + app: dexcon-simulation + type: cobinhood +spec: + revisionHistoryLimit: 5 + replicas: {{numValidators}} + template: + metadata: + name: dexcon-simulation + labels: + app: dexcon-simulation + spec: + nodeSelector: + cloud.google.com/gke-nodepool: default-pool + volumes: + - name: ssl-certs + hostPath: + path: /etc/ssl/certs + containers: + - name: dexcon-simulation + image: asia.gcr.io/cobinhood/dexcon-simulation:latest + imagePullPolicy: Always + ports: + - containerPort: 8080 + resources: + requests: + cpu: 100m + memory: 256Mi + limits: + cpu: 200m + memory: 512Mi + env: + - name: ROLE + value: "validator" diff --git a/simulation/network.go b/simulation/network.go deleted file mode 100644 index 51eb868..0000000 --- a/simulation/network.go +++ /dev/null @@ -1,86 +0,0 @@ -// copyright 2018 the dexon-consensus-core authors -// this file is part of the dexon-consensus-core library. -// -// the dexon-consensus-core library is free software: you can redistribute it -// and/or modify it under the terms of the gnu lesser general public license as -// published by the free software foundation, either version 3 of the license, -// or (at your option) any later version. -// -// the dexon-consensus-core library is distributed in the hope that it will be -// useful, but without any warranty; without even the implied warranty of -// merchantability or fitness for a particular purpose. see the gnu lesser -// general public license for more details. -// -// you should have received a copy of the gnu lesser general public license -// along with the dexon-consensus-core library. if not, see -// <http://www.gnu.org/licenses/>. - -package simulation - -import ( - "math/rand" - "sync" - "time" - - "github.com/dexon-foundation/dexon-consensus-core/core" - "github.com/dexon-foundation/dexon-consensus-core/core/types" -) - -const msgBufferSize = 128 - -// Network implements the consensus.Network interface. -type Network struct { - model Model - - endpointMutex sync.RWMutex - endpoints map[types.ValidatorID]chan interface{} -} - -// NewNetwork returns pointer to a new Network instance. -func NewNetwork(model Model) *Network { - return &Network{ - model: model, - endpoints: make(map[types.ValidatorID]chan interface{}), - } -} - -// Join allow a client to join the network. It reutnrs a interface{} channel for -// the client to recieve information. -func (n *Network) Join(endpoint core.Endpoint) chan interface{} { - n.endpointMutex.Lock() - defer n.endpointMutex.Unlock() - - if x, exists := n.endpoints[endpoint.GetID()]; exists { - return x - } - - recivingChannel := make(chan interface{}, msgBufferSize) - n.endpoints[endpoint.GetID()] = recivingChannel - return recivingChannel -} - -// Send sends a msg to another client. -func (n *Network) Send(destID types.ValidatorID, msg interface{}) { - n.endpointMutex.RLock() - defer n.endpointMutex.RUnlock() - - clientChannel, exists := n.endpoints[destID] - if !exists { - return - } - - go func() { - if rand.Float64() > n.model.LossRate() { - time.Sleep(n.model.Delay()) - - clientChannel <- msg - } - }() -} - -// BroadcastBlock broadcast blocks into the network. -func (n *Network) BroadcastBlock(block *types.Block) { - for endpoint := range n.endpoints { - n.Send(endpoint, block.Clone()) - } -} diff --git a/simulation/peer-server.go b/simulation/peer-server.go new file mode 100644 index 0000000..4a74fb7 --- /dev/null +++ b/simulation/peer-server.go @@ -0,0 +1,117 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package simulation + +import ( + "encoding/json" + "fmt" + "log" + "net" + "net/http" + "sync" + + "github.com/dexon-foundation/dexon-consensus-core/core/types" + "github.com/dexon-foundation/dexon-consensus-core/simulation/config" +) + +// PeerServer is the main object for maintaining peer list. +type PeerServer struct { + peers map[types.ValidatorID]string + peersMu sync.Mutex +} + +// NewPeerServer returns a new peer server. +func NewPeerServer() *PeerServer { + return &PeerServer{ + peers: make(map[types.ValidatorID]string), + } +} + +// Run starts the peer server. +func (p *PeerServer) Run(configPath string) { + cfg, err := config.Read(configPath) + if err != nil { + panic(err) + } + + resetHandler := func(w http.ResponseWriter, r *http.Request) { + p.peersMu.Lock() + defer p.peersMu.Unlock() + + p.peers = make(map[types.ValidatorID]string) + log.Printf("Peer server has been reset.") + } + + joinHandler := func(w http.ResponseWriter, r *http.Request) { + idString := r.Header.Get("ID") + portString := r.Header.Get("PORT") + + id := types.ValidatorID{} + id.UnmarshalText([]byte(idString)) + + p.peersMu.Lock() + defer p.peersMu.Unlock() + + host, _, _ := net.SplitHostPort(r.RemoteAddr) + p.peers[id] = fmt.Sprintf("%s:%s", host, portString) + log.Printf("Peer %s joined from %s", id, p.peers[id]) + } + + peersHandler := func(w http.ResponseWriter, r *http.Request) { + p.peersMu.Lock() + defer p.peersMu.Unlock() + + if len(p.peers) != cfg.Validator.Num { + w.WriteHeader(http.StatusNotFound) + return + } + + jsonText, err := json.Marshal(p.peers) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.Write(jsonText) + } + + infoHandler := func(w http.ResponseWriter, r *http.Request) { + p.peersMu.Lock() + defer p.peersMu.Unlock() + + jsonText, err := json.Marshal(p.peers) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.Write(jsonText) + } + + http.HandleFunc("/reset", resetHandler) + http.HandleFunc("/join", joinHandler) + http.HandleFunc("/peers", peersHandler) + http.HandleFunc("/info", infoHandler) + + addr := fmt.Sprintf("0.0.0.0:%d", peerPort) + log.Printf("Peer server started at %s", addr) + + http.ListenAndServe(addr, nil) +} diff --git a/simulation/simulation.go b/simulation/simulation.go index 8708293..a219d61 100644 --- a/simulation/simulation.go +++ b/simulation/simulation.go @@ -21,37 +21,57 @@ import ( "fmt" "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core" "github.com/dexon-foundation/dexon-consensus-core/core/types" "github.com/dexon-foundation/dexon-consensus-core/simulation/config" ) // Run starts the simulation. func Run(configPath string) { - config, err := config.Read(configPath) + cfg, err := config.Read(configPath) if err != nil { panic(err) } - networkModel := &NormalNetwork{ - Sigma: config.Networking.Sigma, - Mean: config.Networking.Mean, - LossRateValue: config.Networking.LossRateValue, - } - network := NewNetwork(networkModel) + networkType := cfg.Networking.Type - var vs []*Validator - for i := 0; i < config.Validator.Num; i++ { - id := types.ValidatorID(common.NewRandomHash()) - vs = append(vs, NewValidator(id, config.Validator, network, nil)) - } + if networkType == config.NetworkTypeFake || + networkType == config.NetworkTypeTCPLocal { - for i := 0; i < config.Validator.Num; i++ { - vs[i].Bootstrap(vs) - } + var vs []*Validator + var network core.Network + + if networkType == config.NetworkTypeFake { + networkModel := &NormalNetwork{ + Sigma: cfg.Networking.Sigma, + Mean: cfg.Networking.Mean, + LossRateValue: cfg.Networking.LossRateValue, + } + network = NewFakeNetwork(networkModel) + + for i := 0; i < cfg.Validator.Num; i++ { + id := types.ValidatorID{Hash: common.NewRandomHash()} + vs = append(vs, NewValidator(id, cfg.Validator, network, nil)) + } + } else if networkType == config.NetworkTypeTCPLocal { + for i := 0; i < cfg.Validator.Num; i++ { + id := types.ValidatorID{Hash: common.NewRandomHash()} + network := NewTCPNetwork(true, cfg.Networking.PeerServer) + go network.Start() + vs = append(vs, NewValidator(id, cfg.Validator, network, nil)) + } + } - for i := 0; i < config.Validator.Num; i++ { - fmt.Printf("Validator %d: %s\n", i, vs[i].ID) - go vs[i].Run() + for i := 0; i < cfg.Validator.Num; i++ { + fmt.Printf("Validator %d: %s\n", i, vs[i].ID) + go vs[i].Run() + } + } else if networkType == config.NetworkTypeTCP { + id := types.ValidatorID{Hash: common.NewRandomHash()} + network := NewTCPNetwork(false, cfg.Networking.PeerServer) + go network.Start() + v := NewValidator(id, cfg.Validator, network, nil) + go v.Run() } select {} diff --git a/simulation/tcp-network.go b/simulation/tcp-network.go new file mode 100644 index 0000000..7640480 --- /dev/null +++ b/simulation/tcp-network.go @@ -0,0 +1,233 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package simulation + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "runtime" + "strings" + "sync" + "time" + + "github.com/dexon-foundation/dexon-consensus-core/core" + "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +// TCPNetwork implements the core.Network interface. +type TCPNetwork struct { + local bool + port int + endpoint core.Endpoint + + peerServer string + endpointMutex sync.RWMutex + endpoints map[types.ValidatorID]string + recieveChan chan interface{} +} + +// NewTCPNetwork returns pointer to a new Network instance. +func NewTCPNetwork(local bool, peerServer string) *TCPNetwork { + port := 1024 + rand.Int()%1024 + if !local { + port = peerPort + } + return &TCPNetwork{ + local: local, + peerServer: peerServer, + port: port, + endpoints: make(map[types.ValidatorID]string), + recieveChan: make(chan interface{}, msgBufferSize), + } +} + +// Start starts the http server for accepting message. +func (n *TCPNetwork) Start() { + addr := fmt.Sprintf("0.0.0.0:%d", n.port) + server := &http.Server{ + Addr: addr, + Handler: n, + } + fmt.Printf("Validator started at %s\n", addr) + server.ListenAndServe() +} + +// NumPeers returns the number of peers in the network. +func (n *TCPNetwork) NumPeers() int { + n.endpointMutex.Lock() + defer n.endpointMutex.Unlock() + + return len(n.endpoints) +} + +// ServerHTTP implements the http.Handler interface. +func (n *TCPNetwork) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + body, err := ioutil.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + m := struct { + Type string `json:"type"` + Payload json.RawMessage `json:"payload"` + }{} + if err := json.Unmarshal(body, &m); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + switch m.Type { + case "block": + block := &types.Block{} + if err := json.Unmarshal(m.Payload, block); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + n.recieveChan <- block + default: + w.WriteHeader(http.StatusBadRequest) + return + } +} + +// Join allow a client to join the network. It reutnrs a interface{} channel for +// the client to recieve information. +func (n *TCPNetwork) Join(endpoint core.Endpoint) chan interface{} { + n.endpointMutex.Lock() + defer n.endpointMutex.Unlock() + + n.endpoint = endpoint + + joinURL := fmt.Sprintf("http://%s:%d/join", n.peerServer, peerPort) + peersURL := fmt.Sprintf("http://%s:%d/peers", n.peerServer, peerPort) + + client := &http.Client{Timeout: 5 * time.Second} + + // Join the peer list. + for { + time.Sleep(time.Second) + + req, err := http.NewRequest(http.MethodGet, joinURL, nil) + if err != nil { + continue + } + req.Header.Add("ID", endpoint.GetID().String()) + req.Header.Add("PORT", fmt.Sprintf("%d", n.port)) + + resp, err := client.Do(req) + if err == nil && resp.StatusCode == http.StatusOK { + break + } + } + + var peerList map[types.ValidatorID]string + + // Wait for the server to collect all validators and return a list. + for { + time.Sleep(time.Second) + + req, err := http.NewRequest(http.MethodGet, peersURL, nil) + if err != nil { + fmt.Println(err) + continue + } + req.Header.Add("ID", endpoint.GetID().String()) + + resp, err := client.Do(req) + if err != nil || resp.StatusCode != http.StatusOK { + continue + } + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + + if err := json.Unmarshal(body, &peerList); err != nil { + fmt.Printf("error: %v", err) + continue + } + break + } + + for key, val := range peerList { + n.endpoints[key] = val + } + return n.recieveChan +} + +// Send sends a msg to another client. +func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) { + clientAddr, exists := n.endpoints[destID] + if !exists { + return + } + + message := struct { + Type string `json:"type"` + Payload interface{} `json:"payload"` + }{} + + switch v := msg.(type) { + case *types.Block: + message.Type = "block" + message.Payload = v + default: + fmt.Println("error: invalid message type") + return + } + + messageJSON, err := json.Marshal(message) + if err != nil { + fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, message) + return + } + + msgURL := fmt.Sprintf("http://%s/msg", clientAddr) + + go func() { + retries := 3 + client := &http.Client{Timeout: 5 * time.Second} + + for i := 0; i < retries; i++ { + req, err := http.NewRequest( + http.MethodPost, msgURL, strings.NewReader(string(messageJSON))) + if err != nil { + continue + } + req.Header.Add("ID", n.endpoint.GetID().String()) + + resp, err := client.Do(req) + if err == nil && resp.StatusCode == http.StatusOK { + runtime.Goexit() + } + time.Sleep(1 * time.Second) + } + fmt.Printf("failed to send message: %v\n", msg) + }() +} + +// BroadcastBlock broadcast blocks into the network. +func (n *TCPNetwork) BroadcastBlock(block *types.Block) { + for endpoint := range n.endpoints { + n.Send(endpoint, block.Clone()) + } +} diff --git a/simulation/validator.go b/simulation/validator.go index d102d0d..c98efd2 100644 --- a/simulation/validator.go +++ b/simulation/validator.go @@ -52,19 +52,8 @@ func NewValidator( config config.Validator, network core.Network, db *leveldb.DB) *Validator { - - hash := common.NewRandomHash() - genesis := &types.Block{ - ProposerID: id, - ParentHash: hash, - Hash: hash, - Height: 0, - Acks: map[common.Hash]struct{}{}, - } - app := NewSimApp(id) - lattice := core.NewBlockLattice(blockdb.NewMemBackedBlockDB(), network, app) - + lattice := core.NewBlockLattice(blockdb.NewMemBackedBlockDB(), app) return &Validator{ ID: id, config: config, @@ -72,8 +61,6 @@ func NewValidator( app: app, db: db, lattice: lattice, - genesis: genesis, - current: genesis, } } @@ -82,14 +69,6 @@ func (v *Validator) GetID() types.ValidatorID { return v.ID } -// Bootstrap bootstraps a validator. -func (v *Validator) Bootstrap(vs []*Validator) { - for _, x := range vs { - v.lattice.AddValidator(x.ID, x.genesis) - } - v.lattice.SetOwner(v.ID) -} - // Run starts the validator. func (v *Validator) Run() { v.msgChannel = v.network.Join(v) @@ -118,6 +97,35 @@ func (v *Validator) MsgServer() { // BlockProposer propose blocks to be send to the DEXON network. func (v *Validator) BlockProposer() { + // Wait until all peer joined the network. + for v.network.NumPeers() != v.config.Num { + time.Sleep(time.Second) + } + + if v.genesis == nil { + hash := common.NewRandomHash() + b := &types.Block{ + ProposerID: v.ID, + ParentHash: hash, + Hash: hash, + Height: 0, + Acks: map[common.Hash]struct{}{}, + } + v.genesis = b + v.current = b + + v.lattice.AddValidator(v.ID, b) + v.lattice.SetOwner(v.ID) + + v.lattice.PrepareBlock(b) + v.network.BroadcastBlock(b) + } + + // Wait until all peer knows each other. + for len(v.lattice.ValidatorSet) != v.config.Num { + time.Sleep(time.Second) + } + model := &NormalNetwork{ Sigma: v.config.ProposeIntervalSigma, Mean: v.config.ProposeIntervalMean, @@ -134,6 +142,7 @@ func (v *Validator) BlockProposer() { Acks: map[common.Hash]struct{}{}, } v.current = block - v.lattice.ProposeBlock(block) + v.lattice.PrepareBlock(block) + v.network.BroadcastBlock(block) } } |