aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2019-01-11 12:58:30 +0800
committerGitHub <noreply@github.com>2019-01-11 12:58:30 +0800
commit809e8def862fdfa792061a448f952747f1af4d3c (patch)
treebd038971e65a09bc9bb399f03a37b194ce67ae3c
parentfa25817354d5b7d40f5911004232392acfe7fe53 (diff)
downloaddexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar
dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.gz
dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.bz2
dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.lz
dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.xz
dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.zst
dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.zip
syncer: fix issues when switching to core.Consensus (#418)
- when confirmed blocks passed to core.Consensus aren't continuous in position in some chain, the pulling would skip those missing blocks. - fix: when some block is missing, avoid adding it and all blocks after it to core.Consensus. - we need to avoid the receive channel of network module full. - fix: during switching to core.Consensus, we need to launch a dummy receiver to receive from receive channel of network module. - fix: between the period during core.Consensus created and before running, a dummy receiver is also required to receive from receive channel of network module.
-rw-r--r--core/consensus.go129
-rw-r--r--core/consensus_test.go1
-rw-r--r--core/lattice-data.go2
-rw-r--r--core/syncer/agreement.go2
-rw-r--r--core/syncer/consensus.go87
-rw-r--r--core/test/utils.go19
-rw-r--r--core/utils/utils.go34
-rw-r--r--core/utils/utils_test.go60
-rw-r--r--integration_test/consensus_test.go8
9 files changed, 274 insertions, 68 deletions
diff --git a/core/consensus.go b/core/consensus.go
index ad1efeb..2770018 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -415,6 +415,13 @@ type Consensus struct {
nonFinalizedBlockDelivered bool
resetRandomnessTicker chan struct{}
resetDeliveryGuardTicker chan struct{}
+ msgChan chan interface{}
+ waitGroup sync.WaitGroup
+
+ // Context of Dummy receiver during switching from syncer.
+ dummyCancel context.CancelFunc
+ dummyFinished <-chan struct{}
+ dummyMsgBuffer []interface{}
}
// NewConsensus construct an Consensus instance.
@@ -450,6 +457,9 @@ func NewConsensusForSimulation(
// You need to provide the initial block for this newly created Consensus
// instance to bootstrap with. A proper choice is the last finalized block you
// delivered to syncer.
+//
+// NOTE: those confirmed blocks should be organized by chainID and sorted by
+// their positions, in ascending order.
func NewConsensusFromSyncer(
initBlock *types.Block,
initRoundBeginTime time.Time,
@@ -459,17 +469,43 @@ func NewConsensusFromSyncer(
networkModule Network,
prv crypto.PrivateKey,
latticeModule *Lattice,
- blocks []*types.Block,
+ confirmedBlocks [][]*types.Block,
randomnessResults []*types.BlockRandomnessResult,
+ cachedMessages []interface{},
logger common.Logger) (*Consensus, error) {
// Setup Consensus instance.
con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db,
networkModule, prv, logger, latticeModule, true)
- // Dump all BA-confirmed blocks to the consensus instance.
- for _, b := range blocks {
- con.ccModule.registerBlock(b)
- if err := con.processBlock(b); err != nil {
- return nil, err
+ // Launch a dummy receiver before we start receiving from network module.
+ con.dummyMsgBuffer = cachedMessages
+ con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
+ con.ctx, networkModule.ReceiveChan(), func(msg interface{}) {
+ con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
+ })
+ // Dump all BA-confirmed blocks to the consensus instance, make sure these
+ // added blocks forming a DAG.
+ for {
+ updated := false
+ for idx, bs := range confirmedBlocks {
+ for bIdx, b := range bs {
+ // Only when its parent block is already added to lattice, we can
+ // then add this block. If not, our pulling mechanism would stop at
+ // the block we added, and lost its parent block forever.
+ if !latticeModule.Exist(b.ParentHash) {
+ logger.Debug("Skip discontinuous confirmed block",
+ "from", b,
+ "until", bs[len(bs)-1])
+ confirmedBlocks[idx] = bs[bIdx:]
+ break
+ }
+ con.ccModule.registerBlock(b)
+ if err := con.processBlock(b); err != nil {
+ return nil, err
+ }
+ }
+ }
+ if !updated {
+ break
}
}
// Dump all randomness result to the consensus instance.
@@ -549,6 +585,7 @@ func newConsensusForRound(
logger: logger,
resetRandomnessTicker: make(chan struct{}),
resetDeliveryGuardTicker: make(chan struct{}),
+ msgChan: make(chan interface{}, 1024),
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime)
@@ -614,12 +651,39 @@ func (con *Consensus) Run() {
con.baMgr.run()
// Launch network handler.
con.logger.Debug("Calling Network.ReceiveChan")
- go con.processMsg(con.network.ReceiveChan())
+ con.waitGroup.Add(1)
+ go con.deliverNetworkMsg()
+ con.waitGroup.Add(1)
+ go con.processMsg()
// Sleep until dMoment come.
time.Sleep(con.dMoment.Sub(time.Now().UTC()))
// Take some time to bootstrap.
time.Sleep(3 * time.Second)
+ con.waitGroup.Add(1)
go con.pullRandomness()
+ // Stop dummy receiver if launched.
+ if con.dummyCancel != nil {
+ con.logger.Trace("Stop dummy receiver")
+ con.dummyCancel()
+ <-con.dummyFinished
+ // Replay those cached messages.
+ con.logger.Trace("Dummy receiver stoped, start dumping cached messages",
+ "count", len(con.dummyMsgBuffer))
+ for _, msg := range con.dummyMsgBuffer {
+ loop:
+ for {
+ select {
+ case con.msgChan <- msg:
+ break loop
+ case <-time.After(50 * time.Millisecond):
+ con.logger.Debug(
+ "internal message channel is full when syncing")
+ }
+ }
+ }
+ con.logger.Trace("Finish dumping cached messages")
+ }
+ con.waitGroup.Add(1)
go con.deliveryGuard()
// Block until done.
select {
@@ -815,18 +879,51 @@ func (con *Consensus) Stop() {
con.ctxCancel()
con.baMgr.stop()
con.event.Reset()
+ con.waitGroup.Wait()
+}
+
+func (con *Consensus) deliverNetworkMsg() {
+ defer con.waitGroup.Done()
+ recv := con.network.ReceiveChan()
+ for {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ select {
+ case msg := <-recv:
+ innerLoop:
+ for {
+ select {
+ case con.msgChan <- msg:
+ break innerLoop
+ case <-time.After(500 * time.Millisecond):
+ con.logger.Debug("internal message channel is full",
+ "pending", msg)
+ }
+ }
+ case <-con.ctx.Done():
+ return
+ }
+ }
}
-func (con *Consensus) processMsg(msgChan <-chan interface{}) {
+func (con *Consensus) processMsg() {
+ defer con.waitGroup.Done()
MessageLoop:
for {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
var msg interface{}
select {
- case msg = <-msgChan:
+ case msg = <-con.msgChan:
case <-con.ctx.Done():
return
}
-
switch val := msg.(type) {
case *types.Block:
if ch, exist := func() (chan<- *types.Block, bool) {
@@ -1037,10 +1134,16 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
}
func (con *Consensus) pullRandomness() {
+ defer con.waitGroup.Done()
for {
select {
case <-con.ctx.Done():
return
+ default:
+ }
+ select {
+ case <-con.ctx.Done():
+ return
case <-con.resetRandomnessTicker:
case <-time.After(1500 * time.Millisecond):
// TODO(jimmy): pulling period should be related to lambdaBA.
@@ -1055,6 +1158,7 @@ func (con *Consensus) pullRandomness() {
}
func (con *Consensus) deliveryGuard() {
+ defer con.waitGroup.Done()
time.Sleep(con.dMoment.Sub(time.Now()))
// Node takes time to start.
time.Sleep(60 * time.Second)
@@ -1062,6 +1166,11 @@ func (con *Consensus) deliveryGuard() {
select {
case <-con.ctx.Done():
return
+ default:
+ }
+ select {
+ case <-con.ctx.Done():
+ return
case <-con.resetDeliveryGuardTicker:
case <-time.After(60 * time.Second):
con.logger.Error("no blocks delivered for too long", "ID", con.ID)
diff --git a/core/consensus_test.go b/core/consensus_test.go
index 938b56d..b53af8a 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -593,6 +593,7 @@ func (s *ConsensusTestSuite) TestSyncBA() {
prvKey := prvKeys[0]
_, con := s.prepareConsensus(time.Now().UTC(), gov, prvKey, conn)
go con.Run()
+ defer con.Stop()
hash := common.NewRandomHash()
signers := make([]*utils.Signer, 0, len(prvKeys))
for _, prvKey := range prvKeys {
diff --git a/core/lattice-data.go b/core/lattice-data.go
index 998fb1f..8367539 100644
--- a/core/lattice-data.go
+++ b/core/lattice-data.go
@@ -55,7 +55,7 @@ type ErrAckingBlockNotExists struct {
}
func (e ErrAckingBlockNotExists) Error() string {
- return fmt.Sprintf("acking block %s not exists", e.hash)
+ return fmt.Sprintf("acking block %s not exists", e.hash.String()[:6])
}
// Errors for method usage
diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go
index eaad860..acc4f1c 100644
--- a/core/syncer/agreement.go
+++ b/core/syncer/agreement.go
@@ -1,4 +1,4 @@
-// Copyright 2018 The dexon-consensus-core Authors
+// Copyright 2018 The dexon-consensus Authors
// This file is part of the dexon-consensus-core library.
//
// The dexon-consensus-core library is free software: you can redistribute it
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index 306017f..75c1067 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -39,7 +39,8 @@ var (
ErrNotSynced = fmt.Errorf("not synced yet")
// ErrGenesisBlockReached is reported when genesis block reached.
ErrGenesisBlockReached = fmt.Errorf("genesis block reached")
- // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered blocks.
+ // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered
+ // blocks.
ErrInvalidBlockOrder = fmt.Errorf("invalid block order")
// ErrMismatchBlockHashSequence means the delivering sequence is not
// correct, compared to finalized blocks.
@@ -83,6 +84,9 @@ type Consensus struct {
ctxCancel context.CancelFunc
syncedLastBlock *types.Block
syncedConsensus *core.Consensus
+ dummyCancel context.CancelFunc
+ dummyFinished <-chan struct{}
+ dummyMsgBuffer []interface{}
}
// NewConsensus creates an instance for Consensus (syncer consensus).
@@ -286,12 +290,17 @@ func (con *Consensus) findLatticeSyncBlock(
blocks []*types.Block) (*types.Block, error) {
lastBlock := blocks[len(blocks)-1]
round := lastBlock.Position.Round
+ isConfigChanged := func(prev, cur *types.Config) bool {
+ return prev.K != cur.K ||
+ prev.NumChains != cur.NumChains ||
+ prev.PhiRatio != cur.PhiRatio
+ }
for {
// Find round r which r-1, r, r+1 are all in same total ordering config.
for {
- sameAsPrevRound := round == 0 || !con.isConfigChanged(
+ sameAsPrevRound := round == 0 || !isConfigChanged(
con.configs[round-1], con.configs[round])
- sameAsNextRound := !con.isConfigChanged(
+ sameAsNextRound := !isConfigChanged(
con.configs[round], con.configs[round+1])
if sameAsPrevRound && sameAsNextRound {
break
@@ -314,8 +323,9 @@ func (con *Consensus) findLatticeSyncBlock(
lastBlock = &b
}
// Find the deliver set by hash for two times. Blocks in a deliver set
- // returned by total ordering is sorted by hash. If a block's parent hash
- // is greater than its hash means there is a cut between deliver sets.
+ // returned by total ordering is sorted by hash. If a block's parent
+ // hash is greater than its hash means there is a cut between deliver
+ // sets.
var curBlock, prevBlock *types.Block
var deliverSetFirstBlock, deliverSetLastBlock *types.Block
curBlock = lastBlock
@@ -464,8 +474,8 @@ func (con *Consensus) SyncBlocks(
}
}
if latest && con.lattice == nil {
- // New Lattice and find the deliver set of total ordering when "latest" is
- // true for first time. Deliver set is found by block hashes.
+ // New Lattice and find the deliver set of total ordering when "latest"
+ // is true for first time. Deliver set is found by block hashes.
var syncBlock *types.Block
syncBlock, err = con.findLatticeSyncBlock(blocks)
if err != nil {
@@ -482,7 +492,8 @@ func (con *Consensus) SyncBlocks(
con.setupConfigs(blocks)
// Process blocks from syncBlock to blocks' last block.
b := blocks[len(blocks)-1]
- blocksCount := b.Finalization.Height - syncBlock.Finalization.Height + 1
+ blocksCount :=
+ b.Finalization.Height - syncBlock.Finalization.Height + 1
blocksToProcess := make([]*types.Block, blocksCount)
for {
blocksToProcess[blocksCount-1] = b
@@ -512,8 +523,13 @@ func (con *Consensus) SyncBlocks(
if err = con.Stop(); err != nil {
return
}
+ con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
+ context.Background(), con.network.ReceiveChan(),
+ func(msg interface{}) {
+ con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
+ })
con.syncedLastBlock = blocks[len(blocks)-1]
- synced = con.syncedLastBlock != nil
+ synced = true
}
}
return
@@ -521,6 +537,8 @@ func (con *Consensus) SyncBlocks(
// GetSyncedConsensus returns the core.Consensus instance after synced.
func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
+ con.lock.Lock()
+ defer con.lock.Unlock()
if con.syncedConsensus != nil {
return con.syncedConsensus, nil
}
@@ -529,18 +547,16 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
}
// flush all blocks in con.blocks into core.Consensus, and build
// core.Consensus from syncer.
- confirmedBlocks := []*types.Block{}
+ confirmedBlocks := make([][]*types.Block, len(con.blocks))
+ for i, bs := range con.blocks {
+ confirmedBlocks[i] = []*types.Block(bs)
+ }
randomnessResults := []*types.BlockRandomnessResult{}
- func() {
- con.lock.Lock()
- defer con.lock.Unlock()
- for _, bs := range con.blocks {
- confirmedBlocks = append(confirmedBlocks, bs...)
- }
- for _, r := range con.randomnessResults {
- randomnessResults = append(randomnessResults, r)
- }
- }()
+ for _, r := range con.randomnessResults {
+ randomnessResults = append(randomnessResults, r)
+ }
+ con.dummyCancel()
+ <-con.dummyFinished
var err error
con.syncedConsensus, err = core.NewConsensusFromSyncer(
con.syncedLastBlock,
@@ -553,6 +569,7 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
con.lattice,
confirmedBlocks,
randomnessResults,
+ con.dummyMsgBuffer,
con.logger)
return con.syncedConsensus, err
}
@@ -560,7 +577,7 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
// Stop the syncer.
//
// This method is mainly for caller to stop the syncer before synced, the syncer
-// would call this method automatically after synced.
+// would call this method automatically after being synced.
func (con *Consensus) Stop() error {
con.logger.Trace("syncer is about to stop")
// Stop network and CRS routines, wait until they are all stoped.
@@ -654,7 +671,8 @@ func (con *Consensus) resizeByNumChains(numChains uint32) {
// Resize the pool of blocks.
con.blocks = append(con.blocks, types.ByPosition{})
// Resize agreement modules.
- a := newAgreement(con.receiveChan, con.pullChan, con.nodeSetCache, con.logger)
+ a := newAgreement(
+ con.receiveChan, con.pullChan, con.nodeSetCache, con.logger)
con.agreements = append(con.agreements, a)
con.agreementWaitGroup.Add(1)
go func() {
@@ -679,8 +697,8 @@ func (con *Consensus) startAgreement() {
func() {
con.lock.Lock()
defer con.lock.Unlock()
- // If round is cut in agreements, do not add blocks with round less
- // then cut round.
+ // If round is cut in agreements, do not add blocks with
+ // round less then cut round.
if b.Position.Round < con.agreementRoundCut {
return
}
@@ -764,15 +782,22 @@ func (con *Consensus) startNetwork() {
default:
continue Loop
}
- func() {
+ if func() bool {
con.lock.RLock()
defer con.lock.RUnlock()
if pos.ChainID >= uint32(len(con.agreements)) {
- con.logger.Error("Unknown chainID message received (syncer)",
+ // This error might be easily encountered when the
+ // "latest" parameter of SyncBlocks is turned on too
+ // early.
+ con.logger.Error(
+ "Unknown chainID message received (syncer)",
"position", &pos)
+ return false
}
- }()
- con.agreements[pos.ChainID].inputChan <- val
+ return true
+ }() {
+ con.agreements[pos.ChainID].inputChan <- val
+ }
case <-con.ctx.Done():
return
}
@@ -849,9 +874,3 @@ func (con *Consensus) stopAgreement() {
close(con.receiveChan)
close(con.pullChan)
}
-
-func (con *Consensus) isConfigChanged(prev, cur *types.Config) bool {
- return prev.K != cur.K ||
- prev.NumChains != cur.NumChains ||
- prev.PhiRatio != cur.PhiRatio
-}
diff --git a/core/test/utils.go b/core/test/utils.go
index d85395b..a2819ae 100644
--- a/core/test/utils.go
+++ b/core/test/utils.go
@@ -18,7 +18,6 @@
package test
import (
- "context"
"errors"
"fmt"
"math"
@@ -227,24 +226,6 @@ func VerifyDB(db db.Database) error {
return nil
}
-// LaunchDummyReceiver launches a go routine to receive and drop messages from
-// a network module. An optional context could be passed to stop the go routine.
-func LaunchDummyReceiver(
- ctx context.Context, networkModule *Network) context.CancelFunc {
- dummyCtx, dummyCancel := context.WithCancel(ctx)
- go func() {
- loop:
- for {
- select {
- case <-dummyCtx.Done():
- break loop
- case <-networkModule.ReceiveChan():
- }
- }
- }()
- return dummyCancel
-}
-
func getComplementSet(
all, set map[types.NodeID]struct{}) map[types.NodeID]struct{} {
complement := make(map[types.NodeID]struct{})
diff --git a/core/utils/utils.go b/core/utils/utils.go
index 8c9f77a..8486d28 100644
--- a/core/utils/utils.go
+++ b/core/utils/utils.go
@@ -18,6 +18,7 @@
package utils
import (
+ "context"
"fmt"
"github.com/dexon-foundation/dexon-consensus/common"
@@ -92,3 +93,36 @@ func VerifyDKGComplaint(
}
return ok, nil
}
+
+// LaunchDummyReceiver launches a go routine to receive from the receive
+// channel of a network module. An context is required to stop the go routine
+// automatically. An optinal message handler could be provided.
+func LaunchDummyReceiver(
+ ctx context.Context, recv <-chan interface{}, handler func(interface{})) (
+ context.CancelFunc, <-chan struct{}) {
+ var (
+ dummyCtx, dummyCancel = context.WithCancel(ctx)
+ finishedChan = make(chan struct{}, 1)
+ )
+ go func() {
+ defer func() {
+ finishedChan <- struct{}{}
+ }()
+ loop:
+ for {
+ select {
+ case <-dummyCtx.Done():
+ break loop
+ case v, ok := <-recv:
+ if !ok {
+ panic(fmt.Errorf(
+ "receive channel is closed before dummy receiver"))
+ }
+ if handler != nil {
+ handler(v)
+ }
+ }
+ }
+ }()
+ return dummyCancel, finishedChan
+}
diff --git a/core/utils/utils_test.go b/core/utils/utils_test.go
index 34336b3..88576b8 100644
--- a/core/utils/utils_test.go
+++ b/core/utils/utils_test.go
@@ -18,7 +18,9 @@
package utils
import (
+ "context"
"testing"
+ "time"
"github.com/stretchr/testify/suite"
@@ -111,6 +113,64 @@ func (s *UtilsTestSuite) TestVerifyDKGComplaint() {
s.False(ok)
}
+func (s *UtilsTestSuite) TestDummyReceiver() {
+ var (
+ msgCount = 1000
+ fakeMsgs = make([]int, 0, msgCount)
+ )
+ for i := 0; i < msgCount; i++ {
+ fakeMsgs = append(fakeMsgs, i)
+ }
+ launchDummySender := func(msgs []int, inputChan chan<- interface{}) {
+ finished := make(chan struct{}, 1)
+ go func() {
+ defer func() {
+ finished <- struct{}{}
+ }()
+ for _, v := range msgs {
+ inputChan <- v
+ }
+ }()
+ select {
+ case <-finished:
+ case <-time.After(1 * time.Second):
+ s.Require().FailNow("unable to deliver all messages in time")
+ }
+ }
+ checkBuffer := func(sent []int, buff []interface{}) {
+ s.Require().Len(buff, len(sent))
+ for i := range sent {
+ s.Require().Equal(sent[i], buff[i].(int))
+ }
+ }
+ // Basic scenario: a dummy receiver with caching enabled.
+ recv := make(chan interface{})
+ buff := []interface{}{}
+ cancel, finished := LaunchDummyReceiver(
+ context.Background(), recv, func(msg interface{}) {
+ buff = append(buff, msg)
+ })
+ launchDummySender(fakeMsgs, recv)
+ cancel()
+ select {
+ case <-finished:
+ case <-time.After(1 * time.Second):
+ s.Require().FailNow("should finished after cancel is called")
+ }
+ checkBuffer(fakeMsgs, buff)
+ // Dummy receiver can be shutdown along with parent context, and caching
+ // is not enabled.
+ ctx, cancel := context.WithCancel(context.Background())
+ _, finished = LaunchDummyReceiver(ctx, recv, nil)
+ launchDummySender(fakeMsgs, recv)
+ cancel()
+ select {
+ case <-finished:
+ case <-time.After(1 * time.Second):
+ s.Require().FailNow("should finished after cancel is called")
+ }
+}
+
func TestUtils(t *testing.T) {
suite.Run(t, new(UtilsTestSuite))
}
diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go
index 99de7c9..fc6bb47 100644
--- a/integration_test/consensus_test.go
+++ b/integration_test/consensus_test.go
@@ -367,8 +367,8 @@ func (s *ConsensusTestSuite) TestSync() {
}
// Clean syncNode's network receive channel, or it might exceed the limit
// and block other go routines.
- dummyReceiverCtxCancel := test.LaunchDummyReceiver(
- context.Background(), syncNode.network)
+ dummyReceiverCtxCancel, dummyFinished := utils.LaunchDummyReceiver(
+ context.Background(), syncNode.network.ReceiveChan(), nil)
ReachAlive:
for {
// Check if any error happened or sleep for a period of time.
@@ -390,6 +390,7 @@ ReachAlive:
}
}
dummyReceiverCtxCancel()
+ <-dummyFinished
break
}
// Initiate Syncer.
@@ -456,7 +457,8 @@ ReachAlive:
stoppedNode.con.Stop()
stoppedNode.con = nil
fmt.Println("one node stopped", stoppedNode.ID)
- test.LaunchDummyReceiver(runnerCtx, stoppedNode.network)
+ utils.LaunchDummyReceiver(
+ runnerCtx, stoppedNode.network.ReceiveChan(), nil)
continue
}
if pos.Round < untilRound {