aboutsummaryrefslogtreecommitdiffstats
path: root/simulation
diff options
context:
space:
mode:
authorWei-Ning Huang <w@dexon.org>2018-07-20 09:52:52 +0800
committermissionliao <38416648+missionliao@users.noreply.github.com>2018-07-20 09:52:52 +0800
commitdfa19fc2b4e38097334f4a30e159f9bcd92909c0 (patch)
tree718524746a0871a07a4a68dea43481e6ac841b39 /simulation
parent46a84bff9ac419853550f7c17b13fb8633e507c9 (diff)
downloadtangerine-consensus-dfa19fc2b4e38097334f4a30e159f9bcd92909c0.tar
tangerine-consensus-dfa19fc2b4e38097334f4a30e159f9bcd92909c0.tar.gz
tangerine-consensus-dfa19fc2b4e38097334f4a30e159f9bcd92909c0.tar.bz2
tangerine-consensus-dfa19fc2b4e38097334f4a30e159f9bcd92909c0.tar.lz
tangerine-consensus-dfa19fc2b4e38097334f4a30e159f9bcd92909c0.tar.xz
tangerine-consensus-dfa19fc2b4e38097334f4a30e159f9bcd92909c0.tar.zst
tangerine-consensus-dfa19fc2b4e38097334f4a30e159f9bcd92909c0.zip
Implement simulation on a real network (#5)
simulation: implement simulation on a real network
Diffstat (limited to 'simulation')
-rw-r--r--simulation/config/config.go19
-rw-r--r--simulation/constant.go23
-rw-r--r--simulation/fake-network.go95
-rw-r--r--simulation/kubernetes/Dockerfile15
-rw-r--r--simulation/kubernetes/config.toml.in13
-rwxr-xr-xsimulation/kubernetes/entrypoint.sh7
-rw-r--r--simulation/kubernetes/peer-server.yaml51
-rwxr-xr-xsimulation/kubernetes/run_simulation.sh41
-rw-r--r--simulation/kubernetes/validator.yaml.in38
-rw-r--r--simulation/network.go86
-rw-r--r--simulation/peer-server.go117
-rw-r--r--simulation/simulation.go56
-rw-r--r--simulation/tcp-network.go233
-rw-r--r--simulation/validator.go55
14 files changed, 720 insertions, 129 deletions
diff --git a/simulation/config/config.go b/simulation/config/config.go
index 228d69b..30d1562 100644
--- a/simulation/config/config.go
+++ b/simulation/config/config.go
@@ -23,6 +23,16 @@ import (
"github.com/naoina/toml"
)
+// NetworkType is the simulation network type.
+type NetworkType string
+
+// NetworkType enums.
+const (
+ NetworkTypeFake NetworkType = "fake"
+ NetworkTypeTCP NetworkType = "tcp"
+ NetworkTypeTCPLocal NetworkType = "tcp-local"
+)
+
// Validator config for the simulation.
type Validator struct {
Num int
@@ -32,6 +42,9 @@ type Validator struct {
// Networking config.
type Networking struct {
+ Type NetworkType
+ PeerServer string
+
Mean float64
Sigma float64
LossRateValue float64
@@ -55,13 +68,15 @@ func GenerateDefault(path string) error {
config := Config{
Title: "DEXON Consensus Simulation Config",
Validator: Validator{
- Num: 4,
+ Num: 7,
ProposeIntervalMean: 500,
ProposeIntervalSigma: 30,
},
Networking: Networking{
+ Type: NetworkTypeFake,
+ PeerServer: "peer.server",
Mean: 100,
- Sigma: 30,
+ Sigma: 10,
LossRateValue: 0,
},
}
diff --git a/simulation/constant.go b/simulation/constant.go
new file mode 100644
index 0000000..aa21a3d
--- /dev/null
+++ b/simulation/constant.go
@@ -0,0 +1,23 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package simulation
+
+const (
+ peerPort = 8080
+ msgBufferSize = 128
+)
diff --git a/simulation/fake-network.go b/simulation/fake-network.go
new file mode 100644
index 0000000..923fab4
--- /dev/null
+++ b/simulation/fake-network.go
@@ -0,0 +1,95 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package simulation
+
+import (
+ "math/rand"
+ "sync"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus-core/core"
+ "github.com/dexon-foundation/dexon-consensus-core/core/types"
+)
+
+// FakeNetwork implements the core.Network interface.
+type FakeNetwork struct {
+ model Model
+
+ endpointMutex sync.RWMutex
+ endpoints map[types.ValidatorID]chan interface{}
+}
+
+// NewFakeNetwork returns pointer to a new Network instance.
+func NewFakeNetwork(model Model) *FakeNetwork {
+ return &FakeNetwork{
+ model: model,
+ endpoints: make(map[types.ValidatorID]chan interface{}),
+ }
+}
+
+// Start starts the network.
+func (n *FakeNetwork) Start() {
+}
+
+// NumPeers returns the number of peers in the network.
+func (n *FakeNetwork) NumPeers() int {
+ n.endpointMutex.Lock()
+ defer n.endpointMutex.Unlock()
+ return len(n.endpoints)
+}
+
+// Join allow a client to join the network. It reutnrs a interface{} channel for
+// the client to recieve information.
+func (n *FakeNetwork) Join(endpoint core.Endpoint) chan interface{} {
+ n.endpointMutex.Lock()
+ defer n.endpointMutex.Unlock()
+
+ if x, exists := n.endpoints[endpoint.GetID()]; exists {
+ return x
+ }
+ recivingChannel := make(chan interface{}, msgBufferSize)
+
+ n.endpoints[endpoint.GetID()] = recivingChannel
+ return recivingChannel
+}
+
+// Send sends a msg to another client.
+func (n *FakeNetwork) Send(destID types.ValidatorID, msg interface{}) {
+ clientChannel, exists := n.endpoints[destID]
+ if !exists {
+ return
+ }
+
+ go func() {
+ if rand.Float64() > n.model.LossRate() {
+ time.Sleep(n.model.Delay())
+
+ clientChannel <- msg
+ }
+ }()
+}
+
+// BroadcastBlock broadcast blocks into the network.
+func (n *FakeNetwork) BroadcastBlock(block *types.Block) {
+ n.endpointMutex.Lock()
+ defer n.endpointMutex.Unlock()
+
+ for endpoint := range n.endpoints {
+ n.Send(endpoint, block.Clone())
+ }
+}
diff --git a/simulation/kubernetes/Dockerfile b/simulation/kubernetes/Dockerfile
new file mode 100644
index 0000000..c6599d8
--- /dev/null
+++ b/simulation/kubernetes/Dockerfile
@@ -0,0 +1,15 @@
+FROM golang:alpine
+MAINTAINER Wei-Ning Huang <w@dexon.org>
+
+# Cobinhood vendor base directory.
+RUN mkdir -p /opt/dexon/
+
+# Copy data.
+COPY build/dexcon-simulation /opt/dexon
+COPY build/dexcon-simulation-peer-server /opt/dexon
+COPY entrypoint.sh /opt/dexon
+COPY config.toml /opt/dexon
+
+WORKDIR /opt/dexon
+
+ENTRYPOINT ["./entrypoint.sh"]
diff --git a/simulation/kubernetes/config.toml.in b/simulation/kubernetes/config.toml.in
new file mode 100644
index 0000000..d956055
--- /dev/null
+++ b/simulation/kubernetes/config.toml.in
@@ -0,0 +1,13 @@
+title = "DEXON Consensus Simulation Config"
+
+[validator]
+num = {{numValidators}}
+propose_interval_mean = 5e+02
+propose_interval_sigma = 3e+01
+
+[networking]
+type = "tcp"
+peer_server = "peer-server-svc.default.svc.cluster.local"
+mean = 1e+02
+sigma = 1e+01
+loss_rate_value = 0e+00
diff --git a/simulation/kubernetes/entrypoint.sh b/simulation/kubernetes/entrypoint.sh
new file mode 100755
index 0000000..a892580
--- /dev/null
+++ b/simulation/kubernetes/entrypoint.sh
@@ -0,0 +1,7 @@
+#!/bin/sh
+
+if [ "$ROLE" = "validator" ]; then
+ exec ./dexcon-simulation -config config.toml
+elif [ "$ROLE" = "peer-server" ]; then
+ exec ./dexcon-simulation-peer-server -config config.toml
+fi
diff --git a/simulation/kubernetes/peer-server.yaml b/simulation/kubernetes/peer-server.yaml
new file mode 100644
index 0000000..360736b
--- /dev/null
+++ b/simulation/kubernetes/peer-server.yaml
@@ -0,0 +1,51 @@
+apiVersion: v1
+kind: Service
+metadata:
+ name: peer-server-svc
+ labels:
+ app: peer-server-svc
+spec:
+ selector:
+ app: dexcon-simulation-peer-server
+ ports:
+ - protocol: TCP
+ port: 8080
+ targetPort: 8080
+---
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+ name: dexcon-simulation-peer-server
+ labels:
+ app: dexcon-simulation-peer-server
+spec:
+ revisionHistoryLimit: 5
+ replicas: 1
+ template:
+ metadata:
+ name: dexcon-simulation-peer-server
+ labels:
+ app: dexcon-simulation-peer-server
+ spec:
+ nodeSelector:
+ cloud.google.com/gke-nodepool: default-pool
+ volumes:
+ - name: ssl-certs
+ hostPath:
+ path: /etc/ssl/certs
+ containers:
+ - name: dexcon-simulation
+ image: asia.gcr.io/cobinhood/dexcon-simulation:latest
+ imagePullPolicy: Always
+ ports:
+ - containerPort: 8080
+ resources:
+ requests:
+ cpu: 100m
+ memory: 256Mi
+ limits:
+ cpu: 200m
+ memory: 512Mi
+ env:
+ - name: ROLE
+ value: "peer-server"
diff --git a/simulation/kubernetes/run_simulation.sh b/simulation/kubernetes/run_simulation.sh
new file mode 100755
index 0000000..88f9eee
--- /dev/null
+++ b/simulation/kubernetes/run_simulation.sh
@@ -0,0 +1,41 @@
+#!/bin/bash
+
+IMAGE_TAG=asia.gcr.io/cobinhood/dexcon-simulation:latest
+
+
+build_binary() {
+ make DOCKER=true -C ../..
+ cp -r ../../build .
+}
+
+build_docker_image() {
+ docker build -t ${IMAGE_TAG} .
+ docker push ${IMAGE_TAG}
+}
+
+start_simulation() {
+ kubectl delete deployment --all --force --grace-period=0
+ sleep 10
+
+ kubectl apply -f peer-server.yaml
+ sleep 10
+ kubectl apply -f validator.yaml
+}
+
+main() {
+ local num_validators=$1
+
+ if [ "$num_validators" == "" ]; then
+ num_validators=7
+ fi
+
+ # Render configuration files.
+ sed "s/{{numValidators}}/$num_validators/" validator.yaml.in > validator.yaml
+ sed "s/{{numValidators}}/$num_validators/" config.toml.in > config.toml
+
+ build_binary
+ build_docker_image
+ start_simulation
+}
+
+main $*
diff --git a/simulation/kubernetes/validator.yaml.in b/simulation/kubernetes/validator.yaml.in
new file mode 100644
index 0000000..8c0dd1d
--- /dev/null
+++ b/simulation/kubernetes/validator.yaml.in
@@ -0,0 +1,38 @@
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+ name: dexcon-simulation
+ labels:
+ app: dexcon-simulation
+ type: cobinhood
+spec:
+ revisionHistoryLimit: 5
+ replicas: {{numValidators}}
+ template:
+ metadata:
+ name: dexcon-simulation
+ labels:
+ app: dexcon-simulation
+ spec:
+ nodeSelector:
+ cloud.google.com/gke-nodepool: default-pool
+ volumes:
+ - name: ssl-certs
+ hostPath:
+ path: /etc/ssl/certs
+ containers:
+ - name: dexcon-simulation
+ image: asia.gcr.io/cobinhood/dexcon-simulation:latest
+ imagePullPolicy: Always
+ ports:
+ - containerPort: 8080
+ resources:
+ requests:
+ cpu: 100m
+ memory: 256Mi
+ limits:
+ cpu: 200m
+ memory: 512Mi
+ env:
+ - name: ROLE
+ value: "validator"
diff --git a/simulation/network.go b/simulation/network.go
deleted file mode 100644
index 51eb868..0000000
--- a/simulation/network.go
+++ /dev/null
@@ -1,86 +0,0 @@
-// copyright 2018 the dexon-consensus-core authors
-// this file is part of the dexon-consensus-core library.
-//
-// the dexon-consensus-core library is free software: you can redistribute it
-// and/or modify it under the terms of the gnu lesser general public license as
-// published by the free software foundation, either version 3 of the license,
-// or (at your option) any later version.
-//
-// the dexon-consensus-core library is distributed in the hope that it will be
-// useful, but without any warranty; without even the implied warranty of
-// merchantability or fitness for a particular purpose. see the gnu lesser
-// general public license for more details.
-//
-// you should have received a copy of the gnu lesser general public license
-// along with the dexon-consensus-core library. if not, see
-// <http://www.gnu.org/licenses/>.
-
-package simulation
-
-import (
- "math/rand"
- "sync"
- "time"
-
- "github.com/dexon-foundation/dexon-consensus-core/core"
- "github.com/dexon-foundation/dexon-consensus-core/core/types"
-)
-
-const msgBufferSize = 128
-
-// Network implements the consensus.Network interface.
-type Network struct {
- model Model
-
- endpointMutex sync.RWMutex
- endpoints map[types.ValidatorID]chan interface{}
-}
-
-// NewNetwork returns pointer to a new Network instance.
-func NewNetwork(model Model) *Network {
- return &Network{
- model: model,
- endpoints: make(map[types.ValidatorID]chan interface{}),
- }
-}
-
-// Join allow a client to join the network. It reutnrs a interface{} channel for
-// the client to recieve information.
-func (n *Network) Join(endpoint core.Endpoint) chan interface{} {
- n.endpointMutex.Lock()
- defer n.endpointMutex.Unlock()
-
- if x, exists := n.endpoints[endpoint.GetID()]; exists {
- return x
- }
-
- recivingChannel := make(chan interface{}, msgBufferSize)
- n.endpoints[endpoint.GetID()] = recivingChannel
- return recivingChannel
-}
-
-// Send sends a msg to another client.
-func (n *Network) Send(destID types.ValidatorID, msg interface{}) {
- n.endpointMutex.RLock()
- defer n.endpointMutex.RUnlock()
-
- clientChannel, exists := n.endpoints[destID]
- if !exists {
- return
- }
-
- go func() {
- if rand.Float64() > n.model.LossRate() {
- time.Sleep(n.model.Delay())
-
- clientChannel <- msg
- }
- }()
-}
-
-// BroadcastBlock broadcast blocks into the network.
-func (n *Network) BroadcastBlock(block *types.Block) {
- for endpoint := range n.endpoints {
- n.Send(endpoint, block.Clone())
- }
-}
diff --git a/simulation/peer-server.go b/simulation/peer-server.go
new file mode 100644
index 0000000..4a74fb7
--- /dev/null
+++ b/simulation/peer-server.go
@@ -0,0 +1,117 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package simulation
+
+import (
+ "encoding/json"
+ "fmt"
+ "log"
+ "net"
+ "net/http"
+ "sync"
+
+ "github.com/dexon-foundation/dexon-consensus-core/core/types"
+ "github.com/dexon-foundation/dexon-consensus-core/simulation/config"
+)
+
+// PeerServer is the main object for maintaining peer list.
+type PeerServer struct {
+ peers map[types.ValidatorID]string
+ peersMu sync.Mutex
+}
+
+// NewPeerServer returns a new peer server.
+func NewPeerServer() *PeerServer {
+ return &PeerServer{
+ peers: make(map[types.ValidatorID]string),
+ }
+}
+
+// Run starts the peer server.
+func (p *PeerServer) Run(configPath string) {
+ cfg, err := config.Read(configPath)
+ if err != nil {
+ panic(err)
+ }
+
+ resetHandler := func(w http.ResponseWriter, r *http.Request) {
+ p.peersMu.Lock()
+ defer p.peersMu.Unlock()
+
+ p.peers = make(map[types.ValidatorID]string)
+ log.Printf("Peer server has been reset.")
+ }
+
+ joinHandler := func(w http.ResponseWriter, r *http.Request) {
+ idString := r.Header.Get("ID")
+ portString := r.Header.Get("PORT")
+
+ id := types.ValidatorID{}
+ id.UnmarshalText([]byte(idString))
+
+ p.peersMu.Lock()
+ defer p.peersMu.Unlock()
+
+ host, _, _ := net.SplitHostPort(r.RemoteAddr)
+ p.peers[id] = fmt.Sprintf("%s:%s", host, portString)
+ log.Printf("Peer %s joined from %s", id, p.peers[id])
+ }
+
+ peersHandler := func(w http.ResponseWriter, r *http.Request) {
+ p.peersMu.Lock()
+ defer p.peersMu.Unlock()
+
+ if len(p.peers) != cfg.Validator.Num {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ jsonText, err := json.Marshal(p.peers)
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ w.Write(jsonText)
+ }
+
+ infoHandler := func(w http.ResponseWriter, r *http.Request) {
+ p.peersMu.Lock()
+ defer p.peersMu.Unlock()
+
+ jsonText, err := json.Marshal(p.peers)
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ w.Write(jsonText)
+ }
+
+ http.HandleFunc("/reset", resetHandler)
+ http.HandleFunc("/join", joinHandler)
+ http.HandleFunc("/peers", peersHandler)
+ http.HandleFunc("/info", infoHandler)
+
+ addr := fmt.Sprintf("0.0.0.0:%d", peerPort)
+ log.Printf("Peer server started at %s", addr)
+
+ http.ListenAndServe(addr, nil)
+}
diff --git a/simulation/simulation.go b/simulation/simulation.go
index 8708293..a219d61 100644
--- a/simulation/simulation.go
+++ b/simulation/simulation.go
@@ -21,37 +21,57 @@ import (
"fmt"
"github.com/dexon-foundation/dexon-consensus-core/common"
+ "github.com/dexon-foundation/dexon-consensus-core/core"
"github.com/dexon-foundation/dexon-consensus-core/core/types"
"github.com/dexon-foundation/dexon-consensus-core/simulation/config"
)
// Run starts the simulation.
func Run(configPath string) {
- config, err := config.Read(configPath)
+ cfg, err := config.Read(configPath)
if err != nil {
panic(err)
}
- networkModel := &NormalNetwork{
- Sigma: config.Networking.Sigma,
- Mean: config.Networking.Mean,
- LossRateValue: config.Networking.LossRateValue,
- }
- network := NewNetwork(networkModel)
+ networkType := cfg.Networking.Type
- var vs []*Validator
- for i := 0; i < config.Validator.Num; i++ {
- id := types.ValidatorID(common.NewRandomHash())
- vs = append(vs, NewValidator(id, config.Validator, network, nil))
- }
+ if networkType == config.NetworkTypeFake ||
+ networkType == config.NetworkTypeTCPLocal {
- for i := 0; i < config.Validator.Num; i++ {
- vs[i].Bootstrap(vs)
- }
+ var vs []*Validator
+ var network core.Network
+
+ if networkType == config.NetworkTypeFake {
+ networkModel := &NormalNetwork{
+ Sigma: cfg.Networking.Sigma,
+ Mean: cfg.Networking.Mean,
+ LossRateValue: cfg.Networking.LossRateValue,
+ }
+ network = NewFakeNetwork(networkModel)
+
+ for i := 0; i < cfg.Validator.Num; i++ {
+ id := types.ValidatorID{Hash: common.NewRandomHash()}
+ vs = append(vs, NewValidator(id, cfg.Validator, network, nil))
+ }
+ } else if networkType == config.NetworkTypeTCPLocal {
+ for i := 0; i < cfg.Validator.Num; i++ {
+ id := types.ValidatorID{Hash: common.NewRandomHash()}
+ network := NewTCPNetwork(true, cfg.Networking.PeerServer)
+ go network.Start()
+ vs = append(vs, NewValidator(id, cfg.Validator, network, nil))
+ }
+ }
- for i := 0; i < config.Validator.Num; i++ {
- fmt.Printf("Validator %d: %s\n", i, vs[i].ID)
- go vs[i].Run()
+ for i := 0; i < cfg.Validator.Num; i++ {
+ fmt.Printf("Validator %d: %s\n", i, vs[i].ID)
+ go vs[i].Run()
+ }
+ } else if networkType == config.NetworkTypeTCP {
+ id := types.ValidatorID{Hash: common.NewRandomHash()}
+ network := NewTCPNetwork(false, cfg.Networking.PeerServer)
+ go network.Start()
+ v := NewValidator(id, cfg.Validator, network, nil)
+ go v.Run()
}
select {}
diff --git a/simulation/tcp-network.go b/simulation/tcp-network.go
new file mode 100644
index 0000000..7640480
--- /dev/null
+++ b/simulation/tcp-network.go
@@ -0,0 +1,233 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package simulation
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "math/rand"
+ "net/http"
+ "runtime"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus-core/core"
+ "github.com/dexon-foundation/dexon-consensus-core/core/types"
+)
+
+// TCPNetwork implements the core.Network interface.
+type TCPNetwork struct {
+ local bool
+ port int
+ endpoint core.Endpoint
+
+ peerServer string
+ endpointMutex sync.RWMutex
+ endpoints map[types.ValidatorID]string
+ recieveChan chan interface{}
+}
+
+// NewTCPNetwork returns pointer to a new Network instance.
+func NewTCPNetwork(local bool, peerServer string) *TCPNetwork {
+ port := 1024 + rand.Int()%1024
+ if !local {
+ port = peerPort
+ }
+ return &TCPNetwork{
+ local: local,
+ peerServer: peerServer,
+ port: port,
+ endpoints: make(map[types.ValidatorID]string),
+ recieveChan: make(chan interface{}, msgBufferSize),
+ }
+}
+
+// Start starts the http server for accepting message.
+func (n *TCPNetwork) Start() {
+ addr := fmt.Sprintf("0.0.0.0:%d", n.port)
+ server := &http.Server{
+ Addr: addr,
+ Handler: n,
+ }
+ fmt.Printf("Validator started at %s\n", addr)
+ server.ListenAndServe()
+}
+
+// NumPeers returns the number of peers in the network.
+func (n *TCPNetwork) NumPeers() int {
+ n.endpointMutex.Lock()
+ defer n.endpointMutex.Unlock()
+
+ return len(n.endpoints)
+}
+
+// ServerHTTP implements the http.Handler interface.
+func (n *TCPNetwork) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ defer r.Body.Close()
+ body, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ m := struct {
+ Type string `json:"type"`
+ Payload json.RawMessage `json:"payload"`
+ }{}
+ if err := json.Unmarshal(body, &m); err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ switch m.Type {
+ case "block":
+ block := &types.Block{}
+ if err := json.Unmarshal(m.Payload, block); err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ n.recieveChan <- block
+ default:
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+}
+
+// Join allow a client to join the network. It reutnrs a interface{} channel for
+// the client to recieve information.
+func (n *TCPNetwork) Join(endpoint core.Endpoint) chan interface{} {
+ n.endpointMutex.Lock()
+ defer n.endpointMutex.Unlock()
+
+ n.endpoint = endpoint
+
+ joinURL := fmt.Sprintf("http://%s:%d/join", n.peerServer, peerPort)
+ peersURL := fmt.Sprintf("http://%s:%d/peers", n.peerServer, peerPort)
+
+ client := &http.Client{Timeout: 5 * time.Second}
+
+ // Join the peer list.
+ for {
+ time.Sleep(time.Second)
+
+ req, err := http.NewRequest(http.MethodGet, joinURL, nil)
+ if err != nil {
+ continue
+ }
+ req.Header.Add("ID", endpoint.GetID().String())
+ req.Header.Add("PORT", fmt.Sprintf("%d", n.port))
+
+ resp, err := client.Do(req)
+ if err == nil && resp.StatusCode == http.StatusOK {
+ break
+ }
+ }
+
+ var peerList map[types.ValidatorID]string
+
+ // Wait for the server to collect all validators and return a list.
+ for {
+ time.Sleep(time.Second)
+
+ req, err := http.NewRequest(http.MethodGet, peersURL, nil)
+ if err != nil {
+ fmt.Println(err)
+ continue
+ }
+ req.Header.Add("ID", endpoint.GetID().String())
+
+ resp, err := client.Do(req)
+ if err != nil || resp.StatusCode != http.StatusOK {
+ continue
+ }
+
+ defer resp.Body.Close()
+ body, err := ioutil.ReadAll(resp.Body)
+
+ if err := json.Unmarshal(body, &peerList); err != nil {
+ fmt.Printf("error: %v", err)
+ continue
+ }
+ break
+ }
+
+ for key, val := range peerList {
+ n.endpoints[key] = val
+ }
+ return n.recieveChan
+}
+
+// Send sends a msg to another client.
+func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) {
+ clientAddr, exists := n.endpoints[destID]
+ if !exists {
+ return
+ }
+
+ message := struct {
+ Type string `json:"type"`
+ Payload interface{} `json:"payload"`
+ }{}
+
+ switch v := msg.(type) {
+ case *types.Block:
+ message.Type = "block"
+ message.Payload = v
+ default:
+ fmt.Println("error: invalid message type")
+ return
+ }
+
+ messageJSON, err := json.Marshal(message)
+ if err != nil {
+ fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, message)
+ return
+ }
+
+ msgURL := fmt.Sprintf("http://%s/msg", clientAddr)
+
+ go func() {
+ retries := 3
+ client := &http.Client{Timeout: 5 * time.Second}
+
+ for i := 0; i < retries; i++ {
+ req, err := http.NewRequest(
+ http.MethodPost, msgURL, strings.NewReader(string(messageJSON)))
+ if err != nil {
+ continue
+ }
+ req.Header.Add("ID", n.endpoint.GetID().String())
+
+ resp, err := client.Do(req)
+ if err == nil && resp.StatusCode == http.StatusOK {
+ runtime.Goexit()
+ }
+ time.Sleep(1 * time.Second)
+ }
+ fmt.Printf("failed to send message: %v\n", msg)
+ }()
+}
+
+// BroadcastBlock broadcast blocks into the network.
+func (n *TCPNetwork) BroadcastBlock(block *types.Block) {
+ for endpoint := range n.endpoints {
+ n.Send(endpoint, block.Clone())
+ }
+}
diff --git a/simulation/validator.go b/simulation/validator.go
index d102d0d..c98efd2 100644
--- a/simulation/validator.go
+++ b/simulation/validator.go
@@ -52,19 +52,8 @@ func NewValidator(
config config.Validator,
network core.Network,
db *leveldb.DB) *Validator {
-
- hash := common.NewRandomHash()
- genesis := &types.Block{
- ProposerID: id,
- ParentHash: hash,
- Hash: hash,
- Height: 0,
- Acks: map[common.Hash]struct{}{},
- }
-
app := NewSimApp(id)
- lattice := core.NewBlockLattice(blockdb.NewMemBackedBlockDB(), network, app)
-
+ lattice := core.NewBlockLattice(blockdb.NewMemBackedBlockDB(), app)
return &Validator{
ID: id,
config: config,
@@ -72,8 +61,6 @@ func NewValidator(
app: app,
db: db,
lattice: lattice,
- genesis: genesis,
- current: genesis,
}
}
@@ -82,14 +69,6 @@ func (v *Validator) GetID() types.ValidatorID {
return v.ID
}
-// Bootstrap bootstraps a validator.
-func (v *Validator) Bootstrap(vs []*Validator) {
- for _, x := range vs {
- v.lattice.AddValidator(x.ID, x.genesis)
- }
- v.lattice.SetOwner(v.ID)
-}
-
// Run starts the validator.
func (v *Validator) Run() {
v.msgChannel = v.network.Join(v)
@@ -118,6 +97,35 @@ func (v *Validator) MsgServer() {
// BlockProposer propose blocks to be send to the DEXON network.
func (v *Validator) BlockProposer() {
+ // Wait until all peer joined the network.
+ for v.network.NumPeers() != v.config.Num {
+ time.Sleep(time.Second)
+ }
+
+ if v.genesis == nil {
+ hash := common.NewRandomHash()
+ b := &types.Block{
+ ProposerID: v.ID,
+ ParentHash: hash,
+ Hash: hash,
+ Height: 0,
+ Acks: map[common.Hash]struct{}{},
+ }
+ v.genesis = b
+ v.current = b
+
+ v.lattice.AddValidator(v.ID, b)
+ v.lattice.SetOwner(v.ID)
+
+ v.lattice.PrepareBlock(b)
+ v.network.BroadcastBlock(b)
+ }
+
+ // Wait until all peer knows each other.
+ for len(v.lattice.ValidatorSet) != v.config.Num {
+ time.Sleep(time.Second)
+ }
+
model := &NormalNetwork{
Sigma: v.config.ProposeIntervalSigma,
Mean: v.config.ProposeIntervalMean,
@@ -134,6 +142,7 @@ func (v *Validator) BlockProposer() {
Acks: map[common.Hash]struct{}{},
}
v.current = block
- v.lattice.ProposeBlock(block)
+ v.lattice.PrepareBlock(block)
+ v.network.BroadcastBlock(block)
}
}