diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2019-04-12 11:35:37 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-04-12 11:35:37 +0800 |
commit | 24c0cecbbf7ba84754ccc02d37c9540ce317976c (patch) | |
tree | 81031ec19affa0a0b962af627301963b077ea194 /core/test | |
parent | 1107fedc61b1b042216948d9fea78e7747f7599f (diff) | |
download | dexon-consensus-24c0cecbbf7ba84754ccc02d37c9540ce317976c.tar dexon-consensus-24c0cecbbf7ba84754ccc02d37c9540ce317976c.tar.gz dexon-consensus-24c0cecbbf7ba84754ccc02d37c9540ce317976c.tar.bz2 dexon-consensus-24c0cecbbf7ba84754ccc02d37c9540ce317976c.tar.lz dexon-consensus-24c0cecbbf7ba84754ccc02d37c9540ce317976c.tar.xz dexon-consensus-24c0cecbbf7ba84754ccc02d37c9540ce317976c.tar.zst dexon-consensus-24c0cecbbf7ba84754ccc02d37c9540ce317976c.zip |
core: add report bad peer interface to network (#559)
Diffstat (limited to 'core/test')
-rw-r--r-- | core/test/fake-transport.go | 5 | ||||
-rw-r--r-- | core/test/interface.go | 2 | ||||
-rw-r--r-- | core/test/network.go | 33 | ||||
-rw-r--r-- | core/test/network_test.go | 27 | ||||
-rw-r--r-- | core/test/tcp-transport.go | 9 |
5 files changed, 60 insertions, 16 deletions
diff --git a/core/test/fake-transport.go b/core/test/fake-transport.go index ed9871b..cecac54 100644 --- a/core/test/fake-transport.go +++ b/core/test/fake-transport.go @@ -65,6 +65,11 @@ func NewFakeTransportClient(pubKey crypto.PublicKey) TransportClient { } } +// Disconnect implements Transport.Disconnect method. +func (t *FakeTransport) Disconnect(endpoint types.NodeID) { + delete(t.peers, endpoint) +} + // Send implements Transport.Send method. func (t *FakeTransport) Send( endpoint types.NodeID, msg interface{}) (err error) { diff --git a/core/test/interface.go b/core/test/interface.go index 0712ad5..58a3ced 100644 --- a/core/test/interface.go +++ b/core/test/interface.go @@ -94,6 +94,8 @@ type Transport interface { // This method should be accessed after ether 'Join' or 'WaitForPeers' // returned. Peers() []crypto.PublicKey + + Disconnect(endpoint types.NodeID) } // Marshaller defines an interface to convert between interface{} and []byte. diff --git a/core/test/network.go b/core/test/network.go index f32c27f..c0ec255 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -179,8 +179,9 @@ type Network struct { trans *censorClient dMoment time.Time fromTransport <-chan *TransportEnvelope - toConsensus chan interface{} + toConsensus chan types.Msg toNode chan interface{} + badPeerChan chan interface{} sentAgreementLock sync.Mutex sentAgreement map[common.Hash]struct{} blockCacheLock sync.RWMutex @@ -208,8 +209,9 @@ func NewNetwork(pubKey crypto.PublicKey, config NetworkConfig) ( n = &Network{ ID: types.NewNodeID(pubKey), config: config, - toConsensus: make(chan interface{}, 1000), + toConsensus: make(chan types.Msg, 1000), toNode: make(chan interface{}, 1000), + badPeerChan: make(chan interface{}, 1000), sentAgreement: make(map[common.Hash]struct{}), blockCache: make(map[common.Hash]*types.Block, maxBlockCache), unreceivedBlocks: make(map[common.Hash]chan<- common.Hash), @@ -348,7 +350,7 @@ func (n *Network) BroadcastDKGPartialSignature( } // ReceiveChan implements core.Network interface. -func (n *Network) ReceiveChan() <-chan interface{} { +func (n *Network) ReceiveChan() <-chan types.Msg { return n.toConsensus } @@ -396,14 +398,23 @@ func (n *Network) dispatchMsg(e *TransportEnvelope) { } delete(n.unreceivedBlocks, v.Hash) }() - n.toConsensus <- v + n.toConsensus <- types.Msg{ + PeerID: e.From, + Payload: v, + } case *types.Vote: // Add this vote to cache. n.addVoteToCache(v) - n.toConsensus <- v + n.toConsensus <- types.Msg{ + PeerID: e.From, + Payload: v, + } case *types.AgreementResult, *typesDKG.PrivateShare, *typesDKG.PartialSignature: - n.toConsensus <- v + n.toConsensus <- types.Msg{ + PeerID: e.From, + Payload: v, + } case packedStateChanges: if n.stateModule == nil { panic(errors.New( @@ -466,6 +477,11 @@ Loop: default: } select { + case peer := <-n.badPeerChan: + if peer == nil { + continue Loop + } + n.trans.Disconnect(peer.(types.NodeID)) case <-n.ctx.Done(): break Loop case e, ok := <-n.fromTransport: @@ -535,6 +551,11 @@ func (n *Network) PurgeNodeSetCache(round uint64) { n.cache.Purge(round) } +// ReportBadPeerChan reports that a peer is sending bad message. +func (n *Network) ReportBadPeerChan() chan<- interface{} { + return n.badPeerChan +} + func (n *Network) pullBlocksAsync(hashes common.Hashes) { // Setup notification channels for each block hash. notYetReceived := make(map[common.Hash]struct{}) diff --git a/core/test/network_test.go b/core/test/network_test.go index 7a5ad16..27d25e6 100644 --- a/core/test/network_test.go +++ b/core/test/network_test.go @@ -147,7 +147,7 @@ func (s *NetworkTestSuite) TestPullBlocks() { for { select { case v := <-master.ReceiveChan(): - b, ok := v.(*types.Block) + b, ok := v.Payload.(*types.Block) if !ok { break } @@ -223,7 +223,7 @@ func (s *NetworkTestSuite) TestPullVotes() { defer cancelFunc() select { case v := <-master.ReceiveChan(): - vv, ok := v.(*types.Vote) + vv, ok := v.Payload.(*types.Vote) if !ok { break } @@ -283,13 +283,17 @@ func (s *NetworkTestSuite) TestBroadcastToSet() { // Try broadcasting with datum from round 0, and make sure only node belongs // to that set receiving the message. nerd.BroadcastVote(&types.Vote{VoteHeader: types.VoteHeader{Position: pos}}) - req.IsType(&types.Vote{}, <-notaryNode.ReceiveChan()) + msg := <-notaryNode.ReceiveChan() + req.IsType(&types.Vote{}, msg.Payload) nerd.BroadcastDKGPrivateShare(&typesDKG.PrivateShare{Round: pos.Round}) - req.IsType(&typesDKG.PrivateShare{}, <-notaryNode.ReceiveChan()) + msg = <-notaryNode.ReceiveChan() + req.IsType(&typesDKG.PrivateShare{}, msg.Payload) nerd.BroadcastDKGPartialSignature(&typesDKG.PartialSignature{Round: pos.Round}) - req.IsType(&typesDKG.PartialSignature{}, <-notaryNode.ReceiveChan()) + msg = <-notaryNode.ReceiveChan() + req.IsType(&typesDKG.PartialSignature{}, msg.Payload) nerd.BroadcastBlock(&types.Block{Position: pos}) - req.IsType(&types.Block{}, <-notaryNode.ReceiveChan()) + msg = <-notaryNode.ReceiveChan() + req.IsType(&types.Block{}, msg.Payload) } type testVoteCensor struct{} @@ -309,7 +313,7 @@ func (s *NetworkTestSuite) TestCensor() { _, pubKeys, err := NewKeys(peerCount) req.NoError(err) networks := s.setupNetworks(pubKeys) - receiveChans := make(map[types.NodeID]<-chan interface{}, peerCount) + receiveChans := make(map[types.NodeID]<-chan types.Msg, peerCount) for nID, node := range networks { receiveChans[nID] = node.ReceiveChan() } @@ -330,7 +334,8 @@ func (s *NetworkTestSuite) TestCensor() { req.Equal(0, len(receiveChan)) } else { req.Equal(1, len(receiveChan)) - req.IsType(&types.Vote{}, <-receiveChan) + msg := <-receiveChan + req.IsType(&types.Vote{}, msg.Payload) } } @@ -351,7 +356,8 @@ func (s *NetworkTestSuite) TestCensor() { req.Equal(0, len(receiveChan)) } else { req.Equal(1, len(receiveChan)) - req.IsType(&types.Vote{}, <-receiveChan) + msg := <-receiveChan + req.IsType(&types.Vote{}, msg.Payload) } } censorNode.BroadcastVote(vote) @@ -361,7 +367,8 @@ func (s *NetworkTestSuite) TestCensor() { req.Equal(0, len(receiveChan)) } else { req.Equal(1, len(receiveChan)) - req.IsType(&types.Vote{}, <-receiveChan) + msg := <-receiveChan + req.IsType(&types.Vote{}, msg.Payload) } } diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go index bbc4d56..fdc47d5 100644 --- a/core/test/tcp-transport.go +++ b/core/test/tcp-transport.go @@ -205,6 +205,11 @@ func (t *TCPTransport) clientHandshake(conn net.Conn) ( return } +// Disconnect implements Transport.Disconnect method. +func (t *TCPTransport) Disconnect(endpoint types.NodeID) { + delete(t.peers, endpoint) +} + func (t *TCPTransport) send( endpoint types.NodeID, msg interface{}, payload []byte) { t.peersLock.RLock() @@ -217,6 +222,10 @@ func (t *TCPTransport) send( func (t *TCPTransport) Send( endpoint types.NodeID, msg interface{}) (err error) { + if _, exist := t.peers[endpoint]; !exist { + return fmt.Errorf("the endpoint does not exists: %v", endpoint) + } + payload, err := t.marshalMessage(msg) if err != nil { return |