aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/consensus.go42
-rw-r--r--core/consensus_test.go58
-rw-r--r--core/interfaces.go5
-rw-r--r--core/syncer/consensus.go10
-rw-r--r--core/test/fake-transport.go5
-rw-r--r--core/test/interface.go2
-rw-r--r--core/test/network.go33
-rw-r--r--core/test/network_test.go27
-rw-r--r--core/test/tcp-transport.go9
-rw-r--r--core/types/message.go24
-rw-r--r--core/utils/utils.go2
-rw-r--r--core/utils/utils_test.go16
12 files changed, 178 insertions, 55 deletions
diff --git a/core/consensus.go b/core/consensus.go
index 49f2db1..ec15bf3 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -520,7 +520,7 @@ type Consensus struct {
roundEvent *utils.RoundEvent
logger common.Logger
resetDeliveryGuardTicker chan struct{}
- msgChan chan interface{}
+ msgChan chan types.Msg
priorityMsgChan chan interface{}
waitGroup sync.WaitGroup
processBlockChan chan *types.Block
@@ -528,7 +528,7 @@ type Consensus struct {
// Context of Dummy receiver during switching from syncer.
dummyCancel context.CancelFunc
dummyFinished <-chan struct{}
- dummyMsgBuffer []interface{}
+ dummyMsgBuffer []types.Msg
}
// NewConsensus construct an Consensus instance.
@@ -577,7 +577,7 @@ func NewConsensusFromSyncer(
networkModule Network,
prv crypto.PrivateKey,
confirmedBlocks []*types.Block,
- cachedMessages []interface{},
+ cachedMessages []types.Msg,
logger common.Logger) (*Consensus, error) {
// Setup Consensus instance.
con := newConsensusForRound(initBlock, dMoment, app, gov, db,
@@ -585,7 +585,7 @@ func NewConsensusFromSyncer(
// Launch a dummy receiver before we start receiving from network module.
con.dummyMsgBuffer = cachedMessages
con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
- con.ctx, networkModule.ReceiveChan(), func(msg interface{}) {
+ con.ctx, networkModule.ReceiveChan(), func(msg types.Msg) {
con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
})
// Dump all BA-confirmed blocks to the consensus instance, make sure these
@@ -690,7 +690,7 @@ func newConsensusForRound(
event: common.NewEvent(),
logger: logger,
resetDeliveryGuardTicker: make(chan struct{}),
- msgChan: make(chan interface{}, 1024),
+ msgChan: make(chan types.Msg, 1024),
priorityMsgChan: make(chan interface{}, 1024),
processBlockChan: make(chan *types.Block, 1024),
}
@@ -1234,14 +1234,15 @@ MessageLoop:
return
default:
}
- var msg interface{}
+ var msg, peer interface{}
select {
case msg = <-con.priorityMsgChan:
default:
}
if msg == nil {
select {
- case msg = <-con.msgChan:
+ case message := <-con.msgChan:
+ msg, peer = message.Payload, message.PeerID
case msg = <-con.priorityMsgChan:
case <-con.ctx.Done():
return
@@ -1260,42 +1261,53 @@ MessageLoop:
if val.IsEmpty() {
hash, err := utils.HashBlock(val)
if err != nil {
- con.logger.Error("error verifying empty block hash",
+ con.logger.Error("Error verifying empty block hash",
"block", val,
"error, err")
+ con.network.ReportBadPeerChan() <- peer
continue MessageLoop
}
if hash != val.Hash {
- con.logger.Error("incorrect confirmed empty block hash",
+ con.logger.Error("Incorrect confirmed empty block hash",
"block", val,
"hash", hash)
+ con.network.ReportBadPeerChan() <- peer
continue MessageLoop
}
if _, err := con.bcModule.proposeBlock(
val.Position, time.Time{}, true); err != nil {
- con.logger.Error("error adding empty block",
+ con.logger.Error("Error adding empty block",
"block", val,
"error", err)
+ con.network.ReportBadPeerChan() <- peer
continue MessageLoop
}
} else {
+ if !val.IsFinalized() {
+ con.logger.Warn("Ignore not finalized block",
+ "block", val)
+ continue MessageLoop
+ }
ok, err := con.bcModule.verifyRandomness(
val.Hash, val.Position.Round, val.Randomness)
if err != nil {
- con.logger.Error("error verifying confirmed block randomness",
+ con.logger.Error("Error verifying confirmed block randomness",
"block", val,
"error", err)
+ con.network.ReportBadPeerChan() <- peer
continue MessageLoop
}
if !ok {
- con.logger.Error("incorrect confirmed block randomness",
+ con.logger.Error("Incorrect confirmed block randomness",
"block", val)
+ con.network.ReportBadPeerChan() <- peer
continue MessageLoop
}
if err := utils.VerifyBlockSignature(val); err != nil {
con.logger.Error("VerifyBlockSignature failed",
"block", val,
"error", err)
+ con.network.ReportBadPeerChan() <- peer
continue MessageLoop
}
}
@@ -1314,12 +1326,14 @@ MessageLoop:
con.logger.Error("Failed to process finalized block",
"block", val,
"error", err)
+ con.network.ReportBadPeerChan() <- peer
}
} else {
if err := con.preProcessBlock(val); err != nil {
con.logger.Error("Failed to pre process block",
"block", val,
"error", err)
+ con.network.ReportBadPeerChan() <- peer
}
}
case *types.Vote:
@@ -1327,23 +1341,27 @@ MessageLoop:
con.logger.Error("Failed to process vote",
"vote", val,
"error", err)
+ con.network.ReportBadPeerChan() <- peer
}
case *types.AgreementResult:
if err := con.ProcessAgreementResult(val); err != nil {
con.logger.Error("Failed to process agreement result",
"result", val,
"error", err)
+ con.network.ReportBadPeerChan() <- peer
}
case *typesDKG.PrivateShare:
if err := con.cfgModule.processPrivateShare(val); err != nil {
con.logger.Error("Failed to process private share",
"error", err)
+ con.network.ReportBadPeerChan() <- peer
}
case *typesDKG.PartialSignature:
if err := con.cfgModule.processPartialSignature(val); err != nil {
con.logger.Error("Failed to process partial signature",
"error", err)
+ con.network.ReportBadPeerChan() <- peer
}
}
}
diff --git a/core/consensus_test.go b/core/consensus_test.go
index f677d00..5f61028 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -18,6 +18,7 @@
package core
import (
+ "context"
"encoding/json"
"testing"
"time"
@@ -70,7 +71,7 @@ func (n *network) BroadcastAgreementResult(
// SendDKGPrivateShare sends PrivateShare to a DKG participant.
func (n *network) SendDKGPrivateShare(
recv crypto.PublicKey, prvShare *typesDKG.PrivateShare) {
- n.conn.send(types.NewNodeID(recv), prvShare)
+ n.conn.send(n.nID, types.NewNodeID(recv), prvShare)
}
// BroadcastDKGPrivateShare broadcasts PrivateShare to all DKG participants.
@@ -87,8 +88,13 @@ func (n *network) BroadcastDKGPartialSignature(
}
// ReceiveChan returns a channel to receive messages from DEXON network.
-func (n *network) ReceiveChan() <-chan interface{} {
- return make(chan interface{})
+func (n *network) ReceiveChan() <-chan types.Msg {
+ return make(chan types.Msg)
+}
+
+// ReportBadPeer reports that a peer is sending bad message.
+func (n *network) ReportBadPeerChan() chan<- interface{} {
+ return n.conn.s.sink
}
func (nc *networkConnection) broadcast(from types.NodeID, msg interface{}) {
@@ -96,11 +102,11 @@ func (nc *networkConnection) broadcast(from types.NodeID, msg interface{}) {
if nID == from {
continue
}
- nc.send(nID, msg)
+ nc.send(from, nID, msg)
}
}
-func (nc *networkConnection) send(to types.NodeID, msg interface{}) {
+func (nc *networkConnection) send(from, to types.NodeID, msg interface{}) {
ch, exist := nc.cons[to]
if !exist {
return
@@ -122,12 +128,15 @@ func (nc *networkConnection) send(to types.NodeID, msg interface{}) {
}
msgCopy = valCopy
}
- ch <- msgCopy
+ ch <- types.Msg{
+ PeerID: from,
+ Payload: msgCopy,
+ }
}
type networkConnection struct {
s *ConsensusTestSuite
- cons map[types.NodeID]chan interface{}
+ cons map[types.NodeID]chan types.Msg
}
func (nc *networkConnection) newNetwork(nID types.NodeID) *network {
@@ -138,14 +147,19 @@ func (nc *networkConnection) newNetwork(nID types.NodeID) *network {
}
func (nc *networkConnection) setCon(nID types.NodeID, con *Consensus) {
- ch := make(chan interface{}, 1000)
+ ch := make(chan types.Msg, 1000)
go func() {
for {
- msg := <-ch
+ var msg types.Msg
+ select {
+ case msg = <-ch:
+ case <-nc.s.ctx.Done():
+ return
+ }
var err error
// Testify package does not support concurrent call.
// Use panic() to detact error.
- switch val := msg.(type) {
+ switch val := msg.Payload.(type) {
case *types.Block:
err = con.preProcessBlock(val)
case *types.Vote:
@@ -167,13 +181,31 @@ func (nc *networkConnection) setCon(nID types.NodeID, con *Consensus) {
type ConsensusTestSuite struct {
suite.Suite
- conn *networkConnection
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ conn *networkConnection
+ sink chan interface{}
+}
+
+func (s *ConsensusTestSuite) SetupTest() {
+ s.ctx, s.ctxCancel = context.WithCancel(context.Background())
+ s.sink = make(chan interface{}, 1000)
+ go func() {
+ select {
+ case <-s.sink:
+ case <-s.ctx.Done():
+ return
+ }
+ }()
+}
+func (s *ConsensusTestSuite) TearDownTest() {
+ s.ctxCancel()
}
func (s *ConsensusTestSuite) newNetworkConnection() *networkConnection {
return &networkConnection{
s: s,
- cons: make(map[types.NodeID]chan interface{}),
+ cons: make(map[types.NodeID]chan types.Msg),
}
}
@@ -384,7 +416,7 @@ func (s *ConsensusTestSuite) TestInitialHeightEventTriggered() {
network,
prvKey,
[]*types.Block(nil),
- []interface{}(nil),
+ []types.Msg{},
&common.NullLogger{},
)
s.Require().NoError(err)
diff --git a/core/interfaces.go b/core/interfaces.go
index f76ee19..c16c624 100644
--- a/core/interfaces.go
+++ b/core/interfaces.go
@@ -83,7 +83,10 @@ type Network interface {
BroadcastDKGPartialSignature(psig *typesDKG.PartialSignature)
// ReceiveChan returns a channel to receive messages from DEXON network.
- ReceiveChan() <-chan interface{}
+ ReceiveChan() <-chan types.Msg
+
+ // ReportBadPeerChan returns a channel to report bad peer.
+ ReportBadPeerChan() chan<- interface{}
}
// Governance interface specifies interface to control the governance contract.
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index 7db836a..496c0f9 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -80,7 +80,7 @@ type Consensus struct {
syncedSkipNext bool
dummyCancel context.CancelFunc
dummyFinished <-chan struct{}
- dummyMsgBuffer []interface{}
+ dummyMsgBuffer []types.Msg
initChainTipHeight uint64
}
@@ -297,7 +297,7 @@ func (con *Consensus) ForceSync(lastPos types.Position, skip bool) {
if con.dummyCancel == nil {
con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
context.Background(), con.network.ReceiveChan(),
- func(msg interface{}) {
+ func(msg types.Msg) {
con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
})
}
@@ -448,7 +448,7 @@ func (con *Consensus) stopBuffering() {
// need to launch a dummy receiver right away.
con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
context.Background(), con.network.ReceiveChan(),
- func(msg interface{}) {
+ func(msg types.Msg) {
con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
})
// Stop agreements.
@@ -512,7 +512,7 @@ func (con *Consensus) startNetwork() {
for {
select {
case val := <-con.network.ReceiveChan():
- switch v := val.(type) {
+ switch v := val.Payload.(type) {
case *types.Block:
case *types.AgreementResult:
// Avoid byzantine nodes attack by broadcasting older
@@ -524,7 +524,7 @@ func (con *Consensus) startNetwork() {
default:
continue loop
}
- con.agreementModule.inputChan <- val
+ con.agreementModule.inputChan <- val.Payload
case <-con.ctx.Done():
break loop
}
diff --git a/core/test/fake-transport.go b/core/test/fake-transport.go
index ed9871b..cecac54 100644
--- a/core/test/fake-transport.go
+++ b/core/test/fake-transport.go
@@ -65,6 +65,11 @@ func NewFakeTransportClient(pubKey crypto.PublicKey) TransportClient {
}
}
+// Disconnect implements Transport.Disconnect method.
+func (t *FakeTransport) Disconnect(endpoint types.NodeID) {
+ delete(t.peers, endpoint)
+}
+
// Send implements Transport.Send method.
func (t *FakeTransport) Send(
endpoint types.NodeID, msg interface{}) (err error) {
diff --git a/core/test/interface.go b/core/test/interface.go
index 0712ad5..58a3ced 100644
--- a/core/test/interface.go
+++ b/core/test/interface.go
@@ -94,6 +94,8 @@ type Transport interface {
// This method should be accessed after ether 'Join' or 'WaitForPeers'
// returned.
Peers() []crypto.PublicKey
+
+ Disconnect(endpoint types.NodeID)
}
// Marshaller defines an interface to convert between interface{} and []byte.
diff --git a/core/test/network.go b/core/test/network.go
index f32c27f..c0ec255 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -179,8 +179,9 @@ type Network struct {
trans *censorClient
dMoment time.Time
fromTransport <-chan *TransportEnvelope
- toConsensus chan interface{}
+ toConsensus chan types.Msg
toNode chan interface{}
+ badPeerChan chan interface{}
sentAgreementLock sync.Mutex
sentAgreement map[common.Hash]struct{}
blockCacheLock sync.RWMutex
@@ -208,8 +209,9 @@ func NewNetwork(pubKey crypto.PublicKey, config NetworkConfig) (
n = &Network{
ID: types.NewNodeID(pubKey),
config: config,
- toConsensus: make(chan interface{}, 1000),
+ toConsensus: make(chan types.Msg, 1000),
toNode: make(chan interface{}, 1000),
+ badPeerChan: make(chan interface{}, 1000),
sentAgreement: make(map[common.Hash]struct{}),
blockCache: make(map[common.Hash]*types.Block, maxBlockCache),
unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
@@ -348,7 +350,7 @@ func (n *Network) BroadcastDKGPartialSignature(
}
// ReceiveChan implements core.Network interface.
-func (n *Network) ReceiveChan() <-chan interface{} {
+func (n *Network) ReceiveChan() <-chan types.Msg {
return n.toConsensus
}
@@ -396,14 +398,23 @@ func (n *Network) dispatchMsg(e *TransportEnvelope) {
}
delete(n.unreceivedBlocks, v.Hash)
}()
- n.toConsensus <- v
+ n.toConsensus <- types.Msg{
+ PeerID: e.From,
+ Payload: v,
+ }
case *types.Vote:
// Add this vote to cache.
n.addVoteToCache(v)
- n.toConsensus <- v
+ n.toConsensus <- types.Msg{
+ PeerID: e.From,
+ Payload: v,
+ }
case *types.AgreementResult,
*typesDKG.PrivateShare, *typesDKG.PartialSignature:
- n.toConsensus <- v
+ n.toConsensus <- types.Msg{
+ PeerID: e.From,
+ Payload: v,
+ }
case packedStateChanges:
if n.stateModule == nil {
panic(errors.New(
@@ -466,6 +477,11 @@ Loop:
default:
}
select {
+ case peer := <-n.badPeerChan:
+ if peer == nil {
+ continue Loop
+ }
+ n.trans.Disconnect(peer.(types.NodeID))
case <-n.ctx.Done():
break Loop
case e, ok := <-n.fromTransport:
@@ -535,6 +551,11 @@ func (n *Network) PurgeNodeSetCache(round uint64) {
n.cache.Purge(round)
}
+// ReportBadPeerChan reports that a peer is sending bad message.
+func (n *Network) ReportBadPeerChan() chan<- interface{} {
+ return n.badPeerChan
+}
+
func (n *Network) pullBlocksAsync(hashes common.Hashes) {
// Setup notification channels for each block hash.
notYetReceived := make(map[common.Hash]struct{})
diff --git a/core/test/network_test.go b/core/test/network_test.go
index 7a5ad16..27d25e6 100644
--- a/core/test/network_test.go
+++ b/core/test/network_test.go
@@ -147,7 +147,7 @@ func (s *NetworkTestSuite) TestPullBlocks() {
for {
select {
case v := <-master.ReceiveChan():
- b, ok := v.(*types.Block)
+ b, ok := v.Payload.(*types.Block)
if !ok {
break
}
@@ -223,7 +223,7 @@ func (s *NetworkTestSuite) TestPullVotes() {
defer cancelFunc()
select {
case v := <-master.ReceiveChan():
- vv, ok := v.(*types.Vote)
+ vv, ok := v.Payload.(*types.Vote)
if !ok {
break
}
@@ -283,13 +283,17 @@ func (s *NetworkTestSuite) TestBroadcastToSet() {
// Try broadcasting with datum from round 0, and make sure only node belongs
// to that set receiving the message.
nerd.BroadcastVote(&types.Vote{VoteHeader: types.VoteHeader{Position: pos}})
- req.IsType(&types.Vote{}, <-notaryNode.ReceiveChan())
+ msg := <-notaryNode.ReceiveChan()
+ req.IsType(&types.Vote{}, msg.Payload)
nerd.BroadcastDKGPrivateShare(&typesDKG.PrivateShare{Round: pos.Round})
- req.IsType(&typesDKG.PrivateShare{}, <-notaryNode.ReceiveChan())
+ msg = <-notaryNode.ReceiveChan()
+ req.IsType(&typesDKG.PrivateShare{}, msg.Payload)
nerd.BroadcastDKGPartialSignature(&typesDKG.PartialSignature{Round: pos.Round})
- req.IsType(&typesDKG.PartialSignature{}, <-notaryNode.ReceiveChan())
+ msg = <-notaryNode.ReceiveChan()
+ req.IsType(&typesDKG.PartialSignature{}, msg.Payload)
nerd.BroadcastBlock(&types.Block{Position: pos})
- req.IsType(&types.Block{}, <-notaryNode.ReceiveChan())
+ msg = <-notaryNode.ReceiveChan()
+ req.IsType(&types.Block{}, msg.Payload)
}
type testVoteCensor struct{}
@@ -309,7 +313,7 @@ func (s *NetworkTestSuite) TestCensor() {
_, pubKeys, err := NewKeys(peerCount)
req.NoError(err)
networks := s.setupNetworks(pubKeys)
- receiveChans := make(map[types.NodeID]<-chan interface{}, peerCount)
+ receiveChans := make(map[types.NodeID]<-chan types.Msg, peerCount)
for nID, node := range networks {
receiveChans[nID] = node.ReceiveChan()
}
@@ -330,7 +334,8 @@ func (s *NetworkTestSuite) TestCensor() {
req.Equal(0, len(receiveChan))
} else {
req.Equal(1, len(receiveChan))
- req.IsType(&types.Vote{}, <-receiveChan)
+ msg := <-receiveChan
+ req.IsType(&types.Vote{}, msg.Payload)
}
}
@@ -351,7 +356,8 @@ func (s *NetworkTestSuite) TestCensor() {
req.Equal(0, len(receiveChan))
} else {
req.Equal(1, len(receiveChan))
- req.IsType(&types.Vote{}, <-receiveChan)
+ msg := <-receiveChan
+ req.IsType(&types.Vote{}, msg.Payload)
}
}
censorNode.BroadcastVote(vote)
@@ -361,7 +367,8 @@ func (s *NetworkTestSuite) TestCensor() {
req.Equal(0, len(receiveChan))
} else {
req.Equal(1, len(receiveChan))
- req.IsType(&types.Vote{}, <-receiveChan)
+ msg := <-receiveChan
+ req.IsType(&types.Vote{}, msg.Payload)
}
}
diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go
index bbc4d56..fdc47d5 100644
--- a/core/test/tcp-transport.go
+++ b/core/test/tcp-transport.go
@@ -205,6 +205,11 @@ func (t *TCPTransport) clientHandshake(conn net.Conn) (
return
}
+// Disconnect implements Transport.Disconnect method.
+func (t *TCPTransport) Disconnect(endpoint types.NodeID) {
+ delete(t.peers, endpoint)
+}
+
func (t *TCPTransport) send(
endpoint types.NodeID, msg interface{}, payload []byte) {
t.peersLock.RLock()
@@ -217,6 +222,10 @@ func (t *TCPTransport) send(
func (t *TCPTransport) Send(
endpoint types.NodeID, msg interface{}) (err error) {
+ if _, exist := t.peers[endpoint]; !exist {
+ return fmt.Errorf("the endpoint does not exists: %v", endpoint)
+ }
+
payload, err := t.marshalMessage(msg)
if err != nil {
return
diff --git a/core/types/message.go b/core/types/message.go
new file mode 100644
index 0000000..0335cfa
--- /dev/null
+++ b/core/types/message.go
@@ -0,0 +1,24 @@
+// Copyright 2019 The dexon-consensus Authors
+// This file is part of the dexon-consensus library.
+//
+// The dexon-consensus library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package types
+
+// Msg for the network ReceiveChan.
+type Msg struct {
+ PeerID interface{}
+ Payload interface{}
+}
diff --git a/core/utils/utils.go b/core/utils/utils.go
index 1a372c7..dc29bdf 100644
--- a/core/utils/utils.go
+++ b/core/utils/utils.go
@@ -105,7 +105,7 @@ func VerifyDKGComplaint(
// channel of a network module. An context is required to stop the go routine
// automatically. An optinal message handler could be provided.
func LaunchDummyReceiver(
- ctx context.Context, recv <-chan interface{}, handler func(interface{})) (
+ ctx context.Context, recv <-chan types.Msg, handler func(types.Msg)) (
context.CancelFunc, <-chan struct{}) {
var (
dummyCtx, dummyCancel = context.WithCancel(ctx)
diff --git a/core/utils/utils_test.go b/core/utils/utils_test.go
index c3afea9..c6f8543 100644
--- a/core/utils/utils_test.go
+++ b/core/utils/utils_test.go
@@ -121,14 +121,16 @@ func (s *UtilsTestSuite) TestDummyReceiver() {
for i := 0; i < msgCount; i++ {
fakeMsgs = append(fakeMsgs, i)
}
- launchDummySender := func(msgs []int, inputChan chan<- interface{}) {
+ launchDummySender := func(msgs []int, inputChan chan<- types.Msg) {
finished := make(chan struct{}, 1)
go func() {
defer func() {
finished <- struct{}{}
}()
for _, v := range msgs {
- inputChan <- v
+ inputChan <- types.Msg{
+ Payload: v,
+ }
}
}()
select {
@@ -137,17 +139,17 @@ func (s *UtilsTestSuite) TestDummyReceiver() {
s.Require().FailNow("unable to deliver all messages in time")
}
}
- checkBuffer := func(sent []int, buff []interface{}) {
+ checkBuffer := func(sent []int, buff []types.Msg) {
s.Require().Len(buff, len(sent))
for i := range sent {
- s.Require().Equal(sent[i], buff[i].(int))
+ s.Require().Equal(sent[i], buff[i].Payload.(int))
}
}
// Basic scenario: a dummy receiver with caching enabled.
- recv := make(chan interface{})
- buff := []interface{}{}
+ recv := make(chan types.Msg)
+ buff := []types.Msg{}
cancel, finished := LaunchDummyReceiver(
- context.Background(), recv, func(msg interface{}) {
+ context.Background(), recv, func(msg types.Msg) {
buff = append(buff, msg)
})
launchDummySender(fakeMsgs, recv)