From 24c0cecbbf7ba84754ccc02d37c9540ce317976c Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Fri, 12 Apr 2019 11:35:37 +0800 Subject: core: add report bad peer interface to network (#559) --- core/consensus.go | 42 ++++++++++++++++++++++---------- core/consensus_test.go | 58 +++++++++++++++++++++++++++++++++++---------- core/interfaces.go | 5 +++- core/syncer/consensus.go | 10 ++++---- core/test/fake-transport.go | 5 ++++ core/test/interface.go | 2 ++ core/test/network.go | 33 +++++++++++++++++++++----- core/test/network_test.go | 27 +++++++++++++-------- core/test/tcp-transport.go | 9 +++++++ core/types/message.go | 24 +++++++++++++++++++ core/utils/utils.go | 2 +- core/utils/utils_test.go | 16 +++++++------ 12 files changed, 178 insertions(+), 55 deletions(-) create mode 100644 core/types/message.go diff --git a/core/consensus.go b/core/consensus.go index 49f2db1..ec15bf3 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -520,7 +520,7 @@ type Consensus struct { roundEvent *utils.RoundEvent logger common.Logger resetDeliveryGuardTicker chan struct{} - msgChan chan interface{} + msgChan chan types.Msg priorityMsgChan chan interface{} waitGroup sync.WaitGroup processBlockChan chan *types.Block @@ -528,7 +528,7 @@ type Consensus struct { // Context of Dummy receiver during switching from syncer. dummyCancel context.CancelFunc dummyFinished <-chan struct{} - dummyMsgBuffer []interface{} + dummyMsgBuffer []types.Msg } // NewConsensus construct an Consensus instance. @@ -577,7 +577,7 @@ func NewConsensusFromSyncer( networkModule Network, prv crypto.PrivateKey, confirmedBlocks []*types.Block, - cachedMessages []interface{}, + cachedMessages []types.Msg, logger common.Logger) (*Consensus, error) { // Setup Consensus instance. con := newConsensusForRound(initBlock, dMoment, app, gov, db, @@ -585,7 +585,7 @@ func NewConsensusFromSyncer( // Launch a dummy receiver before we start receiving from network module. con.dummyMsgBuffer = cachedMessages con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( - con.ctx, networkModule.ReceiveChan(), func(msg interface{}) { + con.ctx, networkModule.ReceiveChan(), func(msg types.Msg) { con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) }) // Dump all BA-confirmed blocks to the consensus instance, make sure these @@ -690,7 +690,7 @@ func newConsensusForRound( event: common.NewEvent(), logger: logger, resetDeliveryGuardTicker: make(chan struct{}), - msgChan: make(chan interface{}, 1024), + msgChan: make(chan types.Msg, 1024), priorityMsgChan: make(chan interface{}, 1024), processBlockChan: make(chan *types.Block, 1024), } @@ -1234,14 +1234,15 @@ MessageLoop: return default: } - var msg interface{} + var msg, peer interface{} select { case msg = <-con.priorityMsgChan: default: } if msg == nil { select { - case msg = <-con.msgChan: + case message := <-con.msgChan: + msg, peer = message.Payload, message.PeerID case msg = <-con.priorityMsgChan: case <-con.ctx.Done(): return @@ -1260,42 +1261,53 @@ MessageLoop: if val.IsEmpty() { hash, err := utils.HashBlock(val) if err != nil { - con.logger.Error("error verifying empty block hash", + con.logger.Error("Error verifying empty block hash", "block", val, "error, err") + con.network.ReportBadPeerChan() <- peer continue MessageLoop } if hash != val.Hash { - con.logger.Error("incorrect confirmed empty block hash", + con.logger.Error("Incorrect confirmed empty block hash", "block", val, "hash", hash) + con.network.ReportBadPeerChan() <- peer continue MessageLoop } if _, err := con.bcModule.proposeBlock( val.Position, time.Time{}, true); err != nil { - con.logger.Error("error adding empty block", + con.logger.Error("Error adding empty block", "block", val, "error", err) + con.network.ReportBadPeerChan() <- peer continue MessageLoop } } else { + if !val.IsFinalized() { + con.logger.Warn("Ignore not finalized block", + "block", val) + continue MessageLoop + } ok, err := con.bcModule.verifyRandomness( val.Hash, val.Position.Round, val.Randomness) if err != nil { - con.logger.Error("error verifying confirmed block randomness", + con.logger.Error("Error verifying confirmed block randomness", "block", val, "error", err) + con.network.ReportBadPeerChan() <- peer continue MessageLoop } if !ok { - con.logger.Error("incorrect confirmed block randomness", + con.logger.Error("Incorrect confirmed block randomness", "block", val) + con.network.ReportBadPeerChan() <- peer continue MessageLoop } if err := utils.VerifyBlockSignature(val); err != nil { con.logger.Error("VerifyBlockSignature failed", "block", val, "error", err) + con.network.ReportBadPeerChan() <- peer continue MessageLoop } } @@ -1314,12 +1326,14 @@ MessageLoop: con.logger.Error("Failed to process finalized block", "block", val, "error", err) + con.network.ReportBadPeerChan() <- peer } } else { if err := con.preProcessBlock(val); err != nil { con.logger.Error("Failed to pre process block", "block", val, "error", err) + con.network.ReportBadPeerChan() <- peer } } case *types.Vote: @@ -1327,23 +1341,27 @@ MessageLoop: con.logger.Error("Failed to process vote", "vote", val, "error", err) + con.network.ReportBadPeerChan() <- peer } case *types.AgreementResult: if err := con.ProcessAgreementResult(val); err != nil { con.logger.Error("Failed to process agreement result", "result", val, "error", err) + con.network.ReportBadPeerChan() <- peer } case *typesDKG.PrivateShare: if err := con.cfgModule.processPrivateShare(val); err != nil { con.logger.Error("Failed to process private share", "error", err) + con.network.ReportBadPeerChan() <- peer } case *typesDKG.PartialSignature: if err := con.cfgModule.processPartialSignature(val); err != nil { con.logger.Error("Failed to process partial signature", "error", err) + con.network.ReportBadPeerChan() <- peer } } } diff --git a/core/consensus_test.go b/core/consensus_test.go index f677d00..5f61028 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -18,6 +18,7 @@ package core import ( + "context" "encoding/json" "testing" "time" @@ -70,7 +71,7 @@ func (n *network) BroadcastAgreementResult( // SendDKGPrivateShare sends PrivateShare to a DKG participant. func (n *network) SendDKGPrivateShare( recv crypto.PublicKey, prvShare *typesDKG.PrivateShare) { - n.conn.send(types.NewNodeID(recv), prvShare) + n.conn.send(n.nID, types.NewNodeID(recv), prvShare) } // BroadcastDKGPrivateShare broadcasts PrivateShare to all DKG participants. @@ -87,8 +88,13 @@ func (n *network) BroadcastDKGPartialSignature( } // ReceiveChan returns a channel to receive messages from DEXON network. -func (n *network) ReceiveChan() <-chan interface{} { - return make(chan interface{}) +func (n *network) ReceiveChan() <-chan types.Msg { + return make(chan types.Msg) +} + +// ReportBadPeer reports that a peer is sending bad message. +func (n *network) ReportBadPeerChan() chan<- interface{} { + return n.conn.s.sink } func (nc *networkConnection) broadcast(from types.NodeID, msg interface{}) { @@ -96,11 +102,11 @@ func (nc *networkConnection) broadcast(from types.NodeID, msg interface{}) { if nID == from { continue } - nc.send(nID, msg) + nc.send(from, nID, msg) } } -func (nc *networkConnection) send(to types.NodeID, msg interface{}) { +func (nc *networkConnection) send(from, to types.NodeID, msg interface{}) { ch, exist := nc.cons[to] if !exist { return @@ -122,12 +128,15 @@ func (nc *networkConnection) send(to types.NodeID, msg interface{}) { } msgCopy = valCopy } - ch <- msgCopy + ch <- types.Msg{ + PeerID: from, + Payload: msgCopy, + } } type networkConnection struct { s *ConsensusTestSuite - cons map[types.NodeID]chan interface{} + cons map[types.NodeID]chan types.Msg } func (nc *networkConnection) newNetwork(nID types.NodeID) *network { @@ -138,14 +147,19 @@ func (nc *networkConnection) newNetwork(nID types.NodeID) *network { } func (nc *networkConnection) setCon(nID types.NodeID, con *Consensus) { - ch := make(chan interface{}, 1000) + ch := make(chan types.Msg, 1000) go func() { for { - msg := <-ch + var msg types.Msg + select { + case msg = <-ch: + case <-nc.s.ctx.Done(): + return + } var err error // Testify package does not support concurrent call. // Use panic() to detact error. - switch val := msg.(type) { + switch val := msg.Payload.(type) { case *types.Block: err = con.preProcessBlock(val) case *types.Vote: @@ -167,13 +181,31 @@ func (nc *networkConnection) setCon(nID types.NodeID, con *Consensus) { type ConsensusTestSuite struct { suite.Suite - conn *networkConnection + ctx context.Context + ctxCancel context.CancelFunc + conn *networkConnection + sink chan interface{} +} + +func (s *ConsensusTestSuite) SetupTest() { + s.ctx, s.ctxCancel = context.WithCancel(context.Background()) + s.sink = make(chan interface{}, 1000) + go func() { + select { + case <-s.sink: + case <-s.ctx.Done(): + return + } + }() +} +func (s *ConsensusTestSuite) TearDownTest() { + s.ctxCancel() } func (s *ConsensusTestSuite) newNetworkConnection() *networkConnection { return &networkConnection{ s: s, - cons: make(map[types.NodeID]chan interface{}), + cons: make(map[types.NodeID]chan types.Msg), } } @@ -384,7 +416,7 @@ func (s *ConsensusTestSuite) TestInitialHeightEventTriggered() { network, prvKey, []*types.Block(nil), - []interface{}(nil), + []types.Msg{}, &common.NullLogger{}, ) s.Require().NoError(err) diff --git a/core/interfaces.go b/core/interfaces.go index f76ee19..c16c624 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -83,7 +83,10 @@ type Network interface { BroadcastDKGPartialSignature(psig *typesDKG.PartialSignature) // ReceiveChan returns a channel to receive messages from DEXON network. - ReceiveChan() <-chan interface{} + ReceiveChan() <-chan types.Msg + + // ReportBadPeerChan returns a channel to report bad peer. + ReportBadPeerChan() chan<- interface{} } // Governance interface specifies interface to control the governance contract. diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index 7db836a..496c0f9 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -80,7 +80,7 @@ type Consensus struct { syncedSkipNext bool dummyCancel context.CancelFunc dummyFinished <-chan struct{} - dummyMsgBuffer []interface{} + dummyMsgBuffer []types.Msg initChainTipHeight uint64 } @@ -297,7 +297,7 @@ func (con *Consensus) ForceSync(lastPos types.Position, skip bool) { if con.dummyCancel == nil { con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( context.Background(), con.network.ReceiveChan(), - func(msg interface{}) { + func(msg types.Msg) { con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) }) } @@ -448,7 +448,7 @@ func (con *Consensus) stopBuffering() { // need to launch a dummy receiver right away. con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( context.Background(), con.network.ReceiveChan(), - func(msg interface{}) { + func(msg types.Msg) { con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) }) // Stop agreements. @@ -512,7 +512,7 @@ func (con *Consensus) startNetwork() { for { select { case val := <-con.network.ReceiveChan(): - switch v := val.(type) { + switch v := val.Payload.(type) { case *types.Block: case *types.AgreementResult: // Avoid byzantine nodes attack by broadcasting older @@ -524,7 +524,7 @@ func (con *Consensus) startNetwork() { default: continue loop } - con.agreementModule.inputChan <- val + con.agreementModule.inputChan <- val.Payload case <-con.ctx.Done(): break loop } diff --git a/core/test/fake-transport.go b/core/test/fake-transport.go index ed9871b..cecac54 100644 --- a/core/test/fake-transport.go +++ b/core/test/fake-transport.go @@ -65,6 +65,11 @@ func NewFakeTransportClient(pubKey crypto.PublicKey) TransportClient { } } +// Disconnect implements Transport.Disconnect method. +func (t *FakeTransport) Disconnect(endpoint types.NodeID) { + delete(t.peers, endpoint) +} + // Send implements Transport.Send method. func (t *FakeTransport) Send( endpoint types.NodeID, msg interface{}) (err error) { diff --git a/core/test/interface.go b/core/test/interface.go index 0712ad5..58a3ced 100644 --- a/core/test/interface.go +++ b/core/test/interface.go @@ -94,6 +94,8 @@ type Transport interface { // This method should be accessed after ether 'Join' or 'WaitForPeers' // returned. Peers() []crypto.PublicKey + + Disconnect(endpoint types.NodeID) } // Marshaller defines an interface to convert between interface{} and []byte. diff --git a/core/test/network.go b/core/test/network.go index f32c27f..c0ec255 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -179,8 +179,9 @@ type Network struct { trans *censorClient dMoment time.Time fromTransport <-chan *TransportEnvelope - toConsensus chan interface{} + toConsensus chan types.Msg toNode chan interface{} + badPeerChan chan interface{} sentAgreementLock sync.Mutex sentAgreement map[common.Hash]struct{} blockCacheLock sync.RWMutex @@ -208,8 +209,9 @@ func NewNetwork(pubKey crypto.PublicKey, config NetworkConfig) ( n = &Network{ ID: types.NewNodeID(pubKey), config: config, - toConsensus: make(chan interface{}, 1000), + toConsensus: make(chan types.Msg, 1000), toNode: make(chan interface{}, 1000), + badPeerChan: make(chan interface{}, 1000), sentAgreement: make(map[common.Hash]struct{}), blockCache: make(map[common.Hash]*types.Block, maxBlockCache), unreceivedBlocks: make(map[common.Hash]chan<- common.Hash), @@ -348,7 +350,7 @@ func (n *Network) BroadcastDKGPartialSignature( } // ReceiveChan implements core.Network interface. -func (n *Network) ReceiveChan() <-chan interface{} { +func (n *Network) ReceiveChan() <-chan types.Msg { return n.toConsensus } @@ -396,14 +398,23 @@ func (n *Network) dispatchMsg(e *TransportEnvelope) { } delete(n.unreceivedBlocks, v.Hash) }() - n.toConsensus <- v + n.toConsensus <- types.Msg{ + PeerID: e.From, + Payload: v, + } case *types.Vote: // Add this vote to cache. n.addVoteToCache(v) - n.toConsensus <- v + n.toConsensus <- types.Msg{ + PeerID: e.From, + Payload: v, + } case *types.AgreementResult, *typesDKG.PrivateShare, *typesDKG.PartialSignature: - n.toConsensus <- v + n.toConsensus <- types.Msg{ + PeerID: e.From, + Payload: v, + } case packedStateChanges: if n.stateModule == nil { panic(errors.New( @@ -466,6 +477,11 @@ Loop: default: } select { + case peer := <-n.badPeerChan: + if peer == nil { + continue Loop + } + n.trans.Disconnect(peer.(types.NodeID)) case <-n.ctx.Done(): break Loop case e, ok := <-n.fromTransport: @@ -535,6 +551,11 @@ func (n *Network) PurgeNodeSetCache(round uint64) { n.cache.Purge(round) } +// ReportBadPeerChan reports that a peer is sending bad message. +func (n *Network) ReportBadPeerChan() chan<- interface{} { + return n.badPeerChan +} + func (n *Network) pullBlocksAsync(hashes common.Hashes) { // Setup notification channels for each block hash. notYetReceived := make(map[common.Hash]struct{}) diff --git a/core/test/network_test.go b/core/test/network_test.go index 7a5ad16..27d25e6 100644 --- a/core/test/network_test.go +++ b/core/test/network_test.go @@ -147,7 +147,7 @@ func (s *NetworkTestSuite) TestPullBlocks() { for { select { case v := <-master.ReceiveChan(): - b, ok := v.(*types.Block) + b, ok := v.Payload.(*types.Block) if !ok { break } @@ -223,7 +223,7 @@ func (s *NetworkTestSuite) TestPullVotes() { defer cancelFunc() select { case v := <-master.ReceiveChan(): - vv, ok := v.(*types.Vote) + vv, ok := v.Payload.(*types.Vote) if !ok { break } @@ -283,13 +283,17 @@ func (s *NetworkTestSuite) TestBroadcastToSet() { // Try broadcasting with datum from round 0, and make sure only node belongs // to that set receiving the message. nerd.BroadcastVote(&types.Vote{VoteHeader: types.VoteHeader{Position: pos}}) - req.IsType(&types.Vote{}, <-notaryNode.ReceiveChan()) + msg := <-notaryNode.ReceiveChan() + req.IsType(&types.Vote{}, msg.Payload) nerd.BroadcastDKGPrivateShare(&typesDKG.PrivateShare{Round: pos.Round}) - req.IsType(&typesDKG.PrivateShare{}, <-notaryNode.ReceiveChan()) + msg = <-notaryNode.ReceiveChan() + req.IsType(&typesDKG.PrivateShare{}, msg.Payload) nerd.BroadcastDKGPartialSignature(&typesDKG.PartialSignature{Round: pos.Round}) - req.IsType(&typesDKG.PartialSignature{}, <-notaryNode.ReceiveChan()) + msg = <-notaryNode.ReceiveChan() + req.IsType(&typesDKG.PartialSignature{}, msg.Payload) nerd.BroadcastBlock(&types.Block{Position: pos}) - req.IsType(&types.Block{}, <-notaryNode.ReceiveChan()) + msg = <-notaryNode.ReceiveChan() + req.IsType(&types.Block{}, msg.Payload) } type testVoteCensor struct{} @@ -309,7 +313,7 @@ func (s *NetworkTestSuite) TestCensor() { _, pubKeys, err := NewKeys(peerCount) req.NoError(err) networks := s.setupNetworks(pubKeys) - receiveChans := make(map[types.NodeID]<-chan interface{}, peerCount) + receiveChans := make(map[types.NodeID]<-chan types.Msg, peerCount) for nID, node := range networks { receiveChans[nID] = node.ReceiveChan() } @@ -330,7 +334,8 @@ func (s *NetworkTestSuite) TestCensor() { req.Equal(0, len(receiveChan)) } else { req.Equal(1, len(receiveChan)) - req.IsType(&types.Vote{}, <-receiveChan) + msg := <-receiveChan + req.IsType(&types.Vote{}, msg.Payload) } } @@ -351,7 +356,8 @@ func (s *NetworkTestSuite) TestCensor() { req.Equal(0, len(receiveChan)) } else { req.Equal(1, len(receiveChan)) - req.IsType(&types.Vote{}, <-receiveChan) + msg := <-receiveChan + req.IsType(&types.Vote{}, msg.Payload) } } censorNode.BroadcastVote(vote) @@ -361,7 +367,8 @@ func (s *NetworkTestSuite) TestCensor() { req.Equal(0, len(receiveChan)) } else { req.Equal(1, len(receiveChan)) - req.IsType(&types.Vote{}, <-receiveChan) + msg := <-receiveChan + req.IsType(&types.Vote{}, msg.Payload) } } diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go index bbc4d56..fdc47d5 100644 --- a/core/test/tcp-transport.go +++ b/core/test/tcp-transport.go @@ -205,6 +205,11 @@ func (t *TCPTransport) clientHandshake(conn net.Conn) ( return } +// Disconnect implements Transport.Disconnect method. +func (t *TCPTransport) Disconnect(endpoint types.NodeID) { + delete(t.peers, endpoint) +} + func (t *TCPTransport) send( endpoint types.NodeID, msg interface{}, payload []byte) { t.peersLock.RLock() @@ -217,6 +222,10 @@ func (t *TCPTransport) send( func (t *TCPTransport) Send( endpoint types.NodeID, msg interface{}) (err error) { + if _, exist := t.peers[endpoint]; !exist { + return fmt.Errorf("the endpoint does not exists: %v", endpoint) + } + payload, err := t.marshalMessage(msg) if err != nil { return diff --git a/core/types/message.go b/core/types/message.go new file mode 100644 index 0000000..0335cfa --- /dev/null +++ b/core/types/message.go @@ -0,0 +1,24 @@ +// Copyright 2019 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// . + +package types + +// Msg for the network ReceiveChan. +type Msg struct { + PeerID interface{} + Payload interface{} +} diff --git a/core/utils/utils.go b/core/utils/utils.go index 1a372c7..dc29bdf 100644 --- a/core/utils/utils.go +++ b/core/utils/utils.go @@ -105,7 +105,7 @@ func VerifyDKGComplaint( // channel of a network module. An context is required to stop the go routine // automatically. An optinal message handler could be provided. func LaunchDummyReceiver( - ctx context.Context, recv <-chan interface{}, handler func(interface{})) ( + ctx context.Context, recv <-chan types.Msg, handler func(types.Msg)) ( context.CancelFunc, <-chan struct{}) { var ( dummyCtx, dummyCancel = context.WithCancel(ctx) diff --git a/core/utils/utils_test.go b/core/utils/utils_test.go index c3afea9..c6f8543 100644 --- a/core/utils/utils_test.go +++ b/core/utils/utils_test.go @@ -121,14 +121,16 @@ func (s *UtilsTestSuite) TestDummyReceiver() { for i := 0; i < msgCount; i++ { fakeMsgs = append(fakeMsgs, i) } - launchDummySender := func(msgs []int, inputChan chan<- interface{}) { + launchDummySender := func(msgs []int, inputChan chan<- types.Msg) { finished := make(chan struct{}, 1) go func() { defer func() { finished <- struct{}{} }() for _, v := range msgs { - inputChan <- v + inputChan <- types.Msg{ + Payload: v, + } } }() select { @@ -137,17 +139,17 @@ func (s *UtilsTestSuite) TestDummyReceiver() { s.Require().FailNow("unable to deliver all messages in time") } } - checkBuffer := func(sent []int, buff []interface{}) { + checkBuffer := func(sent []int, buff []types.Msg) { s.Require().Len(buff, len(sent)) for i := range sent { - s.Require().Equal(sent[i], buff[i].(int)) + s.Require().Equal(sent[i], buff[i].Payload.(int)) } } // Basic scenario: a dummy receiver with caching enabled. - recv := make(chan interface{}) - buff := []interface{}{} + recv := make(chan types.Msg) + buff := []types.Msg{} cancel, finished := LaunchDummyReceiver( - context.Background(), recv, func(msg interface{}) { + context.Background(), recv, func(msg types.Msg) { buff = append(buff, msg) }) launchDummySender(fakeMsgs, recv) -- cgit v1.2.3