diff options
Diffstat (limited to 'core/test')
-rw-r--r-- | core/test/fake-transport.go | 34 | ||||
-rw-r--r-- | core/test/governance.go | 29 | ||||
-rw-r--r-- | core/test/interface.go | 5 | ||||
-rw-r--r-- | core/test/tcp-transport.go | 101 | ||||
-rw-r--r-- | core/test/transport_test.go | 22 | ||||
-rw-r--r-- | core/test/utils.go | 14 |
6 files changed, 134 insertions, 71 deletions
diff --git a/core/test/fake-transport.go b/core/test/fake-transport.go index 2f1686e..e4bc252 100644 --- a/core/test/fake-transport.go +++ b/core/test/fake-transport.go @@ -21,17 +21,24 @@ import ( "fmt" "time" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" "github.com/dexon-foundation/dexon-consensus-core/core/types" ) +type fakePeerRecord struct { + sendChannel chan<- *TransportEnvelope + pubKey crypto.PublicKey +} + // FakeTransport implement TransportServer and TransportClient interface // by using golang channel. type FakeTransport struct { peerType TransportPeerType nID types.NodeID + pubKey crypto.PublicKey recvChannel chan *TransportEnvelope serverChannel chan<- *TransportEnvelope - peers map[types.NodeID]chan<- *TransportEnvelope + peers map[types.NodeID]fakePeerRecord latency LatencyModel } @@ -45,12 +52,13 @@ func NewFakeTransportServer() TransportServer { // NewFakeTransportClient constructs FakeTransport instance for peer. func NewFakeTransportClient( - nID types.NodeID, latency LatencyModel) TransportClient { + pubKey crypto.PublicKey, latency LatencyModel) TransportClient { return &FakeTransport{ peerType: TransportPeer, recvChannel: make(chan *TransportEnvelope, 1000), - nID: nID, + nID: types.NewNodeID(pubKey), + pubKey: pubKey, latency: latency, } } @@ -59,7 +67,7 @@ func NewFakeTransportClient( func (t *FakeTransport) Send( endpoint types.NodeID, msg interface{}) (err error) { - ch, exists := t.peers[endpoint] + rec, exists := t.peers[endpoint] if !exists { err = fmt.Errorf("the endpoint does not exists: %v", endpoint) return @@ -73,7 +81,7 @@ func (t *FakeTransport) Send( From: t.nID, Msg: msg, } - }(ch) + }(rec.sendChannel) return } @@ -107,10 +115,9 @@ func (t *FakeTransport) Close() (err error) { } // Peers implements Transport.Peers method. -func (t *FakeTransport) Peers() (peers map[types.NodeID]struct{}) { - peers = make(map[types.NodeID]struct{}) - for nID := range t.peers { - peers[nID] = struct{}{} +func (t *FakeTransport) Peers() (peers []crypto.PublicKey) { + for _, rec := range t.peers { + peers = append(peers, rec.pubKey) } return } @@ -135,7 +142,7 @@ func (t *FakeTransport) Join( continue } if t.peers, ok = - envelope.Msg.(map[types.NodeID]chan<- *TransportEnvelope); !ok { + envelope.Msg.(map[types.NodeID]fakePeerRecord); !ok { envelopes = append(envelopes, envelope) continue @@ -155,13 +162,16 @@ func (t *FakeTransport) Host() (chan *TransportEnvelope, error) { // WaitForPeers implements TransportServer.WaitForPeers method. func (t *FakeTransport) WaitForPeers(numPeers int) (err error) { - t.peers = make(map[types.NodeID]chan<- *TransportEnvelope) + t.peers = make(map[types.NodeID]fakePeerRecord) for { envelope := <-t.recvChannel // Panic here if some peer send other stuffs before // receiving peer lists. newPeer := envelope.Msg.(*FakeTransport) - t.peers[envelope.From] = newPeer.recvChannel + t.peers[envelope.From] = fakePeerRecord{ + sendChannel: newPeer.recvChannel, + pubKey: newPeer.pubKey, + } if len(t.peers) == numPeers { break } diff --git a/core/test/governance.go b/core/test/governance.go index bc5f58a..c666ffc 100644 --- a/core/test/governance.go +++ b/core/test/governance.go @@ -37,7 +37,6 @@ var ( type Governance struct { lambdaBA time.Duration lambdaDKG time.Duration - nodeSet map[types.NodeID]struct{} privateKeys map[types.NodeID]crypto.PrivateKey tsig map[uint64]crypto.Signature DKGComplaint map[uint64][]*types.DKGComplaint @@ -51,7 +50,6 @@ func NewGovernance(nodeCount int, lambda time.Duration) ( g = &Governance{ lambdaBA: lambda, lambdaDKG: lambda * 10, - nodeSet: make(map[types.NodeID]struct{}), privateKeys: make(map[types.NodeID]crypto.PrivateKey), tsig: make(map[uint64]crypto.Signature), DKGComplaint: make(map[uint64][]*types.DKGComplaint), @@ -63,7 +61,6 @@ func NewGovernance(nodeCount int, lambda time.Duration) ( return nil, err } nID := types.NewNodeID(prv.PublicKey()) - g.nodeSet[nID] = struct{}{} g.privateKeys[nID] = prv } return @@ -71,21 +68,19 @@ func NewGovernance(nodeCount int, lambda time.Duration) ( // GetNodeSet implements Governance interface to return current // notary set. -func (g *Governance) GetNodeSet(round uint64) ( - ret map[types.NodeID]struct{}) { - // Return a cloned map. - ret = make(map[types.NodeID]struct{}) - for k := range g.nodeSet { - ret[k] = struct{}{} +func (g *Governance) GetNodeSet(_ uint64) ( + ret []crypto.PublicKey) { + for _, key := range g.privateKeys { + ret = append(ret, key.PublicKey()) } return } // GetConfiguration returns the configuration at a given block height. -func (g *Governance) GetConfiguration(round uint64) *types.Config { +func (g *Governance) GetConfiguration(_ uint64) *types.Config { return &types.Config{ NumShards: 1, - NumChains: uint32(len(g.nodeSet)), + NumChains: uint32(len(g.privateKeys)), LambdaBA: g.lambdaBA, LambdaDKG: g.lambdaDKG, K: 0, @@ -98,15 +93,11 @@ func (g *Governance) GetCRS(round uint64) []byte { return []byte("__ DEXON") } -// GetPrivateKey return the private key for that node, this function +// GetPrivateKeys return the private key for that node, this function // is a test utility and not a general Governance interface. -func (g *Governance) GetPrivateKey( - nID types.NodeID) (key crypto.PrivateKey, err error) { - - key, exists := g.privateKeys[nID] - if !exists { - err = ErrPrivateKeyNotExists - return +func (g *Governance) GetPrivateKeys() (keys []crypto.PrivateKey) { + for _, k := range g.privateKeys { + keys = append(keys, k) } return } diff --git a/core/test/interface.go b/core/test/interface.go index a422ee7..ad8304e 100644 --- a/core/test/interface.go +++ b/core/test/interface.go @@ -19,6 +19,7 @@ package test import ( "github.com/dexon-foundation/dexon-consensus-core/core/blockdb" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" "github.com/dexon-foundation/dexon-consensus-core/core/types" ) @@ -96,10 +97,10 @@ type Transport interface { // Close would cleanup allocated resources. Close() error - // Peers return IDs of all connected nodes in p2p favor. + // Peers return public keys of all connected nodes in p2p favor. // This method should be accessed after ether 'Join' or 'WaitForPeers' // returned. - Peers() map[types.NodeID]struct{} + Peers() []crypto.PublicKey } // Marshaller defines an interface to convert between interface{} and []byte. diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go index 0f9bd73..6e3ddfb 100644 --- a/core/test/tcp-transport.go +++ b/core/test/tcp-transport.go @@ -19,6 +19,7 @@ package test import ( "context" + "encoding/base64" "encoding/binary" "encoding/json" "fmt" @@ -28,13 +29,22 @@ import ( "net" "os" "strconv" + "strings" "sync" "syscall" "time" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto/ecdsa" "github.com/dexon-foundation/dexon-consensus-core/core/types" ) +type tcpPeerRecord struct { + conn string + sendChannel chan<- []byte + pubKey crypto.PublicKey +} + // tcpMessage is the general message between peers and server. type tcpMessage struct { NodeID types.NodeID `json:"nid"` @@ -42,13 +52,33 @@ type tcpMessage struct { Info string `json:"conn"` } +// buildPeerInfo is a tricky way to combine connection string and +// base64 encoded byte slice for public key into a single string, +// separated by ';'. +func buildPeerInfo(pubKey crypto.PublicKey, conn string) string { + return conn + ";" + base64.StdEncoding.EncodeToString(pubKey.Bytes()) +} + +// parsePeerInfo parse connection string and base64 encoded public key built +// via buildPeerInfo. +func parsePeerInfo(info string) (key crypto.PublicKey, conn string) { + tokens := strings.Split(info, ";") + conn = tokens[0] + data, err := base64.StdEncoding.DecodeString(tokens[1]) + if err != nil { + panic(err) + } + key = ecdsa.NewPublicKeyFromByteSlice(data) + return +} + // TCPTransport implements Transport interface via TCP connection. type TCPTransport struct { peerType TransportPeerType nID types.NodeID + pubKey crypto.PublicKey localPort int - peersInfo map[types.NodeID]string - peers map[types.NodeID]chan<- []byte + peers map[types.NodeID]*tcpPeerRecord peersLock sync.RWMutex recvChannel chan *TransportEnvelope ctx context.Context @@ -60,7 +90,7 @@ type TCPTransport struct { // NewTCPTransport constructs an TCPTransport instance. func NewTCPTransport( peerType TransportPeerType, - nID types.NodeID, + pubKey crypto.PublicKey, latency LatencyModel, marshaller Marshaller, localPort int) *TCPTransport { @@ -68,9 +98,9 @@ func NewTCPTransport( ctx, cancel := context.WithCancel(context.Background()) return &TCPTransport{ peerType: peerType, - nID: nID, - peersInfo: make(map[types.NodeID]string), - peers: make(map[types.NodeID]chan<- []byte), + nID: types.NewNodeID(pubKey), + pubKey: pubKey, + peers: make(map[types.NodeID]*tcpPeerRecord), recvChannel: make(chan *TransportEnvelope, 1000), ctx: ctx, cancel: cancel, @@ -96,7 +126,7 @@ func (t *TCPTransport) Send( t.peersLock.RLock() defer t.peersLock.RUnlock() - t.peers[endpoint] <- payload + t.peers[endpoint].sendChannel <- payload }() return } @@ -110,7 +140,7 @@ func (t *TCPTransport) Broadcast(msg interface{}) (err error) { t.peersLock.RLock() defer t.peersLock.RUnlock() - for nID, ch := range t.peers { + for nID, rec := range t.peers { if nID == t.nID { continue } @@ -119,7 +149,7 @@ func (t *TCPTransport) Broadcast(msg interface{}) (err error) { time.Sleep(t.latency.Delay()) } ch <- payload - }(ch) + }(rec.sendChannel) } return } @@ -131,7 +161,7 @@ func (t *TCPTransport) Close() (err error) { // Reset peers. t.peersLock.Lock() defer t.peersLock.Unlock() - t.peers = make(map[types.NodeID]chan<- []byte) + t.peers = make(map[types.NodeID]*tcpPeerRecord) // Tell our user that this channel is closed. close(t.recvChannel) t.recvChannel = nil @@ -139,10 +169,9 @@ func (t *TCPTransport) Close() (err error) { } // Peers implements Transport.Peers method. -func (t *TCPTransport) Peers() (peers map[types.NodeID]struct{}) { - peers = make(map[types.NodeID]struct{}) - for nID := range t.peersInfo { - peers[nID] = struct{}{} +func (t *TCPTransport) Peers() (peers []crypto.PublicKey) { + for _, rec := range t.peers { + peers = append(peers, rec.pubKey) } return } @@ -376,7 +405,7 @@ func (t *TCPTransport) listenerRoutine(listener *net.TCPListener) { // we only utilize the write part for simplicity. func (t *TCPTransport) buildConnectionsToPeers() (err error) { var wg sync.WaitGroup - for nID, addr := range t.peersInfo { + for nID, rec := range t.peers { if nID == t.nID { continue } @@ -394,8 +423,8 @@ func (t *TCPTransport) buildConnectionsToPeers() (err error) { t.peersLock.Lock() defer t.peersLock.Unlock() - t.peers[nID] = t.connWriter(conn) - }(nID, addr) + t.peers[nID].sendChannel = t.connWriter(conn) + }(nID, rec.conn) } wg.Wait() return @@ -410,14 +439,15 @@ type TCPTransportClient struct { // NewTCPTransportClient constructs a TCPTransportClient instance. func NewTCPTransportClient( - nID types.NodeID, + pubKey crypto.PublicKey, latency LatencyModel, marshaller Marshaller, local bool) *TCPTransportClient { return &TCPTransportClient{ - TCPTransport: *NewTCPTransport(TransportPeer, nID, latency, marshaller, 8080), - local: local, + TCPTransport: *NewTCPTransport( + TransportPeer, pubKey, latency, marshaller, 8080), + local: local, } } @@ -436,7 +466,6 @@ func (t *TCPTransportClient) Report(msg interface{}) (err error) { // Join implements TransportClient.Join method. func (t *TCPTransportClient) Join( serverEndpoint interface{}) (ch <-chan *TransportEnvelope, err error) { - // Initiate a TCP server. // TODO(mission): config initial listening port. var ( @@ -475,7 +504,6 @@ func (t *TCPTransportClient) Join( t.localPort = 1024 + rand.Int()%1024 } go t.listenerRoutine(ln.(*net.TCPListener)) - serverConn, err := net.Dial("tcp", serverEndpoint.(string)) if err != nil { return @@ -492,17 +520,26 @@ func (t *TCPTransportClient) Join( conn = net.JoinHostPort(ip, strconv.Itoa(t.localPort)) } if err = t.Report(&tcpMessage{ - Type: "conn", NodeID: t.nID, - Info: conn, + Type: "conn", + Info: buildPeerInfo(t.pubKey, conn), }); err != nil { return } // Wait for peers list sent by server. e := <-t.recvChannel - if t.peersInfo, ok = e.Msg.(map[types.NodeID]string); !ok { + peersInfo, ok := e.Msg.(map[types.NodeID]string) + if !ok { panic(fmt.Errorf("expect peer list, not %v", e)) } + // Setup peers information. + for nID, info := range peersInfo { + pubKey, conn := parsePeerInfo(info) + t.peers[nID] = &tcpPeerRecord{ + conn: conn, + pubKey: pubKey, + } + } // Setup connections to other peers. if err = t.buildConnectionsToPeers(); err != nil { return @@ -551,7 +588,7 @@ func NewTCPTransportServer( // won't be zero. TCPTransport: *NewTCPTransport( TransportPeerServer, - types.NodeID{}, + ecdsa.NewPublicKeyFromByteSlice(nil), nil, marshaller, serverPort), @@ -576,6 +613,7 @@ func (t *TCPTransportServer) Host() (chan *TransportEnvelope, error) { func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) { // Collect peers info. Packets other than peer info is // unexpected. + peersInfo := make(map[types.NodeID]string) for { // Wait for connection info reported by peers. e := <-t.recvChannel @@ -586,9 +624,14 @@ func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) { if msg.Type != "conn" { panic(fmt.Errorf("expect connection report, not %v", e)) } - t.peersInfo[msg.NodeID] = msg.Info + pubKey, conn := parsePeerInfo(msg.Info) + t.peers[msg.NodeID] = &tcpPeerRecord{ + conn: conn, + pubKey: pubKey, + } + peersInfo[msg.NodeID] = msg.Info // Check if we already collect enought peers. - if len(t.peersInfo) == numPeers { + if len(peersInfo) == numPeers { break } } @@ -596,7 +639,7 @@ func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) { if err = t.buildConnectionsToPeers(); err != nil { return } - if err = t.Broadcast(t.peersInfo); err != nil { + if err = t.Broadcast(peersInfo); err != nil { return } // Wait for peers to send 'ready' report. diff --git a/core/test/transport_test.go b/core/test/transport_test.go index 4d71fc8..056cd39 100644 --- a/core/test/transport_test.go +++ b/core/test/transport_test.go @@ -197,7 +197,7 @@ func (s *TransportTestSuite) TestFake() { peerCount = 13 req = s.Require() peers = make(map[types.NodeID]*testPeer) - nIDs = GenerateRandomNodeIDs(peerCount) + prvKeys = GenerateRandomPrivateKeys(peerCount) err error wg sync.WaitGroup latency = &FixedLatencyModel{Latency: 300} @@ -207,11 +207,12 @@ func (s *TransportTestSuite) TestFake() { server.recv, err = server.trans.Host() req.Nil(err) // Setup Peers - wg.Add(len(nIDs)) - for _, nID := range nIDs { + wg.Add(len(prvKeys)) + for _, key := range prvKeys { + nID := types.NewNodeID(key.PublicKey()) peer := &testPeer{ nID: nID, - trans: NewFakeTransportClient(nID, latency), + trans: NewFakeTransportClient(key.PublicKey(), latency), } peers[nID] = peer go func() { @@ -233,11 +234,12 @@ func (s *TransportTestSuite) TestFake() { } func (s *TransportTestSuite) TestTCPLocal() { + var ( peerCount = 25 req = s.Require() peers = make(map[types.NodeID]*testPeer) - nIDs = GenerateRandomNodeIDs(peerCount) + prvKeys = GenerateRandomPrivateKeys(peerCount) err error wg sync.WaitGroup latency = &FixedLatencyModel{Latency: 300} @@ -250,11 +252,13 @@ func (s *TransportTestSuite) TestTCPLocal() { server.recv, err = server.trans.Host() req.Nil(err) // Setup Peers - wg.Add(len(nIDs)) - for _, nID := range nIDs { + wg.Add(len(prvKeys)) + for _, prvKey := range prvKeys { + nID := types.NewNodeID(prvKey.PublicKey()) peer := &testPeer{ - nID: nID, - trans: NewTCPTransportClient(nID, latency, &testMarshaller{}, true), + nID: nID, + trans: NewTCPTransportClient( + prvKey.PublicKey(), latency, &testMarshaller{}, true), } peers[nID] = peer go func() { diff --git a/core/test/utils.go b/core/test/utils.go index 887ef14..2fc21ce 100644 --- a/core/test/utils.go +++ b/core/test/utils.go @@ -24,6 +24,8 @@ import ( "time" "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto/ecdsa" "github.com/dexon-foundation/dexon-consensus-core/core/types" ) @@ -43,6 +45,18 @@ func GenerateRandomNodeIDs(nodeCount int) (nIDs types.NodeIDs) { return } +// GenerateRandomPrivateKeys generate a set of private keys. +func GenerateRandomPrivateKeys(nodeCount int) (prvKeys []crypto.PrivateKey) { + for i := 0; i < nodeCount; i++ { + prvKey, err := ecdsa.NewPrivateKey() + if err != nil { + panic(err) + } + prvKeys = append(prvKeys, prvKey) + } + return +} + // CalcLatencyStatistics calculates average and deviation from a slice // of latencies. func CalcLatencyStatistics(latencies []time.Duration) (avg, dev time.Duration) { |