diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2018-10-26 13:22:29 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-26 13:22:29 +0800 |
commit | 721d36f2720b0cbf648cbffe40fd05c9a60061e4 (patch) | |
tree | 05d643695b20cc9f430869cf2105c177856625bc | |
parent | ef0df0e6e27acd3852f2e0efdccf0798d5fc63ad (diff) | |
download | dexon-consensus-721d36f2720b0cbf648cbffe40fd05c9a60061e4.tar dexon-consensus-721d36f2720b0cbf648cbffe40fd05c9a60061e4.tar.gz dexon-consensus-721d36f2720b0cbf648cbffe40fd05c9a60061e4.tar.bz2 dexon-consensus-721d36f2720b0cbf648cbffe40fd05c9a60061e4.tar.lz dexon-consensus-721d36f2720b0cbf648cbffe40fd05c9a60061e4.tar.xz dexon-consensus-721d36f2720b0cbf648cbffe40fd05c9a60061e4.tar.zst dexon-consensus-721d36f2720b0cbf648cbffe40fd05c9a60061e4.zip |
core: Pull block (#263)
-rw-r--r-- | core/agreement-state_test.go | 2 | ||||
-rw-r--r-- | core/agreement.go | 16 | ||||
-rw-r--r-- | core/agreement_test.go | 35 | ||||
-rw-r--r-- | core/consensus.go | 88 | ||||
-rw-r--r-- | core/consensus_test.go | 4 | ||||
-rw-r--r-- | core/interfaces.go | 3 | ||||
-rw-r--r-- | simulation/network.go | 25 |
7 files changed, 141 insertions, 32 deletions
diff --git a/core/agreement-state_test.go b/core/agreement-state_test.go index 37d9ae2..429e124 100644 --- a/core/agreement-state_test.go +++ b/core/agreement-state_test.go @@ -57,6 +57,8 @@ func (r *agreementStateTestReceiver) ConfirmBlock(block common.Hash, r.s.confirmChan <- block } +func (r *agreementStateTestReceiver) PullBlocks(common.Hashes) {} + func (s *AgreementStateTestSuite) proposeBlock( leader *leaderSelector) *types.Block { block := &types.Block{ diff --git a/core/agreement.go b/core/agreement.go index 8618b5f..cc7af4f 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -69,6 +69,7 @@ type agreementReceiver interface { ProposeVote(vote *types.Vote) ProposeBlock() common.Hash ConfirmBlock(common.Hash, map[types.NodeID]*types.Vote) + PullBlocks(common.Hashes) } type pendingBlock struct { @@ -327,6 +328,21 @@ func (a *agreement) processVote(vote *types.Vote) error { // Condition 3. if vote.Type == types.VoteCom && vote.Period >= a.data.period && len(a.data.votes[vote.Period][types.VoteCom]) >= a.data.requiredVote { + hashes := common.Hashes{} + addPullBlocks := func(voteType types.VoteType) { + for _, vote := range a.data.votes[vote.Period][voteType] { + if vote.BlockHash == nullBlockHash || vote.BlockHash == skipBlockHash { + continue + } + if _, found := a.findCandidateBlock(vote.BlockHash); !found { + hashes = append(hashes, vote.BlockHash) + } + } + } + addPullBlocks(types.VoteInit) + addPullBlocks(types.VotePreCom) + addPullBlocks(types.VoteCom) + a.data.recv.PullBlocks(hashes) a.fastForward <- vote.Period + 1 return nil } diff --git a/core/agreement_test.go b/core/agreement_test.go index 3b97a10..e4e05cd 100644 --- a/core/agreement_test.go +++ b/core/agreement_test.go @@ -47,6 +47,13 @@ func (r *agreementTestReceiver) ConfirmBlock(block common.Hash, r.s.confirmChan <- block } +func (r *agreementTestReceiver) PullBlocks(hashes common.Hashes) { + for _, hash := range hashes { + r.s.pulledBlocks[hash] = struct{}{} + } + +} + func (s *AgreementTestSuite) proposeBlock( agreementIdx int) *types.Block { block := &types.Block{ @@ -61,13 +68,14 @@ func (s *AgreementTestSuite) proposeBlock( type AgreementTestSuite struct { suite.Suite - ID types.NodeID - auths map[types.NodeID]*Authenticator - voteChan chan *types.Vote - blockChan chan common.Hash - confirmChan chan common.Hash - block map[common.Hash]*types.Block - agreement []*agreement + ID types.NodeID + auths map[types.NodeID]*Authenticator + voteChan chan *types.Vote + blockChan chan common.Hash + confirmChan chan common.Hash + block map[common.Hash]*types.Block + pulledBlocks map[common.Hash]struct{} + agreement []*agreement } func (s *AgreementTestSuite) SetupTest() { @@ -81,6 +89,7 @@ func (s *AgreementTestSuite) SetupTest() { s.blockChan = make(chan common.Hash, 100) s.confirmChan = make(chan common.Hash, 100) s.block = make(map[common.Hash]*types.Block) + s.pulledBlocks = make(map[common.Hash]struct{}) } func (s *AgreementTestSuite) newAgreement(numNotarySet int) *agreement { @@ -265,13 +274,15 @@ func (s *AgreementTestSuite) TestFastForwardCond2() { } func (s *AgreementTestSuite) TestFastForwardCond3() { - votes := 0 + numVotes := 0 + votes := []*types.Vote{} a := s.newAgreement(4) a.data.period = 1 for nID := range a.notarySet { vote := s.prepareVote(nID, types.VoteCom, common.NewRandomHash(), uint64(2)) + votes = append(votes, vote) s.Require().NoError(a.processVote(vote)) - if votes++; votes == 3 { + if numVotes++; numVotes == 3 { break } } @@ -282,6 +293,12 @@ func (s *AgreementTestSuite) TestFastForwardCond3() { s.FailNow("Expecting fast forward.") } s.Equal(uint64(3), a.data.period) + + s.Len(s.pulledBlocks, 3) + for _, vote := range votes { + _, exist := s.pulledBlocks[vote.BlockHash] + s.True(exist) + } } func (s *AgreementTestSuite) TestDecide() { diff --git a/core/consensus.go b/core/consensus.go index 3e9c816..bd311ba 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -113,8 +113,20 @@ func (recv *consensusBAReceiver) ConfirmBlock( block, exist = recv.consensus.baModules[recv.chainID]. findCandidateBlock(hash) if !exist { - recv.consensus.logger.Error("Unknown block confirmed", "hash", hash) - return + recv.consensus.logger.Error("Unknown block confirmed", + "hash", hash, + "chainID", recv.chainID) + ch := make(chan *types.Block) + func() { + recv.consensus.lock.Lock() + defer recv.consensus.lock.Unlock() + recv.consensus.baConfirmedBlock[hash] = ch + }() + recv.consensus.network.PullBlocks(common.Hashes{hash}) + block = <-ch + recv.consensus.logger.Info("Receive unknown block", + "hash", hash, + "chainID", recv.chainID) } } recv.consensus.ccModule.registerBlock(block) @@ -145,6 +157,11 @@ func (recv *consensusBAReceiver) ConfirmBlock( } } +func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) { + recv.consensus.logger.Debug("Calling Network.PullBlocks", "hashes", hashes) + recv.consensus.network.PullBlocks(hashes) +} + // consensusDKGReceiver implements dkgReceiver. type consensusDKGReceiver struct { ID types.NodeID @@ -236,8 +253,9 @@ type Consensus struct { currentConfig *types.Config // BA. - baModules []*agreement - receivers []*consensusBAReceiver + baModules []*agreement + receivers []*consensusBAReceiver + baConfirmedBlock map[common.Hash]chan<- *types.Block // DKG. dkgRunning int32 @@ -318,23 +336,24 @@ func NewConsensus( recv.cfgModule = cfgModule // Construct Consensus instance. con := &Consensus{ - ID: ID, - currentConfig: config, - ccModule: newCompactionChain(gov), - lattice: lattice, - app: app, - gov: gov, - db: db, - network: network, - tickerObj: newTicker(gov, round, TickerBA), - dkgReady: sync.NewCond(&sync.Mutex{}), - cfgModule: cfgModule, - dMoment: dMoment, - nodeSetCache: nodeSetCache, - authModule: authModule, - event: common.NewEvent(), - logger: logger, - roundToNotify: roundToNotify, + ID: ID, + currentConfig: config, + ccModule: newCompactionChain(gov), + lattice: lattice, + app: app, + gov: gov, + db: db, + network: network, + tickerObj: newTicker(gov, round, TickerBA), + baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), + dkgReady: sync.NewCond(&sync.Mutex{}), + cfgModule: cfgModule, + dMoment: dMoment, + nodeSetCache: nodeSetCache, + authModule: authModule, + event: common.NewEvent(), + logger: logger, + roundToNotify: roundToNotify, } validLeader := func(block *types.Block) bool { @@ -612,6 +631,7 @@ func (con *Consensus) Stop() { } func (con *Consensus) processMsg(msgChan <-chan interface{}) { +MessageLoop: for { var msg interface{} select { @@ -622,8 +642,30 @@ func (con *Consensus) processMsg(msgChan <-chan interface{}) { switch val := msg.(type) { case *types.Block: - // For sync mode. - if val.IsFinalized() { + if ch, exist := func() (chan<- *types.Block, bool) { + con.lock.RLock() + defer con.lock.RUnlock() + ch, e := con.baConfirmedBlock[val.Hash] + return ch, e + }(); exist { + if err := con.lattice.SanityCheck(val); err != nil { + if err == ErrRetrySanityCheckLater { + err = nil + } else { + con.logger.Error("SanityCheck failed", "error", err) + continue MessageLoop + } + } + con.lock.Lock() + defer con.lock.Unlock() + // In case of multiple delivered block. + if _, exist := con.baConfirmedBlock[val.Hash]; !exist { + continue MessageLoop + } + delete(con.baConfirmedBlock, val.Hash) + ch <- val + } else if val.IsFinalized() { + // For sync mode. if err := con.processFinalizedBlock(val); err != nil { con.logger.Error("Failed to process finalized block", "error", err) diff --git a/core/consensus_test.go b/core/consensus_test.go index ed98b76..b8d25d9 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -37,6 +37,10 @@ type network struct { conn *networkConnection } +// PullBlocks tries to pull blocks from the DEXON network. +func (n *network) PullBlocks(common.Hashes) { +} + // BroadcastVote broadcasts vote to all nodes in DEXON network. func (n *network) BroadcastVote(vote *types.Vote) { n.conn.broadcast(n.nID, vote) diff --git a/core/interfaces.go b/core/interfaces.go index 01e9096..4f6ad45 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -59,6 +59,9 @@ type Debug interface { // Network describs the network interface that interacts with DEXON consensus // core. type Network interface { + // PullBlocks tries to pull blocks from the DEXON network. + PullBlocks(hashes common.Hashes) + // BroadcastVote broadcasts vote to all nodes in DEXON network. BroadcastVote(vote *types.Vote) diff --git a/simulation/network.go b/simulation/network.go index 5cd97b6..86a70b9 100644 --- a/simulation/network.go +++ b/simulation/network.go @@ -88,6 +88,7 @@ type network struct { toNode chan interface{} sentRandomness map[common.Hash]struct{} sentAgreement map[common.Hash]struct{} + blockCache map[common.Hash]*types.Block } // newNetwork setup network stuffs for nodes, which provides an @@ -105,6 +106,7 @@ func newNetwork(pubKey crypto.PublicKey, cfg config.Networking) (n *network) { toConsensus: make(chan interface{}, 1000), sentRandomness: make(map[common.Hash]struct{}), sentAgreement: make(map[common.Hash]struct{}), + blockCache: make(map[common.Hash]*types.Block), } n.ctx, n.ctxCancel = context.WithCancel(context.Background()) // Construct transport layer. @@ -123,6 +125,20 @@ func newNetwork(pubKey crypto.PublicKey, cfg config.Networking) (n *network) { return } +// PullBlock implements core.Network interface. +func (n *network) PullBlocks(hashes common.Hashes) { + go func() { + for _, hash := range hashes { + // TODO(jimmy-dexon): request block from network instead of cache. + if block, exist := n.blockCache[hash]; exist { + n.toConsensus <- block + continue + } + panic(fmt.Errorf("unknown block %s requested", hash)) + } + }() +} + // BroadcastVote implements core.Network interface. func (n *network) BroadcastVote(vote *types.Vote) { if err := n.trans.Broadcast(vote); err != nil { @@ -240,6 +256,15 @@ func (n *network) run() { // The dispatcher declararion: // to consensus or node, that's the question. disp := func(e *test.TransportEnvelope) { + if block, ok := e.Msg.(*types.Block); ok { + if len(n.blockCache) > 500 { + for k := range n.blockCache { + delete(n.blockCache, k) + break + } + } + n.blockCache[block.Hash] = block + } switch e.Msg.(type) { case *types.Block, *types.Vote, *types.AgreementResult, *types.BlockRandomnessResult, |