aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/blockchain.go30
-rw-r--r--core/blockchain_test.go37
-rw-r--r--core/configuration-chain.go11
-rw-r--r--core/consensus.go85
-rw-r--r--core/dkg-tsig-protocol.go2
-rw-r--r--core/utils/round-event.go2
6 files changed, 129 insertions, 38 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index 2d67d62..9fbb861 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -137,11 +137,14 @@ type blockChain struct {
vGetter tsigVerifierGetter
app Application
logger common.Logger
- pendingRandomnesses map[types.Position]*types.AgreementResult
+ pendingRandomnesses map[types.Position][]byte
configs []blockChainConfig
pendingBlocks pendingBlockRecords
confirmedBlocks types.BlocksByPosition
dMoment time.Time
+
+ // Do not access this variable besides processAgreementResult.
+ lastPosition types.Position
}
func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block,
@@ -157,7 +160,7 @@ func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block,
logger: logger,
dMoment: dMoment,
pendingRandomnesses: make(
- map[types.Position]*types.AgreementResult),
+ map[types.Position][]byte),
}
}
@@ -629,10 +632,7 @@ func (bc *blockChain) confirmBlock(b *types.Block) {
func (bc *blockChain) setRandomnessFromPending(b *types.Block) bool {
if r, exist := bc.pendingRandomnesses[b.Position]; exist {
- if !r.BlockHash.Equal(b.Hash) {
- panic(fmt.Errorf("mismathed randomness: %s %s", b, r))
- }
- b.Randomness = r.Randomness
+ b.Randomness = r
delete(bc.pendingRandomnesses, b.Position)
return true
}
@@ -643,6 +643,9 @@ func (bc *blockChain) processAgreementResult(result *types.AgreementResult) erro
if result.Position.Round < DKGDelayRound {
return nil
}
+ if !result.Position.Newer(bc.lastPosition) {
+ return ErrSkipButNoError
+ }
ok, err := bc.verifyRandomness(
result.BlockHash, result.Position.Round, result.Randomness)
if err != nil {
@@ -656,6 +659,19 @@ func (bc *blockChain) processAgreementResult(result *types.AgreementResult) erro
if !result.Position.Newer(bc.lastDelivered.Position) {
return nil
}
- bc.pendingRandomnesses[result.Position] = result
+ bc.pendingRandomnesses[result.Position] = result.Randomness
+ bc.lastPosition = bc.lastDelivered.Position
return nil
}
+
+func (bc *blockChain) addBlockRandomness(pos types.Position, rand []byte) {
+ if pos.Round < DKGDelayRound {
+ return
+ }
+ bc.lock.Lock()
+ defer bc.lock.Unlock()
+ if !pos.Newer(bc.lastDelivered.Position) {
+ return
+ }
+ bc.pendingRandomnesses[pos] = rand
+}
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index d64c2a1..ea604a2 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -19,6 +19,7 @@ package core
import (
"fmt"
+ "math/rand"
"testing"
"time"
@@ -175,13 +176,16 @@ func (s *BlockChainTestSuite) newRoundOneInitBlock() *types.Block {
}
func (s *BlockChainTestSuite) baseConcurrentAceessTest(initBlock *types.Block,
- blocks []*types.Block, rands []*types.AgreementResult) {
+ blocks []*types.Block, results []*types.AgreementResult) {
var (
bc = s.newBlockChain(initBlock, uint64(len(blocks)+1))
start = make(chan struct{})
newNotif = make(chan struct{}, 1)
delivered []*types.Block
)
+ resultsCopy := make([]*types.AgreementResult, len(results))
+ copy(resultsCopy, results)
+ type randomnessResult types.AgreementResult
add := func(v interface{}) {
<-start
switch val := v.(type) {
@@ -192,9 +196,13 @@ func (s *BlockChainTestSuite) baseConcurrentAceessTest(initBlock *types.Block,
}
case *types.AgreementResult:
if err := bc.processAgreementResult(val); err != nil {
- // Never assertion in sub routine when testing.
- panic(err)
+ if err != ErrSkipButNoError {
+ // Never assertion in sub routine when testing.
+ panic(err)
+ }
}
+ case *randomnessResult:
+ bc.addBlockRandomness(val.Position, val.Randomness)
default:
panic(fmt.Errorf("unknown type: %v", v))
}
@@ -203,12 +211,26 @@ func (s *BlockChainTestSuite) baseConcurrentAceessTest(initBlock *types.Block,
default:
}
}
+ rand.Shuffle(len(resultsCopy), func(i, j int) {
+ resultsCopy[i], resultsCopy[j] = resultsCopy[j], resultsCopy[i]
+ })
for _, b := range blocks {
go add(b)
}
- for _, r := range rands {
- go add(r)
+ for i, r := range resultsCopy {
+ if i >= len(resultsCopy)/2 {
+ break
+ }
+ go add((*randomnessResult)(r))
}
+ go func() {
+ for i, a := range resultsCopy {
+ if i < len(resultsCopy)/2 {
+ continue
+ }
+ add(a)
+ }
+ }()
close(start)
for {
select {
@@ -257,10 +279,7 @@ func (s *BlockChainTestSuite) TestBasicUsage() {
s.Require().NoError(bc.addBlock(b0))
extracted := bc.extractBlocks()
s.Require().Len(extracted, 4)
- bc.pendingRandomnesses[b4.Position] = &types.AgreementResult{
- BlockHash: b4.Hash,
- Randomness: common.GenerateRandomBytes(),
- }
+ bc.pendingRandomnesses[b4.Position] = common.GenerateRandomBytes()
extracted = bc.extractBlocks()
s.Require().Len(extracted, 2)
s.Require().Equal(extracted[0].Hash, b4.Hash)
diff --git a/core/configuration-chain.go b/core/configuration-chain.go
index c8aac38..fbd504d 100644
--- a/core/configuration-chain.go
+++ b/core/configuration-chain.go
@@ -586,8 +586,19 @@ func (cc *configurationChain) recoverDKGInfo(
// Restore group public key.
cc.logger.Debug("Calling Governance.DKGMasterPublicKeys for recoverDKGInfo",
"round", round)
+ mpk := cc.gov.DKGMasterPublicKeys(round)
cc.logger.Debug("Calling Governance.DKGComplaints for recoverDKGInfo",
"round", round)
+ comps := cc.gov.DKGComplaints(round)
+ qualifies, _, err := typesDKG.CalcQualifyNodes(mpk, comps, threshold)
+ if err != nil {
+ return err
+ }
+ if len(qualifies) <
+ utils.GetDKGValidThreshold(utils.GetConfigWithPanic(
+ cc.gov, round, cc.logger)) {
+ return typesDKG.ErrNotReachThreshold
+ }
npks, err := typesDKG.NewNodePublicKeys(round,
cc.gov.DKGMasterPublicKeys(round),
cc.gov.DKGComplaints(round),
diff --git a/core/consensus.go b/core/consensus.go
index 9702231..e657c64 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -286,17 +286,13 @@ func (recv *consensusBAReceiver) ConfirmBlock(
IsEmptyBlock: isEmptyBlockConfirmed,
Randomness: block.Randomness,
}
+ recv.consensus.baMgr.touchAgreementResult(result)
recv.consensus.logger.Debug("Broadcast AgreementResult",
"result", result)
recv.consensus.network.BroadcastAgreementResult(result)
if block.IsEmpty() {
- if err :=
- recv.consensus.bcModule.processAgreementResult(
- result); err != nil {
- recv.consensus.logger.Warn(
- "Failed to process agreement result",
- "result", result)
- }
+ recv.consensus.bcModule.addBlockRandomness(
+ block.Position, block.Randomness)
}
if block.Position.Round >= DKGDelayRound {
recv.consensus.logger.Debug(
@@ -729,12 +725,13 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) {
// modules see the up-to-date node set, we need to make sure this action
// should be taken as the first one.
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
- defer elapse("purge-node-set", evts[len(evts)-1])()
+ defer elapse("purge-cache", evts[len(evts)-1])()
for _, e := range evts {
if e.Reset == 0 {
continue
}
con.nodeSetCache.Purge(e.Round + 1)
+ con.tsigVerifierCache.Purge(e.Round + 1)
}
})
// Register round event handler to abort previous running DKG if any.
@@ -850,13 +847,64 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) {
e := evts[len(evts)-1]
defer elapse("touch-NodeSetCache", e)()
con.event.RegisterHeight(e.NextTouchNodeSetCacheHeight(), func(uint64) {
- if err := con.nodeSetCache.Touch(e.Round + 1); err != nil {
- con.logger.Warn("Failed to update nodeSetCache",
- "round", e.Round+1,
- "error", err)
+ if e.Reset == 0 {
+ return
}
+ go func() {
+ nextRound := e.Round + 1
+ if err := con.nodeSetCache.Touch(nextRound); err != nil {
+ con.logger.Warn("Failed to update nodeSetCache",
+ "round", nextRound,
+ "error", err)
+ }
+ }()
})
})
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ e := evts[len(evts)-1]
+ if e.Reset != 0 {
+ return
+ }
+ defer elapse("touch-DKGCache", e)()
+ go func() {
+ if _, err :=
+ con.tsigVerifierCache.Update(e.Round); err != nil {
+ con.logger.Warn("Failed to update tsig cache",
+ "round", e.Round,
+ "error", err)
+ }
+ }()
+ go func() {
+ threshold := utils.GetDKGThreshold(
+ utils.GetConfigWithPanic(con.gov, e.Round, con.logger))
+ // Restore group public key.
+ con.logger.Debug(
+ "Calling Governance.DKGMasterPublicKeys for recoverDKGInfo",
+ "round", e.Round)
+ con.logger.Debug(
+ "Calling Governance.DKGComplaints for recoverDKGInfo",
+ "round", e.Round)
+ _, qualifies, err := typesDKG.CalcQualifyNodes(
+ con.gov.DKGMasterPublicKeys(e.Round),
+ con.gov.DKGComplaints(e.Round),
+ threshold)
+ if err != nil {
+ con.logger.Warn("Failed to calculate dkg set",
+ "round", e.Round,
+ "error", err)
+ return
+ }
+ if _, exist := qualifies[con.ID]; !exist {
+ return
+ }
+ if _, _, err :=
+ con.cfgModule.getDKGInfo(e.Round, true); err != nil {
+ con.logger.Warn("Failed to recover DKG info",
+ "round", e.Round,
+ "error", err)
+ }
+ }()
+ })
// checkCRS is a generator of checker to check if CRS for that round is
// ready or not.
checkCRS := func(round uint64) func() bool {
@@ -1045,12 +1093,7 @@ func (con *Consensus) generateBlockRandomness(blocks []*types.Block) {
Position: block.Position,
Randomness: sig.Signature[:],
}
- if err := con.bcModule.processAgreementResult(result); err != nil {
- con.logger.Error("Failed to process BlockRandomness",
- "result", result,
- "error", err)
- return
- }
+ con.bcModule.addBlockRandomness(block.Position, sig.Signature[:])
con.logger.Debug("Broadcast BlockRandomness",
"block", block,
"result", result)
@@ -1275,8 +1318,7 @@ MessageLoop:
// ProcessVote is the entry point to submit ont vote to a Consensus instance.
func (con *Consensus) ProcessVote(vote *types.Vote) (err error) {
- v := vote.Clone()
- err = con.baMgr.processVote(v)
+ err = con.baMgr.processVote(vote)
return
}
@@ -1293,6 +1335,9 @@ func (con *Consensus) ProcessAgreementResult(
}
if err := con.bcModule.processAgreementResult(rand); err != nil {
con.baMgr.untouchAgreementResult(rand)
+ if err == ErrSkipButNoError {
+ return nil
+ }
return err
}
// Syncing BA Module.
diff --git a/core/dkg-tsig-protocol.go b/core/dkg-tsig-protocol.go
index 9a71bea..e4ae14c 100644
--- a/core/dkg-tsig-protocol.go
+++ b/core/dkg-tsig-protocol.go
@@ -555,7 +555,7 @@ func (tc *TSigVerifierCache) UpdateAndGet(round uint64) (
return v, ok, nil
}
-// Purge the cache and returns if success.
+// Purge the cache.
func (tc *TSigVerifierCache) Purge(round uint64) {
tc.lock.Lock()
defer tc.lock.Unlock()
diff --git a/core/utils/round-event.go b/core/utils/round-event.go
index 472c724..602d2da 100644
--- a/core/utils/round-event.go
+++ b/core/utils/round-event.go
@@ -79,7 +79,7 @@ func (e RoundEventParam) NextRoundHeight() uint64 {
// NextTouchNodeSetCacheHeight returns the height to touch the node set cache.
func (e RoundEventParam) NextTouchNodeSetCacheHeight() uint64 {
- return e.BeginHeight + e.Config.RoundLength*9/10
+ return e.BeginHeight + e.Config.RoundLength/2
}
// NextDKGResetHeight returns the height to reset DKG for next period.