aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2019-03-26 11:59:24 +0800
committerJimmy Hu <jimmy.hu@dexon.org>2019-03-27 15:25:10 +0800
commit7783bc4ba52bfc534d5b4d91e78abb2ddad7d078 (patch)
treef57ea6f4d8b2faa23ab95691717b071f507e621f
parentb8ced165b1fb03394f8758e08148b0e5d06aa07b (diff)
downloaddexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar
dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar.gz
dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar.bz2
dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar.lz
dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar.xz
dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar.zst
dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.zip
core: bring back agreement result (#515)
* core: bring back agreement result * add logger * Fix * fixup
-rw-r--r--core/agreement-mgr.go24
-rw-r--r--core/agreement.go102
-rw-r--r--core/blockchain.go65
-rw-r--r--core/consensus.go48
-rw-r--r--core/syncer/agreement.go43
-rw-r--r--core/test/network.go13
-rw-r--r--core/types/block-randomness.go11
-rw-r--r--integration_test/byzantine_test.go11
8 files changed, 239 insertions, 78 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go
index bc6923a..9086e51 100644
--- a/core/agreement-mgr.go
+++ b/core/agreement-mgr.go
@@ -274,6 +274,9 @@ func (mgr *agreementMgr) processAgreementResult(
}
if result.Position == aID && !mgr.baModule.confirmed() {
mgr.logger.Info("Syncing BA", "position", result.Position)
+ if result.Position.Round >= DKGDelayRound {
+ return mgr.baModule.processAgreementResult(result)
+ }
for key := range result.Votes {
if err := mgr.baModule.processVote(&result.Votes[key]); err != nil {
return err
@@ -285,21 +288,26 @@ func (mgr *agreementMgr) processAgreementResult(
if err != nil {
return err
}
- mgr.logger.Debug("Calling Network.PullBlocks for fast syncing BA",
- "hash", result.BlockHash)
- mgr.network.PullBlocks(common.Hashes{result.BlockHash})
- mgr.logger.Debug("Calling Governance.CRS", "round", result.Position.Round)
- crs := utils.GetCRSWithPanic(mgr.gov, result.Position.Round, mgr.logger)
- for key := range result.Votes {
- if err := mgr.baModule.processVote(&result.Votes[key]); err != nil {
- return err
+ if result.Position.Round < DKGDelayRound {
+ mgr.logger.Debug("Calling Network.PullBlocks for fast syncing BA",
+ "hash", result.BlockHash)
+ mgr.network.PullBlocks(common.Hashes{result.BlockHash})
+ for key := range result.Votes {
+ if err := mgr.baModule.processVote(&result.Votes[key]); err != nil {
+ return err
+ }
}
}
+ mgr.logger.Debug("Calling Governance.CRS", "round", result.Position.Round)
+ crs := utils.GetCRSWithPanic(mgr.gov, result.Position.Round, mgr.logger)
leader, err := mgr.cache.GetLeaderNode(result.Position)
if err != nil {
return err
}
mgr.baModule.restart(nIDs, result.Position, leader, crs)
+ if result.Position.Round >= DKGDelayRound {
+ return mgr.baModule.processAgreementResult(result)
+ }
}
return nil
}
diff --git a/core/agreement.go b/core/agreement.go
index 5e7b7de..d53440b 100644
--- a/core/agreement.go
+++ b/core/agreement.go
@@ -117,20 +117,21 @@ type agreementData struct {
// agreement is the agreement protocal describe in the Crypto Shuffle Algorithm.
type agreement struct {
- state agreementState
- data *agreementData
- aID *atomic.Value
- doneChan chan struct{}
- notarySet map[types.NodeID]struct{}
- hasVoteFast bool
- hasOutput bool
- lock sync.RWMutex
- pendingBlock []pendingBlock
- pendingVote []pendingVote
- candidateBlock map[common.Hash]*types.Block
- fastForward chan uint64
- signer *utils.Signer
- logger common.Logger
+ state agreementState
+ data *agreementData
+ aID *atomic.Value
+ doneChan chan struct{}
+ notarySet map[types.NodeID]struct{}
+ hasVoteFast bool
+ hasOutput bool
+ lock sync.RWMutex
+ pendingBlock []pendingBlock
+ pendingVote []pendingVote
+ pendingAgreementResult map[types.Position]*types.AgreementResult
+ candidateBlock map[common.Hash]*types.Block
+ fastForward chan uint64
+ signer *utils.Signer
+ logger common.Logger
}
// newAgreement creates a agreement instance.
@@ -146,11 +147,12 @@ func newAgreement(
ID: ID,
leader: leader,
},
- aID: &atomic.Value{},
- candidateBlock: make(map[common.Hash]*types.Block),
- fastForward: make(chan uint64, 1),
- signer: signer,
- logger: logger,
+ aID: &atomic.Value{},
+ pendingAgreementResult: make(map[types.Position]*types.AgreementResult),
+ candidateBlock: make(map[common.Hash]*types.Block),
+ fastForward: make(chan uint64, 1),
+ signer: signer,
+ logger: logger,
}
agreement.stop()
return agreement
@@ -205,6 +207,22 @@ func (a *agreement) restart(
return
}
+ var result *types.AgreementResult
+ func() {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ newPendingAgreementResult := make(
+ map[types.Position]*types.AgreementResult)
+ for pos, agr := range a.pendingAgreementResult {
+ if pos.Newer(aID) {
+ newPendingAgreementResult[pos] = agr
+ } else if pos == aID {
+ result = agr
+ }
+ }
+ a.pendingAgreementResult = newPendingAgreementResult
+ }()
+
expireTime := time.Now().Add(-10 * time.Second)
replayBlock := make([]*types.Block, 0)
func() {
@@ -215,7 +233,11 @@ func (a *agreement) restart(
if aID.Newer(pending.block.Position) {
continue
} else if pending.block.Position == aID {
- replayBlock = append(replayBlock, pending.block)
+ if result == nil ||
+ result.Position.Round < DKGDelayRound ||
+ result.BlockHash == pending.block.Hash {
+ replayBlock = append(replayBlock, pending.block)
+ }
} else if pending.receivedTime.After(expireTime) {
newPendingBlock = append(newPendingBlock, pending)
}
@@ -232,7 +254,9 @@ func (a *agreement) restart(
if aID.Newer(pending.vote.Position) {
continue
} else if pending.vote.Position == aID {
- replayVote = append(replayVote, pending.vote)
+ if result == nil || result.Position.Round < DKGDelayRound {
+ replayVote = append(replayVote, pending.vote)
+ }
} else if pending.receivedTime.After(expireTime) {
newPendingVote = append(newPendingVote, pending)
}
@@ -247,6 +271,13 @@ func (a *agreement) restart(
}
}
+ if result != nil {
+ if err := a.processAgreementResult(result); err != nil {
+ a.logger.Error("Failed to process agreement result when retarting",
+ "result", result)
+ }
+ }
+
for _, vote := range replayVote {
if err := a.processVote(vote); err != nil {
a.logger.Error("Failed to process vote when restarting agreement",
@@ -526,6 +557,8 @@ func (a *agreement) processFinalizedBlock(block *types.Block) {
}
a.addCandidateBlockNoLock(block)
a.hasOutput = true
+ a.data.lock.Lock()
+ defer a.data.lock.Unlock()
a.data.recv.ConfirmBlock(block.Hash, nil)
if a.doneChan != nil {
close(a.doneChan)
@@ -533,6 +566,33 @@ func (a *agreement) processFinalizedBlock(block *types.Block) {
}
}
+func (a *agreement) processAgreementResult(result *types.AgreementResult) error {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ aID := a.agreementID()
+ if result.Position.Older(aID) {
+ return nil
+ } else if result.Position.Newer(aID) {
+ a.pendingAgreementResult[result.Position] = result
+ return nil
+ }
+ if a.hasOutput {
+ return nil
+ }
+ a.data.lock.Lock()
+ defer a.data.lock.Unlock()
+ if _, exist := a.findCandidateBlockNoLock(result.BlockHash); !exist {
+ a.data.recv.PullBlocks(common.Hashes{result.BlockHash})
+ }
+ a.hasOutput = true
+ a.data.recv.ConfirmBlock(result.BlockHash, nil)
+ if a.doneChan != nil {
+ close(a.doneChan)
+ a.doneChan = nil
+ }
+ return nil
+}
+
func (a *agreement) done() <-chan struct{} {
a.lock.Lock()
defer a.lock.Unlock()
diff --git a/core/blockchain.go b/core/blockchain.go
index 798a080..283d22e 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -127,18 +127,19 @@ type tsigVerifierGetter interface {
}
type blockChain struct {
- lock sync.RWMutex
- ID types.NodeID
- lastConfirmed *types.Block
- lastDelivered *types.Block
- signer *utils.Signer
- vGetter tsigVerifierGetter
- app Application
- logger common.Logger
- configs []blockChainConfig
- pendingBlocks pendingBlockRecords
- confirmedBlocks types.BlocksByPosition
- dMoment time.Time
+ lock sync.RWMutex
+ ID types.NodeID
+ lastConfirmed *types.Block
+ lastDelivered *types.Block
+ signer *utils.Signer
+ vGetter tsigVerifierGetter
+ app Application
+ logger common.Logger
+ pendingRandomnesses map[types.Position]*types.AgreementResult
+ configs []blockChainConfig
+ pendingBlocks pendingBlockRecords
+ confirmedBlocks types.BlocksByPosition
+ dMoment time.Time
}
func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block,
@@ -153,6 +154,8 @@ func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block,
app: app,
logger: logger,
dMoment: dMoment,
+ pendingRandomnesses: make(
+ map[types.Position]*types.AgreementResult),
}
}
@@ -309,7 +312,9 @@ func (bc *blockChain) addEmptyBlock(position types.Position) (
// addBlock should be called when the block is confirmed by BA, we won't perform
// sanity check against this block, it's ok to add block with skipping height.
func (bc *blockChain) addBlock(b *types.Block) error {
- if b.Position.Round >= DKGDelayRound && len(b.Finalization.Randomness) == 0 {
+ if b.Position.Round >= DKGDelayRound &&
+ len(b.Finalization.Randomness) == 0 &&
+ !bc.setRandomnessFromPending(b) {
return ErrMissingRandomness
}
bc.lock.Lock()
@@ -327,6 +332,7 @@ func (bc *blockChain) addBlock(b *types.Block) error {
} else if b.IsGenesis() {
confirmed = true
}
+ delete(bc.pendingRandomnesses, b.Position)
if !confirmed {
return bc.addPendingBlockRecord(pendingBlockRecord{b.Position, b})
}
@@ -606,3 +612,36 @@ func (bc *blockChain) confirmBlock(b *types.Block) {
bc.confirmedBlocks = append(bc.confirmedBlocks, b)
bc.purgeConfig()
}
+
+func (bc *blockChain) setRandomnessFromPending(b *types.Block) bool {
+ if r, exist := bc.pendingRandomnesses[b.Position]; exist {
+ if !r.BlockHash.Equal(b.Hash) {
+ panic(fmt.Errorf("mismathed randomness: %s %s", b, r))
+ }
+ b.Finalization.Randomness = r.Randomness
+ delete(bc.pendingRandomnesses, b.Position)
+ return true
+ }
+ return false
+}
+
+func (bc *blockChain) processAgreementResult(result *types.AgreementResult) error {
+ if result.Position.Round < DKGDelayRound {
+ return nil
+ }
+ ok, err := bc.verifyRandomness(
+ result.BlockHash, result.Position.Round, result.Randomness)
+ if err != nil {
+ return err
+ }
+ if !ok {
+ return ErrIncorrectAgreementResult
+ }
+ bc.lock.RLock()
+ defer bc.lock.RUnlock()
+ if !result.Position.Newer(bc.lastConfirmed.Position) {
+ return nil
+ }
+ bc.pendingRandomnesses[result.Position] = result
+ return nil
+}
diff --git a/core/consensus.go b/core/consensus.go
index 5a331d2..5106f18 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -227,6 +227,9 @@ func (recv *consensusBAReceiver) ConfirmBlock(
}
}
+ // It's a workaround, the height for application is one-based.
+ block.Finalization.Height = block.Position.Height + 1
+
if len(votes) == 0 && len(block.Finalization.Randomness) == 0 {
recv.consensus.logger.Error("No votes to recover randomness",
"block", block)
@@ -258,24 +261,31 @@ func (recv *consensusBAReceiver) ConfirmBlock(
block.Finalization.Randomness = rand.Signature[:]
}
}
- // It's a workaround, the height for application is one-based.
- block.Finalization.Height = block.Position.Height + 1
if recv.isNotary {
- if block.Position.Round < DKGDelayRound {
- result := &types.AgreementResult{
- BlockHash: block.Hash,
- Position: block.Position,
- Votes: voteList,
- FinalizationHeight: block.Finalization.Height,
- IsEmptyBlock: isEmptyBlockConfirmed,
+ result := &types.AgreementResult{
+ BlockHash: block.Hash,
+ Position: block.Position,
+ Votes: voteList,
+ FinalizationHeight: block.Finalization.Height,
+ IsEmptyBlock: isEmptyBlockConfirmed,
+ Randomness: block.Finalization.Randomness,
+ }
+ recv.consensus.logger.Debug("Broadcast AgreementResult",
+ "result", result)
+ recv.consensus.network.BroadcastAgreementResult(result)
+ if block.IsEmpty() {
+ if err :=
+ recv.consensus.bcModule.processAgreementResult(
+ result); err != nil {
+ recv.consensus.logger.Warn(
+ "Failed to process agreement result",
+ "result", result)
}
- recv.consensus.logger.Debug("Propose AgreementResult",
- "result", result)
- recv.consensus.msgChan <- result
- } else {
+ }
+ if block.Position.Round >= DKGDelayRound {
recv.consensus.logger.Debug(
- "Propose AgreementResult as finalized block",
+ "Broadcast finalized block",
"block", block)
recv.consensus.network.BroadcastBlock(block)
}
@@ -354,7 +364,7 @@ CleanChannelLoop:
if block.Position.Height > changeNotaryHeight &&
block.Position.Round <= currentRound {
panic(fmt.Errorf(
- "round not switch when confirmig: %s, %d, should switch at %d, %s",
+ "round not switch when confirming: %s, %d, should switch at %d, %s",
block, currentRound, changeNotaryHeight, newPos))
}
recv.restartNotary <- newPos
@@ -1207,9 +1217,6 @@ func (con *Consensus) ProcessVote(vote *types.Vote) (err error) {
// ProcessAgreementResult processes the randomness request.
func (con *Consensus) ProcessAgreementResult(
rand *types.AgreementResult) error {
- if rand.Position.Round >= DKGDelayRound {
- return nil
- }
if !con.baMgr.touchAgreementResult(rand) {
return nil
}
@@ -1218,12 +1225,15 @@ func (con *Consensus) ProcessAgreementResult(
con.baMgr.untouchAgreementResult(rand)
return err
}
+ if err := con.bcModule.processAgreementResult(rand); err != nil {
+ return err
+ }
// Syncing BA Module.
if err := con.baMgr.processAgreementResult(rand); err != nil {
return err
}
- con.logger.Debug("Broadcast AgreementResult",
+ con.logger.Debug("Rebroadcast AgreementResult",
"result", rand)
con.network.BroadcastAgreementResult(rand)
diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go
index 98e86b1..a4a0f20 100644
--- a/core/syncer/agreement.go
+++ b/core/syncer/agreement.go
@@ -19,6 +19,7 @@ package syncer
import (
"context"
+ "fmt"
"time"
"github.com/dexon-foundation/dexon-consensus/common"
@@ -155,17 +156,9 @@ func (a *agreement) processFinalizedBlock(block *types.Block) {
return
}
a.confirm(block)
- if block.Position.Height > a.chainTip+1 {
- if _, exist := a.confirmedBlocks[block.ParentHash]; !exist {
- a.pullChan <- block.ParentHash
- }
- }
}
func (a *agreement) processAgreementResult(r *types.AgreementResult) {
- if r.Position.Round >= core.DKGDelayRound {
- return
- }
// Cache those results that CRS is not ready yet.
if _, exists := a.confirmedBlocks[r.BlockHash]; exists {
a.logger.Trace("Agreement result already confirmed", "result", r)
@@ -187,9 +180,32 @@ func (a *agreement) processAgreementResult(r *types.AgreementResult) {
"error", err)
return
}
+ if r.Position.Round >= core.DKGDelayRound {
+ verifier, ok, err := a.tsigVerifierCache.UpdateAndGet(r.Position.Round)
+ if err != nil {
+ a.logger.Error("error verifying agreement result randomness",
+ "result", r,
+ "error", err)
+ return
+ }
+ if !ok {
+ a.logger.Error("cannot verify agreement result randomness", "result", r)
+ return
+ }
+ if !verifier.VerifySignature(r.BlockHash, crypto.Signature{
+ Type: "bls",
+ Signature: r.Randomness,
+ }) {
+ a.logger.Error("incorrect agreement result randomness", "result", r)
+ return
+ }
+ }
if r.IsEmptyBlock {
b := &types.Block{
Position: r.Position,
+ Finalization: types.FinalizationResult{
+ Randomness: r.Randomness,
+ },
}
// Empty blocks should be confirmed directly, they won't be sent over
// the wire.
@@ -198,6 +214,7 @@ func (a *agreement) processAgreementResult(r *types.AgreementResult) {
}
if bs, exist := a.blocks[r.Position]; exist {
if b, exist := bs[r.BlockHash]; exist {
+ b.Finalization.Randomness = r.Randomness
a.confirm(b)
return
}
@@ -250,6 +267,11 @@ func (a *agreement) processNewCRS(round uint64) {
// confirm notifies consensus the confirmation of a block in BA.
func (a *agreement) confirm(b *types.Block) {
+ if b.Position.Round >= core.DKGDelayRound &&
+ len(b.Finalization.Randomness) == 0 {
+ panic(fmt.Errorf("confirm a block %s without randomness", b))
+ }
+ b.Finalization.Height = b.Position.Height + 1
if _, exist := a.confirmedBlocks[b.Hash]; !exist {
delete(a.blocks, b.Position)
delete(a.agreementResults, b.Hash)
@@ -267,4 +289,9 @@ func (a *agreement) confirm(b *types.Block) {
}
a.confirmedBlocks[b.Hash] = struct{}{}
}
+ if b.Position.Height > a.chainTip+1 {
+ if _, exist := a.confirmedBlocks[b.ParentHash]; !exist {
+ a.pullChan <- b.ParentHash
+ }
+ }
}
diff --git a/core/test/network.go b/core/test/network.go
index c903c57..b0ce3f7 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -41,6 +41,7 @@ const (
maxVoteCache = 128
// Gossiping parameter.
+ maxAgreementResultBroadcast = 3
gossipAgreementResultPercent = 33
)
@@ -282,9 +283,11 @@ func (n *Network) BroadcastBlock(block *types.Block) {
// Avoid data race in fake transport.
block = n.cloneForFake(block).(*types.Block)
notarySet := n.getNotarySet(block.Position.Round)
- if err := n.trans.Broadcast(
- notarySet, n.config.DirectLatency, block); err != nil {
- panic(err)
+ if !block.IsFinalized() {
+ if err := n.trans.Broadcast(
+ notarySet, n.config.DirectLatency, block); err != nil {
+ panic(err)
+ }
}
if err := n.trans.Broadcast(getComplementSet(n.peers, notarySet),
n.config.GossipLatency, block); err != nil {
@@ -308,10 +311,10 @@ func (n *Network) BroadcastAgreementResult(
n.addBlockFinalizationToCache(
result.BlockHash,
result.FinalizationHeight,
- nil,
+ result.Randomness,
)
notarySet := n.getNotarySet(result.Position.Round)
- count := len(notarySet)*gossipAgreementResultPercent/100 + 1
+ count := maxAgreementResultBroadcast
for nID := range notarySet {
if count--; count < 0 {
break
diff --git a/core/types/block-randomness.go b/core/types/block-randomness.go
index a3ba631..b87e8a1 100644
--- a/core/types/block-randomness.go
+++ b/core/types/block-randomness.go
@@ -18,6 +18,7 @@
package types
import (
+ "encoding/hex"
"fmt"
"github.com/dexon-foundation/dexon-consensus/common"
@@ -30,9 +31,15 @@ type AgreementResult struct {
Votes []Vote `json:"votes"`
IsEmptyBlock bool `json:"is_empty_block"`
FinalizationHeight uint64 `json:"finalization_height"`
+ Randomness []byte `json:"randomness"`
}
func (r *AgreementResult) String() string {
- return fmt.Sprintf("agreementResult{Block:%s Pos:%s}",
- r.BlockHash.String()[:6], r.Position)
+ if len(r.Randomness) == 0 {
+ return fmt.Sprintf("agreementResult{Block:%s Pos:%s}",
+ r.BlockHash.String()[:6], r.Position)
+ }
+ return fmt.Sprintf("agreementResult{Block:%s Pos:%s Rand:%s}",
+ r.BlockHash.String()[:6], r.Position,
+ hex.EncodeToString(r.Randomness)[:6])
}
diff --git a/integration_test/byzantine_test.go b/integration_test/byzantine_test.go
index 62dfe7a..2395470 100644
--- a/integration_test/byzantine_test.go
+++ b/integration_test/byzantine_test.go
@@ -20,6 +20,8 @@ package integration
import (
"context"
"fmt"
+ "log"
+ "os"
"sync"
"testing"
"time"
@@ -60,7 +62,7 @@ func (s *ByzantineTestSuite) setupNodes(
// setup nodes.
nodes := make(map[types.NodeID]*node)
wg.Add(len(prvKeys))
- for _, k := range prvKeys {
+ for i, k := range prvKeys {
dbInst, err := db.NewMemBackedDB()
s.Require().NoError(err)
nID := types.NewNodeID(k.PublicKey())
@@ -81,6 +83,11 @@ func (s *ByzantineTestSuite) setupNodes(
gov.SwitchToRemoteMode(networkModule)
gov.NotifyRound(0, 0)
networkModule.AttachNodeSetCache(utils.NewNodeSetCache(gov))
+ f, err := os.Create(fmt.Sprintf("log.%d.log", i))
+ if err != nil {
+ panic(err)
+ }
+ logger := common.NewCustomLogger(log.New(f, "", log.LstdFlags|log.Lmicroseconds))
app := test.NewApp(1, gov, nil)
nodes[nID] = &node{
ID: nID,
@@ -88,7 +95,7 @@ func (s *ByzantineTestSuite) setupNodes(
gov: gov,
db: dbInst,
network: networkModule,
- logger: &common.NullLogger{},
+ logger: logger,
}
go func() {
defer wg.Done()