diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2018-12-26 14:24:16 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-12-26 14:24:16 +0800 |
commit | 00416c9df2fec5398389863fb6f885a1fe11a6cc (patch) | |
tree | 74d260ac8a8c0e2ab782be1814b49b4232f56dc8 /core | |
parent | d333dc1a24df26ae8e8e3ffa2d700c1116a93ba2 (diff) | |
download | dexon-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar dexon-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar.gz dexon-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar.bz2 dexon-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar.lz dexon-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar.xz dexon-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar.zst dexon-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.zip |
core: pull block random (#384)
* Add PullRandomness to interface
* Add pendingBlocksWithoutRandomness to compactionChain
* Pull randomness every 1 second
Diffstat (limited to 'core')
-rw-r--r-- | core/compaction-chain.go | 12 | ||||
-rw-r--r-- | core/compaction-chain_test.go | 8 | ||||
-rw-r--r-- | core/consensus.go | 56 | ||||
-rw-r--r-- | core/consensus_test.go | 4 | ||||
-rw-r--r-- | core/interfaces.go | 3 | ||||
-rw-r--r-- | core/test/network.go | 178 |
6 files changed, 206 insertions, 55 deletions
diff --git a/core/compaction-chain.go b/core/compaction-chain.go index 14e3b26..8e04429 100644 --- a/core/compaction-chain.go +++ b/core/compaction-chain.go @@ -257,3 +257,15 @@ func (cc *compactionChain) lastPendingBlock() *types.Block { } return nil } + +func (cc *compactionChain) pendingBlocksWithoutRandomness() ( + hashes common.Hashes) { + cc.lock.RLock() + defer cc.lock.RUnlock() + for _, block := range cc.pendingBlocks { + if _, exist := cc.blockRandomness[block.Hash]; !exist { + hashes = append(hashes, block.Hash) + } + } + return +} diff --git a/core/compaction-chain_test.go b/core/compaction-chain_test.go index e944e4d..ca88734 100644 --- a/core/compaction-chain_test.go +++ b/core/compaction-chain_test.go @@ -201,6 +201,7 @@ func (s *CompactionChainTestSuite) TestMissedRandomness() { cc.registerBlock(blocks[idx]) s.Require().True(cc.blockRegistered(blocks[idx].Hash)) } + noRandBlocks := common.Hashes{} // Block#4, #5, contains randomness. for i := range blocks { s.Require().NoError(cc.processBlock(blocks[i])) @@ -212,8 +213,11 @@ func (s *CompactionChainTestSuite) TestMissedRandomness() { Position: blocks[i].Position, Randomness: h[:], })) + } else { + noRandBlocks = append(noRandBlocks, blocks[i].Hash) } } + s.Equal(noRandBlocks, cc.pendingBlocksWithoutRandomness()) s.Require().Len(cc.extractBlocks(), 0) // Give compactionChain module randomnessResult via finalized block // #0, #1, #2, #3, #4. @@ -227,6 +231,9 @@ func (s *CompactionChainTestSuite) TestMissedRandomness() { block.Finalization.Height = uint64(i + 1) cc.processFinalizedBlock(block) } + // Block #0-3 has randomness result. + noRandBlocks = noRandBlocks[4:] + s.Equal(noRandBlocks, cc.pendingBlocksWithoutRandomness()) delivered := cc.extractBlocks() s.Require().Len(delivered, 6) // Give compactionChain module randomnessResult#6-9. @@ -239,6 +246,7 @@ func (s *CompactionChainTestSuite) TestMissedRandomness() { Randomness: h[:], })) } + s.Len(cc.pendingBlocksWithoutRandomness(), 0) delivered = append(delivered, cc.extractBlocks()...) s.Require().Len(delivered, 10) // The delivered order should be the same as processing order. diff --git a/core/consensus.go b/core/consensus.go index 0d4a38a..7f77df9 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -387,6 +387,7 @@ type Consensus struct { event *common.Event logger common.Logger nonFinalizedBlockDelivered bool + resetRandomnessTicker chan struct{} } // NewConsensus construct an Consensus instance. @@ -466,22 +467,23 @@ func newConsensus( } // Construct Consensus instance. con := &Consensus{ - ID: ID, - ccModule: newCompactionChain(gov), - lattice: lattice, - app: appModule, - debugApp: debugApp, - gov: gov, - db: db, - network: network, - baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), - dkgReady: sync.NewCond(&sync.Mutex{}), - cfgModule: cfgModule, - dMoment: dMoment, - nodeSetCache: nodeSetCache, - signer: signer, - event: common.NewEvent(), - logger: logger, + ID: ID, + ccModule: newCompactionChain(gov), + lattice: lattice, + app: appModule, + debugApp: debugApp, + gov: gov, + db: db, + network: network, + baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), + dkgReady: sync.NewCond(&sync.Mutex{}), + cfgModule: cfgModule, + dMoment: dMoment, + nodeSetCache: nodeSetCache, + signer: signer, + event: common.NewEvent(), + logger: logger, + resetRandomnessTicker: make(chan struct{}), } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) con.baMgr = newAgreementMgr(con, round, dMoment) @@ -634,6 +636,9 @@ func (con *Consensus) Run() { go con.processMsg(con.network.ReceiveChan()) // Sleep until dMoment come. time.Sleep(con.dMoment.Sub(time.Now().UTC())) + // Take some time to bootstrap. + time.Sleep(3 * time.Second) + go con.pullRandomness() // Block until done. select { case <-con.ctx.Done(): @@ -1051,8 +1056,27 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) { return } +func (con *Consensus) pullRandomness() { + for { + select { + case <-con.ctx.Done(): + return + case <-con.resetRandomnessTicker: + case <-time.After(1 * time.Second): + // TODO(jimmy): pulling period should be related to lambdaBA. + hashes := con.ccModule.pendingBlocksWithoutRandomness() + con.logger.Debug("Calling Network.PullRandomness", "blocks", hashes) + con.network.PullRandomness(hashes) + } + } +} + // deliverBlock deliver a block to application layer. func (con *Consensus) deliverBlock(b *types.Block) { + select { + case con.resetRandomnessTicker <- struct{}{}: + default: + } if err := con.db.UpdateBlock(*b); err != nil { panic(err) } diff --git a/core/consensus_test.go b/core/consensus_test.go index 40ee34b..d26e102 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -48,6 +48,10 @@ func (n *network) PullBlocks(common.Hashes) { func (n *network) PullVotes(types.Position) { } +// PullRandomness tries to pull randomness from the DEXON network. +func (n *network) PullRandomness(common.Hashes) { +} + // BroadcastVote broadcasts vote to all nodes in DEXON network. func (n *network) BroadcastVote(vote *types.Vote) { n.conn.broadcast(n.nID, vote) diff --git a/core/interfaces.go b/core/interfaces.go index fc3bf09..2077032 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -67,6 +67,9 @@ type Network interface { // PullVotes tries to pull votes from the DEXON network. PullVotes(position types.Position) + // PullRandomness tries to pull randomness from the DEXON network. + PullRandomness(hashes common.Hashes) + // BroadcastVote broadcasts vote to all nodes in DEXON network. BroadcastVote(vote *types.Vote) diff --git a/core/test/network.go b/core/test/network.go index 207d8a4..a79898e 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -125,34 +125,38 @@ func (req *PullRequest) UnmarshalJSON(data []byte) (err error) { // Network implements core.Network interface based on TransportClient. type Network struct { - ID types.NodeID - config NetworkConfig - ctx context.Context - ctxCancel context.CancelFunc - trans TransportClient - fromTransport <-chan *TransportEnvelope - toConsensus chan interface{} - toNode chan interface{} - sentRandomnessLock sync.Mutex - sentRandomness map[common.Hash]struct{} - sentAgreementLock sync.Mutex - sentAgreement map[common.Hash]struct{} - blockCacheLock sync.RWMutex - blockCache map[common.Hash]*types.Block - voteCacheLock sync.RWMutex - voteCache map[types.Position]map[types.VoteHeader]*types.Vote - voteCacheSize int - votePositions []types.Position - stateModule *State - peers map[types.NodeID]struct{} - unreceivedBlocksLock sync.RWMutex - unreceivedBlocks map[common.Hash]chan<- common.Hash - latencyModel LatencyModel - cache *utils.NodeSetCache - notarySetCachesLock sync.Mutex - notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{} - dkgSetCachesLock sync.Mutex - dkgSetCaches map[uint64]map[types.NodeID]struct{} + ID types.NodeID + config NetworkConfig + ctx context.Context + ctxCancel context.CancelFunc + trans TransportClient + fromTransport <-chan *TransportEnvelope + toConsensus chan interface{} + toNode chan interface{} + sentRandomnessLock sync.Mutex + sentRandomness map[common.Hash]struct{} + sentAgreementLock sync.Mutex + sentAgreement map[common.Hash]struct{} + blockCacheLock sync.RWMutex + blockCache map[common.Hash]*types.Block + voteCacheLock sync.RWMutex + voteCache map[types.Position]map[types.VoteHeader]*types.Vote + voteCacheSize int + votePositions []types.Position + randomnessCacheLock sync.RWMutex + randomnessCache map[common.Hash]*types.BlockRandomnessResult + stateModule *State + peers map[types.NodeID]struct{} + unreceivedBlocksLock sync.RWMutex + unreceivedBlocks map[common.Hash]chan<- common.Hash + unreceivedRandomnessLock sync.RWMutex + unreceivedRandomness map[common.Hash]chan<- common.Hash + latencyModel LatencyModel + cache *utils.NodeSetCache + notarySetCachesLock sync.Mutex + notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{} + dkgSetCachesLock sync.Mutex + dkgSetCaches map[uint64]map[types.NodeID]struct{} } // NewNetwork setup network stuffs for nodes, which provides an @@ -161,17 +165,19 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel, marshaller Marshaller, config NetworkConfig) (n *Network) { // Construct basic network instance. n = &Network{ - ID: types.NewNodeID(pubKey), - config: config, - toConsensus: make(chan interface{}, 1000), - toNode: make(chan interface{}, 1000), - sentRandomness: make(map[common.Hash]struct{}), - sentAgreement: make(map[common.Hash]struct{}), - blockCache: make(map[common.Hash]*types.Block), - unreceivedBlocks: make(map[common.Hash]chan<- common.Hash), - latencyModel: latency, - notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}), - dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}), + ID: types.NewNodeID(pubKey), + config: config, + toConsensus: make(chan interface{}, 1000), + toNode: make(chan interface{}, 1000), + sentRandomness: make(map[common.Hash]struct{}), + sentAgreement: make(map[common.Hash]struct{}), + blockCache: make(map[common.Hash]*types.Block), + randomnessCache: make(map[common.Hash]*types.BlockRandomnessResult), + unreceivedBlocks: make(map[common.Hash]chan<- common.Hash), + unreceivedRandomness: make(map[common.Hash]chan<- common.Hash), + latencyModel: latency, + notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}), + dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}), voteCache: make( map[types.Position]map[types.VoteHeader]*types.Vote), } @@ -200,6 +206,11 @@ func (n *Network) PullVotes(pos types.Position) { go n.pullVotesAsync(pos) } +// PullRandomness implememnts core.Network interface. +func (n *Network) PullRandomness(hashes common.Hashes) { + go n.pullRandomnessAsync(hashes) +} + // BroadcastVote implements core.Network interface. func (n *Network) BroadcastVote(vote *types.Vote) { n.broadcastToSet( @@ -256,6 +267,7 @@ func (n *Network) BroadcastRandomnessResult( if err := n.trans.Broadcast(randResult); err != nil { panic(err) } + n.addRandomnessToCache(randResult) } // SendDKGPrivateShare implements core.Network interface. @@ -380,6 +392,27 @@ func (n *Network) handlePullRequest(req *PullRequest) { } } }() + case "randomness": + hashes := req.Identity.(common.Hashes) + func() { + n.randomnessCacheLock.Lock() + defer n.randomnessCacheLock.Unlock() + All: + for _, h := range hashes { + r, exists := n.randomnessCache[h] + if !exists { + continue + } + select { + case <-n.ctx.Done(): + break All + default: + } + if err := n.trans.Send(req.Requester, r); err != nil { + log.Println("unable to send randomness", req.Requester, err) + } + } + }() default: panic(fmt.Errorf("unknown type of pull request: %v", req.Type)) } @@ -539,6 +572,60 @@ func (n *Network) pullVotesAsync(pos types.Position) { } } +func (n *Network) pullRandomnessAsync(hashes common.Hashes) { + // Setup notification channels for each block hash. + notYetReceived := make(map[common.Hash]struct{}) + ch := make(chan common.Hash, len(hashes)) + func() { + n.unreceivedRandomnessLock.Lock() + defer n.unreceivedRandomnessLock.Unlock() + for _, h := range hashes { + if _, exists := n.unreceivedRandomness[h]; exists { + continue + } + n.unreceivedRandomness[h] = ch + notYetReceived[h] = struct{}{} + } + }() + req := &PullRequest{ + Requester: n.ID, + Type: "randomness", + Identity: hashes, + } + // Randomly pick peers to send pull requests. +Loop: + for nID := range n.peers { + if nID == n.ID { + continue + } + if err := n.trans.Send(nID, req); err != nil { + // Try next peer. + continue + } + select { + case <-n.ctx.Done(): + break Loop + case <-time.After(2 * n.latencyModel.Delay()): + // Consume everything in the notification channel. + for { + select { + case h, ok := <-ch: + if !ok { + // This network module is closed. + break Loop + } + delete(notYetReceived, h) + if len(notYetReceived) == 0 { + break Loop + } + default: + continue Loop + } + } + } + } +} + func (n *Network) addBlockToCache(b *types.Block) { n.blockCacheLock.Lock() defer n.blockCacheLock.Unlock() @@ -573,6 +660,19 @@ func (n *Network) addVoteToCache(v *types.Vote) { n.voteCacheSize++ } +func (n *Network) addRandomnessToCache(rand *types.BlockRandomnessResult) { + n.randomnessCacheLock.Lock() + defer n.randomnessCacheLock.Unlock() + if len(n.randomnessCache) > 1000 { + // Randomly purge one randomness from cache. + for k := range n.randomnessCache { + delete(n.randomnessCache, k) + break + } + } + n.randomnessCache[rand.BlockHash] = rand +} + func (n *Network) cloneForFake(v interface{}) interface{} { if n.config.Type != NetworkTypeFake { return v |