aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-09-11 14:56:47 +0800
committerGitHub <noreply@github.com>2018-09-11 14:56:47 +0800
commit292ad73ec08621fa9beef5f028860131fcbf9bd9 (patch)
tree47644eaad7757cd2e8798ae7fe361fa5d6e99060 /core
parent582a491aa0bcb784ac7b65ebbfb42139945ea703 (diff)
downloaddexon-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar
dexon-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar.gz
dexon-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar.bz2
dexon-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar.lz
dexon-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar.xz
dexon-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar.zst
dexon-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.zip
simulation: integrate test.Transport (#99)
- Add marshaller for simulation by encoding/json - Implement peer server based on test.TranportServer - Remove network models, they are replaced with test.LatencyModel
Diffstat (limited to 'core')
-rw-r--r--core/consensus.go1
-rw-r--r--core/test/tcp-transport.go122
-rw-r--r--core/test/utils.go25
3 files changed, 115 insertions, 33 deletions
diff --git a/core/consensus.go b/core/consensus.go
index a9ec6a5..e021024 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -346,7 +346,6 @@ func (con *Consensus) processMsg(
if err := blockProcesser(val); err != nil {
fmt.Println(err)
}
- //types.RecycleBlock(val)
case *types.NotaryAck:
if err := con.ProcessNotaryAck(val); err != nil {
fmt.Println(err)
diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go
index 673a5a1..2afea14 100644
--- a/core/test/tcp-transport.go
+++ b/core/test/tcp-transport.go
@@ -35,10 +35,11 @@ import (
"github.com/dexon-foundation/dexon-consensus-core/core/types"
)
-// peerInfo describe connection info of one peer in TCP transport.
-type peerInfo struct {
+// tcpMessage is the general message between peers and server.
+type tcpMessage struct {
ValidatorID types.ValidatorID `json:"vid"`
- Conn string `json:"conn"`
+ Type string `json:"type"`
+ Info string `json:"conn"`
}
// TCPTransport implements Transport interface via TCP connection.
@@ -162,8 +163,8 @@ func (t *TCPTransport) marshalMessage(
switch msg.(type) {
case map[types.ValidatorID]string:
msgCarrier.Type = "peerlist"
- case *peerInfo:
- msgCarrier.Type = "peer"
+ case *tcpMessage:
+ msgCarrier.Type = "trans-msg"
default:
if t.marshaller == nil {
err = fmt.Errorf("unknown msg type: %v", msg)
@@ -209,12 +210,12 @@ func (t *TCPTransport) unmarshalMessage(
return
}
msg = peers
- case "peer":
- peer := &peerInfo{}
- if err = json.Unmarshal(msgCarrier.Payload, peer); err != nil {
+ case "trans-msg":
+ m := &tcpMessage{}
+ if err = json.Unmarshal(msgCarrier.Payload, m); err != nil {
return
}
- msg = peer
+ msg = m
default:
if t.marshaller == nil {
err = fmt.Errorf("unknown msg type: %v", msgCarrier.Type)
@@ -443,6 +444,7 @@ func (t *TCPTransportClient) Join(
envelopes = []*TransportEnvelope{}
ok bool
addr string
+ conn string
)
for {
addr = net.JoinHostPort("0.0.0.0", strconv.Itoa(t.localPort))
@@ -479,27 +481,57 @@ func (t *TCPTransportClient) Join(
return
}
t.serverWriteChannel = t.connWriter(serverConn)
- err = t.Report(&peerInfo{
+ if t.local {
+ conn = addr
+ } else {
+ // Find my IP.
+ var ip string
+ if ip, err = FindMyIP(); err != nil {
+ return
+ }
+ conn = net.JoinHostPort(ip, strconv.Itoa(t.localPort))
+ }
+ if err = t.Report(&tcpMessage{
+ Type: "conn",
ValidatorID: t.vID,
- Conn: addr,
- })
+ Info: conn,
+ }); err != nil {
+ return
+ }
// Wait for peers list sent by server.
+ e := <-t.recvChannel
+ if t.peersInfo, ok = e.Msg.(map[types.ValidatorID]string); !ok {
+ panic(fmt.Errorf("expect peer list, not %v", e))
+ }
+ // Setup connections to other peers.
+ if err = t.buildConnectionsToPeers(); err != nil {
+ return
+ }
+ // Report to server that the connections to other peers are ready.
+ if err = t.Report(&tcpMessage{
+ Type: "conn-ready",
+ ValidatorID: t.vID,
+ }); err != nil {
+ return
+ }
+ // Wait for server to ack us that all peers are ready.
for {
e := <-t.recvChannel
- if t.peersInfo, ok = e.Msg.(map[types.ValidatorID]string); !ok {
+ msg, ok := e.Msg.(*tcpMessage)
+ if !ok {
envelopes = append(envelopes, e)
continue
}
- // Replay those messages sent before peer list.
- if len(envelopes) > cap(t.recvChannel)-len(t.recvChannel) {
- panic(fmt.Errorf("unable to replay pending messages"))
- }
- for _, e := range envelopes {
- t.recvChannel <- e
+ if msg.Type != "all-ready" {
+ err = fmt.Errorf("expected ready message, but %v", msg)
+ return
}
break
}
- t.buildConnectionsToPeers()
+ // Replay those messages sent before peer list and ready-ack.
+ for _, e := range envelopes {
+ t.recvChannel <- e
+ }
ch = t.recvChannel
return
}
@@ -545,24 +577,50 @@ func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) {
// Collect peers info. Packets other than peer info is
// unexpected.
for {
- select {
- case <-t.ctx.Done():
- err = fmt.Errorf("cancel waiting")
- return
- case e := <-t.recvChannel:
- peer, ok := e.Msg.(*peerInfo)
- if !ok {
- panic(fmt.Errorf("expect peerInfo, not %v", e))
- }
- t.peersInfo[peer.ValidatorID] = peer.Conn
+ // Wait for connection info reported by peers.
+ e := <-t.recvChannel
+ msg, ok := e.Msg.(*tcpMessage)
+ if !ok {
+ panic(fmt.Errorf("expect tcpMessage, not %v", e))
}
+ if msg.Type != "conn" {
+ panic(fmt.Errorf("expect connection report, not %v", e))
+ }
+ t.peersInfo[msg.ValidatorID] = msg.Info
// Check if we already collect enought peers.
if len(t.peersInfo) == numPeers {
break
}
}
// Send collected peers back to them.
- t.buildConnectionsToPeers()
- t.Broadcast(t.peersInfo)
+ if err = t.buildConnectionsToPeers(); err != nil {
+ return
+ }
+ if err = t.Broadcast(t.peersInfo); err != nil {
+ return
+ }
+ // Wait for peers to send 'ready' report.
+ readies := make(map[types.ValidatorID]struct{})
+ for {
+ e := <-t.recvChannel
+ msg, ok := e.Msg.(*tcpMessage)
+ if !ok {
+ panic(fmt.Errorf("expect tcpMessage, not %v", e))
+ }
+ if msg.Type != "conn-ready" {
+ panic(fmt.Errorf("expect connection ready, not %v", e))
+ }
+ if _, reported := readies[msg.ValidatorID]; reported {
+ panic(fmt.Errorf("already report conn-ready message: %v", e))
+ }
+ readies[msg.ValidatorID] = struct{}{}
+ if len(readies) == numPeers {
+ break
+ }
+ }
+ // Ack all peers ready to go.
+ if err = t.Broadcast(&tcpMessage{Type: "all-ready"}); err != nil {
+ return
+ }
return
}
diff --git a/core/test/utils.go b/core/test/utils.go
index 5f92ad9..138e8a1 100644
--- a/core/test/utils.go
+++ b/core/test/utils.go
@@ -18,7 +18,9 @@
package test
import (
+ "fmt"
"math"
+ "net"
"time"
"github.com/dexon-foundation/dexon-consensus-core/common"
@@ -63,3 +65,26 @@ func CalcLatencyStatistics(latencies []time.Duration) (avg, dev time.Duration) {
dev = time.Duration(math.Sqrt(sumOfSquareDiff / float64(len(latencies)-1)))
return
}
+
+// FindMyIP returns local IP address.
+func FindMyIP() (ip string, err error) {
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ return
+ }
+ for _, a := range addrs {
+ ipnet, ok := a.(*net.IPNet)
+ if !ok {
+ continue
+ }
+ if ipnet.IP.IsLoopback() {
+ continue
+ }
+ if ipnet.IP.To4() != nil {
+ ip = ipnet.IP.String()
+ return
+ }
+ }
+ err = fmt.Errorf("unable to find IP")
+ return
+}