diff options
author | Mission Liao <mission.liao@dexon.org> | 2018-09-10 16:11:10 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-10 16:11:10 +0800 |
commit | 2439f49063d8498eadf26d4fa1220c5eac8412a8 (patch) | |
tree | 1142ad5a5e4393315f956324191ddb7e03b804c3 | |
parent | 2b5c97e53e9734dda971456ff483bf2b34f0f021 (diff) | |
download | dexon-consensus-2439f49063d8498eadf26d4fa1220c5eac8412a8.tar dexon-consensus-2439f49063d8498eadf26d4fa1220c5eac8412a8.tar.gz dexon-consensus-2439f49063d8498eadf26d4fa1220c5eac8412a8.tar.bz2 dexon-consensus-2439f49063d8498eadf26d4fa1220c5eac8412a8.tar.lz dexon-consensus-2439f49063d8498eadf26d4fa1220c5eac8412a8.tar.xz dexon-consensus-2439f49063d8498eadf26d4fa1220c5eac8412a8.tar.zst dexon-consensus-2439f49063d8498eadf26d4fa1220c5eac8412a8.zip |
test: add transport layer (#97)
The purpose of transport layer is to abstract the way to send messages and setup connections between peers in a p2p network. The peer discovery is simulated by a hosted server: every peer sends its address to a known server. Once collecting enough peers, respond the whole peers lists to all peers.
Changes:
- Add test.Trasnport interface
- Add test.Transport implementation by golang channel.
- Add test.transport implementation by TCP connection.
- Move LatencyModel to core/test package
- Add Marshaller interface
-rw-r--r-- | cmd/dexcon-simulation-with-scheduler/main.go | 4 | ||||
-rw-r--r-- | core/test/fake-transport.go | 174 | ||||
-rw-r--r-- | core/test/interface.go | 64 | ||||
-rw-r--r-- | core/test/latency.go (renamed from integration_test/latency.go) | 2 | ||||
-rw-r--r-- | core/test/tcp-transport.go | 568 | ||||
-rw-r--r-- | core/test/transport_test.go | 282 | ||||
-rw-r--r-- | integration_test/non-byzantine_test.go | 4 | ||||
-rw-r--r-- | integration_test/stats_test.go | 4 | ||||
-rw-r--r-- | integration_test/utils.go | 2 | ||||
-rw-r--r-- | integration_test/validator.go | 8 |
10 files changed, 1100 insertions, 12 deletions
diff --git a/cmd/dexcon-simulation-with-scheduler/main.go b/cmd/dexcon-simulation-with-scheduler/main.go index d1d815a..0684a0d 100644 --- a/cmd/dexcon-simulation-with-scheduler/main.go +++ b/cmd/dexcon-simulation-with-scheduler/main.go @@ -44,11 +44,11 @@ func main() { log.Fatal("unable to read config: ", err) } // Setup latencies, validators. - networkLatency := &integration.NormalLatencyModel{ + networkLatency := &test.NormalLatencyModel{ Sigma: cfg.Networking.Sigma, Mean: cfg.Networking.Mean, } - proposingLatency := &integration.NormalLatencyModel{ + proposingLatency := &test.NormalLatencyModel{ Sigma: cfg.Validator.Legacy.ProposeIntervalSigma, Mean: cfg.Validator.Legacy.ProposeIntervalMean, } diff --git a/core/test/fake-transport.go b/core/test/fake-transport.go new file mode 100644 index 0000000..2615bd4 --- /dev/null +++ b/core/test/fake-transport.go @@ -0,0 +1,174 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package test + +import ( + "fmt" + "time" + + "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +// FakeTransport implement TransportServer and TransportClient interface +// by using golang channel. +type FakeTransport struct { + peerType TransportPeerType + vID types.ValidatorID + recvChannel chan *TransportEnvelope + serverChannel chan<- *TransportEnvelope + peers map[types.ValidatorID]chan<- *TransportEnvelope + latency LatencyModel +} + +// NewFakeTransportServer constructs FakeTransport instance for peer server. +func NewFakeTransportServer() TransportServer { + return &FakeTransport{ + peerType: TransportPeerServer, + recvChannel: make(chan *TransportEnvelope, 1000), + } +} + +// NewFakeTransportClient constructs FakeTransport instance for peer. +func NewFakeTransportClient( + vID types.ValidatorID, latency LatencyModel) TransportClient { + + return &FakeTransport{ + peerType: TransportPeer, + recvChannel: make(chan *TransportEnvelope, 1000), + vID: vID, + latency: latency, + } +} + +// Send implements Transport.Send method. +func (t *FakeTransport) Send( + endpoint types.ValidatorID, msg interface{}) (err error) { + + ch, exists := t.peers[endpoint] + if !exists { + err = fmt.Errorf("the endpoint does not exists: %v", endpoint) + return + } + go func(ch chan<- *TransportEnvelope) { + if t.latency != nil { + time.Sleep(t.latency.Delay()) + } + ch <- &TransportEnvelope{ + PeerType: t.peerType, + From: t.vID, + Msg: msg, + } + }(ch) + return +} + +// Report implements Transport.Report method. +func (t *FakeTransport) Report(msg interface{}) (err error) { + go func() { + t.serverChannel <- &TransportEnvelope{ + PeerType: TransportPeer, + From: t.vID, + Msg: msg, + } + }() + return +} + +// Broadcast implements Transport.Broadcast method. +func (t *FakeTransport) Broadcast(msg interface{}) (err error) { + for k := range t.peers { + if k == t.vID { + continue + } + t.Send(k, msg) + } + return +} + +// Close implements Transport.Close method. +func (t *FakeTransport) Close() (err error) { + close(t.recvChannel) + return +} + +// Peers implements Transport.Peers method. +func (t *FakeTransport) Peers() (peers map[types.ValidatorID]struct{}) { + peers = make(map[types.ValidatorID]struct{}) + for vID := range t.peers { + peers[vID] = struct{}{} + } + return +} + +// Join implements TransportClient.Join method. +func (t *FakeTransport) Join( + serverEndpoint interface{}) (<-chan *TransportEnvelope, error) { + + var ( + envelopes = []*TransportEnvelope{} + ok bool + ) + if t.serverChannel, ok = serverEndpoint.(chan *TransportEnvelope); !ok { + return nil, fmt.Errorf("accept channel of *TransportEnvelope when join") + } + t.Report(t) + // Wait for peers info. + for { + envelope := <-t.recvChannel + if envelope.PeerType != TransportPeerServer { + envelopes = append(envelopes, envelope) + continue + } + if t.peers, ok = + envelope.Msg.(map[types.ValidatorID]chan<- *TransportEnvelope); !ok { + + envelopes = append(envelopes, envelope) + continue + } + for _, envelope := range envelopes { + t.recvChannel <- envelope + } + break + } + return t.recvChannel, nil +} + +// Host implements TransportServer.Host method. +func (t *FakeTransport) Host() (chan *TransportEnvelope, error) { + return t.recvChannel, nil +} + +// WaitForPeers implements TransportServer.WaitForPeers method. +func (t *FakeTransport) WaitForPeers(numPeers int) (err error) { + t.peers = make(map[types.ValidatorID]chan<- *TransportEnvelope) + for { + envelope := <-t.recvChannel + // Panic here if some peer send other stuffs before + // receiving peer lists. + newPeer := envelope.Msg.(*FakeTransport) + t.peers[envelope.From] = newPeer.recvChannel + if len(t.peers) == numPeers { + break + } + } + // The collected peer channels are shared for all peers. + if err = t.Broadcast(t.peers); err != nil { + return + } + return +} diff --git a/core/test/interface.go b/core/test/interface.go index 0e963fd..9932262 100644 --- a/core/test/interface.go +++ b/core/test/interface.go @@ -46,3 +46,67 @@ type EventHandler interface { // Handle the event belongs to this handler, and return derivated events. Handle(*Event) []*Event } + +// TransportPeerType defines the type of peer, either 'peer' or 'server'. +type TransportPeerType string + +const ( + // TransportPeerServer is the type of peer server. + TransportPeerServer TransportPeerType = "server" + // TransportPeer is the type of peer. + TransportPeer TransportPeerType = "peer" +) + +// TransportEnvelope define the payload format of a message when transporting. +type TransportEnvelope struct { + // PeerType defines the type of source peer, could be either "peer" or + // "server". + PeerType TransportPeerType + // From defines the validatorID of the source peer. + From types.ValidatorID + // Msg is the actual payload of this message. + Msg interface{} +} + +// TransportServer defines the peer server in the network. +type TransportServer interface { + Transport + // Host the server, consider it's a setup procedure. The + // returned channel could be used after 'WaitForPeers' returns. + Host() (chan *TransportEnvelope, error) + // WaitForPeers waits for all peers to join the network. + WaitForPeers(numPeers int) error +} + +// TransportClient defines those peers in the network. +type TransportClient interface { + Transport + // Report a message to the peer server. + Report(msg interface{}) error + // Join the network, should block until joined. + Join(serverEndpoint interface{}) (<-chan *TransportEnvelope, error) +} + +// Transport defines the interface for basic transportation capabilities. +type Transport interface { + // Broadcast a message to all peers in network. + Broadcast(msg interface{}) error + // Send one message to a peer. + Send(endpoint types.ValidatorID, msg interface{}) error + // Close would cleanup allocated resources. + Close() error + + // Peers return IDs of all connected validators in p2p favor. + // This method should be accessed after ether 'Join' or 'WaitForPeers' + // returned. + Peers() map[types.ValidatorID]struct{} +} + +// Marshaller defines an interface to convert between interface{} and []byte. +type Marshaller interface { + // Unmarshal converts a []byte back to interface{} based on the type + // of message. + Unmarshal(msgType string, payload []byte) (msg interface{}, err error) + // Marshal converts a message to byte string + Marshal(msg interface{}) (msgType string, payload []byte, err error) +} diff --git a/integration_test/latency.go b/core/test/latency.go index 8f06084..0fe9277 100644 --- a/integration_test/latency.go +++ b/core/test/latency.go @@ -15,7 +15,7 @@ // along with the dexon-consensus-core library. If not, see // <http://www.gnu.org/licenses/>. -package integration +package test import ( "math/rand" diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go new file mode 100644 index 0000000..673a5a1 --- /dev/null +++ b/core/test/tcp-transport.go @@ -0,0 +1,568 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package test + +import ( + "context" + "encoding/binary" + "encoding/json" + "fmt" + "io" + "math" + "math/rand" + "net" + "os" + "strconv" + "sync" + "syscall" + "time" + + "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +// peerInfo describe connection info of one peer in TCP transport. +type peerInfo struct { + ValidatorID types.ValidatorID `json:"vid"` + Conn string `json:"conn"` +} + +// TCPTransport implements Transport interface via TCP connection. +type TCPTransport struct { + peerType TransportPeerType + vID types.ValidatorID + localPort int + peersInfo map[types.ValidatorID]string + peers map[types.ValidatorID]chan<- []byte + peersLock sync.RWMutex + recvChannel chan *TransportEnvelope + ctx context.Context + cancel context.CancelFunc + latency LatencyModel + marshaller Marshaller +} + +// NewTCPTransport constructs an TCPTransport instance. +func NewTCPTransport( + peerType TransportPeerType, + vID types.ValidatorID, + latency LatencyModel, + marshaller Marshaller, + localPort int) *TCPTransport { + + ctx, cancel := context.WithCancel(context.Background()) + return &TCPTransport{ + peerType: peerType, + vID: vID, + peersInfo: make(map[types.ValidatorID]string), + peers: make(map[types.ValidatorID]chan<- []byte), + recvChannel: make(chan *TransportEnvelope, 1000), + ctx: ctx, + cancel: cancel, + localPort: localPort, + latency: latency, + marshaller: marshaller, + } +} + +// Send implements Transport.Send method. +func (t *TCPTransport) Send( + endpoint types.ValidatorID, msg interface{}) (err error) { + + payload, err := t.marshalMessage(msg) + if err != nil { + return + } + go func() { + if t.latency != nil { + time.Sleep(t.latency.Delay()) + } + + t.peersLock.RLock() + defer t.peersLock.RUnlock() + + t.peers[endpoint] <- payload + }() + return +} + +// Broadcast implements Transport.Broadcast method. +func (t *TCPTransport) Broadcast(msg interface{}) (err error) { + payload, err := t.marshalMessage(msg) + if err != nil { + return + } + t.peersLock.RLock() + defer t.peersLock.RUnlock() + + for vID, ch := range t.peers { + if vID == t.vID { + continue + } + go func(ch chan<- []byte) { + if t.latency != nil { + time.Sleep(t.latency.Delay()) + } + ch <- payload + }(ch) + } + return +} + +// Close implements Transport.Close method. +func (t *TCPTransport) Close() (err error) { + // Tell all routines raised by us to die. + t.cancel() + // Reset peers. + t.peersLock.Lock() + defer t.peersLock.Unlock() + t.peers = make(map[types.ValidatorID]chan<- []byte) + // Tell our user that this channel is closed. + close(t.recvChannel) + t.recvChannel = nil + return +} + +// Peers implements Transport.Peers method. +func (t *TCPTransport) Peers() (peers map[types.ValidatorID]struct{}) { + peers = make(map[types.ValidatorID]struct{}) + for vID := range t.peersInfo { + peers[vID] = struct{}{} + } + return +} + +func (t *TCPTransport) marshalMessage( + msg interface{}) (payload []byte, err error) { + + msgCarrier := struct { + PeerType TransportPeerType `json:"peer_type"` + From types.ValidatorID `json:"from"` + Type string `json:"type"` + Payload interface{} `json:"payload"` + }{ + PeerType: t.peerType, + From: t.vID, + Payload: msg, + } + switch msg.(type) { + case map[types.ValidatorID]string: + msgCarrier.Type = "peerlist" + case *peerInfo: + msgCarrier.Type = "peer" + default: + if t.marshaller == nil { + err = fmt.Errorf("unknown msg type: %v", msg) + break + } + // Delegate to user defined marshaller. + var buff []byte + msgCarrier.Type, buff, err = t.marshaller.Marshal(msg) + if err != nil { + break + } + msgCarrier.Payload = json.RawMessage(buff) + } + if err != nil { + return + } + payload, err = json.Marshal(msgCarrier) + return +} + +func (t *TCPTransport) unmarshalMessage( + payload []byte) ( + peerType TransportPeerType, + from types.ValidatorID, + msg interface{}, + err error) { + + msgCarrier := struct { + PeerType TransportPeerType `json:"peer_type"` + From types.ValidatorID `json:"from"` + Type string `json:"type"` + Payload json.RawMessage `json:"payload"` + }{} + if err = json.Unmarshal(payload, &msgCarrier); err != nil { + return + } + peerType = msgCarrier.PeerType + from = msgCarrier.From + switch msgCarrier.Type { + case "peerlist": + var peers map[types.ValidatorID]string + if err = json.Unmarshal(msgCarrier.Payload, &peers); err != nil { + return + } + msg = peers + case "peer": + peer := &peerInfo{} + if err = json.Unmarshal(msgCarrier.Payload, peer); err != nil { + return + } + msg = peer + default: + if t.marshaller == nil { + err = fmt.Errorf("unknown msg type: %v", msgCarrier.Type) + break + } + msg, err = t.marshaller.Unmarshal(msgCarrier.Type, msgCarrier.Payload) + } + return +} + +// connReader is a reader routine to read from a TCP connection. +func (t *TCPTransport) connReader(conn net.Conn) { + defer func() { + if err := conn.Close(); err != nil { + panic(err) + } + }() + + var ( + msgLengthInByte [4]byte + msgLength uint32 + err error + payload = make([]byte, 4096) + ) + + checkErr := func(err error) (toBreak bool) { + if err == io.EOF { + toBreak = true + return + } + // Check if timeout. + nErr, ok := err.(*net.OpError) + if !ok { + panic(err) + } + if !nErr.Timeout() { + panic(err) + } + return + } +Loop: + for { + select { + case <-t.ctx.Done(): + break Loop + default: + } + // Add timeout when reading to check if shutdown. + if err := conn.SetReadDeadline( + time.Now().Add(2 * time.Second)); err != nil { + + panic(err) + } + // Read message length. + if _, err = io.ReadFull(conn, msgLengthInByte[:]); err != nil { + if checkErr(err) { + break + } + continue + } + msgLength = binary.LittleEndian.Uint32(msgLengthInByte[:]) + // Resize buffer + if msgLength > uint32(len(payload)) { + payload = make([]byte, msgLength) + } + buff := payload[:msgLength] + // Read the message in bytes. + if _, err = io.ReadFull(conn, buff); err != nil { + if checkErr(err) { + break + } + continue + } + peerType, from, msg, err := t.unmarshalMessage(buff) + if err != nil { + panic(err) + } + t.recvChannel <- &TransportEnvelope{ + PeerType: peerType, + From: from, + Msg: msg, + } + } +} + +// connWriter is a writer routine to write to TCP connection. +func (t *TCPTransport) connWriter(conn net.Conn) chan<- []byte { + ch := make(chan []byte, 1000) + go func() { + defer func() { + close(ch) + if err := conn.Close(); err != nil { + panic(err) + } + }() + for { + select { + case <-t.ctx.Done(): + return + default: + } + select { + case <-t.ctx.Done(): + return + case msg := <-ch: + // Send message length in uint32. + var msgLength [4]byte + if len(msg) > math.MaxUint32 { + panic(fmt.Errorf("message size overflow")) + } + binary.LittleEndian.PutUint32(msgLength[:], uint32(len(msg))) + if _, err := conn.Write(msgLength[:]); err != nil { + panic(err) + } + // Send the payload. + if _, err := conn.Write(msg); err != nil { + panic(err) + } + } + } + }() + return ch +} + +// listenerRoutine is a routine to accept incoming request for TCP connection. +func (t *TCPTransport) listenerRoutine(listener *net.TCPListener) { + defer func() { + if err := listener.Close(); err != nil { + panic(err) + } + }() + for { + select { + case <-t.ctx.Done(): + return + default: + } + + listener.SetDeadline(time.Now().Add(5 * time.Second)) + conn, err := listener.Accept() + if err != nil { + // Check if timeout error. + nErr, ok := err.(*net.OpError) + if !ok { + panic(err) + } + if !nErr.Timeout() { + panic(err) + } + continue + } + go t.connReader(conn) + } +} + +// buildConnectionToPeers constructs TCP connections to each peer. +// Although TCP connection could be used for both read/write operation, +// we only utilize the write part for simplicity. +func (t *TCPTransport) buildConnectionsToPeers() (err error) { + var wg sync.WaitGroup + for vID, addr := range t.peersInfo { + if vID == t.vID { + continue + } + wg.Add(1) + go func(vID types.ValidatorID, addr string) { + defer wg.Done() + + conn, localErr := net.Dial("tcp", addr) + if localErr != nil { + // Propagate this error to outside, at least one error + // could be returned to caller. + err = localErr + return + } + t.peersLock.Lock() + defer t.peersLock.Unlock() + + t.peers[vID] = t.connWriter(conn) + }(vID, addr) + } + wg.Wait() + return +} + +// TCPTransportClient implement TransportClient base on TCP connection. +type TCPTransportClient struct { + TCPTransport + local bool + serverWriteChannel chan<- []byte +} + +// NewTCPTransportClient constructs a TCPTransportClient instance. +func NewTCPTransportClient( + vID types.ValidatorID, + latency LatencyModel, + marshaller Marshaller, + local bool) *TCPTransportClient { + + return &TCPTransportClient{ + TCPTransport: *NewTCPTransport(TransportPeer, vID, latency, marshaller, 8080), + local: local, + } +} + +// Report implements TransportClient.Report method. +func (t *TCPTransportClient) Report(msg interface{}) (err error) { + payload, err := t.marshalMessage(msg) + if err != nil { + return + } + go func() { + t.serverWriteChannel <- payload + }() + return +} + +// Join implements TransportClient.Join method. +func (t *TCPTransportClient) Join( + serverEndpoint interface{}) (ch <-chan *TransportEnvelope, err error) { + + // Initiate a TCP server. + // TODO(mission): config initial listening port. + var ( + ln net.Listener + envelopes = []*TransportEnvelope{} + ok bool + addr string + ) + for { + addr = net.JoinHostPort("0.0.0.0", strconv.Itoa(t.localPort)) + ln, err = net.Listen("tcp", addr) + if err == nil { + break + } + if !t.local { + return + } + // In local-tcp, retry with other port when the address is in use. + operr, ok := err.(*net.OpError) + if !ok { + panic(err) + } + oserr, ok := operr.Err.(*os.SyscallError) + if !ok { + panic(operr) + } + errno, ok := oserr.Err.(syscall.Errno) + if !ok { + panic(oserr) + } + if errno != syscall.EADDRINUSE { + panic(errno) + } + // The port is used, generate another port randomly. + t.localPort = 1024 + rand.Int()%1024 + } + go t.listenerRoutine(ln.(*net.TCPListener)) + + serverConn, err := net.Dial("tcp", serverEndpoint.(string)) + if err != nil { + return + } + t.serverWriteChannel = t.connWriter(serverConn) + err = t.Report(&peerInfo{ + ValidatorID: t.vID, + Conn: addr, + }) + // Wait for peers list sent by server. + for { + e := <-t.recvChannel + if t.peersInfo, ok = e.Msg.(map[types.ValidatorID]string); !ok { + envelopes = append(envelopes, e) + continue + } + // Replay those messages sent before peer list. + if len(envelopes) > cap(t.recvChannel)-len(t.recvChannel) { + panic(fmt.Errorf("unable to replay pending messages")) + } + for _, e := range envelopes { + t.recvChannel <- e + } + break + } + t.buildConnectionsToPeers() + ch = t.recvChannel + return +} + +// TCPTransportServer implements TransportServer via TCP connections. +type TCPTransportServer struct { + TCPTransport +} + +// NewTCPTransportServer constructs TCPTransportServer instance. +func NewTCPTransportServer( + marshaller Marshaller, + serverPort int) *TCPTransportServer { + + return &TCPTransportServer{ + // NOTE: the assumption here is the validator ID of peers + // won't be zero. + TCPTransport: *NewTCPTransport( + TransportPeerServer, + types.ValidatorID{}, + nil, + marshaller, + serverPort), + } +} + +// Host implements TransportServer.Host method. +func (t *TCPTransportServer) Host() (chan *TransportEnvelope, error) { + // The port of peer server should be known to other peers, + // if we can listen on the pre-defiend part, we don't have to + // retry with other random ports. + ln, err := net.Listen( + "tcp", net.JoinHostPort("0.0.0.0", strconv.Itoa(t.localPort))) + if err != nil { + return nil, err + } + go t.listenerRoutine(ln.(*net.TCPListener)) + return t.recvChannel, nil +} + +// WaitForPeers implements TransportServer.WaitForPeers method. +func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) { + // Collect peers info. Packets other than peer info is + // unexpected. + for { + select { + case <-t.ctx.Done(): + err = fmt.Errorf("cancel waiting") + return + case e := <-t.recvChannel: + peer, ok := e.Msg.(*peerInfo) + if !ok { + panic(fmt.Errorf("expect peerInfo, not %v", e)) + } + t.peersInfo[peer.ValidatorID] = peer.Conn + } + // Check if we already collect enought peers. + if len(t.peersInfo) == numPeers { + break + } + } + // Send collected peers back to them. + t.buildConnectionsToPeers() + t.Broadcast(t.peersInfo) + return +} diff --git a/core/test/transport_test.go b/core/test/transport_test.go new file mode 100644 index 0000000..9bfc12b --- /dev/null +++ b/core/test/transport_test.go @@ -0,0 +1,282 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package test + +import ( + "encoding/json" + "fmt" + "net" + "strconv" + "sync" + "testing" + "time" + + "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/types" + "github.com/stretchr/testify/suite" +) + +type testPeer struct { + vID types.ValidatorID + trans TransportClient + recv <-chan *TransportEnvelope + expectedEchoHash common.Hash + echoBlock *types.Block + myBlock *types.Block + myBlockSentTime time.Time + blocks map[types.ValidatorID]*types.Block + blocksReceiveTime map[common.Hash]time.Time +} + +type testPeerServer struct { + trans TransportServer + recv chan *TransportEnvelope + peerBlocks map[types.ValidatorID]*types.Block +} + +type testMarshaller struct{} + +func (m *testMarshaller) Unmarshal( + msgType string, payload []byte) (msg interface{}, err error) { + + switch msgType { + case "block": + block := &types.Block{} + if err = json.Unmarshal(payload, block); err != nil { + return + } + msg = block + default: + err = fmt.Errorf("unknown message type: %v", msgType) + } + return +} + +func (m *testMarshaller) Marshal( + msg interface{}) (msgType string, payload []byte, err error) { + + switch msg.(type) { + case *types.Block: + if payload, err = json.Marshal(msg); err != nil { + return + } + msgType = "block" + default: + err = fmt.Errorf("unknown message type: %v", msg) + } + return +} + +type TransportTestSuite struct { + suite.Suite +} + +func (s *TransportTestSuite) baseTest( + server *testPeerServer, + peers map[types.ValidatorID]*testPeer, + delay time.Duration) { + + var ( + req = s.Require() + wg sync.WaitGroup + ) + + // For each peers, do following stuffs: + // - broadcast 1 block. + // - report one random block to server, along with its validator ID. + // Server would echo the random block back to the peer. + handleServer := func(server *testPeerServer) { + defer wg.Done() + server.peerBlocks = make(map[types.ValidatorID]*types.Block) + for { + select { + case e := <-server.recv: + req.Equal(e.PeerType, TransportPeer) + switch v := e.Msg.(type) { + case *types.Block: + req.Equal(v.ProposerID, e.From) + server.peerBlocks[v.ProposerID] = v + // Echo the block back + server.trans.Send(v.ProposerID, v) + } + } + // Upon receiving blocks from all peers, stop. + if len(server.peerBlocks) == len(peers) { + return + } + } + } + handlePeer := func(peer *testPeer) { + defer wg.Done() + peer.blocks = make(map[types.ValidatorID]*types.Block) + peer.blocksReceiveTime = make(map[common.Hash]time.Time) + for { + select { + case e := <-peer.recv: + switch v := e.Msg.(type) { + case *types.Block: + if v.ProposerID == peer.vID { + req.Equal(e.PeerType, TransportPeerServer) + peer.echoBlock = v + } else { + req.Equal(e.PeerType, TransportPeer) + req.Equal(e.From, v.ProposerID) + peer.blocks[v.ProposerID] = v + peer.blocksReceiveTime[v.Hash] = time.Now() + } + } + } + // Upon receiving blocks from all other peers, and echoed from + // server, stop. + if peer.echoBlock != nil && len(peer.blocks) == len(peers)-1 { + return + } + } + } + wg.Add(len(peers) + 1) + go handleServer(server) + for vID, peer := range peers { + go handlePeer(peer) + // Broadcast a block. + peer.myBlock = &types.Block{ + ProposerID: vID, + Hash: common.NewRandomHash(), + } + peer.myBlockSentTime = time.Now() + peer.trans.Broadcast(peer.myBlock) + // Report a block to server. + peer.expectedEchoHash = common.NewRandomHash() + peer.trans.Report(&types.Block{ + ProposerID: vID, + Hash: peer.expectedEchoHash, + }) + } + wg.Wait() + // Make sure each sent block is received. + for vID, peer := range peers { + req.NotNil(peer.echoBlock) + req.Equal(peer.echoBlock.Hash, peer.expectedEchoHash) + for otherVID, otherPeer := range peers { + if vID == otherVID { + continue + } + req.Equal( + peer.myBlock.Hash, + otherPeer.blocks[peer.vID].Hash) + } + } + // Make sure the latency is expected. + for vID, peer := range peers { + for otherVID, otherPeer := range peers { + if otherVID == vID { + continue + } + req.True(otherPeer.blocksReceiveTime[peer.myBlock.Hash].Sub( + peer.myBlockSentTime) >= delay) + } + } +} + +func (s *TransportTestSuite) TestFake() { + var ( + peerCount = 13 + req = s.Require() + peers = make(map[types.ValidatorID]*testPeer) + vIDs = GenerateRandomValidatorIDs(peerCount) + err error + wg sync.WaitGroup + latency = &FixedLatencyModel{Latency: 300} + server = &testPeerServer{trans: NewFakeTransportServer()} + ) + // Setup PeerServer + server.recv, err = server.trans.Host() + req.Nil(err) + // Setup Peers + wg.Add(len(vIDs)) + for _, vID := range vIDs { + peer := &testPeer{ + vID: vID, + trans: NewFakeTransportClient(vID, latency), + } + peers[vID] = peer + go func() { + defer wg.Done() + recv, err := peer.trans.Join(server.recv) + req.Nil(err) + peer.recv = recv + }() + } + // Block here until we collect enough peers. + server.trans.WaitForPeers(peerCount) + // Make sure all clients are ready. + wg.Wait() + s.baseTest(server, peers, 300*time.Millisecond) + req.Nil(server.trans.Close()) + for _, peer := range peers { + req.Nil(peer.trans.Close()) + } +} + +func (s *TransportTestSuite) TestTCPLocal() { + var ( + peerCount = 25 + req = s.Require() + peers = make(map[types.ValidatorID]*testPeer) + vIDs = GenerateRandomValidatorIDs(peerCount) + err error + wg sync.WaitGroup + latency = &FixedLatencyModel{Latency: 300} + serverPort = 8080 + serverAddr = net.JoinHostPort("0.0.0.0", strconv.Itoa(serverPort)) + server = &testPeerServer{ + trans: NewTCPTransportServer(&testMarshaller{}, serverPort)} + ) + // Setup PeerServer + server.recv, err = server.trans.Host() + req.Nil(err) + // Setup Peers + wg.Add(len(vIDs)) + for _, vID := range vIDs { + peer := &testPeer{ + vID: vID, + trans: NewTCPTransportClient(vID, latency, &testMarshaller{}, true), + } + peers[vID] = peer + go func() { + defer wg.Done() + + recv, err := peer.trans.Join(serverAddr) + req.Nil(err) + peer.recv = recv + }() + } + // Block here until we collect enough peers. + server.trans.WaitForPeers(peerCount) + // Make sure all clients are ready. + wg.Wait() + + s.baseTest(server, peers, 300*time.Millisecond) + req.Nil(server.trans.Close()) + for _, peer := range peers { + req.Nil(peer.trans.Close()) + } +} + +func TestTransport(t *testing.T) { + suite.Run(t, new(TransportTestSuite)) +} diff --git a/integration_test/non-byzantine_test.go b/integration_test/non-byzantine_test.go index 827d2ad..afda9b4 100644 --- a/integration_test/non-byzantine_test.go +++ b/integration_test/non-byzantine_test.go @@ -33,11 +33,11 @@ type NonByzantineTestSuite struct { func (s *NonByzantineTestSuite) TestNonByzantine() { var ( - networkLatency = &NormalLatencyModel{ + networkLatency = &test.NormalLatencyModel{ Sigma: 20, Mean: 250, } - proposingLatency = &NormalLatencyModel{ + proposingLatency = &test.NormalLatencyModel{ Sigma: 30, Mean: 500, } diff --git a/integration_test/stats_test.go b/integration_test/stats_test.go index e0be126..8816e8c 100644 --- a/integration_test/stats_test.go +++ b/integration_test/stats_test.go @@ -16,8 +16,8 @@ func (s *EventStatsTestSuite) TestCalculate() { // Setup a test with fixed latency in proposing and network, // and make sure the calculated statistics is expected. var ( - networkLatency = &FixedLatencyModel{Latency: 100} - proposingLatency = &FixedLatencyModel{Latency: 300} + networkLatency = &test.FixedLatencyModel{Latency: 100} + proposingLatency = &test.FixedLatencyModel{Latency: 300} req = s.Require() ) diff --git a/integration_test/utils.go b/integration_test/utils.go index c1eafb7..7371223 100644 --- a/integration_test/utils.go +++ b/integration_test/utils.go @@ -10,7 +10,7 @@ import ( // PrepareValidators setups validators for testing. func PrepareValidators( validatorCount int, - networkLatency, proposingLatency LatencyModel) ( + networkLatency, proposingLatency test.LatencyModel) ( apps map[types.ValidatorID]*test.App, dbs map[types.ValidatorID]blockdb.BlockDatabase, validators map[types.ValidatorID]*Validator, diff --git a/integration_test/validator.go b/integration_test/validator.go index bfe517f..5909b46 100644 --- a/integration_test/validator.go +++ b/integration_test/validator.go @@ -68,8 +68,8 @@ type Validator struct { chainID uint32 cons *core.Consensus gov core.Governance - networkLatency LatencyModel - proposingLatency LatencyModel + networkLatency test.LatencyModel + proposingLatency test.LatencyModel } // NewValidator constructs an instance of Validator. @@ -79,8 +79,8 @@ func NewValidator( db blockdb.BlockDatabase, privateKey crypto.PrivateKey, vID types.ValidatorID, - networkLatency LatencyModel, - proposingLatency LatencyModel) *Validator { + networkLatency test.LatencyModel, + proposingLatency test.LatencyModel) *Validator { hashes := make(common.Hashes, 0) for vID := range gov.GetValidatorSet() { |