From 60c1b59a97379753889b693460ada18b45d2beea Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Tue, 27 Nov 2018 09:55:13 +0800 Subject: core: Fix stuffs (#342) --- core/agreement-mgr.go | 34 ++++++++---- core/compaction-chain.go | 40 +++++++++++--- core/consensus.go | 55 +++++++++++++++++++- core/lattice.go | 21 +++++--- core/lattice_test.go | 132 ----------------------------------------------- core/utils.go | 2 +- 6 files changed, 124 insertions(+), 160 deletions(-) diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index 10469de..f695e36 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -1,18 +1,18 @@ -// Copyright 2018 The dexon-consensus-core Authors -// This file is part of the dexon-consensus-core library. +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus library. // -// The dexon-consensus-core library is free software: you can redistribute it +// The dexon-consensus library is free software: you can redistribute it // and/or modify it under the terms of the GNU Lesser General Public License as // published by the Free Software Foundation, either version 3 of the License, // or (at your option) any later version. // -// The dexon-consensus-core library is distributed in the hope that it will be +// The dexon-consensus library is distributed in the hope that it will be // useful, but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser // General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus-core library. If not, see +// along with the dexon-consensus library. If not, see // . package core @@ -382,15 +382,29 @@ Loop: // This round is finished. break Loop } - nextHeight, err := mgr.lattice.NextHeight(recv.round, setting.chainID) - if err != nil { - panic(err) + oldPos := agr.agreementID() + var nextHeight uint64 + for { + nextHeight, err = mgr.lattice.NextHeight(recv.round, setting.chainID) + if err != nil { + panic(err) + } + if isStop(oldPos) || nextHeight == 0 { + break + } + if nextHeight > oldPos.Height { + break + } + time.Sleep(100 * time.Millisecond) + mgr.logger.Debug("Lattice not ready!!!", + "old", &oldPos, "next", nextHeight) } - agr.restart(setting.notarySet, types.Position{ + nextPos := types.Position{ Round: recv.round, ChainID: setting.chainID, Height: nextHeight, - }, setting.crs) + } + agr.restart(setting.notarySet, nextPos, setting.crs) default: } if agr.pullVotes() { diff --git a/core/compaction-chain.go b/core/compaction-chain.go index f6bc014..20a7bdd 100644 --- a/core/compaction-chain.go +++ b/core/compaction-chain.go @@ -21,6 +21,7 @@ import ( "container/heap" "fmt" "sync" + "time" "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/crypto" @@ -39,6 +40,13 @@ var ( "incorrect block randomness result") ) +const maxPendingPeriod = 3 * time.Second + +type pendingRandomnessResult struct { + receivedTime time.Time + randResult *types.BlockRandomnessResult +} + type finalizedBlockHeap = types.ByFinalizationHeight type compactionChain struct { @@ -47,6 +55,7 @@ type compactionChain struct { tsigVerifier *TSigVerifierCache blocks map[common.Hash]*types.Block blockRandomness map[common.Hash][]byte + pendingRandomness map[common.Hash]pendingRandomnessResult pendingBlocks []*types.Block pendingFinalizedBlocks *finalizedBlockHeap lock sync.RWMutex @@ -61,6 +70,7 @@ func newCompactionChain(gov Governance) *compactionChain { tsigVerifier: NewTSigVerifierCache(gov, 7), blocks: make(map[common.Hash]*types.Block), blockRandomness: make(map[common.Hash][]byte), + pendingRandomness: make(map[common.Hash]pendingRandomnessResult), pendingFinalizedBlocks: pendingFinalizedBlocks, } } @@ -83,6 +93,10 @@ func (cc *compactionChain) registerBlock(block *types.Block) { cc.lock.Lock() defer cc.lock.Unlock() cc.blocks[block.Hash] = block + if rand, exist := cc.pendingRandomness[block.Hash]; exist { + cc.blockRandomness[rand.randResult.BlockHash] = rand.randResult.Randomness + delete(cc.pendingRandomness, block.Hash) + } } func (cc *compactionChain) blockRegistered(hash common.Hash) bool { @@ -286,13 +300,6 @@ func (cc *compactionChain) extractFinalizedBlocks() []*types.Block { func (cc *compactionChain) processBlockRandomnessResult( rand *types.BlockRandomnessResult) error { - cc.lock.Lock() - defer cc.lock.Unlock() - if !cc.blockRegisteredNoLock(rand.BlockHash) { - // If the randomness result is discarded here, it'll later be processed by - //finalized block - return ErrBlockNotRegistered - } ok, err := cc.verifyRandomness( rand.BlockHash, rand.Position.Round, rand.Randomness) if err != nil { @@ -301,10 +308,29 @@ func (cc *compactionChain) processBlockRandomnessResult( if !ok { return ErrIncorrectBlockRandomnessResult } + cc.lock.Lock() + defer cc.lock.Unlock() + if !cc.blockRegisteredNoLock(rand.BlockHash) { + cc.purgePending() + cc.pendingRandomness[rand.BlockHash] = pendingRandomnessResult{ + receivedTime: time.Now(), + randResult: rand, + } + return ErrBlockNotRegistered + } cc.blockRandomness[rand.BlockHash] = rand.Randomness return nil } +func (cc *compactionChain) purgePending() { + now := time.Now() + for key, rand := range cc.pendingRandomness { + if now.After(rand.receivedTime.Add(maxPendingPeriod)) { + delete(cc.pendingRandomness, key) + } + } +} + func (cc *compactionChain) lastBlock() *types.Block { cc.lock.RLock() defer cc.lock.RUnlock() diff --git a/core/consensus.go b/core/consensus.go index 49874d3..af40417 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -144,6 +144,54 @@ func (recv *consensusBAReceiver) ConfirmBlock( } } recv.consensus.ccModule.registerBlock(block) + if block.Position.Height != 0 && + !recv.consensus.lattice.Exist(block.ParentHash) { + go func(hash common.Hash) { + parentHash := hash + for { + recv.consensus.logger.Warn("Parent block not confirmed", + "hash", parentHash, + "chainID", recv.chainID) + ch := make(chan *types.Block) + if !func() bool { + recv.consensus.lock.Lock() + defer recv.consensus.lock.Unlock() + if _, exist := recv.consensus.baConfirmedBlock[parentHash]; exist { + return false + } + recv.consensus.baConfirmedBlock[parentHash] = ch + return true + }() { + return + } + var block *types.Block + PullBlockLoop: + for { + recv.consensus.logger.Debug("Calling Network.PullBlock for parent", + "hash", parentHash) + recv.consensus.network.PullBlocks(common.Hashes{parentHash}) + select { + case block = <-ch: + break PullBlockLoop + case <-time.After(1 * time.Second): + } + } + recv.consensus.logger.Info("Receive parent block", + "hash", block.ParentHash, + "chainID", recv.chainID) + recv.consensus.ccModule.registerBlock(block) + if err := recv.consensus.processBlock(block); err != nil { + recv.consensus.logger.Error("Failed to process block", "error", err) + return + } + parentHash = block.ParentHash + if block.Position.Height == 0 || + recv.consensus.lattice.Exist(parentHash) { + return + } + } + }(block.ParentHash) + } voteList := make([]types.Vote, 0, len(votes)) for _, vote := range votes { if vote.BlockHash != hash { @@ -157,7 +205,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( Votes: voteList, IsEmptyBlock: isEmptyBlockConfirmed, } - recv.consensus.logger.Debug("Calling Network.BroadcastAgreementResult", + recv.consensus.logger.Debug("Propose AgreementResult", "result", result) recv.consensus.network.BroadcastAgreementResult(result) if err := recv.consensus.processBlock(block); err != nil { @@ -766,7 +814,8 @@ func (con *Consensus) ProcessAgreementResult( if !con.cfgModule.touchTSigHash(rand.BlockHash) { return nil } - con.logger.Debug("Calling Network.BroadcastAgreementResult", "result", rand) + con.logger.Debug("Rebroadcast AgreementResult", + "result", rand) con.network.BroadcastAgreementResult(rand) dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round) if err != nil { @@ -892,6 +941,8 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { go con.event.NotifyTime(b.Finalization.Timestamp) } deliveredBlocks = con.ccModule.extractBlocks() + con.logger.Debug("Last block in compaction chain", + "block", con.ccModule.lastBlock()) for _, b := range deliveredBlocks { if err = con.db.Update(*b); err != nil { panic(err) diff --git a/core/lattice.go b/core/lattice.go index 402a468..f76813d 100644 --- a/core/lattice.go +++ b/core/lattice.go @@ -155,6 +155,16 @@ func (l *Lattice) SanityCheck(b *types.Block) (err error) { return } +// Exist checks if the block is known to lattice. +func (l *Lattice) Exist(hash common.Hash) bool { + l.lock.RLock() + defer l.lock.RUnlock() + if _, err := l.data.findBlock(hash); err != nil { + return false + } + return true +} + // addBlockToLattice adds a block into lattice, and delivers blocks with the // acks already delivered. // @@ -164,6 +174,8 @@ func (l *Lattice) addBlockToLattice( if tip := l.data.chains[input.Position.ChainID].tip; tip != nil { if !input.Position.Newer(&tip.Position) { + l.logger.Warn("Dropping block: older than tip", + "block", input, "tip", tip) return } } @@ -203,7 +215,7 @@ func (l *Lattice) addBlockToLattice( if l.debug != nil { l.debug.StronglyAcked(b.Hash) } - l.logger.Debug("Calling Application.BlockConfirmed", "block", input) + l.logger.Debug("Calling Application.BlockConfirmed", "block", b) l.app.BlockConfirmed(*b.Clone()) // Purge blocks in pool with the same chainID and lower height. l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height) @@ -298,11 +310,4 @@ func (l *Lattice) AppendConfig(round uint64, config *types.Config) (err error) { // ProcessFinalizedBlock is used for syncing lattice data. func (l *Lattice) ProcessFinalizedBlock(b *types.Block) { - defer func() { l.retryAdd = true }() - l.lock.Lock() - defer l.lock.Unlock() - if err := l.data.addFinalizedBlock(b); err != nil { - panic(err) - } - l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height) } diff --git a/core/lattice_test.go b/core/lattice_test.go index 5c5a667..42b8e06 100644 --- a/core/lattice_test.go +++ b/core/lattice_test.go @@ -211,138 +211,6 @@ func (s *LatticeTestSuite) TestBasicUsage() { } } -func (s *LatticeTestSuite) TestSync() { - // A Lattice prepares blocks on chains randomly selected each time and - // processes them. Those generated blocks are kept into a buffer, and - // processed by other Lattice instances with random order. - var ( - chainNum = uint32(19) - otherLatticeNum = 50 - ) - if testing.Short() { - chainNum = 13 - otherLatticeNum = 20 - } - var ( - blockNum = 500 - // The first `desyncNum` blocks revealed are considered "desynced" and will - // not be delivered to lattice. After `syncNum` blocks have revealed, the - // system is considered "synced" and start feeding blocks that are desynced - // to processFinalizedBlock. - desyncNum = 50 - syncNum = 150 - req = s.Require() - err error - cfg = types.Config{ - NumChains: chainNum, - NotarySetSize: chainNum, - PhiRatio: float32(2) / float32(3), - K: 0, - MinBlockInterval: 0, - RoundInterval: time.Hour, - } - dMoment = time.Now().UTC() - master = s.newTestLatticeMgr(&cfg, dMoment) - revealSeq = map[string]struct{}{} - ) - // Make sure the test setup is correct. - req.True(syncNum > desyncNum) - // Master-lattice generates blocks. - for i := uint32(0); i < chainNum; i++ { - // Produced genesis blocks should be delivered before all other blocks, - // or the consensus time would be wrong. - b, err := master.prepareBlock(i) - req.NotNil(b) - req.NoError(err) - // Ignore error "acking blocks don't exist". - req.NoError(master.processBlock(b)) - } - for i := 0; i < (blockNum - int(chainNum)); i++ { - b, err := master.prepareBlock(uint32(rand.Intn(int(chainNum)))) - req.NotNil(b) - req.NoError(err) - // Ignore error "acking blocks don't exist". - req.NoError(master.processBlock(b)) - } - req.NoError(master.app.Verify()) - // Now we have some blocks, replay them on different lattices. - iter, err := master.db.GetAll() - req.NoError(err) - revealer, err := test.NewRandomTipRevealer(iter) - req.NoError(err) - for i := 0; i < otherLatticeNum; i++ { - synced := false - syncFromHeight := uint64(0) - revealer.Reset() - revealed := "" - other := s.newTestLatticeMgr(&cfg, dMoment) - chainTip := make([]*types.Block, chainNum) - for height := 0; ; height++ { - b, err := revealer.Next() - if err != nil { - if err == blockdb.ErrIterationFinished { - err = nil - break - } - } - req.NoError(err) - if height >= syncNum && !synced { - synced = true - syncToHeight := uint64(0) - for _, block := range chainTip { - if block == nil { - synced = false - continue - } - result, exist := master.app.Delivered[block.Hash] - req.True(exist) - if syncToHeight < result.ConsensusHeight { - syncToHeight = result.ConsensusHeight - } - } - - for idx := syncFromHeight; idx < syncToHeight; idx++ { - block, err := master.db.Get(master.app.DeliverSequence[idx]) - req.Equal(idx+1, block.Finalization.Height) - req.NoError(err) - if err = other.db.Put(block); err != nil { - req.Equal(blockdb.ErrBlockExists, err) - } - other.ccModule.processFinalizedBlock(&block) - } - extracted := other.ccModule.extractFinalizedBlocks() - req.Len(extracted, int(syncToHeight-syncFromHeight)) - for _, block := range extracted { - other.app.StronglyAcked(block.Hash) - other.lattice.ProcessFinalizedBlock(block) - } - syncFromHeight = syncToHeight - } - if height > desyncNum { - if chainTip[b.Position.ChainID] == nil { - chainTip[b.Position.ChainID] = &b - } - if err = other.db.Put(b); err != nil { - req.Equal(blockdb.ErrBlockExists, err) - } - delivered, err := other.lattice.addBlockToLattice(&b) - req.NoError(err) - revealed += b.Hash.String() + "," - revealSeq[revealed] = struct{}{} - req.NoError(other.lattice.PurgeBlocks(delivered)) - // TODO(jimmy-dexon): check if delivered set is a DAG. - } else { - other.app.StronglyAcked(b.Hash) - } - } - for b := range master.app.Acked { - if _, exist := other.app.Acked[b]; !exist { - s.FailNowf("Block not delivered", "%s not exists", b) - } - } - } -} - func (s *LatticeTestSuite) TestSanityCheck() { // This sanity check focuses on hash/signature part. var ( diff --git a/core/utils.go b/core/utils.go index 4e9cfdc..441aac1 100644 --- a/core/utils.go +++ b/core/utils.go @@ -61,7 +61,7 @@ func Debugf(format string, args ...interface{}) { // Debugln is like fmt.Println, but only output when we are in debug mode. func Debugln(args ...interface{}) { if debug { - fmt.Println(args) + fmt.Println(args...) } } -- cgit v1.2.3