aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2018-11-27 09:55:13 +0800
committerGitHub <noreply@github.com>2018-11-27 09:55:13 +0800
commit60c1b59a97379753889b693460ada18b45d2beea (patch)
treec2238c8eeb27237e3199274b6d32fac03bac3f72
parent6d95559bf7eb62e6c114ca4d4040c44ffd553629 (diff)
downloaddexon-consensus-60c1b59a97379753889b693460ada18b45d2beea.tar
dexon-consensus-60c1b59a97379753889b693460ada18b45d2beea.tar.gz
dexon-consensus-60c1b59a97379753889b693460ada18b45d2beea.tar.bz2
dexon-consensus-60c1b59a97379753889b693460ada18b45d2beea.tar.lz
dexon-consensus-60c1b59a97379753889b693460ada18b45d2beea.tar.xz
dexon-consensus-60c1b59a97379753889b693460ada18b45d2beea.tar.zst
dexon-consensus-60c1b59a97379753889b693460ada18b45d2beea.zip
core: Fix stuffs (#342)
-rw-r--r--core/agreement-mgr.go34
-rw-r--r--core/compaction-chain.go40
-rw-r--r--core/consensus.go55
-rw-r--r--core/lattice.go21
-rw-r--r--core/lattice_test.go132
-rw-r--r--core/utils.go2
6 files changed, 124 insertions, 160 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go
index 10469de..f695e36 100644
--- a/core/agreement-mgr.go
+++ b/core/agreement-mgr.go
@@ -1,18 +1,18 @@
-// Copyright 2018 The dexon-consensus-core Authors
-// This file is part of the dexon-consensus-core library.
+// Copyright 2018 The dexon-consensus Authors
+// This file is part of the dexon-consensus library.
//
-// The dexon-consensus-core library is free software: you can redistribute it
+// The dexon-consensus library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
-// The dexon-consensus-core library is distributed in the hope that it will be
+// The dexon-consensus library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
-// along with the dexon-consensus-core library. If not, see
+// along with the dexon-consensus library. If not, see
// <http://www.gnu.org/licenses/>.
package core
@@ -382,15 +382,29 @@ Loop:
// This round is finished.
break Loop
}
- nextHeight, err := mgr.lattice.NextHeight(recv.round, setting.chainID)
- if err != nil {
- panic(err)
+ oldPos := agr.agreementID()
+ var nextHeight uint64
+ for {
+ nextHeight, err = mgr.lattice.NextHeight(recv.round, setting.chainID)
+ if err != nil {
+ panic(err)
+ }
+ if isStop(oldPos) || nextHeight == 0 {
+ break
+ }
+ if nextHeight > oldPos.Height {
+ break
+ }
+ time.Sleep(100 * time.Millisecond)
+ mgr.logger.Debug("Lattice not ready!!!",
+ "old", &oldPos, "next", nextHeight)
}
- agr.restart(setting.notarySet, types.Position{
+ nextPos := types.Position{
Round: recv.round,
ChainID: setting.chainID,
Height: nextHeight,
- }, setting.crs)
+ }
+ agr.restart(setting.notarySet, nextPos, setting.crs)
default:
}
if agr.pullVotes() {
diff --git a/core/compaction-chain.go b/core/compaction-chain.go
index f6bc014..20a7bdd 100644
--- a/core/compaction-chain.go
+++ b/core/compaction-chain.go
@@ -21,6 +21,7 @@ import (
"container/heap"
"fmt"
"sync"
+ "time"
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
@@ -39,6 +40,13 @@ var (
"incorrect block randomness result")
)
+const maxPendingPeriod = 3 * time.Second
+
+type pendingRandomnessResult struct {
+ receivedTime time.Time
+ randResult *types.BlockRandomnessResult
+}
+
type finalizedBlockHeap = types.ByFinalizationHeight
type compactionChain struct {
@@ -47,6 +55,7 @@ type compactionChain struct {
tsigVerifier *TSigVerifierCache
blocks map[common.Hash]*types.Block
blockRandomness map[common.Hash][]byte
+ pendingRandomness map[common.Hash]pendingRandomnessResult
pendingBlocks []*types.Block
pendingFinalizedBlocks *finalizedBlockHeap
lock sync.RWMutex
@@ -61,6 +70,7 @@ func newCompactionChain(gov Governance) *compactionChain {
tsigVerifier: NewTSigVerifierCache(gov, 7),
blocks: make(map[common.Hash]*types.Block),
blockRandomness: make(map[common.Hash][]byte),
+ pendingRandomness: make(map[common.Hash]pendingRandomnessResult),
pendingFinalizedBlocks: pendingFinalizedBlocks,
}
}
@@ -83,6 +93,10 @@ func (cc *compactionChain) registerBlock(block *types.Block) {
cc.lock.Lock()
defer cc.lock.Unlock()
cc.blocks[block.Hash] = block
+ if rand, exist := cc.pendingRandomness[block.Hash]; exist {
+ cc.blockRandomness[rand.randResult.BlockHash] = rand.randResult.Randomness
+ delete(cc.pendingRandomness, block.Hash)
+ }
}
func (cc *compactionChain) blockRegistered(hash common.Hash) bool {
@@ -286,13 +300,6 @@ func (cc *compactionChain) extractFinalizedBlocks() []*types.Block {
func (cc *compactionChain) processBlockRandomnessResult(
rand *types.BlockRandomnessResult) error {
- cc.lock.Lock()
- defer cc.lock.Unlock()
- if !cc.blockRegisteredNoLock(rand.BlockHash) {
- // If the randomness result is discarded here, it'll later be processed by
- //finalized block
- return ErrBlockNotRegistered
- }
ok, err := cc.verifyRandomness(
rand.BlockHash, rand.Position.Round, rand.Randomness)
if err != nil {
@@ -301,10 +308,29 @@ func (cc *compactionChain) processBlockRandomnessResult(
if !ok {
return ErrIncorrectBlockRandomnessResult
}
+ cc.lock.Lock()
+ defer cc.lock.Unlock()
+ if !cc.blockRegisteredNoLock(rand.BlockHash) {
+ cc.purgePending()
+ cc.pendingRandomness[rand.BlockHash] = pendingRandomnessResult{
+ receivedTime: time.Now(),
+ randResult: rand,
+ }
+ return ErrBlockNotRegistered
+ }
cc.blockRandomness[rand.BlockHash] = rand.Randomness
return nil
}
+func (cc *compactionChain) purgePending() {
+ now := time.Now()
+ for key, rand := range cc.pendingRandomness {
+ if now.After(rand.receivedTime.Add(maxPendingPeriod)) {
+ delete(cc.pendingRandomness, key)
+ }
+ }
+}
+
func (cc *compactionChain) lastBlock() *types.Block {
cc.lock.RLock()
defer cc.lock.RUnlock()
diff --git a/core/consensus.go b/core/consensus.go
index 49874d3..af40417 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -144,6 +144,54 @@ func (recv *consensusBAReceiver) ConfirmBlock(
}
}
recv.consensus.ccModule.registerBlock(block)
+ if block.Position.Height != 0 &&
+ !recv.consensus.lattice.Exist(block.ParentHash) {
+ go func(hash common.Hash) {
+ parentHash := hash
+ for {
+ recv.consensus.logger.Warn("Parent block not confirmed",
+ "hash", parentHash,
+ "chainID", recv.chainID)
+ ch := make(chan *types.Block)
+ if !func() bool {
+ recv.consensus.lock.Lock()
+ defer recv.consensus.lock.Unlock()
+ if _, exist := recv.consensus.baConfirmedBlock[parentHash]; exist {
+ return false
+ }
+ recv.consensus.baConfirmedBlock[parentHash] = ch
+ return true
+ }() {
+ return
+ }
+ var block *types.Block
+ PullBlockLoop:
+ for {
+ recv.consensus.logger.Debug("Calling Network.PullBlock for parent",
+ "hash", parentHash)
+ recv.consensus.network.PullBlocks(common.Hashes{parentHash})
+ select {
+ case block = <-ch:
+ break PullBlockLoop
+ case <-time.After(1 * time.Second):
+ }
+ }
+ recv.consensus.logger.Info("Receive parent block",
+ "hash", block.ParentHash,
+ "chainID", recv.chainID)
+ recv.consensus.ccModule.registerBlock(block)
+ if err := recv.consensus.processBlock(block); err != nil {
+ recv.consensus.logger.Error("Failed to process block", "error", err)
+ return
+ }
+ parentHash = block.ParentHash
+ if block.Position.Height == 0 ||
+ recv.consensus.lattice.Exist(parentHash) {
+ return
+ }
+ }
+ }(block.ParentHash)
+ }
voteList := make([]types.Vote, 0, len(votes))
for _, vote := range votes {
if vote.BlockHash != hash {
@@ -157,7 +205,7 @@ func (recv *consensusBAReceiver) ConfirmBlock(
Votes: voteList,
IsEmptyBlock: isEmptyBlockConfirmed,
}
- recv.consensus.logger.Debug("Calling Network.BroadcastAgreementResult",
+ recv.consensus.logger.Debug("Propose AgreementResult",
"result", result)
recv.consensus.network.BroadcastAgreementResult(result)
if err := recv.consensus.processBlock(block); err != nil {
@@ -766,7 +814,8 @@ func (con *Consensus) ProcessAgreementResult(
if !con.cfgModule.touchTSigHash(rand.BlockHash) {
return nil
}
- con.logger.Debug("Calling Network.BroadcastAgreementResult", "result", rand)
+ con.logger.Debug("Rebroadcast AgreementResult",
+ "result", rand)
con.network.BroadcastAgreementResult(rand)
dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round)
if err != nil {
@@ -892,6 +941,8 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
go con.event.NotifyTime(b.Finalization.Timestamp)
}
deliveredBlocks = con.ccModule.extractBlocks()
+ con.logger.Debug("Last block in compaction chain",
+ "block", con.ccModule.lastBlock())
for _, b := range deliveredBlocks {
if err = con.db.Update(*b); err != nil {
panic(err)
diff --git a/core/lattice.go b/core/lattice.go
index 402a468..f76813d 100644
--- a/core/lattice.go
+++ b/core/lattice.go
@@ -155,6 +155,16 @@ func (l *Lattice) SanityCheck(b *types.Block) (err error) {
return
}
+// Exist checks if the block is known to lattice.
+func (l *Lattice) Exist(hash common.Hash) bool {
+ l.lock.RLock()
+ defer l.lock.RUnlock()
+ if _, err := l.data.findBlock(hash); err != nil {
+ return false
+ }
+ return true
+}
+
// addBlockToLattice adds a block into lattice, and delivers blocks with the
// acks already delivered.
//
@@ -164,6 +174,8 @@ func (l *Lattice) addBlockToLattice(
if tip := l.data.chains[input.Position.ChainID].tip; tip != nil {
if !input.Position.Newer(&tip.Position) {
+ l.logger.Warn("Dropping block: older than tip",
+ "block", input, "tip", tip)
return
}
}
@@ -203,7 +215,7 @@ func (l *Lattice) addBlockToLattice(
if l.debug != nil {
l.debug.StronglyAcked(b.Hash)
}
- l.logger.Debug("Calling Application.BlockConfirmed", "block", input)
+ l.logger.Debug("Calling Application.BlockConfirmed", "block", b)
l.app.BlockConfirmed(*b.Clone())
// Purge blocks in pool with the same chainID and lower height.
l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height)
@@ -298,11 +310,4 @@ func (l *Lattice) AppendConfig(round uint64, config *types.Config) (err error) {
// ProcessFinalizedBlock is used for syncing lattice data.
func (l *Lattice) ProcessFinalizedBlock(b *types.Block) {
- defer func() { l.retryAdd = true }()
- l.lock.Lock()
- defer l.lock.Unlock()
- if err := l.data.addFinalizedBlock(b); err != nil {
- panic(err)
- }
- l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height)
}
diff --git a/core/lattice_test.go b/core/lattice_test.go
index 5c5a667..42b8e06 100644
--- a/core/lattice_test.go
+++ b/core/lattice_test.go
@@ -211,138 +211,6 @@ func (s *LatticeTestSuite) TestBasicUsage() {
}
}
-func (s *LatticeTestSuite) TestSync() {
- // A Lattice prepares blocks on chains randomly selected each time and
- // processes them. Those generated blocks are kept into a buffer, and
- // processed by other Lattice instances with random order.
- var (
- chainNum = uint32(19)
- otherLatticeNum = 50
- )
- if testing.Short() {
- chainNum = 13
- otherLatticeNum = 20
- }
- var (
- blockNum = 500
- // The first `desyncNum` blocks revealed are considered "desynced" and will
- // not be delivered to lattice. After `syncNum` blocks have revealed, the
- // system is considered "synced" and start feeding blocks that are desynced
- // to processFinalizedBlock.
- desyncNum = 50
- syncNum = 150
- req = s.Require()
- err error
- cfg = types.Config{
- NumChains: chainNum,
- NotarySetSize: chainNum,
- PhiRatio: float32(2) / float32(3),
- K: 0,
- MinBlockInterval: 0,
- RoundInterval: time.Hour,
- }
- dMoment = time.Now().UTC()
- master = s.newTestLatticeMgr(&cfg, dMoment)
- revealSeq = map[string]struct{}{}
- )
- // Make sure the test setup is correct.
- req.True(syncNum > desyncNum)
- // Master-lattice generates blocks.
- for i := uint32(0); i < chainNum; i++ {
- // Produced genesis blocks should be delivered before all other blocks,
- // or the consensus time would be wrong.
- b, err := master.prepareBlock(i)
- req.NotNil(b)
- req.NoError(err)
- // Ignore error "acking blocks don't exist".
- req.NoError(master.processBlock(b))
- }
- for i := 0; i < (blockNum - int(chainNum)); i++ {
- b, err := master.prepareBlock(uint32(rand.Intn(int(chainNum))))
- req.NotNil(b)
- req.NoError(err)
- // Ignore error "acking blocks don't exist".
- req.NoError(master.processBlock(b))
- }
- req.NoError(master.app.Verify())
- // Now we have some blocks, replay them on different lattices.
- iter, err := master.db.GetAll()
- req.NoError(err)
- revealer, err := test.NewRandomTipRevealer(iter)
- req.NoError(err)
- for i := 0; i < otherLatticeNum; i++ {
- synced := false
- syncFromHeight := uint64(0)
- revealer.Reset()
- revealed := ""
- other := s.newTestLatticeMgr(&cfg, dMoment)
- chainTip := make([]*types.Block, chainNum)
- for height := 0; ; height++ {
- b, err := revealer.Next()
- if err != nil {
- if err == blockdb.ErrIterationFinished {
- err = nil
- break
- }
- }
- req.NoError(err)
- if height >= syncNum && !synced {
- synced = true
- syncToHeight := uint64(0)
- for _, block := range chainTip {
- if block == nil {
- synced = false
- continue
- }
- result, exist := master.app.Delivered[block.Hash]
- req.True(exist)
- if syncToHeight < result.ConsensusHeight {
- syncToHeight = result.ConsensusHeight
- }
- }
-
- for idx := syncFromHeight; idx < syncToHeight; idx++ {
- block, err := master.db.Get(master.app.DeliverSequence[idx])
- req.Equal(idx+1, block.Finalization.Height)
- req.NoError(err)
- if err = other.db.Put(block); err != nil {
- req.Equal(blockdb.ErrBlockExists, err)
- }
- other.ccModule.processFinalizedBlock(&block)
- }
- extracted := other.ccModule.extractFinalizedBlocks()
- req.Len(extracted, int(syncToHeight-syncFromHeight))
- for _, block := range extracted {
- other.app.StronglyAcked(block.Hash)
- other.lattice.ProcessFinalizedBlock(block)
- }
- syncFromHeight = syncToHeight
- }
- if height > desyncNum {
- if chainTip[b.Position.ChainID] == nil {
- chainTip[b.Position.ChainID] = &b
- }
- if err = other.db.Put(b); err != nil {
- req.Equal(blockdb.ErrBlockExists, err)
- }
- delivered, err := other.lattice.addBlockToLattice(&b)
- req.NoError(err)
- revealed += b.Hash.String() + ","
- revealSeq[revealed] = struct{}{}
- req.NoError(other.lattice.PurgeBlocks(delivered))
- // TODO(jimmy-dexon): check if delivered set is a DAG.
- } else {
- other.app.StronglyAcked(b.Hash)
- }
- }
- for b := range master.app.Acked {
- if _, exist := other.app.Acked[b]; !exist {
- s.FailNowf("Block not delivered", "%s not exists", b)
- }
- }
- }
-}
-
func (s *LatticeTestSuite) TestSanityCheck() {
// This sanity check focuses on hash/signature part.
var (
diff --git a/core/utils.go b/core/utils.go
index 4e9cfdc..441aac1 100644
--- a/core/utils.go
+++ b/core/utils.go
@@ -61,7 +61,7 @@ func Debugf(format string, args ...interface{}) {
// Debugln is like fmt.Println, but only output when we are in debug mode.
func Debugln(args ...interface{}) {
if debug {
- fmt.Println(args)
+ fmt.Println(args...)
}
}