From f5a753891357ce76308578234ed9edd15bf81f23 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Tue, 9 Apr 2019 16:31:02 +0800 Subject: core: priority process self message (#557) * core: priority process self message * fix * fixup --- core/consensus.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) (limited to 'core') diff --git a/core/consensus.go b/core/consensus.go index f1a383b..966c70a 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -264,8 +264,9 @@ func (recv *consensusBAReceiver) ConfirmBlock( } IDs = append(IDs, ID) psigs = append(psigs, vote.PartialSignature) + } else { + voteList = append(voteList, *vote) } - voteList = append(voteList, *vote) } if block.Position.Round >= DKGDelayRound { rand, err := cryptoDKG.RecoverSignature(psigs, IDs) @@ -289,7 +290,9 @@ func (recv *consensusBAReceiver) ConfirmBlock( Randomness: block.Randomness, } // touchAgreementResult does not support concurrent access. - recv.consensus.msgChan <- (*selfAgreementResult)(result) + go func() { + recv.consensus.priorityMsgChan <- (*selfAgreementResult)(result) + }() recv.consensus.logger.Debug("Broadcast AgreementResult", "result", result) recv.consensus.network.BroadcastAgreementResult(result) @@ -518,6 +521,7 @@ type Consensus struct { logger common.Logger resetDeliveryGuardTicker chan struct{} msgChan chan interface{} + priorityMsgChan chan interface{} waitGroup sync.WaitGroup processBlockChan chan *types.Block @@ -679,6 +683,7 @@ func newConsensusForRound( logger: logger, resetDeliveryGuardTicker: make(chan struct{}), msgChan: make(chan interface{}, 1024), + priorityMsgChan: make(chan interface{}, 1024), processBlockChan: make(chan *types.Block, 1024), } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) @@ -1223,9 +1228,16 @@ MessageLoop: } var msg interface{} select { - case msg = <-con.msgChan: - case <-con.ctx.Done(): - return + case msg = <-con.priorityMsgChan: + default: + } + if msg == nil { + select { + case msg = <-con.msgChan: + case msg = <-con.priorityMsgChan: + case <-con.ctx.Done(): + return + } } switch val := msg.(type) { case *selfAgreementResult: -- cgit v1.2.3