diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2018-12-26 14:24:16 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-12-26 14:24:16 +0800 |
commit | 00416c9df2fec5398389863fb6f885a1fe11a6cc (patch) | |
tree | 74d260ac8a8c0e2ab782be1814b49b4232f56dc8 /core/test | |
parent | d333dc1a24df26ae8e8e3ffa2d700c1116a93ba2 (diff) | |
download | dexon-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar dexon-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar.gz dexon-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar.bz2 dexon-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar.lz dexon-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar.xz dexon-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.tar.zst dexon-consensus-00416c9df2fec5398389863fb6f885a1fe11a6cc.zip |
core: pull block random (#384)
* Add PullRandomness to interface
* Add pendingBlocksWithoutRandomness to compactionChain
* Pull randomness every 1 second
Diffstat (limited to 'core/test')
-rw-r--r-- | core/test/network.go | 178 |
1 files changed, 139 insertions, 39 deletions
diff --git a/core/test/network.go b/core/test/network.go index 207d8a4..a79898e 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -125,34 +125,38 @@ func (req *PullRequest) UnmarshalJSON(data []byte) (err error) { // Network implements core.Network interface based on TransportClient. type Network struct { - ID types.NodeID - config NetworkConfig - ctx context.Context - ctxCancel context.CancelFunc - trans TransportClient - fromTransport <-chan *TransportEnvelope - toConsensus chan interface{} - toNode chan interface{} - sentRandomnessLock sync.Mutex - sentRandomness map[common.Hash]struct{} - sentAgreementLock sync.Mutex - sentAgreement map[common.Hash]struct{} - blockCacheLock sync.RWMutex - blockCache map[common.Hash]*types.Block - voteCacheLock sync.RWMutex - voteCache map[types.Position]map[types.VoteHeader]*types.Vote - voteCacheSize int - votePositions []types.Position - stateModule *State - peers map[types.NodeID]struct{} - unreceivedBlocksLock sync.RWMutex - unreceivedBlocks map[common.Hash]chan<- common.Hash - latencyModel LatencyModel - cache *utils.NodeSetCache - notarySetCachesLock sync.Mutex - notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{} - dkgSetCachesLock sync.Mutex - dkgSetCaches map[uint64]map[types.NodeID]struct{} + ID types.NodeID + config NetworkConfig + ctx context.Context + ctxCancel context.CancelFunc + trans TransportClient + fromTransport <-chan *TransportEnvelope + toConsensus chan interface{} + toNode chan interface{} + sentRandomnessLock sync.Mutex + sentRandomness map[common.Hash]struct{} + sentAgreementLock sync.Mutex + sentAgreement map[common.Hash]struct{} + blockCacheLock sync.RWMutex + blockCache map[common.Hash]*types.Block + voteCacheLock sync.RWMutex + voteCache map[types.Position]map[types.VoteHeader]*types.Vote + voteCacheSize int + votePositions []types.Position + randomnessCacheLock sync.RWMutex + randomnessCache map[common.Hash]*types.BlockRandomnessResult + stateModule *State + peers map[types.NodeID]struct{} + unreceivedBlocksLock sync.RWMutex + unreceivedBlocks map[common.Hash]chan<- common.Hash + unreceivedRandomnessLock sync.RWMutex + unreceivedRandomness map[common.Hash]chan<- common.Hash + latencyModel LatencyModel + cache *utils.NodeSetCache + notarySetCachesLock sync.Mutex + notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{} + dkgSetCachesLock sync.Mutex + dkgSetCaches map[uint64]map[types.NodeID]struct{} } // NewNetwork setup network stuffs for nodes, which provides an @@ -161,17 +165,19 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel, marshaller Marshaller, config NetworkConfig) (n *Network) { // Construct basic network instance. n = &Network{ - ID: types.NewNodeID(pubKey), - config: config, - toConsensus: make(chan interface{}, 1000), - toNode: make(chan interface{}, 1000), - sentRandomness: make(map[common.Hash]struct{}), - sentAgreement: make(map[common.Hash]struct{}), - blockCache: make(map[common.Hash]*types.Block), - unreceivedBlocks: make(map[common.Hash]chan<- common.Hash), - latencyModel: latency, - notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}), - dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}), + ID: types.NewNodeID(pubKey), + config: config, + toConsensus: make(chan interface{}, 1000), + toNode: make(chan interface{}, 1000), + sentRandomness: make(map[common.Hash]struct{}), + sentAgreement: make(map[common.Hash]struct{}), + blockCache: make(map[common.Hash]*types.Block), + randomnessCache: make(map[common.Hash]*types.BlockRandomnessResult), + unreceivedBlocks: make(map[common.Hash]chan<- common.Hash), + unreceivedRandomness: make(map[common.Hash]chan<- common.Hash), + latencyModel: latency, + notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}), + dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}), voteCache: make( map[types.Position]map[types.VoteHeader]*types.Vote), } @@ -200,6 +206,11 @@ func (n *Network) PullVotes(pos types.Position) { go n.pullVotesAsync(pos) } +// PullRandomness implememnts core.Network interface. +func (n *Network) PullRandomness(hashes common.Hashes) { + go n.pullRandomnessAsync(hashes) +} + // BroadcastVote implements core.Network interface. func (n *Network) BroadcastVote(vote *types.Vote) { n.broadcastToSet( @@ -256,6 +267,7 @@ func (n *Network) BroadcastRandomnessResult( if err := n.trans.Broadcast(randResult); err != nil { panic(err) } + n.addRandomnessToCache(randResult) } // SendDKGPrivateShare implements core.Network interface. @@ -380,6 +392,27 @@ func (n *Network) handlePullRequest(req *PullRequest) { } } }() + case "randomness": + hashes := req.Identity.(common.Hashes) + func() { + n.randomnessCacheLock.Lock() + defer n.randomnessCacheLock.Unlock() + All: + for _, h := range hashes { + r, exists := n.randomnessCache[h] + if !exists { + continue + } + select { + case <-n.ctx.Done(): + break All + default: + } + if err := n.trans.Send(req.Requester, r); err != nil { + log.Println("unable to send randomness", req.Requester, err) + } + } + }() default: panic(fmt.Errorf("unknown type of pull request: %v", req.Type)) } @@ -539,6 +572,60 @@ func (n *Network) pullVotesAsync(pos types.Position) { } } +func (n *Network) pullRandomnessAsync(hashes common.Hashes) { + // Setup notification channels for each block hash. + notYetReceived := make(map[common.Hash]struct{}) + ch := make(chan common.Hash, len(hashes)) + func() { + n.unreceivedRandomnessLock.Lock() + defer n.unreceivedRandomnessLock.Unlock() + for _, h := range hashes { + if _, exists := n.unreceivedRandomness[h]; exists { + continue + } + n.unreceivedRandomness[h] = ch + notYetReceived[h] = struct{}{} + } + }() + req := &PullRequest{ + Requester: n.ID, + Type: "randomness", + Identity: hashes, + } + // Randomly pick peers to send pull requests. +Loop: + for nID := range n.peers { + if nID == n.ID { + continue + } + if err := n.trans.Send(nID, req); err != nil { + // Try next peer. + continue + } + select { + case <-n.ctx.Done(): + break Loop + case <-time.After(2 * n.latencyModel.Delay()): + // Consume everything in the notification channel. + for { + select { + case h, ok := <-ch: + if !ok { + // This network module is closed. + break Loop + } + delete(notYetReceived, h) + if len(notYetReceived) == 0 { + break Loop + } + default: + continue Loop + } + } + } + } +} + func (n *Network) addBlockToCache(b *types.Block) { n.blockCacheLock.Lock() defer n.blockCacheLock.Unlock() @@ -573,6 +660,19 @@ func (n *Network) addVoteToCache(v *types.Vote) { n.voteCacheSize++ } +func (n *Network) addRandomnessToCache(rand *types.BlockRandomnessResult) { + n.randomnessCacheLock.Lock() + defer n.randomnessCacheLock.Unlock() + if len(n.randomnessCache) > 1000 { + // Randomly purge one randomness from cache. + for k := range n.randomnessCache { + delete(n.randomnessCache, k) + break + } + } + n.randomnessCache[rand.BlockHash] = rand +} + func (n *Network) cloneForFake(v interface{}) interface{} { if n.config.Type != NetworkTypeFake { return v |