aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2018-10-18 14:48:05 +0800
committerGitHub <noreply@github.com>2018-10-18 14:48:05 +0800
commit8303e9d054957195717f41804a456e2720b0c4bb (patch)
tree8ad2b0ead5391ec6de2c0a39d75b679db87af6b0 /core
parenteae2d201e927c774f2f409f09fa132e4678f540c (diff)
downloaddexon-consensus-8303e9d054957195717f41804a456e2720b0c4bb.tar
dexon-consensus-8303e9d054957195717f41804a456e2720b0c4bb.tar.gz
dexon-consensus-8303e9d054957195717f41804a456e2720b0c4bb.tar.bz2
dexon-consensus-8303e9d054957195717f41804a456e2720b0c4bb.tar.lz
dexon-consensus-8303e9d054957195717f41804a456e2720b0c4bb.tar.xz
dexon-consensus-8303e9d054957195717f41804a456e2720b0c4bb.tar.zst
dexon-consensus-8303e9d054957195717f41804a456e2720b0c4bb.zip
core: sync compaction chain (#222)
Diffstat (limited to 'core')
-rw-r--r--core/compaction-chain.go114
-rw-r--r--core/compaction-chain_test.go107
-rw-r--r--core/consensus.go20
-rw-r--r--core/dkg-tsig-protocol.go85
-rw-r--r--core/dkg-tsig-protocol_test.go61
-rw-r--r--core/types/block.go31
6 files changed, 391 insertions, 27 deletions
diff --git a/core/compaction-chain.go b/core/compaction-chain.go
index ac8c458..6511727 100644
--- a/core/compaction-chain.go
+++ b/core/compaction-chain.go
@@ -18,6 +18,7 @@
package core
import (
+ "container/heap"
"fmt"
"sync"
@@ -34,20 +35,27 @@ var (
"not initialized")
)
+type finalizedBlockHeap = types.ByFinalizationHeight
+
type compactionChain struct {
gov Governance
+ tsigVerifier *TSigVerifierCache
blocks map[common.Hash]*types.Block
pendingBlocks []*types.Block
- pendingFinalizedBlocks []*types.Block
+ pendingFinalizedBlocks *finalizedBlockHeap
blocksLock sync.RWMutex
prevBlockLock sync.RWMutex
prevBlock *types.Block
}
func newCompactionChain(gov Governance) *compactionChain {
+ pendingFinalizedBlocks := &finalizedBlockHeap{}
+ heap.Init(pendingFinalizedBlocks)
return &compactionChain{
- gov: gov,
- blocks: make(map[common.Hash]*types.Block),
+ gov: gov,
+ tsigVerifier: NewTSigVerifierCache(gov, 7),
+ blocks: make(map[common.Hash]*types.Block),
+ pendingFinalizedBlocks: pendingFinalizedBlocks,
}
}
@@ -88,49 +96,103 @@ func (cc *compactionChain) processBlock(block *types.Block) error {
return nil
}
-func (cc *compactionChain) processFinalizedBlock(block *types.Block) (
- []*types.Block, error) {
+func (cc *compactionChain) processFinalizedBlock(block *types.Block) {
+ if block.Finalization.Height <= cc.lastBlock().Finalization.Height {
+ return
+ }
+
+ cc.blocksLock.Lock()
+ defer cc.blocksLock.Unlock()
+ heap.Push(cc.pendingFinalizedBlocks, block)
+
+ return
+}
+
+func (cc *compactionChain) extractFinalizedBlocks() []*types.Block {
+ prevBlock := cc.lastBlock()
+
blocks := func() []*types.Block {
cc.blocksLock.Lock()
defer cc.blocksLock.Unlock()
- blocks := cc.pendingFinalizedBlocks
- cc.pendingFinalizedBlocks = []*types.Block{}
+ blocks := []*types.Block{}
+ prevHeight := prevBlock.Finalization.Height
+ for cc.pendingFinalizedBlocks.Len() != 0 {
+ tip := (*cc.pendingFinalizedBlocks)[0]
+ // Pop blocks that are already confirmed.
+ if tip.Finalization.Height <= prevBlock.Finalization.Height {
+ heap.Pop(cc.pendingFinalizedBlocks)
+ continue
+ }
+ // Since we haven't verified the finalized block,
+ // it is possible to be forked.
+ if tip.Finalization.Height == prevHeight ||
+ tip.Finalization.Height == prevHeight+1 {
+ prevHeight = tip.Finalization.Height
+ blocks = append(blocks, tip)
+ heap.Pop(cc.pendingFinalizedBlocks)
+ } else {
+ break
+ }
+ }
return blocks
}()
- threshold := make(map[uint64]int)
- gpks := make(map[uint64]*DKGGroupPublicKey)
toPending := []*types.Block{}
confirmed := []*types.Block{}
- blocks = append(blocks, block)
for _, b := range blocks {
- if !cc.gov.IsDKGFinal(b.Position.Round) {
- toPending = append(toPending, b)
+ if b.Hash == prevBlock.Hash &&
+ b.Finalization.Height == prevBlock.Finalization.Height {
continue
}
round := b.Position.Round
- if _, exist := gpks[round]; !exist {
- threshold[round] = int(cc.gov.Configuration(round).DKGSetSize)/3 + 1
- var err error
- gpks[round], err = NewDKGGroupPublicKey(
- round,
- cc.gov.DKGMasterPublicKeys(round), cc.gov.DKGComplaints(round),
- threshold[round])
- if err != nil {
- continue
- }
+ v, ok, err := cc.tsigVerifier.UpdateAndGet(round)
+ if err != nil {
+ continue
+ }
+ if !ok {
+ toPending = append(toPending, b)
+ continue
}
- gpk := gpks[round]
- if ok := gpk.VerifySignature(b.Hash, crypto.Signature{
+ if ok := v.VerifySignature(b.Hash, crypto.Signature{
Type: "bls",
Signature: b.Finalization.Randomness}); !ok {
continue
}
+ // Fork resolution: choose block with smaller hash.
+ if prevBlock.Finalization.Height ==
+ b.Finalization.Height {
+ //TODO(jimmy-dexon): remove this panic after test.
+ if true {
+ // workaround to `go vet` error
+ panic(fmt.Errorf(
+ "forked finalized block %s,%s", prevBlock.Hash, b.Hash))
+ }
+ if b.Hash.Less(prevBlock.Hash) {
+ confirmed = confirmed[:len(confirmed)-1]
+ } else {
+ continue
+ }
+ }
+ if b.Finalization.Height-prevBlock.Finalization.Height > 1 {
+ toPending = append(toPending, b)
+ continue
+ }
confirmed = append(confirmed, b)
+ prevBlock = b
}
+ func() {
+ if len(confirmed) == 0 {
+ return
+ }
+ cc.prevBlockLock.Lock()
+ defer cc.prevBlockLock.Unlock()
+ cc.prevBlock = prevBlock
+ }()
cc.blocksLock.Lock()
defer cc.blocksLock.Unlock()
- cc.pendingFinalizedBlocks = append(cc.pendingFinalizedBlocks, toPending...)
- return confirmed, nil
+ for _, b := range toPending {
+ heap.Push(cc.pendingFinalizedBlocks, b)
+ }
+ return confirmed
}
func (cc *compactionChain) extractBlocks() []*types.Block {
diff --git a/core/compaction-chain_test.go b/core/compaction-chain_test.go
index bffe57a..f7aff8a 100644
--- a/core/compaction-chain_test.go
+++ b/core/compaction-chain_test.go
@@ -22,6 +22,7 @@ import (
"time"
"github.com/dexon-foundation/dexon-consensus-core/common"
+ "github.com/dexon-foundation/dexon-consensus-core/core/crypto"
"github.com/dexon-foundation/dexon-consensus-core/core/test"
"github.com/dexon-foundation/dexon-consensus-core/core/types"
"github.com/stretchr/testify/suite"
@@ -198,6 +199,112 @@ func (s *CompactionChainTestSuite) TestExtractBlocksRound0() {
}
}
+type mockTSigVerifier struct {
+ defaultRet bool
+ ret map[common.Hash]bool
+}
+
+func newMockTSigVerifier(defaultRet bool) *mockTSigVerifier {
+ return &mockTSigVerifier{
+ defaultRet: defaultRet,
+ ret: make(map[common.Hash]bool),
+ }
+}
+
+func (m *mockTSigVerifier) VerifySignature(
+ hash common.Hash, _ crypto.Signature) bool {
+ if ret, exist := m.ret[hash]; exist {
+ return ret
+ }
+ return m.defaultRet
+}
+
+func (s *CompactionChainTestSuite) TestSyncFinalizedBlock() {
+ cc := s.newCompactionChain()
+ mock := newMockTSigVerifier(true)
+ for i := 0; i < cc.tsigVerifier.cacheSize; i++ {
+ cc.tsigVerifier.verifier[uint64(i)] = mock
+ }
+ now := time.Now().UTC()
+ blocks := make([]*types.Block, 10)
+ for idx := range blocks {
+ blocks[idx] = &types.Block{
+ Hash: common.NewRandomHash(),
+ Finalization: types.FinalizationResult{
+ Timestamp: now,
+ Height: uint64(idx + 1),
+ },
+ }
+ now = now.Add(100 * time.Millisecond)
+ }
+ cc.processFinalizedBlock(blocks[1])
+ cc.processFinalizedBlock(blocks[3])
+ s.Len(cc.extractFinalizedBlocks(), 0)
+
+ cc.processFinalizedBlock(blocks[0])
+ confirmed := cc.extractFinalizedBlocks()
+ s.Equal(blocks[1].Hash, cc.lastBlock().Hash)
+ s.Require().Len(confirmed, 2)
+ s.Equal(confirmed[0].Hash, blocks[0].Hash)
+ s.Equal(confirmed[1].Hash, blocks[1].Hash)
+ hash := common.NewRandomHash()
+ cc.processFinalizedBlock(&types.Block{
+ Hash: hash,
+ Finalization: types.FinalizationResult{
+ Height: uint64(3),
+ },
+ })
+ // Should not deliver block with error tsig
+ mock.ret[hash] = false
+ s.Len(cc.extractFinalizedBlocks(), 0)
+ // The error block should be discarded.
+ s.Len(cc.extractFinalizedBlocks(), 0)
+
+ // Shuold not deliver block if dkg is not final
+ round99 := uint64(99)
+ s.Require().False(cc.gov.IsDKGFinal(round99))
+ blocks[2].Position.Round = round99
+ cc.processFinalizedBlock(blocks[2])
+ s.Len(cc.extractFinalizedBlocks(), 0)
+
+ // Deliver blocks.
+ blocks[3].Position.Round = round99
+ cc.tsigVerifier.verifier[round99] = mock
+ confirmed = cc.extractFinalizedBlocks()
+ s.Equal(blocks[3].Hash, cc.lastBlock().Hash)
+ s.Require().Len(confirmed, 2)
+ s.Equal(confirmed[0].Hash, blocks[2].Hash)
+ s.Equal(confirmed[1].Hash, blocks[3].Hash)
+
+ // Inserting a bad block. The later block should not be discarded.
+ cc.processFinalizedBlock(blocks[5])
+ cc.processFinalizedBlock(&types.Block{
+ Hash: hash,
+ Finalization: types.FinalizationResult{
+ Height: uint64(5),
+ },
+ })
+ s.Len(cc.extractFinalizedBlocks(), 0)
+ // Good block is inserted, the later block should be delivered.
+ cc.processFinalizedBlock(blocks[4])
+ confirmed = cc.extractFinalizedBlocks()
+ s.Equal(blocks[5].Hash, cc.lastBlock().Hash)
+ s.Require().Len(confirmed, 2)
+ s.Equal(confirmed[0].Hash, blocks[4].Hash)
+ s.Equal(confirmed[1].Hash, blocks[5].Hash)
+
+ // Ignore finalized block if it already confirmed.
+ cc.processFinalizedBlock(blocks[6])
+ s.Require().NoError(cc.processBlock(blocks[6]))
+ s.Equal(blocks[6].Hash, cc.lastBlock().Hash)
+ s.Require().Len(cc.extractFinalizedBlocks(), 0)
+ s.Require().NoError(cc.processBlock(blocks[7]))
+ cc.processFinalizedBlock(blocks[7])
+ s.Len(*cc.pendingFinalizedBlocks, 0)
+ s.Equal(blocks[7].Hash, cc.lastBlock().Hash)
+ s.Require().Len(cc.extractFinalizedBlocks(), 0)
+}
+
func TestCompactionChain(t *testing.T) {
suite.Run(t, new(CompactionChainTestSuite))
}
diff --git a/core/consensus.go b/core/consensus.go
index 3601720..d3f8b5d 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -740,10 +740,28 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
// processFinalizedBlock is the entry point for syncing blocks.
func (con *Consensus) processFinalizedBlock(block *types.Block) (err error) {
- // TODO(jimmy-dexon): drop block that is already in compaction chain.
if err = con.lattice.SanityCheck(block, false); err != nil {
return
}
+ con.ccModule.processFinalizedBlock(block)
+ for {
+ confirmed := con.ccModule.extractFinalizedBlocks()
+ if len(confirmed) == 0 {
+ break
+ }
+ if err = con.lattice.ctModule.processBlocks(confirmed); err != nil {
+ return
+ }
+ for _, b := range confirmed {
+ if err = con.db.Put(*b); err != nil {
+ if err != blockdb.ErrBlockExists {
+ return
+ }
+ err = nil
+ }
+ con.nbModule.BlockDelivered(b.Hash, b.Finalization)
+ }
+ }
return
}
diff --git a/core/dkg-tsig-protocol.go b/core/dkg-tsig-protocol.go
index 795f6d3..1b5f547 100644
--- a/core/dkg-tsig-protocol.go
+++ b/core/dkg-tsig-protocol.go
@@ -19,6 +19,7 @@ package core
import (
"fmt"
+ "sync"
"github.com/dexon-foundation/dexon-consensus-core/common"
"github.com/dexon-foundation/dexon-consensus-core/core/crypto"
@@ -46,6 +47,8 @@ var (
"incorrect partialSignature")
ErrNotEnoughtPartialSignatures = fmt.Errorf(
"not enough of partial signatures")
+ ErrRoundAlreadyPurged = fmt.Errorf(
+ "cache of round already been purged")
)
type dkgReceiver interface {
@@ -95,6 +98,20 @@ type DKGGroupPublicKey struct {
threshold int
}
+// TSigVerifier is the interface verifying threshold signature.
+type TSigVerifier interface {
+ VerifySignature(hash common.Hash, sig crypto.Signature) bool
+}
+
+// TSigVerifierCache is the cache for TSigVerifier.
+type TSigVerifierCache struct {
+ gov Governance
+ verifier map[uint64]TSigVerifier
+ minRound uint64
+ cacheSize int
+ lock sync.RWMutex
+}
+
type tsigProtocol struct {
groupPublicKey *DKGGroupPublicKey
hash common.Hash
@@ -405,6 +422,74 @@ func (gpk *DKGGroupPublicKey) VerifySignature(
return gpk.groupPublicKey.VerifySignature(hash, sig)
}
+// NewTSigVerifierCache creats a DKGGroupPublicKey instance.
+func NewTSigVerifierCache(gov Governance, cacheSize int) *TSigVerifierCache {
+ return &TSigVerifierCache{
+ gov: gov,
+ verifier: make(map[uint64]TSigVerifier),
+ cacheSize: cacheSize,
+ }
+}
+
+// UpdateAndGet calls Update and then Get.
+func (tc *TSigVerifierCache) UpdateAndGet(round uint64) (
+ TSigVerifier, bool, error) {
+ ok, err := tc.Update(round)
+ if err != nil {
+ return nil, false, err
+ }
+ if !ok {
+ return nil, false, nil
+ }
+ v, ok := tc.Get(round)
+ return v, ok, nil
+}
+
+// Update the cache and returns if success.
+func (tc *TSigVerifierCache) Update(round uint64) (bool, error) {
+ tc.lock.Lock()
+ defer tc.lock.Unlock()
+ if round < tc.minRound {
+ return false, ErrRoundAlreadyPurged
+ }
+ if _, exist := tc.verifier[round]; exist {
+ return true, nil
+ }
+ if !tc.gov.IsDKGFinal(round) {
+ return false, nil
+ }
+ gpk, err := NewDKGGroupPublicKey(round,
+ tc.gov.DKGMasterPublicKeys(round),
+ tc.gov.DKGComplaints(round),
+ int(tc.gov.Configuration(round).DKGSetSize/3)+1)
+ if err != nil {
+ return false, err
+ }
+ if len(tc.verifier) == 0 {
+ tc.minRound = round
+ }
+ tc.verifier[round] = gpk
+ if len(tc.verifier) > tc.cacheSize {
+ delete(tc.verifier, tc.minRound)
+ }
+ for {
+ if _, exist := tc.verifier[tc.minRound]; !exist {
+ tc.minRound++
+ } else {
+ break
+ }
+ }
+ return true, nil
+}
+
+// Get the TSigVerifier of round and returns if it exists.
+func (tc *TSigVerifierCache) Get(round uint64) (TSigVerifier, bool) {
+ tc.lock.RLock()
+ defer tc.lock.RUnlock()
+ verifier, exist := tc.verifier[round]
+ return verifier, exist
+}
+
func newTSigProtocol(
gpk *DKGGroupPublicKey,
hash common.Hash) *tsigProtocol {
diff --git a/core/dkg-tsig-protocol_test.go b/core/dkg-tsig-protocol_test.go
index d03d811..e533a2f 100644
--- a/core/dkg-tsig-protocol_test.go
+++ b/core/dkg-tsig-protocol_test.go
@@ -635,6 +635,67 @@ func (s *DKGTSIGProtocolTestSuite) TestProposeFinalize() {
Round: 1,
}, final)
}
+
+func (s *DKGTSIGProtocolTestSuite) TestTSigVerifierCache() {
+ k := 3
+ n := 10
+ gov, err := test.NewGovernance(n, 100)
+ s.Require().NoError(err)
+ for i := 0; i < 10; i++ {
+ round := uint64(i + 1)
+ receivers, protocols := s.newProtocols(k, n, round)
+
+ for _, receiver := range receivers {
+ gov.AddDKGMasterPublicKey(round, receiver.mpk)
+ }
+
+ for _, protocol := range protocols {
+ protocol.proposeFinalize()
+ }
+
+ for _, recv := range receivers {
+ s.Require().Len(recv.final, 1)
+ gov.AddDKGFinalize(recv.final[0].Round, recv.final[0])
+ }
+ s.Require().True(gov.IsDKGFinal(round))
+ }
+
+ cache := NewTSigVerifierCache(gov, 3)
+ for i := 0; i < 5; i++ {
+ round := uint64(i + 1)
+ ok, err := cache.Update(round)
+ s.Require().NoError(err)
+ s.True(ok)
+ }
+ s.Len(cache.verifier, 3)
+
+ for i := 0; i < 2; i++ {
+ round := uint64(i + 1)
+ _, exist := cache.Get(round)
+ s.False(exist)
+ }
+
+ for i := 3; i < 5; i++ {
+ round := uint64(i + 1)
+ _, exist := cache.Get(round)
+ s.True(exist)
+ }
+
+ ok, err := cache.Update(uint64(1))
+ s.Require().Equal(ErrRoundAlreadyPurged, err)
+
+ cache = NewTSigVerifierCache(gov, 1)
+ ok, err = cache.Update(uint64(3))
+ s.Require().NoError(err)
+ s.Require().True(ok)
+ s.Equal(uint64(3), cache.minRound)
+
+ ok, err = cache.Update(uint64(5))
+ s.Require().NoError(err)
+ s.Require().True(ok)
+ s.Equal(uint64(5), cache.minRound)
+}
+
func TestDKGTSIGProtocol(t *testing.T) {
suite.Run(t, new(DKGTSIGProtocolTestSuite))
}
diff --git a/core/types/block.go b/core/types/block.go
index bf27934..e384f95 100644
--- a/core/types/block.go
+++ b/core/types/block.go
@@ -175,3 +175,34 @@ func (bs *ByHeight) Pop() (ret interface{}) {
*bs, ret = (*bs)[0:n-1], (*bs)[n-1]
return
}
+
+// ByFinalizationHeight is the helper type for sorting slice of blocks by
+// finalization height.
+type ByFinalizationHeight []*Block
+
+// Len implements Len method in sort.Sort interface.
+func (bs ByFinalizationHeight) Len() int {
+ return len(bs)
+}
+
+// Less implements Less method in sort.Sort interface.
+func (bs ByFinalizationHeight) Less(i int, j int) bool {
+ return bs[i].Finalization.Height < bs[j].Finalization.Height
+}
+
+// Swap implements Swap method in sort.Sort interface.
+func (bs ByFinalizationHeight) Swap(i int, j int) {
+ bs[i], bs[j] = bs[j], bs[i]
+}
+
+// Push implements Push method in heap interface.
+func (bs *ByFinalizationHeight) Push(x interface{}) {
+ *bs = append(*bs, x.(*Block))
+}
+
+// Pop implements Pop method in heap interface.
+func (bs *ByFinalizationHeight) Pop() (ret interface{}) {
+ n := len(*bs)
+ *bs, ret = (*bs)[0:n-1], (*bs)[n-1]
+ return
+}