aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-09-10 16:11:10 +0800
committerGitHub <noreply@github.com>2018-09-10 16:11:10 +0800
commit2439f49063d8498eadf26d4fa1220c5eac8412a8 (patch)
tree1142ad5a5e4393315f956324191ddb7e03b804c3 /core
parent2b5c97e53e9734dda971456ff483bf2b34f0f021 (diff)
downloaddexon-consensus-2439f49063d8498eadf26d4fa1220c5eac8412a8.tar
dexon-consensus-2439f49063d8498eadf26d4fa1220c5eac8412a8.tar.gz
dexon-consensus-2439f49063d8498eadf26d4fa1220c5eac8412a8.tar.bz2
dexon-consensus-2439f49063d8498eadf26d4fa1220c5eac8412a8.tar.lz
dexon-consensus-2439f49063d8498eadf26d4fa1220c5eac8412a8.tar.xz
dexon-consensus-2439f49063d8498eadf26d4fa1220c5eac8412a8.tar.zst
dexon-consensus-2439f49063d8498eadf26d4fa1220c5eac8412a8.zip
test: add transport layer (#97)
The purpose of transport layer is to abstract the way to send messages and setup connections between peers in a p2p network. The peer discovery is simulated by a hosted server: every peer sends its address to a known server. Once collecting enough peers, respond the whole peers lists to all peers. Changes: - Add test.Trasnport interface - Add test.Transport implementation by golang channel. - Add test.transport implementation by TCP connection. - Move LatencyModel to core/test package - Add Marshaller interface
Diffstat (limited to 'core')
-rw-r--r--core/test/fake-transport.go174
-rw-r--r--core/test/interface.go64
-rw-r--r--core/test/latency.go54
-rw-r--r--core/test/tcp-transport.go568
-rw-r--r--core/test/transport_test.go282
5 files changed, 1142 insertions, 0 deletions
diff --git a/core/test/fake-transport.go b/core/test/fake-transport.go
new file mode 100644
index 0000000..2615bd4
--- /dev/null
+++ b/core/test/fake-transport.go
@@ -0,0 +1,174 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package test
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus-core/core/types"
+)
+
+// FakeTransport implement TransportServer and TransportClient interface
+// by using golang channel.
+type FakeTransport struct {
+ peerType TransportPeerType
+ vID types.ValidatorID
+ recvChannel chan *TransportEnvelope
+ serverChannel chan<- *TransportEnvelope
+ peers map[types.ValidatorID]chan<- *TransportEnvelope
+ latency LatencyModel
+}
+
+// NewFakeTransportServer constructs FakeTransport instance for peer server.
+func NewFakeTransportServer() TransportServer {
+ return &FakeTransport{
+ peerType: TransportPeerServer,
+ recvChannel: make(chan *TransportEnvelope, 1000),
+ }
+}
+
+// NewFakeTransportClient constructs FakeTransport instance for peer.
+func NewFakeTransportClient(
+ vID types.ValidatorID, latency LatencyModel) TransportClient {
+
+ return &FakeTransport{
+ peerType: TransportPeer,
+ recvChannel: make(chan *TransportEnvelope, 1000),
+ vID: vID,
+ latency: latency,
+ }
+}
+
+// Send implements Transport.Send method.
+func (t *FakeTransport) Send(
+ endpoint types.ValidatorID, msg interface{}) (err error) {
+
+ ch, exists := t.peers[endpoint]
+ if !exists {
+ err = fmt.Errorf("the endpoint does not exists: %v", endpoint)
+ return
+ }
+ go func(ch chan<- *TransportEnvelope) {
+ if t.latency != nil {
+ time.Sleep(t.latency.Delay())
+ }
+ ch <- &TransportEnvelope{
+ PeerType: t.peerType,
+ From: t.vID,
+ Msg: msg,
+ }
+ }(ch)
+ return
+}
+
+// Report implements Transport.Report method.
+func (t *FakeTransport) Report(msg interface{}) (err error) {
+ go func() {
+ t.serverChannel <- &TransportEnvelope{
+ PeerType: TransportPeer,
+ From: t.vID,
+ Msg: msg,
+ }
+ }()
+ return
+}
+
+// Broadcast implements Transport.Broadcast method.
+func (t *FakeTransport) Broadcast(msg interface{}) (err error) {
+ for k := range t.peers {
+ if k == t.vID {
+ continue
+ }
+ t.Send(k, msg)
+ }
+ return
+}
+
+// Close implements Transport.Close method.
+func (t *FakeTransport) Close() (err error) {
+ close(t.recvChannel)
+ return
+}
+
+// Peers implements Transport.Peers method.
+func (t *FakeTransport) Peers() (peers map[types.ValidatorID]struct{}) {
+ peers = make(map[types.ValidatorID]struct{})
+ for vID := range t.peers {
+ peers[vID] = struct{}{}
+ }
+ return
+}
+
+// Join implements TransportClient.Join method.
+func (t *FakeTransport) Join(
+ serverEndpoint interface{}) (<-chan *TransportEnvelope, error) {
+
+ var (
+ envelopes = []*TransportEnvelope{}
+ ok bool
+ )
+ if t.serverChannel, ok = serverEndpoint.(chan *TransportEnvelope); !ok {
+ return nil, fmt.Errorf("accept channel of *TransportEnvelope when join")
+ }
+ t.Report(t)
+ // Wait for peers info.
+ for {
+ envelope := <-t.recvChannel
+ if envelope.PeerType != TransportPeerServer {
+ envelopes = append(envelopes, envelope)
+ continue
+ }
+ if t.peers, ok =
+ envelope.Msg.(map[types.ValidatorID]chan<- *TransportEnvelope); !ok {
+
+ envelopes = append(envelopes, envelope)
+ continue
+ }
+ for _, envelope := range envelopes {
+ t.recvChannel <- envelope
+ }
+ break
+ }
+ return t.recvChannel, nil
+}
+
+// Host implements TransportServer.Host method.
+func (t *FakeTransport) Host() (chan *TransportEnvelope, error) {
+ return t.recvChannel, nil
+}
+
+// WaitForPeers implements TransportServer.WaitForPeers method.
+func (t *FakeTransport) WaitForPeers(numPeers int) (err error) {
+ t.peers = make(map[types.ValidatorID]chan<- *TransportEnvelope)
+ for {
+ envelope := <-t.recvChannel
+ // Panic here if some peer send other stuffs before
+ // receiving peer lists.
+ newPeer := envelope.Msg.(*FakeTransport)
+ t.peers[envelope.From] = newPeer.recvChannel
+ if len(t.peers) == numPeers {
+ break
+ }
+ }
+ // The collected peer channels are shared for all peers.
+ if err = t.Broadcast(t.peers); err != nil {
+ return
+ }
+ return
+}
diff --git a/core/test/interface.go b/core/test/interface.go
index 0e963fd..9932262 100644
--- a/core/test/interface.go
+++ b/core/test/interface.go
@@ -46,3 +46,67 @@ type EventHandler interface {
// Handle the event belongs to this handler, and return derivated events.
Handle(*Event) []*Event
}
+
+// TransportPeerType defines the type of peer, either 'peer' or 'server'.
+type TransportPeerType string
+
+const (
+ // TransportPeerServer is the type of peer server.
+ TransportPeerServer TransportPeerType = "server"
+ // TransportPeer is the type of peer.
+ TransportPeer TransportPeerType = "peer"
+)
+
+// TransportEnvelope define the payload format of a message when transporting.
+type TransportEnvelope struct {
+ // PeerType defines the type of source peer, could be either "peer" or
+ // "server".
+ PeerType TransportPeerType
+ // From defines the validatorID of the source peer.
+ From types.ValidatorID
+ // Msg is the actual payload of this message.
+ Msg interface{}
+}
+
+// TransportServer defines the peer server in the network.
+type TransportServer interface {
+ Transport
+ // Host the server, consider it's a setup procedure. The
+ // returned channel could be used after 'WaitForPeers' returns.
+ Host() (chan *TransportEnvelope, error)
+ // WaitForPeers waits for all peers to join the network.
+ WaitForPeers(numPeers int) error
+}
+
+// TransportClient defines those peers in the network.
+type TransportClient interface {
+ Transport
+ // Report a message to the peer server.
+ Report(msg interface{}) error
+ // Join the network, should block until joined.
+ Join(serverEndpoint interface{}) (<-chan *TransportEnvelope, error)
+}
+
+// Transport defines the interface for basic transportation capabilities.
+type Transport interface {
+ // Broadcast a message to all peers in network.
+ Broadcast(msg interface{}) error
+ // Send one message to a peer.
+ Send(endpoint types.ValidatorID, msg interface{}) error
+ // Close would cleanup allocated resources.
+ Close() error
+
+ // Peers return IDs of all connected validators in p2p favor.
+ // This method should be accessed after ether 'Join' or 'WaitForPeers'
+ // returned.
+ Peers() map[types.ValidatorID]struct{}
+}
+
+// Marshaller defines an interface to convert between interface{} and []byte.
+type Marshaller interface {
+ // Unmarshal converts a []byte back to interface{} based on the type
+ // of message.
+ Unmarshal(msgType string, payload []byte) (msg interface{}, err error)
+ // Marshal converts a message to byte string
+ Marshal(msg interface{}) (msgType string, payload []byte, err error)
+}
diff --git a/core/test/latency.go b/core/test/latency.go
new file mode 100644
index 0000000..0fe9277
--- /dev/null
+++ b/core/test/latency.go
@@ -0,0 +1,54 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package test
+
+import (
+ "math/rand"
+ "time"
+)
+
+// LatencyModel defines an interface to randomly decide latency
+// for one operation.
+type LatencyModel interface {
+ Delay() time.Duration
+}
+
+// NormalLatencyModel would return latencies in normal distribution.
+type NormalLatencyModel struct {
+ Sigma float64
+ Mean float64
+}
+
+// Delay implements LatencyModel interface.
+func (m *NormalLatencyModel) Delay() time.Duration {
+ delay := rand.NormFloat64()*m.Sigma + m.Mean
+ if delay < 0 {
+ delay = m.Sigma / 2
+ }
+ return time.Duration(delay) * time.Millisecond
+}
+
+// FixedLatencyModel return fixed latencies.
+type FixedLatencyModel struct {
+ Latency float64
+}
+
+// Delay implements LatencyModel interface.
+func (m *FixedLatencyModel) Delay() time.Duration {
+ return time.Duration(m.Latency) * time.Millisecond
+}
diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go
new file mode 100644
index 0000000..673a5a1
--- /dev/null
+++ b/core/test/tcp-transport.go
@@ -0,0 +1,568 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package test
+
+import (
+ "context"
+ "encoding/binary"
+ "encoding/json"
+ "fmt"
+ "io"
+ "math"
+ "math/rand"
+ "net"
+ "os"
+ "strconv"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus-core/core/types"
+)
+
+// peerInfo describe connection info of one peer in TCP transport.
+type peerInfo struct {
+ ValidatorID types.ValidatorID `json:"vid"`
+ Conn string `json:"conn"`
+}
+
+// TCPTransport implements Transport interface via TCP connection.
+type TCPTransport struct {
+ peerType TransportPeerType
+ vID types.ValidatorID
+ localPort int
+ peersInfo map[types.ValidatorID]string
+ peers map[types.ValidatorID]chan<- []byte
+ peersLock sync.RWMutex
+ recvChannel chan *TransportEnvelope
+ ctx context.Context
+ cancel context.CancelFunc
+ latency LatencyModel
+ marshaller Marshaller
+}
+
+// NewTCPTransport constructs an TCPTransport instance.
+func NewTCPTransport(
+ peerType TransportPeerType,
+ vID types.ValidatorID,
+ latency LatencyModel,
+ marshaller Marshaller,
+ localPort int) *TCPTransport {
+
+ ctx, cancel := context.WithCancel(context.Background())
+ return &TCPTransport{
+ peerType: peerType,
+ vID: vID,
+ peersInfo: make(map[types.ValidatorID]string),
+ peers: make(map[types.ValidatorID]chan<- []byte),
+ recvChannel: make(chan *TransportEnvelope, 1000),
+ ctx: ctx,
+ cancel: cancel,
+ localPort: localPort,
+ latency: latency,
+ marshaller: marshaller,
+ }
+}
+
+// Send implements Transport.Send method.
+func (t *TCPTransport) Send(
+ endpoint types.ValidatorID, msg interface{}) (err error) {
+
+ payload, err := t.marshalMessage(msg)
+ if err != nil {
+ return
+ }
+ go func() {
+ if t.latency != nil {
+ time.Sleep(t.latency.Delay())
+ }
+
+ t.peersLock.RLock()
+ defer t.peersLock.RUnlock()
+
+ t.peers[endpoint] <- payload
+ }()
+ return
+}
+
+// Broadcast implements Transport.Broadcast method.
+func (t *TCPTransport) Broadcast(msg interface{}) (err error) {
+ payload, err := t.marshalMessage(msg)
+ if err != nil {
+ return
+ }
+ t.peersLock.RLock()
+ defer t.peersLock.RUnlock()
+
+ for vID, ch := range t.peers {
+ if vID == t.vID {
+ continue
+ }
+ go func(ch chan<- []byte) {
+ if t.latency != nil {
+ time.Sleep(t.latency.Delay())
+ }
+ ch <- payload
+ }(ch)
+ }
+ return
+}
+
+// Close implements Transport.Close method.
+func (t *TCPTransport) Close() (err error) {
+ // Tell all routines raised by us to die.
+ t.cancel()
+ // Reset peers.
+ t.peersLock.Lock()
+ defer t.peersLock.Unlock()
+ t.peers = make(map[types.ValidatorID]chan<- []byte)
+ // Tell our user that this channel is closed.
+ close(t.recvChannel)
+ t.recvChannel = nil
+ return
+}
+
+// Peers implements Transport.Peers method.
+func (t *TCPTransport) Peers() (peers map[types.ValidatorID]struct{}) {
+ peers = make(map[types.ValidatorID]struct{})
+ for vID := range t.peersInfo {
+ peers[vID] = struct{}{}
+ }
+ return
+}
+
+func (t *TCPTransport) marshalMessage(
+ msg interface{}) (payload []byte, err error) {
+
+ msgCarrier := struct {
+ PeerType TransportPeerType `json:"peer_type"`
+ From types.ValidatorID `json:"from"`
+ Type string `json:"type"`
+ Payload interface{} `json:"payload"`
+ }{
+ PeerType: t.peerType,
+ From: t.vID,
+ Payload: msg,
+ }
+ switch msg.(type) {
+ case map[types.ValidatorID]string:
+ msgCarrier.Type = "peerlist"
+ case *peerInfo:
+ msgCarrier.Type = "peer"
+ default:
+ if t.marshaller == nil {
+ err = fmt.Errorf("unknown msg type: %v", msg)
+ break
+ }
+ // Delegate to user defined marshaller.
+ var buff []byte
+ msgCarrier.Type, buff, err = t.marshaller.Marshal(msg)
+ if err != nil {
+ break
+ }
+ msgCarrier.Payload = json.RawMessage(buff)
+ }
+ if err != nil {
+ return
+ }
+ payload, err = json.Marshal(msgCarrier)
+ return
+}
+
+func (t *TCPTransport) unmarshalMessage(
+ payload []byte) (
+ peerType TransportPeerType,
+ from types.ValidatorID,
+ msg interface{},
+ err error) {
+
+ msgCarrier := struct {
+ PeerType TransportPeerType `json:"peer_type"`
+ From types.ValidatorID `json:"from"`
+ Type string `json:"type"`
+ Payload json.RawMessage `json:"payload"`
+ }{}
+ if err = json.Unmarshal(payload, &msgCarrier); err != nil {
+ return
+ }
+ peerType = msgCarrier.PeerType
+ from = msgCarrier.From
+ switch msgCarrier.Type {
+ case "peerlist":
+ var peers map[types.ValidatorID]string
+ if err = json.Unmarshal(msgCarrier.Payload, &peers); err != nil {
+ return
+ }
+ msg = peers
+ case "peer":
+ peer := &peerInfo{}
+ if err = json.Unmarshal(msgCarrier.Payload, peer); err != nil {
+ return
+ }
+ msg = peer
+ default:
+ if t.marshaller == nil {
+ err = fmt.Errorf("unknown msg type: %v", msgCarrier.Type)
+ break
+ }
+ msg, err = t.marshaller.Unmarshal(msgCarrier.Type, msgCarrier.Payload)
+ }
+ return
+}
+
+// connReader is a reader routine to read from a TCP connection.
+func (t *TCPTransport) connReader(conn net.Conn) {
+ defer func() {
+ if err := conn.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ var (
+ msgLengthInByte [4]byte
+ msgLength uint32
+ err error
+ payload = make([]byte, 4096)
+ )
+
+ checkErr := func(err error) (toBreak bool) {
+ if err == io.EOF {
+ toBreak = true
+ return
+ }
+ // Check if timeout.
+ nErr, ok := err.(*net.OpError)
+ if !ok {
+ panic(err)
+ }
+ if !nErr.Timeout() {
+ panic(err)
+ }
+ return
+ }
+Loop:
+ for {
+ select {
+ case <-t.ctx.Done():
+ break Loop
+ default:
+ }
+ // Add timeout when reading to check if shutdown.
+ if err := conn.SetReadDeadline(
+ time.Now().Add(2 * time.Second)); err != nil {
+
+ panic(err)
+ }
+ // Read message length.
+ if _, err = io.ReadFull(conn, msgLengthInByte[:]); err != nil {
+ if checkErr(err) {
+ break
+ }
+ continue
+ }
+ msgLength = binary.LittleEndian.Uint32(msgLengthInByte[:])
+ // Resize buffer
+ if msgLength > uint32(len(payload)) {
+ payload = make([]byte, msgLength)
+ }
+ buff := payload[:msgLength]
+ // Read the message in bytes.
+ if _, err = io.ReadFull(conn, buff); err != nil {
+ if checkErr(err) {
+ break
+ }
+ continue
+ }
+ peerType, from, msg, err := t.unmarshalMessage(buff)
+ if err != nil {
+ panic(err)
+ }
+ t.recvChannel <- &TransportEnvelope{
+ PeerType: peerType,
+ From: from,
+ Msg: msg,
+ }
+ }
+}
+
+// connWriter is a writer routine to write to TCP connection.
+func (t *TCPTransport) connWriter(conn net.Conn) chan<- []byte {
+ ch := make(chan []byte, 1000)
+ go func() {
+ defer func() {
+ close(ch)
+ if err := conn.Close(); err != nil {
+ panic(err)
+ }
+ }()
+ for {
+ select {
+ case <-t.ctx.Done():
+ return
+ default:
+ }
+ select {
+ case <-t.ctx.Done():
+ return
+ case msg := <-ch:
+ // Send message length in uint32.
+ var msgLength [4]byte
+ if len(msg) > math.MaxUint32 {
+ panic(fmt.Errorf("message size overflow"))
+ }
+ binary.LittleEndian.PutUint32(msgLength[:], uint32(len(msg)))
+ if _, err := conn.Write(msgLength[:]); err != nil {
+ panic(err)
+ }
+ // Send the payload.
+ if _, err := conn.Write(msg); err != nil {
+ panic(err)
+ }
+ }
+ }
+ }()
+ return ch
+}
+
+// listenerRoutine is a routine to accept incoming request for TCP connection.
+func (t *TCPTransport) listenerRoutine(listener *net.TCPListener) {
+ defer func() {
+ if err := listener.Close(); err != nil {
+ panic(err)
+ }
+ }()
+ for {
+ select {
+ case <-t.ctx.Done():
+ return
+ default:
+ }
+
+ listener.SetDeadline(time.Now().Add(5 * time.Second))
+ conn, err := listener.Accept()
+ if err != nil {
+ // Check if timeout error.
+ nErr, ok := err.(*net.OpError)
+ if !ok {
+ panic(err)
+ }
+ if !nErr.Timeout() {
+ panic(err)
+ }
+ continue
+ }
+ go t.connReader(conn)
+ }
+}
+
+// buildConnectionToPeers constructs TCP connections to each peer.
+// Although TCP connection could be used for both read/write operation,
+// we only utilize the write part for simplicity.
+func (t *TCPTransport) buildConnectionsToPeers() (err error) {
+ var wg sync.WaitGroup
+ for vID, addr := range t.peersInfo {
+ if vID == t.vID {
+ continue
+ }
+ wg.Add(1)
+ go func(vID types.ValidatorID, addr string) {
+ defer wg.Done()
+
+ conn, localErr := net.Dial("tcp", addr)
+ if localErr != nil {
+ // Propagate this error to outside, at least one error
+ // could be returned to caller.
+ err = localErr
+ return
+ }
+ t.peersLock.Lock()
+ defer t.peersLock.Unlock()
+
+ t.peers[vID] = t.connWriter(conn)
+ }(vID, addr)
+ }
+ wg.Wait()
+ return
+}
+
+// TCPTransportClient implement TransportClient base on TCP connection.
+type TCPTransportClient struct {
+ TCPTransport
+ local bool
+ serverWriteChannel chan<- []byte
+}
+
+// NewTCPTransportClient constructs a TCPTransportClient instance.
+func NewTCPTransportClient(
+ vID types.ValidatorID,
+ latency LatencyModel,
+ marshaller Marshaller,
+ local bool) *TCPTransportClient {
+
+ return &TCPTransportClient{
+ TCPTransport: *NewTCPTransport(TransportPeer, vID, latency, marshaller, 8080),
+ local: local,
+ }
+}
+
+// Report implements TransportClient.Report method.
+func (t *TCPTransportClient) Report(msg interface{}) (err error) {
+ payload, err := t.marshalMessage(msg)
+ if err != nil {
+ return
+ }
+ go func() {
+ t.serverWriteChannel <- payload
+ }()
+ return
+}
+
+// Join implements TransportClient.Join method.
+func (t *TCPTransportClient) Join(
+ serverEndpoint interface{}) (ch <-chan *TransportEnvelope, err error) {
+
+ // Initiate a TCP server.
+ // TODO(mission): config initial listening port.
+ var (
+ ln net.Listener
+ envelopes = []*TransportEnvelope{}
+ ok bool
+ addr string
+ )
+ for {
+ addr = net.JoinHostPort("0.0.0.0", strconv.Itoa(t.localPort))
+ ln, err = net.Listen("tcp", addr)
+ if err == nil {
+ break
+ }
+ if !t.local {
+ return
+ }
+ // In local-tcp, retry with other port when the address is in use.
+ operr, ok := err.(*net.OpError)
+ if !ok {
+ panic(err)
+ }
+ oserr, ok := operr.Err.(*os.SyscallError)
+ if !ok {
+ panic(operr)
+ }
+ errno, ok := oserr.Err.(syscall.Errno)
+ if !ok {
+ panic(oserr)
+ }
+ if errno != syscall.EADDRINUSE {
+ panic(errno)
+ }
+ // The port is used, generate another port randomly.
+ t.localPort = 1024 + rand.Int()%1024
+ }
+ go t.listenerRoutine(ln.(*net.TCPListener))
+
+ serverConn, err := net.Dial("tcp", serverEndpoint.(string))
+ if err != nil {
+ return
+ }
+ t.serverWriteChannel = t.connWriter(serverConn)
+ err = t.Report(&peerInfo{
+ ValidatorID: t.vID,
+ Conn: addr,
+ })
+ // Wait for peers list sent by server.
+ for {
+ e := <-t.recvChannel
+ if t.peersInfo, ok = e.Msg.(map[types.ValidatorID]string); !ok {
+ envelopes = append(envelopes, e)
+ continue
+ }
+ // Replay those messages sent before peer list.
+ if len(envelopes) > cap(t.recvChannel)-len(t.recvChannel) {
+ panic(fmt.Errorf("unable to replay pending messages"))
+ }
+ for _, e := range envelopes {
+ t.recvChannel <- e
+ }
+ break
+ }
+ t.buildConnectionsToPeers()
+ ch = t.recvChannel
+ return
+}
+
+// TCPTransportServer implements TransportServer via TCP connections.
+type TCPTransportServer struct {
+ TCPTransport
+}
+
+// NewTCPTransportServer constructs TCPTransportServer instance.
+func NewTCPTransportServer(
+ marshaller Marshaller,
+ serverPort int) *TCPTransportServer {
+
+ return &TCPTransportServer{
+ // NOTE: the assumption here is the validator ID of peers
+ // won't be zero.
+ TCPTransport: *NewTCPTransport(
+ TransportPeerServer,
+ types.ValidatorID{},
+ nil,
+ marshaller,
+ serverPort),
+ }
+}
+
+// Host implements TransportServer.Host method.
+func (t *TCPTransportServer) Host() (chan *TransportEnvelope, error) {
+ // The port of peer server should be known to other peers,
+ // if we can listen on the pre-defiend part, we don't have to
+ // retry with other random ports.
+ ln, err := net.Listen(
+ "tcp", net.JoinHostPort("0.0.0.0", strconv.Itoa(t.localPort)))
+ if err != nil {
+ return nil, err
+ }
+ go t.listenerRoutine(ln.(*net.TCPListener))
+ return t.recvChannel, nil
+}
+
+// WaitForPeers implements TransportServer.WaitForPeers method.
+func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) {
+ // Collect peers info. Packets other than peer info is
+ // unexpected.
+ for {
+ select {
+ case <-t.ctx.Done():
+ err = fmt.Errorf("cancel waiting")
+ return
+ case e := <-t.recvChannel:
+ peer, ok := e.Msg.(*peerInfo)
+ if !ok {
+ panic(fmt.Errorf("expect peerInfo, not %v", e))
+ }
+ t.peersInfo[peer.ValidatorID] = peer.Conn
+ }
+ // Check if we already collect enought peers.
+ if len(t.peersInfo) == numPeers {
+ break
+ }
+ }
+ // Send collected peers back to them.
+ t.buildConnectionsToPeers()
+ t.Broadcast(t.peersInfo)
+ return
+}
diff --git a/core/test/transport_test.go b/core/test/transport_test.go
new file mode 100644
index 0000000..9bfc12b
--- /dev/null
+++ b/core/test/transport_test.go
@@ -0,0 +1,282 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package test
+
+import (
+ "encoding/json"
+ "fmt"
+ "net"
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus-core/common"
+ "github.com/dexon-foundation/dexon-consensus-core/core/types"
+ "github.com/stretchr/testify/suite"
+)
+
+type testPeer struct {
+ vID types.ValidatorID
+ trans TransportClient
+ recv <-chan *TransportEnvelope
+ expectedEchoHash common.Hash
+ echoBlock *types.Block
+ myBlock *types.Block
+ myBlockSentTime time.Time
+ blocks map[types.ValidatorID]*types.Block
+ blocksReceiveTime map[common.Hash]time.Time
+}
+
+type testPeerServer struct {
+ trans TransportServer
+ recv chan *TransportEnvelope
+ peerBlocks map[types.ValidatorID]*types.Block
+}
+
+type testMarshaller struct{}
+
+func (m *testMarshaller) Unmarshal(
+ msgType string, payload []byte) (msg interface{}, err error) {
+
+ switch msgType {
+ case "block":
+ block := &types.Block{}
+ if err = json.Unmarshal(payload, block); err != nil {
+ return
+ }
+ msg = block
+ default:
+ err = fmt.Errorf("unknown message type: %v", msgType)
+ }
+ return
+}
+
+func (m *testMarshaller) Marshal(
+ msg interface{}) (msgType string, payload []byte, err error) {
+
+ switch msg.(type) {
+ case *types.Block:
+ if payload, err = json.Marshal(msg); err != nil {
+ return
+ }
+ msgType = "block"
+ default:
+ err = fmt.Errorf("unknown message type: %v", msg)
+ }
+ return
+}
+
+type TransportTestSuite struct {
+ suite.Suite
+}
+
+func (s *TransportTestSuite) baseTest(
+ server *testPeerServer,
+ peers map[types.ValidatorID]*testPeer,
+ delay time.Duration) {
+
+ var (
+ req = s.Require()
+ wg sync.WaitGroup
+ )
+
+ // For each peers, do following stuffs:
+ // - broadcast 1 block.
+ // - report one random block to server, along with its validator ID.
+ // Server would echo the random block back to the peer.
+ handleServer := func(server *testPeerServer) {
+ defer wg.Done()
+ server.peerBlocks = make(map[types.ValidatorID]*types.Block)
+ for {
+ select {
+ case e := <-server.recv:
+ req.Equal(e.PeerType, TransportPeer)
+ switch v := e.Msg.(type) {
+ case *types.Block:
+ req.Equal(v.ProposerID, e.From)
+ server.peerBlocks[v.ProposerID] = v
+ // Echo the block back
+ server.trans.Send(v.ProposerID, v)
+ }
+ }
+ // Upon receiving blocks from all peers, stop.
+ if len(server.peerBlocks) == len(peers) {
+ return
+ }
+ }
+ }
+ handlePeer := func(peer *testPeer) {
+ defer wg.Done()
+ peer.blocks = make(map[types.ValidatorID]*types.Block)
+ peer.blocksReceiveTime = make(map[common.Hash]time.Time)
+ for {
+ select {
+ case e := <-peer.recv:
+ switch v := e.Msg.(type) {
+ case *types.Block:
+ if v.ProposerID == peer.vID {
+ req.Equal(e.PeerType, TransportPeerServer)
+ peer.echoBlock = v
+ } else {
+ req.Equal(e.PeerType, TransportPeer)
+ req.Equal(e.From, v.ProposerID)
+ peer.blocks[v.ProposerID] = v
+ peer.blocksReceiveTime[v.Hash] = time.Now()
+ }
+ }
+ }
+ // Upon receiving blocks from all other peers, and echoed from
+ // server, stop.
+ if peer.echoBlock != nil && len(peer.blocks) == len(peers)-1 {
+ return
+ }
+ }
+ }
+ wg.Add(len(peers) + 1)
+ go handleServer(server)
+ for vID, peer := range peers {
+ go handlePeer(peer)
+ // Broadcast a block.
+ peer.myBlock = &types.Block{
+ ProposerID: vID,
+ Hash: common.NewRandomHash(),
+ }
+ peer.myBlockSentTime = time.Now()
+ peer.trans.Broadcast(peer.myBlock)
+ // Report a block to server.
+ peer.expectedEchoHash = common.NewRandomHash()
+ peer.trans.Report(&types.Block{
+ ProposerID: vID,
+ Hash: peer.expectedEchoHash,
+ })
+ }
+ wg.Wait()
+ // Make sure each sent block is received.
+ for vID, peer := range peers {
+ req.NotNil(peer.echoBlock)
+ req.Equal(peer.echoBlock.Hash, peer.expectedEchoHash)
+ for otherVID, otherPeer := range peers {
+ if vID == otherVID {
+ continue
+ }
+ req.Equal(
+ peer.myBlock.Hash,
+ otherPeer.blocks[peer.vID].Hash)
+ }
+ }
+ // Make sure the latency is expected.
+ for vID, peer := range peers {
+ for otherVID, otherPeer := range peers {
+ if otherVID == vID {
+ continue
+ }
+ req.True(otherPeer.blocksReceiveTime[peer.myBlock.Hash].Sub(
+ peer.myBlockSentTime) >= delay)
+ }
+ }
+}
+
+func (s *TransportTestSuite) TestFake() {
+ var (
+ peerCount = 13
+ req = s.Require()
+ peers = make(map[types.ValidatorID]*testPeer)
+ vIDs = GenerateRandomValidatorIDs(peerCount)
+ err error
+ wg sync.WaitGroup
+ latency = &FixedLatencyModel{Latency: 300}
+ server = &testPeerServer{trans: NewFakeTransportServer()}
+ )
+ // Setup PeerServer
+ server.recv, err = server.trans.Host()
+ req.Nil(err)
+ // Setup Peers
+ wg.Add(len(vIDs))
+ for _, vID := range vIDs {
+ peer := &testPeer{
+ vID: vID,
+ trans: NewFakeTransportClient(vID, latency),
+ }
+ peers[vID] = peer
+ go func() {
+ defer wg.Done()
+ recv, err := peer.trans.Join(server.recv)
+ req.Nil(err)
+ peer.recv = recv
+ }()
+ }
+ // Block here until we collect enough peers.
+ server.trans.WaitForPeers(peerCount)
+ // Make sure all clients are ready.
+ wg.Wait()
+ s.baseTest(server, peers, 300*time.Millisecond)
+ req.Nil(server.trans.Close())
+ for _, peer := range peers {
+ req.Nil(peer.trans.Close())
+ }
+}
+
+func (s *TransportTestSuite) TestTCPLocal() {
+ var (
+ peerCount = 25
+ req = s.Require()
+ peers = make(map[types.ValidatorID]*testPeer)
+ vIDs = GenerateRandomValidatorIDs(peerCount)
+ err error
+ wg sync.WaitGroup
+ latency = &FixedLatencyModel{Latency: 300}
+ serverPort = 8080
+ serverAddr = net.JoinHostPort("0.0.0.0", strconv.Itoa(serverPort))
+ server = &testPeerServer{
+ trans: NewTCPTransportServer(&testMarshaller{}, serverPort)}
+ )
+ // Setup PeerServer
+ server.recv, err = server.trans.Host()
+ req.Nil(err)
+ // Setup Peers
+ wg.Add(len(vIDs))
+ for _, vID := range vIDs {
+ peer := &testPeer{
+ vID: vID,
+ trans: NewTCPTransportClient(vID, latency, &testMarshaller{}, true),
+ }
+ peers[vID] = peer
+ go func() {
+ defer wg.Done()
+
+ recv, err := peer.trans.Join(serverAddr)
+ req.Nil(err)
+ peer.recv = recv
+ }()
+ }
+ // Block here until we collect enough peers.
+ server.trans.WaitForPeers(peerCount)
+ // Make sure all clients are ready.
+ wg.Wait()
+
+ s.baseTest(server, peers, 300*time.Millisecond)
+ req.Nil(server.trans.Close())
+ for _, peer := range peers {
+ req.Nil(peer.trans.Close())
+ }
+}
+
+func TestTransport(t *testing.T) {
+ suite.Run(t, new(TransportTestSuite))
+}