aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2018-10-26 13:22:29 +0800
committerGitHub <noreply@github.com>2018-10-26 13:22:29 +0800
commit721d36f2720b0cbf648cbffe40fd05c9a60061e4 (patch)
tree05d643695b20cc9f430869cf2105c177856625bc /core
parentef0df0e6e27acd3852f2e0efdccf0798d5fc63ad (diff)
downloadtangerine-consensus-721d36f2720b0cbf648cbffe40fd05c9a60061e4.tar
tangerine-consensus-721d36f2720b0cbf648cbffe40fd05c9a60061e4.tar.gz
tangerine-consensus-721d36f2720b0cbf648cbffe40fd05c9a60061e4.tar.bz2
tangerine-consensus-721d36f2720b0cbf648cbffe40fd05c9a60061e4.tar.lz
tangerine-consensus-721d36f2720b0cbf648cbffe40fd05c9a60061e4.tar.xz
tangerine-consensus-721d36f2720b0cbf648cbffe40fd05c9a60061e4.tar.zst
tangerine-consensus-721d36f2720b0cbf648cbffe40fd05c9a60061e4.zip
core: Pull block (#263)
Diffstat (limited to 'core')
-rw-r--r--core/agreement-state_test.go2
-rw-r--r--core/agreement.go16
-rw-r--r--core/agreement_test.go35
-rw-r--r--core/consensus.go88
-rw-r--r--core/consensus_test.go4
-rw-r--r--core/interfaces.go3
6 files changed, 116 insertions, 32 deletions
diff --git a/core/agreement-state_test.go b/core/agreement-state_test.go
index 37d9ae2..429e124 100644
--- a/core/agreement-state_test.go
+++ b/core/agreement-state_test.go
@@ -57,6 +57,8 @@ func (r *agreementStateTestReceiver) ConfirmBlock(block common.Hash,
r.s.confirmChan <- block
}
+func (r *agreementStateTestReceiver) PullBlocks(common.Hashes) {}
+
func (s *AgreementStateTestSuite) proposeBlock(
leader *leaderSelector) *types.Block {
block := &types.Block{
diff --git a/core/agreement.go b/core/agreement.go
index 8618b5f..cc7af4f 100644
--- a/core/agreement.go
+++ b/core/agreement.go
@@ -69,6 +69,7 @@ type agreementReceiver interface {
ProposeVote(vote *types.Vote)
ProposeBlock() common.Hash
ConfirmBlock(common.Hash, map[types.NodeID]*types.Vote)
+ PullBlocks(common.Hashes)
}
type pendingBlock struct {
@@ -327,6 +328,21 @@ func (a *agreement) processVote(vote *types.Vote) error {
// Condition 3.
if vote.Type == types.VoteCom && vote.Period >= a.data.period &&
len(a.data.votes[vote.Period][types.VoteCom]) >= a.data.requiredVote {
+ hashes := common.Hashes{}
+ addPullBlocks := func(voteType types.VoteType) {
+ for _, vote := range a.data.votes[vote.Period][voteType] {
+ if vote.BlockHash == nullBlockHash || vote.BlockHash == skipBlockHash {
+ continue
+ }
+ if _, found := a.findCandidateBlock(vote.BlockHash); !found {
+ hashes = append(hashes, vote.BlockHash)
+ }
+ }
+ }
+ addPullBlocks(types.VoteInit)
+ addPullBlocks(types.VotePreCom)
+ addPullBlocks(types.VoteCom)
+ a.data.recv.PullBlocks(hashes)
a.fastForward <- vote.Period + 1
return nil
}
diff --git a/core/agreement_test.go b/core/agreement_test.go
index 3b97a10..e4e05cd 100644
--- a/core/agreement_test.go
+++ b/core/agreement_test.go
@@ -47,6 +47,13 @@ func (r *agreementTestReceiver) ConfirmBlock(block common.Hash,
r.s.confirmChan <- block
}
+func (r *agreementTestReceiver) PullBlocks(hashes common.Hashes) {
+ for _, hash := range hashes {
+ r.s.pulledBlocks[hash] = struct{}{}
+ }
+
+}
+
func (s *AgreementTestSuite) proposeBlock(
agreementIdx int) *types.Block {
block := &types.Block{
@@ -61,13 +68,14 @@ func (s *AgreementTestSuite) proposeBlock(
type AgreementTestSuite struct {
suite.Suite
- ID types.NodeID
- auths map[types.NodeID]*Authenticator
- voteChan chan *types.Vote
- blockChan chan common.Hash
- confirmChan chan common.Hash
- block map[common.Hash]*types.Block
- agreement []*agreement
+ ID types.NodeID
+ auths map[types.NodeID]*Authenticator
+ voteChan chan *types.Vote
+ blockChan chan common.Hash
+ confirmChan chan common.Hash
+ block map[common.Hash]*types.Block
+ pulledBlocks map[common.Hash]struct{}
+ agreement []*agreement
}
func (s *AgreementTestSuite) SetupTest() {
@@ -81,6 +89,7 @@ func (s *AgreementTestSuite) SetupTest() {
s.blockChan = make(chan common.Hash, 100)
s.confirmChan = make(chan common.Hash, 100)
s.block = make(map[common.Hash]*types.Block)
+ s.pulledBlocks = make(map[common.Hash]struct{})
}
func (s *AgreementTestSuite) newAgreement(numNotarySet int) *agreement {
@@ -265,13 +274,15 @@ func (s *AgreementTestSuite) TestFastForwardCond2() {
}
func (s *AgreementTestSuite) TestFastForwardCond3() {
- votes := 0
+ numVotes := 0
+ votes := []*types.Vote{}
a := s.newAgreement(4)
a.data.period = 1
for nID := range a.notarySet {
vote := s.prepareVote(nID, types.VoteCom, common.NewRandomHash(), uint64(2))
+ votes = append(votes, vote)
s.Require().NoError(a.processVote(vote))
- if votes++; votes == 3 {
+ if numVotes++; numVotes == 3 {
break
}
}
@@ -282,6 +293,12 @@ func (s *AgreementTestSuite) TestFastForwardCond3() {
s.FailNow("Expecting fast forward.")
}
s.Equal(uint64(3), a.data.period)
+
+ s.Len(s.pulledBlocks, 3)
+ for _, vote := range votes {
+ _, exist := s.pulledBlocks[vote.BlockHash]
+ s.True(exist)
+ }
}
func (s *AgreementTestSuite) TestDecide() {
diff --git a/core/consensus.go b/core/consensus.go
index 3e9c816..bd311ba 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -113,8 +113,20 @@ func (recv *consensusBAReceiver) ConfirmBlock(
block, exist = recv.consensus.baModules[recv.chainID].
findCandidateBlock(hash)
if !exist {
- recv.consensus.logger.Error("Unknown block confirmed", "hash", hash)
- return
+ recv.consensus.logger.Error("Unknown block confirmed",
+ "hash", hash,
+ "chainID", recv.chainID)
+ ch := make(chan *types.Block)
+ func() {
+ recv.consensus.lock.Lock()
+ defer recv.consensus.lock.Unlock()
+ recv.consensus.baConfirmedBlock[hash] = ch
+ }()
+ recv.consensus.network.PullBlocks(common.Hashes{hash})
+ block = <-ch
+ recv.consensus.logger.Info("Receive unknown block",
+ "hash", hash,
+ "chainID", recv.chainID)
}
}
recv.consensus.ccModule.registerBlock(block)
@@ -145,6 +157,11 @@ func (recv *consensusBAReceiver) ConfirmBlock(
}
}
+func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) {
+ recv.consensus.logger.Debug("Calling Network.PullBlocks", "hashes", hashes)
+ recv.consensus.network.PullBlocks(hashes)
+}
+
// consensusDKGReceiver implements dkgReceiver.
type consensusDKGReceiver struct {
ID types.NodeID
@@ -236,8 +253,9 @@ type Consensus struct {
currentConfig *types.Config
// BA.
- baModules []*agreement
- receivers []*consensusBAReceiver
+ baModules []*agreement
+ receivers []*consensusBAReceiver
+ baConfirmedBlock map[common.Hash]chan<- *types.Block
// DKG.
dkgRunning int32
@@ -318,23 +336,24 @@ func NewConsensus(
recv.cfgModule = cfgModule
// Construct Consensus instance.
con := &Consensus{
- ID: ID,
- currentConfig: config,
- ccModule: newCompactionChain(gov),
- lattice: lattice,
- app: app,
- gov: gov,
- db: db,
- network: network,
- tickerObj: newTicker(gov, round, TickerBA),
- dkgReady: sync.NewCond(&sync.Mutex{}),
- cfgModule: cfgModule,
- dMoment: dMoment,
- nodeSetCache: nodeSetCache,
- authModule: authModule,
- event: common.NewEvent(),
- logger: logger,
- roundToNotify: roundToNotify,
+ ID: ID,
+ currentConfig: config,
+ ccModule: newCompactionChain(gov),
+ lattice: lattice,
+ app: app,
+ gov: gov,
+ db: db,
+ network: network,
+ tickerObj: newTicker(gov, round, TickerBA),
+ baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
+ dkgReady: sync.NewCond(&sync.Mutex{}),
+ cfgModule: cfgModule,
+ dMoment: dMoment,
+ nodeSetCache: nodeSetCache,
+ authModule: authModule,
+ event: common.NewEvent(),
+ logger: logger,
+ roundToNotify: roundToNotify,
}
validLeader := func(block *types.Block) bool {
@@ -612,6 +631,7 @@ func (con *Consensus) Stop() {
}
func (con *Consensus) processMsg(msgChan <-chan interface{}) {
+MessageLoop:
for {
var msg interface{}
select {
@@ -622,8 +642,30 @@ func (con *Consensus) processMsg(msgChan <-chan interface{}) {
switch val := msg.(type) {
case *types.Block:
- // For sync mode.
- if val.IsFinalized() {
+ if ch, exist := func() (chan<- *types.Block, bool) {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ ch, e := con.baConfirmedBlock[val.Hash]
+ return ch, e
+ }(); exist {
+ if err := con.lattice.SanityCheck(val); err != nil {
+ if err == ErrRetrySanityCheckLater {
+ err = nil
+ } else {
+ con.logger.Error("SanityCheck failed", "error", err)
+ continue MessageLoop
+ }
+ }
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ // In case of multiple delivered block.
+ if _, exist := con.baConfirmedBlock[val.Hash]; !exist {
+ continue MessageLoop
+ }
+ delete(con.baConfirmedBlock, val.Hash)
+ ch <- val
+ } else if val.IsFinalized() {
+ // For sync mode.
if err := con.processFinalizedBlock(val); err != nil {
con.logger.Error("Failed to process finalized block",
"error", err)
diff --git a/core/consensus_test.go b/core/consensus_test.go
index ed98b76..b8d25d9 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -37,6 +37,10 @@ type network struct {
conn *networkConnection
}
+// PullBlocks tries to pull blocks from the DEXON network.
+func (n *network) PullBlocks(common.Hashes) {
+}
+
// BroadcastVote broadcasts vote to all nodes in DEXON network.
func (n *network) BroadcastVote(vote *types.Vote) {
n.conn.broadcast(n.nID, vote)
diff --git a/core/interfaces.go b/core/interfaces.go
index 01e9096..4f6ad45 100644
--- a/core/interfaces.go
+++ b/core/interfaces.go
@@ -59,6 +59,9 @@ type Debug interface {
// Network describs the network interface that interacts with DEXON consensus
// core.
type Network interface {
+ // PullBlocks tries to pull blocks from the DEXON network.
+ PullBlocks(hashes common.Hashes)
+
// BroadcastVote broadcasts vote to all nodes in DEXON network.
BroadcastVote(vote *types.Vote)