aboutsummaryrefslogtreecommitdiffstats
path: root/core/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/test')
-rw-r--r--core/test/fake-transport.go5
-rw-r--r--core/test/interface.go2
-rw-r--r--core/test/network.go33
-rw-r--r--core/test/network_test.go27
-rw-r--r--core/test/tcp-transport.go9
5 files changed, 60 insertions, 16 deletions
diff --git a/core/test/fake-transport.go b/core/test/fake-transport.go
index ed9871b..cecac54 100644
--- a/core/test/fake-transport.go
+++ b/core/test/fake-transport.go
@@ -65,6 +65,11 @@ func NewFakeTransportClient(pubKey crypto.PublicKey) TransportClient {
}
}
+// Disconnect implements Transport.Disconnect method.
+func (t *FakeTransport) Disconnect(endpoint types.NodeID) {
+ delete(t.peers, endpoint)
+}
+
// Send implements Transport.Send method.
func (t *FakeTransport) Send(
endpoint types.NodeID, msg interface{}) (err error) {
diff --git a/core/test/interface.go b/core/test/interface.go
index 0712ad5..58a3ced 100644
--- a/core/test/interface.go
+++ b/core/test/interface.go
@@ -94,6 +94,8 @@ type Transport interface {
// This method should be accessed after ether 'Join' or 'WaitForPeers'
// returned.
Peers() []crypto.PublicKey
+
+ Disconnect(endpoint types.NodeID)
}
// Marshaller defines an interface to convert between interface{} and []byte.
diff --git a/core/test/network.go b/core/test/network.go
index f32c27f..c0ec255 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -179,8 +179,9 @@ type Network struct {
trans *censorClient
dMoment time.Time
fromTransport <-chan *TransportEnvelope
- toConsensus chan interface{}
+ toConsensus chan types.Msg
toNode chan interface{}
+ badPeerChan chan interface{}
sentAgreementLock sync.Mutex
sentAgreement map[common.Hash]struct{}
blockCacheLock sync.RWMutex
@@ -208,8 +209,9 @@ func NewNetwork(pubKey crypto.PublicKey, config NetworkConfig) (
n = &Network{
ID: types.NewNodeID(pubKey),
config: config,
- toConsensus: make(chan interface{}, 1000),
+ toConsensus: make(chan types.Msg, 1000),
toNode: make(chan interface{}, 1000),
+ badPeerChan: make(chan interface{}, 1000),
sentAgreement: make(map[common.Hash]struct{}),
blockCache: make(map[common.Hash]*types.Block, maxBlockCache),
unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
@@ -348,7 +350,7 @@ func (n *Network) BroadcastDKGPartialSignature(
}
// ReceiveChan implements core.Network interface.
-func (n *Network) ReceiveChan() <-chan interface{} {
+func (n *Network) ReceiveChan() <-chan types.Msg {
return n.toConsensus
}
@@ -396,14 +398,23 @@ func (n *Network) dispatchMsg(e *TransportEnvelope) {
}
delete(n.unreceivedBlocks, v.Hash)
}()
- n.toConsensus <- v
+ n.toConsensus <- types.Msg{
+ PeerID: e.From,
+ Payload: v,
+ }
case *types.Vote:
// Add this vote to cache.
n.addVoteToCache(v)
- n.toConsensus <- v
+ n.toConsensus <- types.Msg{
+ PeerID: e.From,
+ Payload: v,
+ }
case *types.AgreementResult,
*typesDKG.PrivateShare, *typesDKG.PartialSignature:
- n.toConsensus <- v
+ n.toConsensus <- types.Msg{
+ PeerID: e.From,
+ Payload: v,
+ }
case packedStateChanges:
if n.stateModule == nil {
panic(errors.New(
@@ -466,6 +477,11 @@ Loop:
default:
}
select {
+ case peer := <-n.badPeerChan:
+ if peer == nil {
+ continue Loop
+ }
+ n.trans.Disconnect(peer.(types.NodeID))
case <-n.ctx.Done():
break Loop
case e, ok := <-n.fromTransport:
@@ -535,6 +551,11 @@ func (n *Network) PurgeNodeSetCache(round uint64) {
n.cache.Purge(round)
}
+// ReportBadPeerChan reports that a peer is sending bad message.
+func (n *Network) ReportBadPeerChan() chan<- interface{} {
+ return n.badPeerChan
+}
+
func (n *Network) pullBlocksAsync(hashes common.Hashes) {
// Setup notification channels for each block hash.
notYetReceived := make(map[common.Hash]struct{})
diff --git a/core/test/network_test.go b/core/test/network_test.go
index 7a5ad16..27d25e6 100644
--- a/core/test/network_test.go
+++ b/core/test/network_test.go
@@ -147,7 +147,7 @@ func (s *NetworkTestSuite) TestPullBlocks() {
for {
select {
case v := <-master.ReceiveChan():
- b, ok := v.(*types.Block)
+ b, ok := v.Payload.(*types.Block)
if !ok {
break
}
@@ -223,7 +223,7 @@ func (s *NetworkTestSuite) TestPullVotes() {
defer cancelFunc()
select {
case v := <-master.ReceiveChan():
- vv, ok := v.(*types.Vote)
+ vv, ok := v.Payload.(*types.Vote)
if !ok {
break
}
@@ -283,13 +283,17 @@ func (s *NetworkTestSuite) TestBroadcastToSet() {
// Try broadcasting with datum from round 0, and make sure only node belongs
// to that set receiving the message.
nerd.BroadcastVote(&types.Vote{VoteHeader: types.VoteHeader{Position: pos}})
- req.IsType(&types.Vote{}, <-notaryNode.ReceiveChan())
+ msg := <-notaryNode.ReceiveChan()
+ req.IsType(&types.Vote{}, msg.Payload)
nerd.BroadcastDKGPrivateShare(&typesDKG.PrivateShare{Round: pos.Round})
- req.IsType(&typesDKG.PrivateShare{}, <-notaryNode.ReceiveChan())
+ msg = <-notaryNode.ReceiveChan()
+ req.IsType(&typesDKG.PrivateShare{}, msg.Payload)
nerd.BroadcastDKGPartialSignature(&typesDKG.PartialSignature{Round: pos.Round})
- req.IsType(&typesDKG.PartialSignature{}, <-notaryNode.ReceiveChan())
+ msg = <-notaryNode.ReceiveChan()
+ req.IsType(&typesDKG.PartialSignature{}, msg.Payload)
nerd.BroadcastBlock(&types.Block{Position: pos})
- req.IsType(&types.Block{}, <-notaryNode.ReceiveChan())
+ msg = <-notaryNode.ReceiveChan()
+ req.IsType(&types.Block{}, msg.Payload)
}
type testVoteCensor struct{}
@@ -309,7 +313,7 @@ func (s *NetworkTestSuite) TestCensor() {
_, pubKeys, err := NewKeys(peerCount)
req.NoError(err)
networks := s.setupNetworks(pubKeys)
- receiveChans := make(map[types.NodeID]<-chan interface{}, peerCount)
+ receiveChans := make(map[types.NodeID]<-chan types.Msg, peerCount)
for nID, node := range networks {
receiveChans[nID] = node.ReceiveChan()
}
@@ -330,7 +334,8 @@ func (s *NetworkTestSuite) TestCensor() {
req.Equal(0, len(receiveChan))
} else {
req.Equal(1, len(receiveChan))
- req.IsType(&types.Vote{}, <-receiveChan)
+ msg := <-receiveChan
+ req.IsType(&types.Vote{}, msg.Payload)
}
}
@@ -351,7 +356,8 @@ func (s *NetworkTestSuite) TestCensor() {
req.Equal(0, len(receiveChan))
} else {
req.Equal(1, len(receiveChan))
- req.IsType(&types.Vote{}, <-receiveChan)
+ msg := <-receiveChan
+ req.IsType(&types.Vote{}, msg.Payload)
}
}
censorNode.BroadcastVote(vote)
@@ -361,7 +367,8 @@ func (s *NetworkTestSuite) TestCensor() {
req.Equal(0, len(receiveChan))
} else {
req.Equal(1, len(receiveChan))
- req.IsType(&types.Vote{}, <-receiveChan)
+ msg := <-receiveChan
+ req.IsType(&types.Vote{}, msg.Payload)
}
}
diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go
index bbc4d56..fdc47d5 100644
--- a/core/test/tcp-transport.go
+++ b/core/test/tcp-transport.go
@@ -205,6 +205,11 @@ func (t *TCPTransport) clientHandshake(conn net.Conn) (
return
}
+// Disconnect implements Transport.Disconnect method.
+func (t *TCPTransport) Disconnect(endpoint types.NodeID) {
+ delete(t.peers, endpoint)
+}
+
func (t *TCPTransport) send(
endpoint types.NodeID, msg interface{}, payload []byte) {
t.peersLock.RLock()
@@ -217,6 +222,10 @@ func (t *TCPTransport) send(
func (t *TCPTransport) Send(
endpoint types.NodeID, msg interface{}) (err error) {
+ if _, exist := t.peers[endpoint]; !exist {
+ return fmt.Errorf("the endpoint does not exists: %v", endpoint)
+ }
+
payload, err := t.marshalMessage(msg)
if err != nil {
return