aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cmd/dexcon-simulation-with-scheduler/main.go6
-rw-r--r--core/test/fake-transport.go28
-rw-r--r--core/test/interface.go3
-rw-r--r--core/test/network.go201
-rw-r--r--core/test/network_test.go28
-rw-r--r--core/test/tcp-transport.go69
-rw-r--r--core/test/transport_test.go29
-rw-r--r--core/test/utils.go12
-rw-r--r--integration_test/consensus_test.go11
-rw-r--r--simulation/config/config.go27
-rw-r--r--simulation/node.go23
-rw-r--r--simulation/peer-server.go9
-rw-r--r--test_config/test-config-change.toml5
-rw-r--r--test_config/test.toml5
14 files changed, 244 insertions, 212 deletions
diff --git a/cmd/dexcon-simulation-with-scheduler/main.go b/cmd/dexcon-simulation-with-scheduler/main.go
index 1ce9c99..8d59825 100644
--- a/cmd/dexcon-simulation-with-scheduler/main.go
+++ b/cmd/dexcon-simulation-with-scheduler/main.go
@@ -69,8 +69,8 @@ func main() {
}
// Setup latencies, nodes.
networkLatency := &test.NormalLatencyModel{
- Sigma: cfg.Networking.Sigma,
- Mean: cfg.Networking.Mean,
+ Sigma: cfg.Networking.Direct.Sigma,
+ Mean: cfg.Networking.Direct.Mean,
}
proposingLatency := &test.NormalLatencyModel{
Sigma: cfg.Node.Legacy.ProposeIntervalSigma,
@@ -85,7 +85,7 @@ func main() {
gov, err := test.NewGovernance(
test.NewState(
pubKeys,
- time.Duration(cfg.Networking.Mean)*time.Millisecond,
+ time.Duration(cfg.Networking.Direct.Mean)*time.Millisecond,
&common.NullLogger{},
true,
), core.ConfigRoundShift)
diff --git a/core/test/fake-transport.go b/core/test/fake-transport.go
index 388d0aa..056c3d5 100644
--- a/core/test/fake-transport.go
+++ b/core/test/fake-transport.go
@@ -39,7 +39,6 @@ type FakeTransport struct {
recvChannel chan *TransportEnvelope
serverChannel chan<- *TransportEnvelope
peers map[types.NodeID]fakePeerRecord
- latency LatencyModel
}
// NewFakeTransportServer constructs FakeTransport instance for peer server.
@@ -51,31 +50,24 @@ func NewFakeTransportServer() TransportServer {
}
// NewFakeTransportClient constructs FakeTransport instance for peer.
-func NewFakeTransportClient(
- pubKey crypto.PublicKey, latency LatencyModel) TransportClient {
-
+func NewFakeTransportClient(pubKey crypto.PublicKey) TransportClient {
return &FakeTransport{
peerType: TransportPeer,
recvChannel: make(chan *TransportEnvelope, 1000),
nID: types.NewNodeID(pubKey),
pubKey: pubKey,
- latency: latency,
}
}
// Send implements Transport.Send method.
func (t *FakeTransport) Send(
endpoint types.NodeID, msg interface{}) (err error) {
-
rec, exists := t.peers[endpoint]
if !exists {
err = fmt.Errorf("the endpoint does not exists: %v", endpoint)
return
}
go func(ch chan<- *TransportEnvelope) {
- if t.latency != nil {
- time.Sleep(t.latency.Delay())
- }
ch <- &TransportEnvelope{
PeerType: t.peerType,
From: t.nID,
@@ -98,12 +90,16 @@ func (t *FakeTransport) Report(msg interface{}) (err error) {
}
// Broadcast implements Transport.Broadcast method.
-func (t *FakeTransport) Broadcast(msg interface{}) (err error) {
- for k := range t.peers {
- if k == t.nID {
+func (t *FakeTransport) Broadcast(endpoints map[types.NodeID]struct{},
+ latency LatencyModel, msg interface{}) (err error) {
+ for ID := range endpoints {
+ if ID == t.nID {
continue
}
- t.Send(k, msg)
+ go func(nID types.NodeID) {
+ time.Sleep(latency.Delay())
+ t.Send(nID, msg)
+ }(ID)
}
return
}
@@ -177,7 +173,11 @@ func (t *FakeTransport) WaitForPeers(numPeers uint32) (err error) {
}
}
// The collected peer channels are shared for all peers.
- if err = t.Broadcast(t.peers); err != nil {
+ peers := make(map[types.NodeID]struct{})
+ for ID := range t.peers {
+ peers[ID] = struct{}{}
+ }
+ if err = t.Broadcast(peers, &FixedLatencyModel{}, t.peers); err != nil {
return
}
return
diff --git a/core/test/interface.go b/core/test/interface.go
index 1388dc1..d9578de 100644
--- a/core/test/interface.go
+++ b/core/test/interface.go
@@ -91,7 +91,8 @@ type TransportClient interface {
// Transport defines the interface for basic transportation capabilities.
type Transport interface {
// Broadcast a message to all peers in network.
- Broadcast(msg interface{}) error
+ Broadcast(endpoints map[types.NodeID]struct{}, latency LatencyModel,
+ msg interface{}) error
// Send one message to a peer.
Send(endpoint types.NodeID, msg interface{}) error
// Close would cleanup allocated resources.
diff --git a/core/test/network.go b/core/test/network.go
index a79898e..066d36c 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -22,7 +22,6 @@ import (
"encoding/json"
"errors"
"fmt"
- "log"
"net"
"strconv"
"sync"
@@ -52,9 +51,12 @@ const (
// NetworkConfig is the configuration for Network module.
type NetworkConfig struct {
- Type NetworkType
- PeerServer string
- PeerPort int
+ Type NetworkType
+ PeerServer string
+ PeerPort int
+ DirectLatency LatencyModel
+ GossipLatency LatencyModel
+ Marshaller Marshaller
}
// PullRequest is a generic request to pull everything (ex. vote, block...).
@@ -151,7 +153,6 @@ type Network struct {
unreceivedBlocks map[common.Hash]chan<- common.Hash
unreceivedRandomnessLock sync.RWMutex
unreceivedRandomness map[common.Hash]chan<- common.Hash
- latencyModel LatencyModel
cache *utils.NodeSetCache
notarySetCachesLock sync.Mutex
notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{}
@@ -161,8 +162,8 @@ type Network struct {
// NewNetwork setup network stuffs for nodes, which provides an
// implementation of core.Network based on TransportClient.
-func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
- marshaller Marshaller, config NetworkConfig) (n *Network) {
+func NewNetwork(pubKey crypto.PublicKey, config NetworkConfig) (
+ n *Network) {
// Construct basic network instance.
n = &Network{
ID: types.NewNodeID(pubKey),
@@ -175,9 +176,10 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
randomnessCache: make(map[common.Hash]*types.BlockRandomnessResult),
unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
unreceivedRandomness: make(map[common.Hash]chan<- common.Hash),
- latencyModel: latency,
- notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}),
- dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}),
+ peers: make(map[types.NodeID]struct{}),
+ notarySetCaches: make(
+ map[uint64]map[uint32]map[types.NodeID]struct{}),
+ dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}),
voteCache: make(
map[types.Position]map[types.VoteHeader]*types.Vote),
}
@@ -185,11 +187,11 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
// Construct transport layer.
switch config.Type {
case NetworkTypeTCPLocal:
- n.trans = NewTCPTransportClient(pubKey, latency, marshaller, true)
+ n.trans = NewTCPTransportClient(pubKey, config.Marshaller, true)
case NetworkTypeTCP:
- n.trans = NewTCPTransportClient(pubKey, latency, marshaller, false)
+ n.trans = NewTCPTransportClient(pubKey, config.Marshaller, false)
case NetworkTypeFake:
- n.trans = NewFakeTransportClient(pubKey, latency)
+ n.trans = NewFakeTransportClient(pubKey)
default:
panic(fmt.Errorf("unknown network type: %v", config.Type))
}
@@ -213,8 +215,11 @@ func (n *Network) PullRandomness(hashes common.Hashes) {
// BroadcastVote implements core.Network interface.
func (n *Network) BroadcastVote(vote *types.Vote) {
- n.broadcastToSet(
- n.getNotarySet(vote.Position.Round, vote.Position.ChainID), vote)
+ if err := n.trans.Broadcast(
+ n.getNotarySet(vote.Position.Round, vote.Position.ChainID),
+ n.config.DirectLatency, vote); err != nil {
+ panic(err)
+ }
n.addVoteToCache(vote)
}
@@ -222,28 +227,33 @@ func (n *Network) BroadcastVote(vote *types.Vote) {
func (n *Network) BroadcastBlock(block *types.Block) {
// Avoid data race in fake transport.
block = n.cloneForFake(block).(*types.Block)
- n.broadcastToSet(
- n.getNotarySet(block.Position.Round, block.Position.ChainID), block)
+ notarySet := n.getNotarySet(block.Position.Round, block.Position.ChainID)
+ if err := n.trans.Broadcast(
+ notarySet, n.config.DirectLatency, block); err != nil {
+ panic(err)
+ }
+ if err := n.trans.Broadcast(getComplementSet(n.peers, notarySet),
+ n.config.GossipLatency, block); err != nil {
+ panic(err)
+ }
n.addBlockToCache(block)
}
// BroadcastAgreementResult implements core.Network interface.
func (n *Network) BroadcastAgreementResult(
- randRequest *types.AgreementResult) {
- n.sentAgreementLock.Lock()
- defer n.sentAgreementLock.Unlock()
- if _, exist := n.sentAgreement[randRequest.BlockHash]; exist {
+ result *types.AgreementResult) {
+ if !n.markAgreementResultAsSent(result.BlockHash) {
return
}
- if len(n.sentAgreement) > 1000 {
- // Randomly drop one entry.
- for k := range n.sentAgreement {
- delete(n.sentAgreement, k)
- break
- }
+ // Send to DKG set first.
+ dkgSet := n.getDKGSet(result.Position.Round)
+ if err := n.trans.Broadcast(
+ dkgSet, n.config.DirectLatency, result); err != nil {
+ panic(err)
}
- n.sentAgreement[randRequest.BlockHash] = struct{}{}
- if err := n.trans.Broadcast(randRequest); err != nil {
+ // Gossip to other nodes.
+ if err := n.trans.Broadcast(getComplementSet(n.peers, dkgSet),
+ n.config.GossipLatency, result); err != nil {
panic(err)
}
}
@@ -251,20 +261,19 @@ func (n *Network) BroadcastAgreementResult(
// BroadcastRandomnessResult implements core.Network interface.
func (n *Network) BroadcastRandomnessResult(
randResult *types.BlockRandomnessResult) {
- n.sentRandomnessLock.Lock()
- defer n.sentRandomnessLock.Unlock()
- if _, exist := n.sentRandomness[randResult.BlockHash]; exist {
+ if !n.markRandomnessResultAsSent(randResult.BlockHash) {
return
}
- if len(n.sentRandomness) > 1000 {
- // Randomly drop one entry.
- for k := range n.sentRandomness {
- delete(n.sentRandomness, k)
- break
- }
+ // Send to notary set first.
+ notarySet := n.getNotarySet(
+ randResult.Position.Round, randResult.Position.ChainID)
+ if err := n.trans.Broadcast(
+ notarySet, n.config.DirectLatency, randResult); err != nil {
+ panic(err)
}
- n.sentRandomness[randResult.BlockHash] = struct{}{}
- if err := n.trans.Broadcast(randResult); err != nil {
+ // Gossip to other nodes.
+ if err := n.trans.Broadcast(getComplementSet(n.peers, notarySet),
+ n.config.GossipLatency, randResult); err != nil {
panic(err)
}
n.addRandomnessToCache(randResult)
@@ -273,21 +282,25 @@ func (n *Network) BroadcastRandomnessResult(
// SendDKGPrivateShare implements core.Network interface.
func (n *Network) SendDKGPrivateShare(
recv crypto.PublicKey, prvShare *typesDKG.PrivateShare) {
- if err := n.trans.Send(types.NewNodeID(recv), prvShare); err != nil {
- panic(err)
- }
+ n.send(types.NewNodeID(recv), prvShare)
}
// BroadcastDKGPrivateShare implements core.Network interface.
func (n *Network) BroadcastDKGPrivateShare(
prvShare *typesDKG.PrivateShare) {
- n.broadcastToSet(n.getDKGSet(prvShare.Round), prvShare)
+ if err := n.trans.Broadcast(n.getDKGSet(prvShare.Round),
+ n.config.DirectLatency, prvShare); err != nil {
+ panic(err)
+ }
}
// BroadcastDKGPartialSignature implements core.Network interface.
func (n *Network) BroadcastDKGPartialSignature(
psig *typesDKG.PartialSignature) {
- n.broadcastToSet(n.getDKGSet(psig.Round), psig)
+ if err := n.trans.Broadcast(
+ n.getDKGSet(psig.Round), n.config.DirectLatency, psig); err != nil {
+ panic(err)
+ }
}
// ReceiveChan implements core.Network interface.
@@ -312,7 +325,6 @@ func (n *Network) Setup(serverEndpoint interface{}) (err error) {
return
}
peerKeys := n.trans.Peers()
- n.peers = make(map[types.NodeID]struct{})
for _, k := range peerKeys {
n.peers[types.NewNodeID(k)] = struct{}{}
}
@@ -374,9 +386,7 @@ func (n *Network) handlePullRequest(req *PullRequest) {
break All
default:
}
- if err := n.trans.Send(req.Requester, b); err != nil {
- log.Println("unable to send block", req.Requester, err)
- }
+ n.send(req.Requester, b)
}
}()
case "vote":
@@ -386,9 +396,7 @@ func (n *Network) handlePullRequest(req *PullRequest) {
defer n.voteCacheLock.Unlock()
if votes, exists := n.voteCache[pos]; exists {
for _, v := range votes {
- if err := n.trans.Send(req.Requester, v); err != nil {
- log.Println("unable to send vote", req.Requester, err)
- }
+ n.send(req.Requester, v)
}
}
}()
@@ -408,9 +416,7 @@ func (n *Network) handlePullRequest(req *PullRequest) {
break All
default:
}
- if err := n.trans.Send(req.Requester, r); err != nil {
- log.Println("unable to send randomness", req.Requester, err)
- }
+ n.send(req.Requester, r)
}
}()
default:
@@ -457,24 +463,16 @@ func (n *Network) Report(msg interface{}) error {
return n.trans.Report(msg)
}
+// Broadcast a message to all peers.
+func (n *Network) Broadcast(msg interface{}) error {
+ return n.trans.Broadcast(n.peers, &FixedLatencyModel{}, msg)
+}
+
// Peers exports 'Peers' method of Transport.
func (n *Network) Peers() []crypto.PublicKey {
return n.trans.Peers()
}
-// Broadcast exports 'Broadcast' method of Transport, and would panic when
-// error.
-func (n *Network) Broadcast(msg interface{}) {
- if err := n.trans.Broadcast(msg); err != nil {
- panic(err)
- }
-}
-
-// Send exports 'Send' method of Transport.
-func (n *Network) Send(nodeID types.NodeID, msg interface{}) error {
- return n.trans.Send(nodeID, msg)
-}
-
// ReceiveChanForNode returns a channel for messages not handled by
// core.Consensus.
func (n *Network) ReceiveChanForNode() <-chan interface{} {
@@ -521,14 +519,11 @@ Loop:
if nID == n.ID {
continue
}
- if err := n.trans.Send(nID, req); err != nil {
- // Try next peer.
- continue
- }
+ n.send(nID, req)
select {
case <-n.ctx.Done():
break Loop
- case <-time.After(2 * n.latencyModel.Delay()):
+ case <-time.After(2 * n.config.DirectLatency.Delay()):
// Consume everything in the notification channel.
for {
select {
@@ -561,10 +556,7 @@ func (n *Network) pullVotesAsync(pos types.Position) {
// Randomly select one peer from notary set and send a pull request.
sentCount := 0
for nID := range notarySet {
- if err := n.trans.Send(nID, req); err != nil {
- // Try next peer.
- continue
- }
+ n.send(nID, req)
sentCount++
if sentCount >= maxPullingPeerCount {
break
@@ -598,14 +590,11 @@ Loop:
if nID == n.ID {
continue
}
- if err := n.trans.Send(nID, req); err != nil {
- // Try next peer.
- continue
- }
+ n.send(nID, req)
select {
case <-n.ctx.Done():
break Loop
- case <-time.After(2 * n.latencyModel.Delay()):
+ case <-time.After(2 * n.config.DirectLatency.Delay()):
// Consume everything in the notification channel.
for {
select {
@@ -673,6 +662,40 @@ func (n *Network) addRandomnessToCache(rand *types.BlockRandomnessResult) {
n.randomnessCache[rand.BlockHash] = rand
}
+func (n *Network) markAgreementResultAsSent(blockHash common.Hash) bool {
+ n.sentAgreementLock.Lock()
+ defer n.sentAgreementLock.Unlock()
+ if _, exist := n.sentAgreement[blockHash]; exist {
+ return false
+ }
+ if len(n.sentAgreement) > 1000 {
+ // Randomly drop one entry.
+ for k := range n.sentAgreement {
+ delete(n.sentAgreement, k)
+ break
+ }
+ }
+ n.sentAgreement[blockHash] = struct{}{}
+ return true
+}
+
+func (n *Network) markRandomnessResultAsSent(blockHash common.Hash) bool {
+ n.sentRandomnessLock.Lock()
+ defer n.sentRandomnessLock.Unlock()
+ if _, exist := n.sentRandomness[blockHash]; exist {
+ return false
+ }
+ if len(n.sentRandomness) > 1000 {
+ // Randomly drop one entry.
+ for k := range n.sentRandomness {
+ delete(n.sentRandomness, k)
+ break
+ }
+ }
+ n.sentRandomness[blockHash] = struct{}{}
+ return true
+}
+
func (n *Network) cloneForFake(v interface{}) interface{} {
if n.config.Type != NetworkTypeFake {
return v
@@ -735,15 +758,11 @@ func (n *Network) getDKGSet(round uint64) map[types.NodeID]struct{} {
return set
}
-// broadcastToSet broadcast a message to a set of nodes.
-func (n *Network) broadcastToSet(
- set map[types.NodeID]struct{}, msg interface{}) {
- for nID := range set {
- if nID == n.ID {
- continue
- }
- if err := n.trans.Send(nID, msg); err != nil {
+func (n *Network) send(endpoint types.NodeID, msg interface{}) {
+ go func() {
+ time.Sleep(n.config.DirectLatency.Delay())
+ if err := n.trans.Send(endpoint, msg); err != nil {
panic(err)
}
- }
+ }()
}
diff --git a/core/test/network_test.go b/core/test/network_test.go
index e05dec5..d040a16 100644
--- a/core/test/network_test.go
+++ b/core/test/network_test.go
@@ -48,11 +48,11 @@ func (s *NetworkTestSuite) setupNetworks(
// Setup several network modules.
networks := make(map[types.NodeID]*Network)
for _, key := range pubKeys {
- n := NewNetwork(
- key,
- &FixedLatencyModel{},
- NewDefaultMarshaller(nil),
- NetworkConfig{Type: NetworkTypeFake})
+ n := NewNetwork(key, NetworkConfig{
+ Type: NetworkTypeFake,
+ DirectLatency: &FixedLatencyModel{},
+ GossipLatency: &FixedLatencyModel{},
+ Marshaller: NewDefaultMarshaller(nil)})
networks[n.ID] = n
wg.Add(1)
go func() {
@@ -297,21 +297,13 @@ func (s *NetworkTestSuite) TestBroadcastToSet() {
// Try broadcasting with datum from round 0, and make sure only node belongs
// to that set receiving the message.
nerd.BroadcastVote(&types.Vote{})
- req.IsType(<-notaryNode.ReceiveChan(), &types.Vote{})
- nerd.BroadcastBlock(&types.Block{})
- req.IsType(<-notaryNode.ReceiveChan(), &types.Block{})
+ req.IsType(&types.Vote{}, <-notaryNode.ReceiveChan())
nerd.BroadcastDKGPrivateShare(&typesDKG.PrivateShare{})
- req.IsType(<-dkgNode.ReceiveChan(), &typesDKG.PrivateShare{})
+ req.IsType(&typesDKG.PrivateShare{}, <-dkgNode.ReceiveChan())
nerd.BroadcastDKGPartialSignature(&typesDKG.PartialSignature{})
- req.IsType(<-dkgNode.ReceiveChan(), &typesDKG.PartialSignature{})
- // There should be no remaining message in each node.
- for _, n := range networks {
- select {
- case <-n.ReceiveChan():
- req.False(true)
- default:
- }
- }
+ req.IsType(&typesDKG.PartialSignature{}, <-dkgNode.ReceiveChan())
+ nerd.BroadcastBlock(&types.Block{})
+ req.IsType(&types.Block{}, <-notaryNode.ReceiveChan())
}
func TestNetwork(t *testing.T) {
diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go
index b16bbcb..e04ba53 100644
--- a/core/test/tcp-transport.go
+++ b/core/test/tcp-transport.go
@@ -109,20 +109,14 @@ type TCPTransport struct {
recvChannel chan *TransportEnvelope
ctx context.Context
cancel context.CancelFunc
- latency LatencyModel
marshaller Marshaller
throughputRecords []ThroughputRecord
throughputLock sync.Mutex
}
// NewTCPTransport constructs an TCPTransport instance.
-func NewTCPTransport(
- peerType TransportPeerType,
- pubKey crypto.PublicKey,
- latency LatencyModel,
- marshaller Marshaller,
- localPort int) *TCPTransport {
-
+func NewTCPTransport(peerType TransportPeerType, pubKey crypto.PublicKey,
+ marshaller Marshaller, localPort int) *TCPTransport {
ctx, cancel := context.WithCancel(context.Background())
return &TCPTransport{
peerType: peerType,
@@ -133,7 +127,6 @@ func NewTCPTransport(
ctx: ctx,
cancel: cancel,
localPort: localPort,
- latency: latency,
marshaller: marshaller,
throughputRecords: []ThroughputRecord{},
}
@@ -202,6 +195,14 @@ func (t *TCPTransport) clientHandshake(conn net.Conn) (
return
}
+func (t *TCPTransport) send(
+ endpoint types.NodeID, msg interface{}, payload []byte) {
+ t.peersLock.RLock()
+ defer t.peersLock.RUnlock()
+ t.handleThroughputData(msg, payload)
+ t.peers[endpoint].sendChannel <- payload
+}
+
// Send implements Transport.Send method.
func (t *TCPTransport) Send(
endpoint types.NodeID, msg interface{}) (err error) {
@@ -210,38 +211,25 @@ func (t *TCPTransport) Send(
if err != nil {
return
}
- go func() {
- if t.latency != nil {
- time.Sleep(t.latency.Delay())
- }
- t.peersLock.RLock()
- defer t.peersLock.RUnlock()
- t.handleThroughputData(msg, payload)
- t.peers[endpoint].sendChannel <- payload
- }()
+ go t.send(endpoint, msg, payload)
return
}
// Broadcast implements Transport.Broadcast method.
-func (t *TCPTransport) Broadcast(msg interface{}) (err error) {
+func (t *TCPTransport) Broadcast(endpoints map[types.NodeID]struct{},
+ latency LatencyModel, msg interface{}) (err error) {
payload, err := t.marshalMessage(msg)
if err != nil {
return
}
- t.peersLock.RLock()
- defer t.peersLock.RUnlock()
-
- for nID, rec := range t.peers {
+ for nID := range endpoints {
if nID == t.nID {
continue
}
- go func(ch chan<- []byte) {
- if t.latency != nil {
- time.Sleep(t.latency.Delay())
- }
- t.handleThroughputData(msg, payload)
- ch <- payload
- }(rec.sendChannel)
+ go func(ID types.NodeID) {
+ time.Sleep(latency.Delay())
+ t.send(ID, msg, payload)
+ }(nID)
}
return
}
@@ -585,14 +573,12 @@ type TCPTransportClient struct {
// NewTCPTransportClient constructs a TCPTransportClient instance.
func NewTCPTransportClient(
pubKey crypto.PublicKey,
- latency LatencyModel,
marshaller Marshaller,
local bool) *TCPTransportClient {
return &TCPTransportClient{
- TCPTransport: *NewTCPTransport(
- TransportPeer, pubKey, latency, marshaller, 8080),
- local: local,
+ TCPTransport: *NewTCPTransport(TransportPeer, pubKey, marshaller, 8080),
+ local: local,
}
}
@@ -776,11 +762,7 @@ func NewTCPTransportServer(
// NOTE: the assumption here is the node ID of peers
// won't be zero.
TCPTransport: *NewTCPTransport(
- TransportPeerServer,
- prvKey.PublicKey(),
- nil,
- marshaller,
- serverPort),
+ TransportPeerServer, prvKey.PublicKey(), marshaller, serverPort),
}
}
@@ -828,7 +810,11 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) {
if err = t.buildConnectionsToPeers(); err != nil {
return
}
- if err = t.Broadcast(peersInfo); err != nil {
+ peers := make(map[types.NodeID]struct{})
+ for ID := range t.peers {
+ peers[ID] = struct{}{}
+ }
+ if err = t.Broadcast(peers, &FixedLatencyModel{}, peersInfo); err != nil {
return
}
// Wait for peers to send 'ready' report.
@@ -851,7 +837,8 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) {
}
}
// Ack all peers ready to go.
- if err = t.Broadcast(&tcpMessage{Type: "all-ready"}); err != nil {
+ if err = t.Broadcast(peers, &FixedLatencyModel{},
+ &tcpMessage{Type: "all-ready"}); err != nil {
return
}
return
diff --git a/core/test/transport_test.go b/core/test/transport_test.go
index d5c4260..8305ee2 100644
--- a/core/test/transport_test.go
+++ b/core/test/transport_test.go
@@ -87,13 +87,11 @@ type TransportTestSuite struct {
}
func (s *TransportTestSuite) baseTest(
- server *testPeerServer,
- peers map[types.NodeID]*testPeer,
- delay time.Duration) {
-
+ server *testPeerServer, peers map[types.NodeID]*testPeer, delay float64) {
var (
- req = s.Require()
- wg sync.WaitGroup
+ req = s.Require()
+ delayDuration = time.Duration(delay) * time.Millisecond
+ wg sync.WaitGroup
)
// For each peers, do following stuffs:
@@ -150,6 +148,10 @@ func (s *TransportTestSuite) baseTest(
}
wg.Add(len(peers) + 1)
go handleServer(server)
+ peersAsMap := make(map[types.NodeID]struct{})
+ for nID := range peers {
+ peersAsMap[nID] = struct{}{}
+ }
for nID, peer := range peers {
go handlePeer(peer)
// Broadcast a block.
@@ -158,7 +160,8 @@ func (s *TransportTestSuite) baseTest(
Hash: common.NewRandomHash(),
}
peer.myBlockSentTime = time.Now()
- peer.trans.Broadcast(peer.myBlock)
+ peer.trans.Broadcast(
+ peersAsMap, &FixedLatencyModel{Latency: delay}, peer.myBlock)
// Report a block to server.
peer.expectedEchoHash = common.NewRandomHash()
peer.trans.Report(&types.Block{
@@ -187,7 +190,7 @@ func (s *TransportTestSuite) baseTest(
continue
}
req.True(otherPeer.blocksReceiveTime[peer.myBlock.Hash].Sub(
- peer.myBlockSentTime) >= delay)
+ peer.myBlockSentTime) >= delayDuration)
}
}
}
@@ -200,7 +203,6 @@ func (s *TransportTestSuite) TestFake() {
prvKeys = GenerateRandomPrivateKeys(peerCount)
err error
wg sync.WaitGroup
- latency = &FixedLatencyModel{Latency: 300}
server = &testPeerServer{trans: NewFakeTransportServer()}
)
// Setup PeerServer
@@ -212,7 +214,7 @@ func (s *TransportTestSuite) TestFake() {
nID := types.NewNodeID(key.PublicKey())
peer := &testPeer{
nID: nID,
- trans: NewFakeTransportClient(key.PublicKey(), latency),
+ trans: NewFakeTransportClient(key.PublicKey()),
}
peers[nID] = peer
go func() {
@@ -226,7 +228,7 @@ func (s *TransportTestSuite) TestFake() {
server.trans.WaitForPeers(uint32(peerCount))
// Make sure all clients are ready.
wg.Wait()
- s.baseTest(server, peers, 300*time.Millisecond)
+ s.baseTest(server, peers, 300)
req.Nil(server.trans.Close())
for _, peer := range peers {
req.Nil(peer.trans.Close())
@@ -242,7 +244,6 @@ func (s *TransportTestSuite) TestTCPLocal() {
prvKeys = GenerateRandomPrivateKeys(peerCount)
err error
wg sync.WaitGroup
- latency = &FixedLatencyModel{Latency: 300}
serverPort = 8080
serverAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(serverPort))
server = &testPeerServer{
@@ -258,7 +259,7 @@ func (s *TransportTestSuite) TestTCPLocal() {
peer := &testPeer{
nID: nID,
trans: NewTCPTransportClient(
- prvKey.PublicKey(), latency, &testMarshaller{}, true),
+ prvKey.PublicKey(), &testMarshaller{}, true),
}
peers[nID] = peer
go func() {
@@ -274,7 +275,7 @@ func (s *TransportTestSuite) TestTCPLocal() {
// Make sure all clients are ready.
wg.Wait()
- s.baseTest(server, peers, 300*time.Millisecond)
+ s.baseTest(server, peers, 300)
req.Nil(server.trans.Close())
for _, peer := range peers {
req.Nil(peer.trans.Close())
diff --git a/core/test/utils.go b/core/test/utils.go
index 8a14ebf..d85395b 100644
--- a/core/test/utils.go
+++ b/core/test/utils.go
@@ -244,3 +244,15 @@ func LaunchDummyReceiver(
}()
return dummyCancel
}
+
+func getComplementSet(
+ all, set map[types.NodeID]struct{}) map[types.NodeID]struct{} {
+ complement := make(map[types.NodeID]struct{})
+ for nID := range all {
+ if _, exists := set[nID]; exists {
+ continue
+ }
+ complement[nID] = struct{}{}
+ }
+ return complement
+}
diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go
index 5c3f181..fa62bda 100644
--- a/integration_test/consensus_test.go
+++ b/integration_test/consensus_test.go
@@ -68,11 +68,12 @@ func (s *ConsensusTestSuite) setupNodes(
dbInst, err := db.NewMemBackedDB()
s.Require().NoError(err)
// Prepare essential modules: app, gov, db.
- networkModule := test.NewNetwork(
- k.PublicKey(),
- &test.FixedLatencyModel{},
- test.NewDefaultMarshaller(nil),
- test.NetworkConfig{Type: test.NetworkTypeFake})
+ networkModule := test.NewNetwork(k.PublicKey(), test.NetworkConfig{
+ Type: test.NetworkTypeFake,
+ DirectLatency: &test.FixedLatencyModel{},
+ GossipLatency: &test.FixedLatencyModel{},
+ Marshaller: test.NewDefaultMarshaller(nil)},
+ )
gov := seedGov.Clone()
gov.SwitchToRemoteMode(networkModule)
gov.NotifyRoundHeight(0, 0)
diff --git a/simulation/config/config.go b/simulation/config/config.go
index 797145c..d22f4f1 100644
--- a/simulation/config/config.go
+++ b/simulation/config/config.go
@@ -56,14 +56,18 @@ type Node struct {
Changes []Change
}
+// LatencyModel for ths simulation.
+type LatencyModel struct {
+ Mean float64
+ Sigma float64
+}
+
// Networking config.
type Networking struct {
Type test.NetworkType
PeerServer string
-
- Mean float64
- Sigma float64
- LossRateValue float64
+ Direct LatencyModel
+ Gossip LatencyModel
}
// Scheduler Settings.
@@ -129,11 +133,16 @@ func GenerateDefault(path string) error {
MaxBlock: math.MaxUint64,
},
Networking: Networking{
- Type: test.NetworkTypeTCPLocal,
- PeerServer: "127.0.0.1",
- Mean: 100,
- Sigma: 10,
- LossRateValue: 0,
+ Type: test.NetworkTypeTCPLocal,
+ PeerServer: "127.0.0.1",
+ Direct: LatencyModel{
+ Mean: 100,
+ Sigma: 10,
+ },
+ Gossip: LatencyModel{
+ Mean: 300,
+ Sigma: 25,
+ },
},
Scheduler: Scheduler{
WorkerNum: 2,
diff --git a/simulation/node.go b/simulation/node.go
index dc4a725..026db66 100644
--- a/simulation/node.go
+++ b/simulation/node.go
@@ -70,18 +70,19 @@ type node struct {
func newNode(prvKey crypto.PrivateKey, logger common.Logger,
cfg config.Config) *node {
pubKey := prvKey.PublicKey()
- netModule := test.NewNetwork(
- pubKey,
- &test.NormalLatencyModel{
- Mean: cfg.Networking.Mean,
- Sigma: cfg.Networking.Sigma,
+ netModule := test.NewNetwork(pubKey, test.NetworkConfig{
+ Type: cfg.Networking.Type,
+ PeerServer: cfg.Networking.PeerServer,
+ PeerPort: peerPort,
+ DirectLatency: &test.NormalLatencyModel{
+ Mean: cfg.Networking.Direct.Mean,
+ Sigma: cfg.Networking.Direct.Sigma,
},
- test.NewDefaultMarshaller(&jsonMarshaller{}),
- test.NetworkConfig{
- Type: cfg.Networking.Type,
- PeerServer: cfg.Networking.PeerServer,
- PeerPort: peerPort,
- })
+ GossipLatency: &test.NormalLatencyModel{
+ Mean: cfg.Networking.Gossip.Mean,
+ Sigma: cfg.Networking.Gossip.Sigma,
+ },
+ Marshaller: test.NewDefaultMarshaller(&jsonMarshaller{})})
id := types.NewNodeID(pubKey)
dbInst, err := db.NewMemBackedDB(id.String() + ".db")
if err != nil {
diff --git a/simulation/peer-server.go b/simulation/peer-server.go
index 14a825a..69ed029 100644
--- a/simulation/peer-server.go
+++ b/simulation/peer-server.go
@@ -90,7 +90,8 @@ func (p *PeerServer) handleBlockList(id types.NodeID, blocks *BlockList) {
}
p.verifiedLen += uint64(length)
if p.verifiedLen >= p.cfg.Node.MaxBlock {
- if err := p.trans.Broadcast(ntfShutdown); err != nil {
+ if err := p.trans.Broadcast(
+ p.peers, &test.FixedLatencyModel{}, ntfShutdown); err != nil {
panic(err)
}
}
@@ -199,7 +200,8 @@ func (p *PeerServer) Run() {
}
// Cache peers' info.
for _, pubKey := range p.trans.Peers() {
- p.peers[types.NewNodeID(pubKey)] = struct{}{}
+ nID := types.NewNodeID(pubKey)
+ p.peers[nID] = struct{}{}
}
// Pick a mater node to execute pending config changes.
for nID := range p.peers {
@@ -225,7 +227,8 @@ func (p *PeerServer) Run() {
break
}
}
- if err := p.trans.Broadcast(ntfReady); err != nil {
+ if err := p.trans.Broadcast(
+ p.peers, &test.FixedLatencyModel{}, ntfReady); err != nil {
panic(err)
}
log.Println("Simulation is ready to go with", len(p.peers), "nodes")
diff --git a/test_config/test-config-change.toml b/test_config/test-config-change.toml
index 5950b30..396e627 100644
--- a/test_config/test-config-change.toml
+++ b/test_config/test-config-change.toml
@@ -38,9 +38,12 @@ propose_interval_sigma = 5e+01
[networking]
type = "fake"
peer_server = "127.0.0.1"
+[networking.direct]
mean = 1e+01
sigma = 1e+01
-loss_rate_value = 0e+00
+[networking.gossip]
+mean = 3e+01
+sigma = 3e+01
[scheduler]
worker_num = 2
diff --git a/test_config/test.toml b/test_config/test.toml
index 0261d8d..8c6e342 100644
--- a/test_config/test.toml
+++ b/test_config/test.toml
@@ -23,9 +23,12 @@ propose_interval_sigma = 5e+01
[networking]
type = "fake"
peer_server = "127.0.0.1"
+[networking.direct]
mean = 1e+01
sigma = 1e+01
-loss_rate_value = 0e+00
+[networking.gossip]
+mean = 3e+01
+sigma = 3e+01
[scheduler]
worker_num = 2