aboutsummaryrefslogtreecommitdiffstats
path: root/simulation
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-09-11 14:56:47 +0800
committerGitHub <noreply@github.com>2018-09-11 14:56:47 +0800
commit292ad73ec08621fa9beef5f028860131fcbf9bd9 (patch)
tree47644eaad7757cd2e8798ae7fe361fa5d6e99060 /simulation
parent582a491aa0bcb784ac7b65ebbfb42139945ea703 (diff)
downloadtangerine-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar
tangerine-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar.gz
tangerine-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar.bz2
tangerine-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar.lz
tangerine-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar.xz
tangerine-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar.zst
tangerine-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.zip
simulation: integrate test.Transport (#99)
- Add marshaller for simulation by encoding/json - Implement peer server based on test.TranportServer - Remove network models, they are replaced with test.LatencyModel
Diffstat (limited to 'simulation')
-rw-r--r--simulation/app.go24
-rw-r--r--simulation/config/config.go1
-rw-r--r--simulation/marshaller.go106
-rw-r--r--simulation/network-model.go85
-rw-r--r--simulation/network-model_test.go48
-rw-r--r--simulation/network.go174
-rw-r--r--simulation/peer-server.go336
-rw-r--r--simulation/simulation.go83
-rw-r--r--simulation/tcp-network.go457
-rw-r--r--simulation/validator.go102
-rw-r--r--simulation/verification.go2
11 files changed, 468 insertions, 950 deletions
diff --git a/simulation/app.go b/simulation/app.go
index 5a31273..c312204 100644
--- a/simulation/app.go
+++ b/simulation/app.go
@@ -32,7 +32,7 @@ type simApp struct {
ValidatorID types.ValidatorID
Outputs []*types.Block
Early bool
- Network PeerServerNetwork
+ netModule *network
DeliverID int
// blockSeen stores the time when block is delivered by Total Ordering.
blockSeen map[common.Hash]time.Time
@@ -43,10 +43,10 @@ type simApp struct {
}
// newSimApp returns point to a new instance of simApp.
-func newSimApp(id types.ValidatorID, Network PeerServerNetwork) *simApp {
+func newSimApp(id types.ValidatorID, netModule *network) *simApp {
return &simApp{
ValidatorID: id,
- Network: Network,
+ netModule: netModule,
DeliverID: 0,
blockSeen: make(map[common.Hash]time.Time),
unconfirmedBlocks: make(map[types.ValidatorID]common.Hashes),
@@ -120,7 +120,7 @@ func (a *simApp) TotalOrderingDeliver(blockHashes common.Hashes, early bool) {
confirmLatency := []time.Duration{}
- payload := []TimestampMessage{}
+ payload := []timestampMessage{}
for _, block := range blocks {
if block.ProposerID == a.ValidatorID {
confirmLatency = append(confirmLatency,
@@ -128,7 +128,7 @@ func (a *simApp) TotalOrderingDeliver(blockHashes common.Hashes, early bool) {
}
for hash := range block.Acks {
for _, blockHash := range a.getAckedBlocks(hash) {
- payload = append(payload, TimestampMessage{
+ payload = append(payload, timestampMessage{
BlockHash: blockHash,
Event: timestampAck,
Timestamp: now,
@@ -142,20 +142,20 @@ func (a *simApp) TotalOrderingDeliver(blockHashes common.Hashes, early bool) {
if err != nil {
fmt.Println(err)
} else {
- msg := Message{
+ msg := &message{
Type: blockTimestamp,
Payload: jsonPayload,
}
- a.Network.NotifyServer(msg)
+ a.netModule.report(msg)
}
}
- blockList := BlockList{
+ blockList := &BlockList{
ID: a.DeliverID,
BlockHash: blockHashes,
ConfirmLatency: confirmLatency,
}
- a.Network.DeliverBlocks(blockList)
+ a.netModule.report(blockList)
a.DeliverID++
for _, block := range blocks {
a.blockSeen[block.Hash] = now
@@ -171,7 +171,7 @@ func (a *simApp) DeliverBlock(blockHash common.Hash, timestamp time.Time) {
return
}
now := time.Now()
- payload := []TimestampMessage{
+ payload := []timestampMessage{
{
BlockHash: blockHash,
Event: blockSeen,
@@ -188,11 +188,11 @@ func (a *simApp) DeliverBlock(blockHash common.Hash, timestamp time.Time) {
fmt.Println(err)
return
}
- msg := Message{
+ msg := &message{
Type: blockTimestamp,
Payload: jsonPayload,
}
- a.Network.NotifyServer(msg)
+ a.netModule.report(msg)
}
// NotaryAckDeliver is called when a notary ack is created.
diff --git a/simulation/config/config.go b/simulation/config/config.go
index 2f03f85..f8f629b 100644
--- a/simulation/config/config.go
+++ b/simulation/config/config.go
@@ -31,6 +31,7 @@ type NetworkType string
const (
NetworkTypeTCP NetworkType = "tcp"
NetworkTypeTCPLocal NetworkType = "tcp-local"
+ NetworkTypeFake NetworkType = "fake"
)
// Consensus settings.
diff --git a/simulation/marshaller.go b/simulation/marshaller.go
new file mode 100644
index 0000000..88e2f6a
--- /dev/null
+++ b/simulation/marshaller.go
@@ -0,0 +1,106 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package simulation
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "github.com/dexon-foundation/dexon-consensus-core/core/types"
+)
+
+// jsonMarshaller implements test.Marshaller to marshal simulation related
+// messages.
+type jsonMarshaller struct{}
+
+// Unmarshal implements Unmarshal method of test.Marshaller interface.
+func (m *jsonMarshaller) Unmarshal(
+ msgType string, payload []byte) (msg interface{}, err error) {
+
+ switch msgType {
+ case "blocklist":
+ var blocks BlockList
+ if err = json.Unmarshal(payload, &blocks); err != nil {
+ break
+ }
+ msg = &blocks
+ case "message":
+ var m message
+ if err = json.Unmarshal(payload, &m); err != nil {
+ break
+ }
+ msg = &m
+ case "info-status":
+ var status infoStatus
+ if err = json.Unmarshal(payload, &status); err != nil {
+ break
+ }
+ msg = status
+ case "block":
+ block := &types.Block{}
+ if err = json.Unmarshal(payload, block); err != nil {
+ break
+ }
+ msg = block
+ case "notary-ack":
+ nAck := &types.NotaryAck{}
+ if err = json.Unmarshal(payload, nAck); err != nil {
+ break
+ }
+ msg = nAck
+ case "vote":
+ vote := &types.Vote{}
+ if err = json.Unmarshal(payload, vote); err != nil {
+ break
+ }
+ msg = vote
+ default:
+ err = fmt.Errorf("unrecognized message type: %v", msgType)
+ }
+ if err != nil {
+ return
+ }
+ return
+}
+
+// Marshal implements Marshal method of test.Marshaller interface.
+func (m *jsonMarshaller) Marshal(msg interface{}) (
+ msgType string, payload []byte, err error) {
+
+ switch msg.(type) {
+ case *BlockList:
+ msgType = "blocklist"
+ case *message:
+ msgType = "message"
+ case infoStatus:
+ msgType = "info-status"
+ case *types.Block:
+ msgType = "block"
+ case *types.NotaryAck:
+ msgType = "notary-ack"
+ case *types.Vote:
+ msgType = "vote"
+ default:
+ err = fmt.Errorf("unknwon message type: %v", msg)
+ }
+ if err != nil {
+ return
+ }
+ payload, err = json.Marshal(msg)
+ return
+}
diff --git a/simulation/network-model.go b/simulation/network-model.go
deleted file mode 100644
index 01704e7..0000000
--- a/simulation/network-model.go
+++ /dev/null
@@ -1,85 +0,0 @@
-// Copyright 2018 The dexon-consensus-core Authors
-// This file is part of the dexon-consensus-core library.
-//
-// The dexon-consensus-core library is free software: you can redistribute it
-// and/or modify it under the terms of the GNU Lesser General Public License as
-// published by the Free Software Foundation, either version 3 of the License,
-// or (at your option) any later version.
-//
-// The dexon-consensus-core library is distributed in the hope that it will be
-// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
-// General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the dexon-consensus-core library. If not, see
-// <http://www.gnu.org/licenses/>.
-
-package simulation
-
-import (
- "math/rand"
- "time"
-)
-
-// Model is the interface for define a given network environment.
-type Model interface {
- // LossRate returns the message lost ratio between [0, 1)
- LossRate() float64
-
- // Delay returns the send delay of the message. This function is called each
- // time before the message is sent, so one can return different number each
- // time.
- Delay() time.Duration
-}
-
-// LosslessNetwork is a lossless network model.
-type LosslessNetwork struct {
-}
-
-// LossRate returns lossrate for the model.
-func (l *LosslessNetwork) LossRate() float64 {
- return 0.0
-}
-
-// Delay returns the send delay of a given message.
-func (l *LosslessNetwork) Delay() time.Duration {
- return time.Duration(0)
-}
-
-// FixedLostNoDelayModel is a network with no delay and a fixed lost
-// ratio.
-type FixedLostNoDelayModel struct {
- LossRateValue float64
-}
-
-// LossRate returns lossrate for the model.
-func (f *FixedLostNoDelayModel) LossRate() float64 {
- return f.LossRateValue
-}
-
-// Delay returns the send delay of a given message.
-func (f *FixedLostNoDelayModel) Delay() time.Duration {
- return time.Duration(0)
-}
-
-// NormalNetwork is a model where it's delay is a normal distribution.
-type NormalNetwork struct {
- Sigma float64
- Mean float64
- LossRateValue float64
-}
-
-// LossRate returns lossrate for the model.
-func (n *NormalNetwork) LossRate() float64 {
- return n.LossRateValue
-}
-
-// Delay returns the send delay of a given message.
-func (n *NormalNetwork) Delay() time.Duration {
- delay := rand.NormFloat64()*n.Sigma + n.Mean
- if delay < 0 {
- delay = n.Sigma / 2
- }
- return time.Duration(delay) * time.Millisecond
-}
diff --git a/simulation/network-model_test.go b/simulation/network-model_test.go
deleted file mode 100644
index a9de462..0000000
--- a/simulation/network-model_test.go
+++ /dev/null
@@ -1,48 +0,0 @@
-// Copyright 2018 The dexon-consensus-core Authors
-// This file is part of the dexon-consensus-core library.
-//
-// The dexon-consensus-core library is free software: you can redistribute it
-// and/or modify it under the terms of the GNU Lesser General Public License as
-// published by the Free Software Foundation, either version 3 of the License,
-// or (at your option) any later version.
-//
-// The dexon-consensus-core library is distributed in the hope that it will be
-// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
-// General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the dexon-consensus-core library. If not, see
-// <http://www.gnu.org/licenses/>.
-
-package simulation
-
-import (
- "testing"
- "time"
-
- "github.com/stretchr/testify/suite"
-)
-
-type NetworkModelsTestSuite struct {
- suite.Suite
-}
-
-func (n *NetworkModelsTestSuite) SetupTest() {
-}
-
-func (n *NetworkModelsTestSuite) TearDownTest() {
-}
-
-// TestNormalNetwork make sure the Delay() or NormalNetwork does not
-// exceeds 200ms.
-func (n *NetworkModelsTestSuite) TestNormalNetwork() {
- m := NormalNetwork{}
- for i := 0; i < 1000; i++ {
- n.Require().True(m.Delay() < 200*time.Millisecond)
- }
-}
-
-func TestNetworkModels(t *testing.T) {
- suite.Run(t, new(NetworkModelsTestSuite))
-}
diff --git a/simulation/network.go b/simulation/network.go
index 791790c..90f3aec 100644
--- a/simulation/network.go
+++ b/simulation/network.go
@@ -18,12 +18,17 @@
package simulation
import (
+ "context"
"encoding/json"
+ "fmt"
+ "net"
+ "strconv"
"time"
"github.com/dexon-foundation/dexon-consensus-core/common"
- "github.com/dexon-foundation/dexon-consensus-core/core"
+ "github.com/dexon-foundation/dexon-consensus-core/core/test"
"github.com/dexon-foundation/dexon-consensus-core/core/types"
+ "github.com/dexon-foundation/dexon-consensus-core/simulation/config"
)
type messageType string
@@ -33,8 +38,8 @@ const (
blockTimestamp messageType = "blockTimestamps"
)
-// Message is a struct for peer sending message to server.
-type Message struct {
+// message is a struct for peer sending message to server.
+type message struct {
Type messageType `json:"type"`
Payload json.RawMessage `json:"payload"`
}
@@ -49,7 +54,7 @@ const (
// TimestampMessage is a struct for peer sending consensus timestamp information
// to server.
-type TimestampMessage struct {
+type timestampMessage struct {
BlockHash common.Hash `json:"hash"`
Event timestampEvent `json:"event"`
Timestamp time.Time `json:"timestamp"`
@@ -63,30 +68,155 @@ const (
statusShutdown infoStatus = "shutdown"
)
-// InfoMessage is a struct used by peerServer's /info.
-type InfoMessage struct {
+// infoMessage is a struct used by peerServer's /info.
+type infoMessage struct {
Status infoStatus `json:"status"`
Peers map[types.ValidatorID]string `json:"peers"`
}
-// Endpoint is the interface for a client network endpoint.
-type Endpoint interface {
- GetID() types.ValidatorID
+// network implements core.Network interface and other methods for simulation
+// based on test.TransportClient.
+type network struct {
+ cfg config.Networking
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ trans test.TransportClient
+ fromTransport <-chan *test.TransportEnvelope
+ toConsensus chan interface{}
+ toValidator chan interface{}
}
-// Network is the interface for network related functions.
-type Network interface {
- PeerServerNetwork
- core.Network
- Start()
- NumPeers() int
- Join(endpoint Endpoint)
- Endpoints() types.ValidatorIDs
+// newNetwork setup network stuffs for validators, which provides an
+// implementation of core.Network based on test.TransportClient.
+func newNetwork(vID types.ValidatorID, cfg config.Networking) (n *network) {
+ // Construct latency model.
+ latency := &test.NormalLatencyModel{
+ Mean: cfg.Mean,
+ Sigma: cfg.Sigma,
+ }
+ // Construct basic network instance.
+ n = &network{
+ cfg: cfg,
+ toValidator: make(chan interface{}, 1000),
+ toConsensus: make(chan interface{}, 1000),
+ }
+ n.ctx, n.ctxCancel = context.WithCancel(context.Background())
+ // Construct transport layer.
+ switch cfg.Type {
+ case config.NetworkTypeTCPLocal:
+ n.trans = test.NewTCPTransportClient(
+ vID, latency, &jsonMarshaller{}, true)
+ case config.NetworkTypeTCP:
+ n.trans = test.NewTCPTransportClient(
+ vID, latency, &jsonMarshaller{}, false)
+ case config.NetworkTypeFake:
+ n.trans = test.NewFakeTransportClient(vID, latency)
+ default:
+ panic(fmt.Errorf("unknown network type: %v", cfg.Type))
+ }
+ return
}
-// PeerServerNetwork is the interface for peerServer network related functions
-type PeerServerNetwork interface {
- DeliverBlocks(blocks BlockList)
- NotifyServer(msg Message)
- GetServerInfo() InfoMessage
+// BroadcastVote implements core.Network interface.
+func (n *network) BroadcastVote(vote *types.Vote) {
+ if err := n.trans.Broadcast(vote); err != nil {
+ panic(err)
+ }
+}
+
+// BroadcastBlock implements core.Network interface.
+func (n *network) BroadcastBlock(block *types.Block) {
+ if err := n.trans.Broadcast(block); err != nil {
+ panic(err)
+ }
+}
+
+// BroadcastNotaryAck implements core.Network interface.
+func (n *network) BroadcastNotaryAck(notaryAck *types.NotaryAck) {
+ if err := n.trans.Broadcast(notaryAck); err != nil {
+ panic(err)
+ }
+}
+
+// ReceiveChan implements core.Network interface.
+func (n *network) ReceiveChan() <-chan interface{} {
+ return n.toConsensus
+}
+
+// receiveChanForValidator returns a channel for validators' specific
+// messages.
+func (n *network) receiveChanForValidator() <-chan interface{} {
+ return n.toValidator
+}
+
+// setup transport layer.
+func (n *network) setup(serverEndpoint interface{}) (err error) {
+ // Join the p2p network.
+ switch n.cfg.Type {
+ case config.NetworkTypeTCP, config.NetworkTypeTCPLocal:
+ addr := net.JoinHostPort(n.cfg.PeerServer, strconv.Itoa(peerPort))
+ n.fromTransport, err = n.trans.Join(addr)
+ case config.NetworkTypeFake:
+ n.fromTransport, err = n.trans.Join(serverEndpoint)
+ default:
+ err = fmt.Errorf("unknown network type: %v", n.cfg.Type)
+ }
+ if err != nil {
+ return
+ }
+ return
+}
+
+// run the main loop.
+func (n *network) run() {
+ // The dispatcher declararion:
+ // to consensus or validator, that's the question.
+ disp := func(e *test.TransportEnvelope) {
+ switch e.Msg.(type) {
+ case *types.Block, *types.Vote, *types.NotaryAck:
+ n.toConsensus <- e.Msg
+ default:
+ n.toValidator <- e.Msg
+ }
+ }
+MainLoop:
+ for {
+ select {
+ case <-n.ctx.Done():
+ break MainLoop
+ default:
+ }
+ select {
+ case <-n.ctx.Done():
+ break MainLoop
+ case e, ok := <-n.fromTransport:
+ if !ok {
+ break MainLoop
+ }
+ disp(e)
+ }
+ }
+}
+
+// Close stop the network.
+func (n *network) Close() (err error) {
+ n.ctxCancel()
+ close(n.toConsensus)
+ n.toConsensus = nil
+ close(n.toValidator)
+ n.toValidator = nil
+ if err = n.trans.Close(); err != nil {
+ return
+ }
+ return
+}
+
+// report exports 'Report' method of test.TransportClient.
+func (n *network) report(msg interface{}) error {
+ return n.trans.Report(msg)
+}
+
+// peers exports 'Peers' method of test.Transport.
+func (n *network) peers() map[types.ValidatorID]struct{} {
+ return n.trans.Peers()
}
diff --git a/simulation/peer-server.go b/simulation/peer-server.go
index 6567639..5b43be4 100644
--- a/simulation/peer-server.go
+++ b/simulation/peer-server.go
@@ -21,275 +21,163 @@ import (
"context"
"encoding/json"
"fmt"
- "io/ioutil"
"log"
- "net"
- "net/http"
+ "reflect"
"sync"
- "time"
+ "github.com/dexon-foundation/dexon-consensus-core/core/test"
"github.com/dexon-foundation/dexon-consensus-core/core/types"
"github.com/dexon-foundation/dexon-consensus-core/simulation/config"
)
-// PeerServer is the main object for maintaining peer list.
+// PeerServer is the main object to collect results and monitor simulation.
type PeerServer struct {
- peers map[types.ValidatorID]string
- peersMu sync.Mutex
+ peers map[types.ValidatorID]struct{}
+ msgChannel chan *test.TransportEnvelope
+ trans test.TransportServer
peerTotalOrder PeerTotalOrder
peerTotalOrderMu sync.Mutex
verifiedLen uint64
+ cfg *config.Config
+ ctx context.Context
+ ctxCancel context.CancelFunc
}
-// NewPeerServer returns a new peer server.
+// NewPeerServer returns a new PeerServer instance.
func NewPeerServer() *PeerServer {
+ ctx, cancel := context.WithCancel(context.Background())
return &PeerServer{
- peers: make(map[types.ValidatorID]string),
+ peers: make(map[types.ValidatorID]struct{}),
peerTotalOrder: make(PeerTotalOrder),
+ ctx: ctx,
+ ctxCancel: cancel,
}
}
// isValidator checks if vID is in p.peers. If peer server restarts but
// validators are not, it will cause panic if validators send message.
func (p *PeerServer) isValidator(vID types.ValidatorID) bool {
- p.peersMu.Lock()
- defer p.peersMu.Unlock()
_, exist := p.peers[vID]
return exist
}
-// Run starts the peer server.
-func (p *PeerServer) Run(configPath string) {
- cfg, err := config.Read(configPath)
- if err != nil {
- panic(err)
- }
-
- resetHandler := func(w http.ResponseWriter, r *http.Request) {
- p.peersMu.Lock()
- defer p.peersMu.Unlock()
-
- p.peers = make(map[types.ValidatorID]string)
- log.Printf("Peer server has been reset.")
- }
-
- joinHandler := func(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
-
- idString := r.Header.Get("ID")
- portString := r.Header.Get("PORT")
-
- id := types.ValidatorID{}
- id.UnmarshalText([]byte(idString))
-
- p.peersMu.Lock()
- defer p.peersMu.Unlock()
-
- host, _, _ := net.SplitHostPort(r.RemoteAddr)
- p.peers[id] = net.JoinHostPort(host, portString)
- p.peerTotalOrder[id] = NewTotalOrderResult(id)
- log.Printf("Peer %s joined from %s", id, p.peers[id])
-
- if len(p.peers) == cfg.Validator.Num {
- log.Println("All peers connected.")
- }
- w.WriteHeader(http.StatusOK)
- }
-
- peersHandler := func(w http.ResponseWriter, r *http.Request) {
- p.peersMu.Lock()
- defer p.peersMu.Unlock()
- defer r.Body.Close()
-
- if len(p.peers) != cfg.Validator.Num {
- w.WriteHeader(http.StatusNotFound)
- return
- }
-
- jsonText, err := json.Marshal(p.peers)
- if err != nil {
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
+// handleBlockList is the handler for messages with BlockList as payload.
+func (p *PeerServer) handleBlockList(id types.ValidatorID, blocks *BlockList) {
+ p.peerTotalOrderMu.Lock()
+ defer p.peerTotalOrderMu.Unlock()
- w.WriteHeader(http.StatusOK)
- w.Header().Set("Content-Type", "application/json")
- w.Write(jsonText)
+ readyForVerify := p.peerTotalOrder[id].PushBlocks(*blocks)
+ if !readyForVerify {
+ return
}
-
- infoHandler := func(w http.ResponseWriter, r *http.Request) {
- p.peersMu.Lock()
- defer p.peersMu.Unlock()
- defer r.Body.Close()
-
- msg := InfoMessage{
- Status: statusNormal,
- Peers: p.peers,
- }
-
- if len(p.peers) < cfg.Validator.Num {
- msg.Status = statusInit
- }
-
- // Determine msg.status.
- if p.verifiedLen >= cfg.Validator.MaxBlock {
- msg.Status = statusShutdown
- }
-
- jsonText, err := json.Marshal(msg)
- if err != nil {
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
-
- w.WriteHeader(http.StatusOK)
- w.Header().Set("Content-Type", "application/json")
- w.Write(jsonText)
- }
-
- deliveryHandler := func(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
-
- idString := r.Header.Get("ID")
- body, err := ioutil.ReadAll(r.Body)
- if err != nil {
- w.WriteHeader(http.StatusBadRequest)
- return
- }
-
- m := BlockList{}
- if err := json.Unmarshal(body, &m); err != nil {
- w.WriteHeader(http.StatusBadRequest)
- return
- }
-
- id := types.ValidatorID{}
- if err := id.UnmarshalText([]byte(idString)); err != nil {
- w.WriteHeader(http.StatusBadRequest)
- return
- }
-
- if !p.isValidator(id) {
- w.WriteHeader(http.StatusForbidden)
- return
- }
-
- w.WriteHeader(http.StatusOK)
-
+ // Verify the total order result.
+ go func(id types.ValidatorID) {
p.peerTotalOrderMu.Lock()
defer p.peerTotalOrderMu.Unlock()
- readyForVerify := p.peerTotalOrder[id].PushBlocks(m)
- if !readyForVerify {
- return
+ var correct bool
+ var length int
+ p.peerTotalOrder, correct, length = VerifyTotalOrder(id, p.peerTotalOrder)
+ if !correct {
+ log.Printf("The result of Total Ordering Algorithm has error.\n")
}
-
- // Verify the total order result.
- go func(id types.ValidatorID) {
- p.peerTotalOrderMu.Lock()
- defer p.peerTotalOrderMu.Unlock()
-
- var correct bool
- var length int
- p.peerTotalOrder, correct, length = VerifyTotalOrder(id, p.peerTotalOrder)
- if !correct {
- log.Printf("The result of Total Ordering Algorithm has error.\n")
+ p.verifiedLen += uint64(length)
+ if p.verifiedLen >= p.cfg.Validator.MaxBlock {
+ if err := p.trans.Broadcast(statusShutdown); err != nil {
+ panic(err)
}
- p.verifiedLen += uint64(length)
- }(id)
- }
-
- stopServer := make(chan struct{})
-
- messageHandler := func(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
-
- idString := r.Header.Get("ID")
- id := types.ValidatorID{}
- id.UnmarshalText([]byte(idString))
-
- if !p.isValidator(id) {
- w.WriteHeader(http.StatusForbidden)
- return
}
+ }(id)
+}
- body, err := ioutil.ReadAll(r.Body)
- if err != nil {
- w.WriteHeader(http.StatusBadRequest)
- return
+// handleMessage is the handler for messages with Message as payload.
+func (p *PeerServer) handleMessage(id types.ValidatorID, m *message) {
+ switch m.Type {
+ case shutdownAck:
+ delete(p.peers, id)
+ log.Printf("%v shutdown, %d remains.\n", id, len(p.peers))
+ if len(p.peers) == 0 {
+ p.ctxCancel()
+ }
+ case blockTimestamp:
+ msgs := []timestampMessage{}
+ if err := json.Unmarshal(m.Payload, &msgs); err != nil {
+ panic(err)
+ }
+ for _, msg := range msgs {
+ if ok := p.peerTotalOrder[id].PushTimestamp(msg); !ok {
+ panic(fmt.Errorf("unable to push timestamp: %v", m))
+ }
}
+ default:
+ panic(fmt.Errorf("unknown simulation message type: %v", m))
+ }
+}
- m := Message{}
- if err := json.Unmarshal(body, &m); err != nil {
- w.WriteHeader(http.StatusBadRequest)
+func (p *PeerServer) mainLoop() {
+ for {
+ select {
+ case <-p.ctx.Done():
return
+ default:
}
-
- switch m.Type {
- case shutdownAck:
- func() {
- p.peersMu.Lock()
- defer p.peersMu.Unlock()
-
- delete(p.peers, id)
- log.Printf("%v shutdown, %d remains.\n", id, len(p.peers))
- if len(p.peers) == 0 {
- stopServer <- struct{}{}
- }
- }()
- break
- case blockTimestamp:
- msgs := []TimestampMessage{}
- if err := json.Unmarshal(m.Payload, &msgs); err != nil {
- w.WriteHeader(http.StatusBadRequest)
- return
+ select {
+ case <-p.ctx.Done():
+ return
+ case e := <-p.msgChannel:
+ if !p.isValidator(e.From) {
+ break
}
- for _, msg := range msgs {
- if ok := p.peerTotalOrder[id].PushTimestamp(msg); !ok {
- w.WriteHeader(http.StatusBadRequest)
- return
- }
+ // Handle messages based on their type.
+ switch val := e.Msg.(type) {
+ case *BlockList:
+ p.handleBlockList(e.From, val)
+ case *message:
+ p.handleMessage(e.From, val)
+ default:
+ panic(fmt.Errorf("unknown message: %v", reflect.TypeOf(e.Msg)))
}
- break
- default:
- w.WriteHeader(http.StatusBadRequest)
- return
}
- w.WriteHeader(http.StatusOK)
}
+}
- http.HandleFunc("/reset", resetHandler)
- http.HandleFunc("/join", joinHandler)
- http.HandleFunc("/peers", peersHandler)
- http.HandleFunc("/info", infoHandler)
- http.HandleFunc("/delivery", deliveryHandler)
- http.HandleFunc("/message", messageHandler)
-
- addr := fmt.Sprintf("0.0.0.0:%d", peerPort)
- log.Printf("Peer server started at %s", addr)
-
- server := &http.Server{Addr: addr}
-
- go func() {
- <-stopServer
-
- LogStatus(p.peerTotalOrder)
-
- log.Printf("Shutting down peerServer.\n")
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- if err := server.Shutdown(ctx); err != nil {
- log.Printf("Error shutting down peerServer: %v\n", err)
- }
- }()
-
- if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
- log.Fatalf("Error starting server %v\n", err)
+// Setup prepares simualtion.
+func (p *PeerServer) Setup(
+ cfg *config.Config) (serverEndpoint interface{}, err error) {
+ // Setup transport layer.
+ switch cfg.Networking.Type {
+ case "tcp", "tcp-local":
+ p.trans = test.NewTCPTransportServer(&jsonMarshaller{}, peerPort)
+ case "fake":
+ p.trans = test.NewFakeTransportServer()
+ default:
+ panic(fmt.Errorf("unknown network type: %v", cfg.Networking.Type))
+ }
+ p.msgChannel, err = p.trans.Host()
+ if err != nil {
+ return
}
+ p.cfg = cfg
+ serverEndpoint = p.msgChannel
+ return
+}
- // Do not exit when we are in TCP node, since k8s will restart the pod and
- // cause confusions.
- if cfg.Networking.Type == config.NetworkTypeTCP {
- select {}
+// Run the simulation.
+func (p *PeerServer) Run() {
+ if err := p.trans.WaitForPeers(p.cfg.Validator.Num); err != nil {
+ panic(err)
+ }
+ // Cache peers' info.
+ p.peers = p.trans.Peers()
+ // Initialize total order result cache.
+ for id := range p.peers {
+ p.peerTotalOrder[id] = NewTotalOrderResult(id)
+ }
+ // Block to handle incoming messages.
+ p.mainLoop()
+ // The simulation is done, clean up.
+ LogStatus(p.peerTotalOrder)
+ if err := p.trans.Close(); err != nil {
+ log.Printf("Error shutting down peerServer: %v\n", err)
}
}
diff --git a/simulation/simulation.go b/simulation/simulation.go
index 978107a..74a758d 100644
--- a/simulation/simulation.go
+++ b/simulation/simulation.go
@@ -18,7 +18,6 @@
package simulation
import (
- "fmt"
"sync"
"github.com/dexon-foundation/dexon-consensus-core/crypto/eth"
@@ -26,63 +25,51 @@ import (
)
// Run starts the simulation.
-func Run(configPath string, legacy bool) {
- cfg, err := config.Read(configPath)
- if err != nil {
- panic(err)
- }
-
- networkType := cfg.Networking.Type
-
+func Run(cfg *config.Config, legacy bool) {
var (
- vs []*Validator
- networkModel = &NormalNetwork{
- Sigma: cfg.Networking.Sigma,
- Mean: cfg.Networking.Mean,
- LossRateValue: cfg.Networking.LossRateValue,
- }
+ networkType = cfg.Networking.Type
+ server *PeerServer
+ wg sync.WaitGroup
+ err error
)
- if networkType == config.NetworkTypeTCPLocal {
- lock := sync.Mutex{}
- wg := sync.WaitGroup{}
- for i := 0; i < cfg.Validator.Num; i++ {
- prv, err := eth.NewPrivateKey()
- if err != nil {
- panic(err)
- }
- wg.Add(1)
- go func() {
- network := NewTCPNetwork(true, cfg.Networking.PeerServer, networkModel)
- network.Start()
- lock.Lock()
- defer lock.Unlock()
- vs = append(vs, NewValidator(prv, eth.SigToPub, cfg.Validator, network))
- wg.Done()
- }()
- }
- wg.Wait()
-
- for i := 0; i < cfg.Validator.Num; i++ {
- fmt.Printf("Validator %d: %s\n", i, vs[i].ID)
- go vs[i].Run(legacy)
- }
- } else if networkType == config.NetworkTypeTCP {
+ // init is a function to init a validator.
+ init := func(serverEndpoint interface{}) {
prv, err := eth.NewPrivateKey()
if err != nil {
panic(err)
}
- network := NewTCPNetwork(false, cfg.Networking.PeerServer, networkModel)
- network.Start()
- v := NewValidator(prv, eth.SigToPub, cfg.Validator, network)
- go v.Run(legacy)
- vs = append(vs, v)
+ v := newValidator(prv, eth.SigToPub, *cfg)
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ v.run(serverEndpoint, legacy)
+ }()
}
- for _, v := range vs {
- v.Wait()
- fmt.Printf("Validator %s is shutdown\n", v.GetID())
+ switch networkType {
+ case config.NetworkTypeTCP:
+ // Intialized a simulation on multiple remotely peers.
+ // The peer-server would be initialized with another command.
+ init(nil)
+ case config.NetworkTypeTCPLocal, config.NetworkTypeFake:
+ // Initialize a local simulation with a peer server.
+ var serverEndpoint interface{}
+ server = NewPeerServer()
+ if serverEndpoint, err = server.Setup(cfg); err != nil {
+ panic(err)
+ }
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ server.Run()
+ }()
+ // Initialize all validators.
+ for i := 0; i < cfg.Validator.Num; i++ {
+ init(serverEndpoint)
+ }
}
+ wg.Wait()
// Do not exit when we are in TCP node, since k8s will restart the pod and
// cause confusions.
diff --git a/simulation/tcp-network.go b/simulation/tcp-network.go
deleted file mode 100644
index 468baff..0000000
--- a/simulation/tcp-network.go
+++ /dev/null
@@ -1,457 +0,0 @@
-// Copyright 2018 The dexon-consensus-core Authors
-// This file is part of the dexon-consensus-core library.
-//
-// The dexon-consensus-core library is free software: you can redistribute it
-// and/or modify it under the terms of the GNU Lesser General Public License as
-// published by the Free Software Foundation, either version 3 of the License,
-// or (at your option) any later version.
-//
-// The dexon-consensus-core library is distributed in the hope that it will be
-// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
-// General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the dexon-consensus-core library. If not, see
-// <http://www.gnu.org/licenses/>.
-
-package simulation
-
-import (
- "context"
- "encoding/json"
- "fmt"
- "io"
- "io/ioutil"
- "math/rand"
- "net"
- "net/http"
- "os"
- "runtime"
- "strconv"
- "strings"
- "sync"
- "syscall"
- "time"
-
- "github.com/dexon-foundation/dexon-consensus-core/core/types"
-)
-
-const retries = 5
-
-// TCPNetwork implements the Network interface.
-type TCPNetwork struct {
- local bool
- port int
- endpoint Endpoint
- client *http.Client
-
- peerServer string
- endpointMutex sync.RWMutex
- endpoints map[types.ValidatorID]string
- recieveChan chan interface{}
- model Model
-}
-
-// NewTCPNetwork returns pointer to a new Network instance.
-func NewTCPNetwork(local bool, peerServer string, model Model) *TCPNetwork {
- pServer := peerServer
- if local {
- pServer = "127.0.0.1"
- }
- // Force connection reuse.
- tr := &http.Transport{
- MaxIdleConnsPerHost: 1024,
- TLSHandshakeTimeout: 0 * time.Second,
- }
- client := &http.Client{
- Transport: tr,
- Timeout: 5 * time.Second,
- }
- return &TCPNetwork{
- local: local,
- peerServer: pServer,
- client: client,
- endpoints: make(map[types.ValidatorID]string),
- recieveChan: make(chan interface{}, msgBufferSize),
- model: model,
- }
-}
-
-// Start starts the http server for accepting message.
-func (n *TCPNetwork) Start() {
- listenSuccess := make(chan struct{})
- go func() {
- for {
- ctx, cancel := context.WithTimeout(context.Background(),
- 50*time.Millisecond)
- defer cancel()
- go func() {
- <-ctx.Done()
- if ctx.Err() != context.Canceled {
- listenSuccess <- struct{}{}
- }
- }()
- port := 1024 + rand.Int()%1024
- if !n.local {
- port = peerPort
- }
- addr := net.JoinHostPort("0.0.0.0", strconv.Itoa(port))
- server := &http.Server{
- Addr: addr,
- Handler: n,
- }
-
- n.port = port
- if err := server.ListenAndServe(); err != nil {
- cancel()
- if err == http.ErrServerClosed {
- break
- }
- if !n.local {
- panic(err)
- }
- // In local-tcp, retry with other port.
- operr, ok := err.(*net.OpError)
- if !ok {
- panic(err)
- }
- oserr, ok := operr.Err.(*os.SyscallError)
- if !ok {
- panic(operr)
- }
- errno, ok := oserr.Err.(syscall.Errno)
- if !ok {
- panic(oserr)
- }
- if errno != syscall.EADDRINUSE {
- panic(errno)
- }
- }
- }
- }()
- <-listenSuccess
- fmt.Printf("Validator started at 0.0.0.0:%d\n", n.port)
-}
-
-// NumPeers returns the number of peers in the network.
-func (n *TCPNetwork) NumPeers() int {
- n.endpointMutex.Lock()
- defer n.endpointMutex.Unlock()
-
- return len(n.endpoints)
-}
-
-// ServerHTTP implements the http.Handler interface.
-func (n *TCPNetwork) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
- body, err := ioutil.ReadAll(r.Body)
- if err != nil {
- w.WriteHeader(http.StatusBadRequest)
- return
- }
-
- m := struct {
- Type string `json:"type"`
- Payload json.RawMessage `json:"payload"`
- }{}
- if err := json.Unmarshal(body, &m); err != nil {
- w.WriteHeader(http.StatusBadRequest)
- return
- }
-
- switch m.Type {
- case "block":
- block := &types.Block{}
- if err := json.Unmarshal(m.Payload, block); err != nil {
- w.WriteHeader(http.StatusBadRequest)
- return
- }
- n.recieveChan <- block
- case "vote":
- vote := &types.Vote{}
- if err := json.Unmarshal(m.Payload, vote); err != nil {
- w.WriteHeader(http.StatusBadRequest)
- return
- }
- n.recieveChan <- vote
- case "notaryAck":
- ack := &types.NotaryAck{}
- if err := json.Unmarshal(m.Payload, ack); err != nil {
- w.WriteHeader(http.StatusBadRequest)
- return
- }
- n.recieveChan <- ack
- default:
- w.WriteHeader(http.StatusBadRequest)
- return
- }
-}
-
-// Join allow a client to join the network. It reutnrs a interface{} channel for
-// the client to recieve information.
-func (n *TCPNetwork) Join(endpoint Endpoint) {
- n.endpointMutex.Lock()
- defer n.endpointMutex.Unlock()
-
- n.endpoint = endpoint
-
- joinURL := fmt.Sprintf("http://%s:%d/join", n.peerServer, peerPort)
- peersURL := fmt.Sprintf("http://%s:%d/peers", n.peerServer, peerPort)
-
- // Join the peer list.
- for {
- time.Sleep(time.Second)
-
- req, err := http.NewRequest(http.MethodGet, joinURL, nil)
- if err != nil {
- continue
- }
- req.Header.Add("ID", endpoint.GetID().String())
- req.Header.Add("PORT", fmt.Sprintf("%d", n.port))
-
- resp, err := n.client.Do(req)
- if err == nil {
- defer resp.Body.Close()
- io.Copy(ioutil.Discard, resp.Body)
- }
- if err == nil && resp.StatusCode == http.StatusOK {
- break
- }
- }
-
- var peerList map[types.ValidatorID]string
-
- // Wait for the server to collect all validators and return a list.
- for {
- time.Sleep(time.Second)
-
- req, err := http.NewRequest(http.MethodGet, peersURL, nil)
- if err != nil {
- fmt.Println(err)
- continue
- }
- resp, err := n.client.Do(req)
- if err != nil || resp.StatusCode != http.StatusOK {
- continue
- }
-
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
-
- if err := json.Unmarshal(body, &peerList); err != nil {
- fmt.Printf("error: %v", err)
- continue
- }
- break
- }
-
- for key, val := range peerList {
- n.endpoints[key] = val
- }
-}
-
-// ReceiveChan return the receive channel.
-func (n *TCPNetwork) ReceiveChan() <-chan interface{} {
- return n.recieveChan
-}
-
-// Send sends a msg to another client.
-func (n *TCPNetwork) Send(destID types.ValidatorID, messageJSON []byte) {
- clientAddr, exists := n.endpoints[destID]
- if !exists {
- return
- }
-
- msgURL := fmt.Sprintf("http://%s/msg", clientAddr)
- go func() {
- time.Sleep(n.model.Delay())
- for i := 0; i < retries; i++ {
- req, err := http.NewRequest(
- http.MethodPost, msgURL, strings.NewReader(string(messageJSON)))
- if err != nil {
- continue
- }
- req.Header.Add("ID", n.endpoint.GetID().String())
-
- resp, err := n.client.Do(req)
- if err == nil {
- defer resp.Body.Close()
- io.Copy(ioutil.Discard, resp.Body)
- }
- if err == nil && resp.StatusCode == http.StatusOK {
- runtime.Goexit()
- }
-
- fmt.Printf("failed to submit message: %s\n", err)
- time.Sleep(1 * time.Second)
- }
- fmt.Printf("failed to send message: %v\n", string(messageJSON))
- }()
-}
-
-func (n *TCPNetwork) marshalMessage(msg interface{}) (messageJSON []byte) {
- message := struct {
- Type string `json:"type"`
- Payload interface{} `json:"payload"`
- }{}
-
- switch v := msg.(type) {
- case *types.Block:
- message.Type = "block"
- message.Payload = v
- case *types.NotaryAck:
- message.Type = "notaryAck"
- message.Payload = v
- case *types.Vote:
- message.Type = "vote"
- message.Payload = v
- default:
- fmt.Println("error: invalid message type")
- return
- }
-
- messageJSON, err := json.Marshal(message)
- if err != nil {
- fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, message)
- return
- }
- return
-}
-
-// BroadcastBlock broadcast blocks into the network.
-func (n *TCPNetwork) BroadcastBlock(block *types.Block) {
- payload := n.marshalMessage(block)
- for endpoint := range n.endpoints {
- if endpoint == block.ProposerID {
- continue
- }
- n.Send(endpoint, payload)
- }
-}
-
-// BroadcastNotaryAck broadcast notaryAck into the network.
-func (n *TCPNetwork) BroadcastNotaryAck(notaryAck *types.NotaryAck) {
- payload := n.marshalMessage(notaryAck)
- for endpoint := range n.endpoints {
- if endpoint == notaryAck.ProposerID {
- continue
- }
- n.Send(endpoint, payload)
- }
-}
-
-// BroadcastVote broadcast vote into the network.
-func (n *TCPNetwork) BroadcastVote(vote *types.Vote) {
- payload := n.marshalMessage(vote)
- for endpoint := range n.endpoints {
- if endpoint == vote.ProposerID {
- continue
- }
- n.Send(endpoint, payload)
- }
-}
-
-// DeliverBlocks sends blocks to peerServer.
-func (n *TCPNetwork) DeliverBlocks(blocks BlockList) {
- messageJSON, err := json.Marshal(blocks)
- if err != nil {
- fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, blocks)
- return
- }
-
- msgURL := fmt.Sprintf("http://%s:%d/delivery", n.peerServer, peerPort)
-
- go func() {
- for i := 0; i < retries; i++ {
- req, err := http.NewRequest(
- http.MethodPost, msgURL, strings.NewReader(string(messageJSON)))
- if err != nil {
- continue
- }
- req.Header.Add("ID", n.endpoint.GetID().String())
-
- resp, err := n.client.Do(req)
- if err == nil {
- defer resp.Body.Close()
- io.Copy(ioutil.Discard, resp.Body)
- }
-
- if err == nil && resp.StatusCode == http.StatusOK {
- runtime.Goexit()
- }
- time.Sleep(1 * time.Second)
- }
- fmt.Printf("failed to send message: %v\n", blocks)
- }()
-}
-
-// NotifyServer sends message to peerServer
-func (n *TCPNetwork) NotifyServer(msg Message) {
- messageJSON, err := json.Marshal(msg)
- if err != nil {
- fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, msg)
- return
- }
-
- msgURL := fmt.Sprintf("http://%s:%d/message", n.peerServer, peerPort)
-
- for i := 0; i < retries; i++ {
- req, err := http.NewRequest(
- http.MethodPost, msgURL, strings.NewReader(string(messageJSON)))
- if err != nil {
- continue
- }
- req.Header.Add("ID", n.endpoint.GetID().String())
-
- resp, err := n.client.Do(req)
- if err == nil {
- defer resp.Body.Close()
- io.Copy(ioutil.Discard, resp.Body)
- }
- if err == nil && resp.StatusCode == http.StatusOK {
- return
- }
- time.Sleep(1 * time.Second)
- }
- fmt.Printf("failed to send message: %v\n", msg)
-
- return
-}
-
-// GetServerInfo retrieve the info message from peerServer.
-func (n *TCPNetwork) GetServerInfo() InfoMessage {
- infoMsg := InfoMessage{}
- msgURL := fmt.Sprintf("http://%s:%d/info", n.peerServer, peerPort)
-
- req, err := http.NewRequest(
- http.MethodGet, msgURL, nil)
- if err != nil {
- fmt.Printf("error: %v\n", err)
- }
-
- resp, err := n.client.Do(req)
- if err != nil {
- fmt.Printf("error: %v\n", err)
- return infoMsg
- }
- if resp.StatusCode != http.StatusOK {
- fmt.Printf("error: %v\n", err)
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
-
- if err := json.Unmarshal(body, &infoMsg); err != nil {
- fmt.Printf("error: %v", err)
- }
- return infoMsg
-}
-
-// Endpoints returns all validatorIDs.
-func (n *TCPNetwork) Endpoints() types.ValidatorIDs {
- vIDs := make(types.ValidatorIDs, 0, len(n.endpoints))
- for vID := range n.endpoints {
- vIDs = append(vIDs, vID)
- }
- return vIDs
-}
diff --git a/simulation/validator.go b/simulation/validator.go
index 21b9db6..46d42d3 100644
--- a/simulation/validator.go
+++ b/simulation/validator.go
@@ -30,15 +30,14 @@ import (
"github.com/dexon-foundation/dexon-consensus-core/simulation/config"
)
-// Validator represents a validator in DexCon.
-type Validator struct {
- network Network
- app *simApp
- gov *simGovernance
- db blockdb.BlockDatabase
+// validator represents a validator in DexCon.
+type validator struct {
+ app *simApp
+ gov *simGovernance
+ db blockdb.BlockDatabase
config config.Validator
- msgChannel <-chan interface{}
+ netModule *network
isFinished chan struct{}
ID types.ValidatorID
@@ -49,46 +48,50 @@ type Validator struct {
compactionChain *core.BlockChain
}
-// NewValidator returns a new empty validator.
-func NewValidator(
+// newValidator returns a new empty validator.
+func newValidator(
prvKey crypto.PrivateKey,
sigToPub core.SigToPubFn,
- config config.Validator,
- network Network) *Validator {
+ config config.Config) *validator {
id := types.NewValidatorID(prvKey.PublicKey())
-
+ netModule := newNetwork(id, config.Networking)
db, err := blockdb.NewMemBackedBlockDB(
id.String() + ".blockdb")
if err != nil {
panic(err)
}
- gov := newSimGovernance(config.Num, config.Consensus)
- return &Validator{
+ gov := newSimGovernance(config.Validator.Num, config.Validator.Consensus)
+ return &validator{
ID: id,
prvKey: prvKey,
sigToPub: sigToPub,
- config: config,
- network: network,
- app: newSimApp(id, network),
+ config: config.Validator,
+ app: newSimApp(id, netModule),
gov: gov,
db: db,
+ netModule: netModule,
isFinished: make(chan struct{}),
}
}
// GetID returns the ID of validator.
-func (v *Validator) GetID() types.ValidatorID {
+func (v *validator) GetID() types.ValidatorID {
return v.ID
}
-// Run starts the validator.
-func (v *Validator) Run(legacy bool) {
- v.network.Join(v)
- v.msgChannel = v.network.ReceiveChan()
-
- hashes := make(common.Hashes, 0, v.network.NumPeers())
- for _, vID := range v.network.Endpoints() {
+// run starts the validator.
+func (v *validator) run(serverEndpoint interface{}, legacy bool) {
+ // Run network.
+ if err := v.netModule.setup(serverEndpoint); err != nil {
+ panic(err)
+ }
+ msgChannel := v.netModule.receiveChanForValidator()
+ peers := v.netModule.peers()
+ go v.netModule.run()
+ // Run consensus.
+ hashes := make(common.Hashes, 0, len(peers))
+ for vID := range peers {
v.gov.addValidator(vID)
hashes = append(hashes, vID.Hash)
}
@@ -101,15 +104,16 @@ func (v *Validator) Run(legacy bool) {
}
if legacy {
v.consensus = core.NewConsensus(
- v.app, v.gov, v.db, v.network,
+ v.app, v.gov, v.db, v.netModule,
time.NewTicker(
- time.Duration(v.config.Legacy.ProposeIntervalMean)*time.Millisecond),
+ time.Duration(
+ v.config.Legacy.ProposeIntervalMean)*time.Millisecond),
v.prvKey, v.sigToPub)
go v.consensus.RunLegacy()
} else {
v.consensus = core.NewConsensus(
- v.app, v.gov, v.db, v.network,
+ v.app, v.gov, v.db, v.netModule,
time.NewTicker(
time.Duration(v.config.Lambda)*time.Millisecond),
v.prvKey, v.sigToPub)
@@ -117,36 +121,28 @@ func (v *Validator) Run(legacy bool) {
go v.consensus.Run()
}
- isShutdown := make(chan struct{})
-
- go v.CheckServerInfo(isShutdown)
-
// Blocks forever.
- <-isShutdown
+MainLoop:
+ for {
+ msg := <-msgChannel
+ switch val := msg.(type) {
+ case infoStatus:
+ if val == statusShutdown {
+ break MainLoop
+ }
+ default:
+ panic(fmt.Errorf("unexpected message from server: %v", val))
+ }
+ }
+ // Cleanup.
v.consensus.Stop()
if err := v.db.Close(); err != nil {
fmt.Println(err)
}
- v.network.NotifyServer(Message{
+ v.netModule.report(&message{
Type: shutdownAck,
})
- v.isFinished <- struct{}{}
-}
-
-// Wait for the validator to stop (if peerServer told it to).
-func (v *Validator) Wait() {
- <-v.isFinished
-}
-
-// CheckServerInfo will check the info from the peerServer and update
-// validator's status if needed.
-func (v *Validator) CheckServerInfo(isShutdown chan struct{}) {
- for {
- infoMsg := v.network.GetServerInfo()
- if infoMsg.Status == statusShutdown {
- isShutdown <- struct{}{}
- break
- }
- time.Sleep(250 * time.Millisecond)
- }
+ // TODO(mission): once we have a way to know if consensus is stopped, stop
+ // the network module.
+ return
}
diff --git a/simulation/verification.go b/simulation/verification.go
index 2e90b8b..0ae7450 100644
--- a/simulation/verification.go
+++ b/simulation/verification.go
@@ -99,7 +99,7 @@ func (totalOrder *TotalOrderResult) PushBlocks(blocks BlockList) (ready bool) {
}
// PushTimestamp log the information in the msg.
-func (totalOrder *TotalOrderResult) PushTimestamp(msg TimestampMessage) bool {
+func (totalOrder *TotalOrderResult) PushTimestamp(msg timestampMessage) bool {
pushLatency := func(latency *[]time.Duration, t1, t2 time.Time) {
*latency = append(*latency, t2.Sub(t1))
}