// Copyright 2018 The dexon-consensus-core Authors
// This file is part of the dexon-consensus-core library.
//
// The dexon-consensus-core library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus-core library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus-core library. If not, see
// <http://www.gnu.org/licenses/>.
package simulation
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"runtime"
"strings"
"sync"
"time"
"github.com/dexon-foundation/dexon-consensus-core/core/types"
)
const retries = 5
// TCPNetwork implements the Network interface.
type TCPNetwork struct {
local bool
port int
endpoint Endpoint
client *http.Client
peerServer string
endpointMutex sync.RWMutex
endpoints map[types.ValidatorID]string
recieveChan chan interface{}
}
// NewTCPNetwork returns pointer to a new Network instance.
func NewTCPNetwork(local bool, peerServer string) *TCPNetwork {
port := 1024 + rand.Int()%1024
if !local {
port = peerPort
}
pServer := peerServer
if local {
pServer = "127.0.0.1"
}
// Force connection reuse.
tr := &http.Transport{
MaxIdleConnsPerHost: 1024,
TLSHandshakeTimeout: 0 * time.Second,
}
client := &http.Client{
Transport: tr,
Timeout: 5 * time.Second,
}
return &TCPNetwork{
local: local,
peerServer: pServer,
port: port,
client: client,
endpoints: make(map[types.ValidatorID]string),
recieveChan: make(chan interface{}, msgBufferSize),
}
}
// Start starts the http server for accepting message.
func (n *TCPNetwork) Start() {
addr := fmt.Sprintf("0.0.0.0:%d", n.port)
server := &http.Server{
Addr: addr,
Handler: n,
}
fmt.Printf("Validator started at %s\n", addr)
server.ListenAndServe()
}
// NumPeers returns the number of peers in the network.
func (n *TCPNetwork) NumPeers() int {
n.endpointMutex.Lock()
defer n.endpointMutex.Unlock()
return len(n.endpoints)
}
// ServerHTTP implements the http.Handler interface.
func (n *TCPNetwork) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
m := struct {
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
}{}
if err := json.Unmarshal(body, &m); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
switch m.Type {
case "block":
block := &types.Block{}
if err := json.Unmarshal(m.Payload, block); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
n.recieveChan <- block
default:
w.WriteHeader(http.StatusBadRequest)
return
}
}
// Join allow a client to join the network. It reutnrs a interface{} channel for
// the client to recieve information.
func (n *TCPNetwork) Join(endpoint Endpoint) chan interface{} {
n.endpointMutex.Lock()
defer n.endpointMutex.Unlock()
n.endpoint = endpoint
joinURL := fmt.Sprintf("http://%s:%d/join", n.peerServer, peerPort)
peersURL := fmt.Sprintf("http://%s:%d/peers", n.peerServer, peerPort)
// Join the peer list.
for {
time.Sleep(time.Second)
req, err := http.NewRequest(http.MethodGet, joinURL, nil)
if err != nil {
continue
}
req.Header.Add("ID", endpoint.GetID().String())
req.Header.Add("PORT", fmt.Sprintf("%d", n.port))
resp, err := n.client.Do(req)
if err == nil {
defer resp.Body.Close()
io.Copy(ioutil.Discard, resp.Body)
}
if err == nil && resp.StatusCode == http.StatusOK {
break
}
}
var peerList map[types.ValidatorID]string
// Wait for the server to collect all validators and return a list.
for {
time.Sleep(time.Second)
req, err := http.NewRequest(http.MethodGet, peersURL, nil)
if err != nil {
fmt.Println(err)
continue
}
resp, err := n.client.Do(req)
if err != nil || resp.StatusCode != http.StatusOK {
continue
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err := json.Unmarshal(body, &peerList); err != nil {
fmt.Printf("error: %v", err)
continue
}
break
}
for key, val := range peerList {
n.endpoints[key] = val
}
return n.recieveChan
}
// Send sends a msg to another client.
func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) {
clientAddr, exists := n.endpoints[destID]
if !exists {
return
}
message := struct {
Type string `json:"type"`
Payload interface{} `json:"payload"`
}{}
switch v := msg.(type) {
case *types.Block:
message.Type = "block"
message.Payload = v
default:
fmt.Println("error: invalid message type")
return
}
messageJSON, err := json.Marshal(message)
if err != nil {
fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, message)
return
}
msgURL := fmt.Sprintf("http://%s/msg", clientAddr)
go func() {
for i := 0; i < retries; i++ {
req, err := http.NewRequest(
http.MethodPost, msgURL, strings.NewReader(string(messageJSON)))
if err != nil {
continue
}
req.Header.Add("ID", n.endpoint.GetID().String())
resp, err := n.client.Do(req)
if err == nil {
defer resp.Body.Close()
io.Copy(ioutil.Discard, resp.Body)
}
if err == nil && resp.StatusCode == http.StatusOK {
runtime.Goexit()
}
fmt.Printf("failed to submit message: %s\n", err)
time.Sleep(1 * time.Second)
}
fmt.Printf("failed to send message: %v\n", msg)
}()
}
// BroadcastBlock broadcast blocks into the network.
func (n *TCPNetwork) BroadcastBlock(block *types.Block) {
for endpoint := range n.endpoints {
if endpoint == block.ProposerID {
continue
}
n.Send(endpoint, block.Clone())
}
}
// DeliverBlocks sends blocks to peerServer.
func (n *TCPNetwork) DeliverBlocks(blocks BlockList) {
messageJSON, err := json.Marshal(blocks)
if err != nil {
fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, blocks)
return
}
msgURL := fmt.Sprintf("http://%s:%d/delivery", n.peerServer, peerPort)
go func() {
for i := 0; i < retries; i++ {
req, err := http.NewRequest(
http.MethodPost, msgURL, strings.NewReader(string(messageJSON)))
if err != nil {
continue
}
req.Header.Add("ID", n.endpoint.GetID().String())
resp, err := n.client.Do(req)
if err == nil {
defer resp.Body.Close()
io.Copy(ioutil.Discard, resp.Body)
}
if err == nil && resp.StatusCode == http.StatusOK {
runtime.Goexit()
}
time.Sleep(1 * time.Second)
}
fmt.Printf("failed to send message: %v\n", blocks)
}()
}
// NotifyServer sends message to peerServer
func (n *TCPNetwork) NotifyServer(msg Message) {
messageJSON, err := json.Marshal(msg)
if err != nil {
fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, msg)
return
}
msgURL := fmt.Sprintf("http://%s:%d/message", n.peerServer, peerPort)
for i := 0; i < retries; i++ {
req, err := http.NewRequest(
http.MethodPost, msgURL, strings.NewReader(string(messageJSON)))
if err != nil {
continue
}
req.Header.Add("ID", n.endpoint.GetID().String())
resp, err := n.client.Do(req)
if err == nil {
defer resp.Body.Close()
io.Copy(ioutil.Discard, resp.Body)
}
if err == nil && resp.StatusCode == http.StatusOK {
return
}
time.Sleep(1 * time.Second)
}
fmt.Printf("failed to send message: %v\n", msg)
return
}
// GetServerInfo retrieve the info message from peerServer.
func (n *TCPNetwork) GetServerInfo() InfoMessage {
infoMsg := InfoMessage{}
msgURL := fmt.Sprintf("http://%s:%d/info", n.peerServer, peerPort)
req, err := http.NewRequest(
http.MethodGet, msgURL, nil)
if err != nil {
fmt.Printf("error: %v\n", err)
}
resp, err := n.client.Do(req)
if err != nil {
fmt.Printf("error: %v\n", err)
return infoMsg
}
if resp.StatusCode != http.StatusOK {
fmt.Printf("error: %v\n", err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err := json.Unmarshal(body, &infoMsg); err != nil {
fmt.Printf("error: %v", err)
}
return infoMsg
}