diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2018-12-27 09:17:28 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-04-09 21:32:55 +0800 |
commit | f79d09a12c8de2e1572292ef6bbd82352526930d (patch) | |
tree | 0ec9ce7fba237187b6d5b88b9401ad36798f7fe1 /dex | |
parent | 509c6899caad7a66f7e64a1ef9718daa9018f7f1 (diff) | |
download | dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar.gz dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar.bz2 dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar.lz dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar.xz dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar.zst dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.zip |
dex: add pull randomness (#105)
* vendor: sync to latest core
* dex: Add PullRandomness
Diffstat (limited to 'dex')
-rw-r--r-- | dex/cache.go | 62 | ||||
-rw-r--r-- | dex/cache_test.go | 90 | ||||
-rw-r--r-- | dex/handler.go | 27 | ||||
-rw-r--r-- | dex/network.go | 11 | ||||
-rw-r--r-- | dex/peer.go | 20 | ||||
-rw-r--r-- | dex/protocol.go | 7 |
6 files changed, 203 insertions, 14 deletions
diff --git a/dex/cache.go b/dex/cache.go index 89bbbe3be..bdc22e114 100644 --- a/dex/cache.go +++ b/dex/cache.go @@ -44,21 +44,23 @@ func voteToKey(vote *coreTypes.Vote) voteKey { } type cache struct { - lock sync.RWMutex - blockCache map[coreCommon.Hash]*coreTypes.Block - voteCache map[coreTypes.Position]map[voteKey]*coreTypes.Vote - votePosition []coreTypes.Position - db coreDb.Database - voteSize int - size int + lock sync.RWMutex + blockCache map[coreCommon.Hash]*coreTypes.Block + voteCache map[coreTypes.Position]map[voteKey]*coreTypes.Vote + randomnessCache map[coreCommon.Hash]*coreTypes.BlockRandomnessResult + votePosition []coreTypes.Position + db coreDb.Database + voteSize int + size int } func newCache(size int, db coreDb.Database) *cache { return &cache{ - blockCache: make(map[coreCommon.Hash]*coreTypes.Block), - voteCache: make(map[coreTypes.Position]map[voteKey]*coreTypes.Vote), - db: db, - size: size, + blockCache: make(map[coreCommon.Hash]*coreTypes.Block), + voteCache: make(map[coreTypes.Position]map[voteKey]*coreTypes.Vote), + randomnessCache: make(map[coreCommon.Hash]*coreTypes.BlockRandomnessResult), + db: db, + size: size, } } @@ -126,3 +128,41 @@ func (c *cache) blocks(hashes coreCommon.Hashes) []*coreTypes.Block { } return cacheBlocks } + +func (c *cache) addRandomness(rand *coreTypes.BlockRandomnessResult) { + c.lock.Lock() + defer c.lock.Unlock() + if len(c.randomnessCache) >= c.size { + // Randomly delete one entry. + for k := range c.randomnessCache { + delete(c.randomnessCache, k) + break + } + } + c.randomnessCache[rand.BlockHash] = rand +} + +func (c *cache) randomness(hashes coreCommon.Hashes) []*coreTypes.BlockRandomnessResult { + c.lock.RLock() + defer c.lock.RUnlock() + cacheRandomnesss := make([]*coreTypes.BlockRandomnessResult, 0, len(hashes)) + for _, hash := range hashes { + if block, exist := c.randomnessCache[hash]; exist { + cacheRandomnesss = append(cacheRandomnesss, block) + } else { + block, err := c.db.GetBlock(hash) + if err != nil { + continue + } + if len(block.Finalization.Randomness) == 0 { + continue + } + cacheRandomnesss = append(cacheRandomnesss, &coreTypes.BlockRandomnessResult{ + BlockHash: block.Hash, + Position: block.Position, + Randomness: block.Finalization.Randomness, + }) + } + } + return cacheRandomnesss +} diff --git a/dex/cache_test.go b/dex/cache_test.go index 3b43e77aa..536e015f0 100644 --- a/dex/cache_test.go +++ b/dex/cache_test.go @@ -18,6 +18,7 @@ package dex import ( + "math/rand" "sort" "strings" "testing" @@ -203,3 +204,92 @@ func TestCacheBlock(t *testing.T) { } } } + +func randomBytes() []byte { + bytes := make([]byte, 32) + for i := range bytes { + bytes[i] = byte(rand.Int() % 256) + } + return bytes +} + +func TestCacheRandomness(t *testing.T) { + db, err := coreDb.NewMemBackedDB() + if err != nil { + panic(err) + } + cache := newCache(3, db) + rand1 := &coreTypes.BlockRandomnessResult{ + BlockHash: coreCommon.NewRandomHash(), + Randomness: randomBytes(), + } + rand2 := &coreTypes.BlockRandomnessResult{ + BlockHash: coreCommon.NewRandomHash(), + Randomness: randomBytes(), + } + rand3 := &coreTypes.BlockRandomnessResult{ + BlockHash: coreCommon.NewRandomHash(), + Randomness: randomBytes(), + } + rand4 := &coreTypes.BlockRandomnessResult{ + BlockHash: coreCommon.NewRandomHash(), + Randomness: randomBytes(), + } + cache.addRandomness(rand1) + cache.addRandomness(rand2) + cache.addRandomness(rand3) + + hashes := coreCommon.Hashes{rand1.BlockHash, rand2.BlockHash, rand3.BlockHash, rand4.BlockHash} + hashMap := map[coreCommon.Hash]struct{}{ + rand1.BlockHash: {}, + rand2.BlockHash: {}, + rand3.BlockHash: {}, + } + rands := cache.randomness(hashes) + if len(rands) != 3 { + t.Errorf("fail to get rands: have %d, want 3", len(rands)) + } + for _, rand := range rands { + if _, exist := hashMap[rand.BlockHash]; !exist { + t.Errorf("get wrong rand: have %s, want %v", rand, hashMap) + } + } + + cache.addRandomness(rand4) + + rands = cache.randomness(hashes) + hashMap[rand4.BlockHash] = struct{}{} + if len(rands) != 3 { + t.Errorf("fail to get rands: have %d, want 3", len(rands)) + } + hasNewRandomness := false + for _, rand := range rands { + if _, exist := hashMap[rand.BlockHash]; !exist { + t.Errorf("get wrong rand: have %s, want %v", rand, hashMap) + } + if rand.BlockHash.Equal(rand4.BlockHash) { + hasNewRandomness = true + } + } + if !hasNewRandomness { + t.Errorf("expect rand %s in cache, have %v", rand4, rands) + } + + block := &coreTypes.Block{ + Hash: coreCommon.NewRandomHash(), + Finalization: coreTypes.FinalizationResult{ + Randomness: randomBytes(), + }, + } + if err := db.PutBlock(*block); err != nil { + panic(err) + } + rands = cache.randomness(coreCommon.Hashes{block.Hash}) + if len(rands) != 1 { + t.Errorf("fail to get rands: have %d, want 1", len(rands)) + } else { + if !rands[0].BlockHash.Equal(block.Hash) { + t.Errorf("get wrong rand: have %s, want %s", rands[0], block) + } + } +} diff --git a/dex/handler.go b/dex/handler.go index ff87884d2..8a60c2a56 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -891,6 +891,21 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return err } } + case msg.Code == PullRandomnessMsg: + if !pm.isBlockProposer { + break + } + var hashes coreCommon.Hashes + if err := msg.Decode(&hashes); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + randomness := pm.cache.randomness(hashes) + log.Debug("Push randomness", "randomness", randomness) + for _, randomness := range randomness { + if err := p.SendRandomness(randomness); err != nil { + return err + } + } case msg.Code == GetGovStateMsg: var hash common.Hash if err := msg.Decode(&hash); err != nil { @@ -1029,6 +1044,7 @@ func (pm *ProtocolManager) BroadcastAgreementResult( func (pm *ProtocolManager) BroadcastRandomnessResult( randomness *coreTypes.BlockRandomnessResult) { + pm.cache.addRandomness(randomness) // send to notary nodes first (direct) label := peerLabel{ set: notaryset, @@ -1109,6 +1125,17 @@ func (pm *ProtocolManager) BroadcastPullVotes( } } +func (pm *ProtocolManager) BroadcastPullRandomness( + hashes coreCommon.Hashes) { + // TODO(jimmy-dexon): pull from dkg set only. + for idx, peer := range pm.peers.Peers() { + if idx >= maxPullPeers { + break + } + peer.AsyncSendPullRandomness(hashes) + } +} + func (pm *ProtocolManager) txBroadcastLoop() { queueSizeMax := common.StorageSize(100 * 1024) // 100 KB currentSize := common.StorageSize(0) diff --git a/dex/network.go b/dex/network.go index 38ee614ad..c5f81782d 100644 --- a/dex/network.go +++ b/dex/network.go @@ -34,6 +34,9 @@ func NewDexconNetwork(pm *ProtocolManager) *DexconNetwork { // PullBlocks tries to pull blocks from the DEXON network. func (n *DexconNetwork) PullBlocks(hashes coreCommon.Hashes) { + if len(hashes) == 0 { + return + } n.pm.BroadcastPullBlocks(hashes) } @@ -42,6 +45,14 @@ func (n *DexconNetwork) PullVotes(pos types.Position) { n.pm.BroadcastPullVotes(pos) } +// PullRandomness tries to pull randomness result from the DEXON network. +func (n *DexconNetwork) PullRandomness(hashes coreCommon.Hashes) { + if len(hashes) == 0 { + return + } + n.pm.BroadcastPullRandomness(hashes) +} + // BroadcastVote broadcasts vote to all nodes in DEXON network. func (n *DexconNetwork) BroadcastVote(vote *types.Vote) { n.pm.BroadcastVote(vote) diff --git a/dex/peer.go b/dex/peer.go index 49a9b64f8..aecf9dc7c 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -100,6 +100,7 @@ const ( maxQueuedDKGParitialSignature = 16 maxQueuedPullBlocks = 128 maxQueuedPullVotes = 128 + maxQueuedPullRandomness = 128 handshakeTimeout = 5 * time.Second @@ -160,6 +161,7 @@ type peer struct { queuedDKGPartialSignatures chan *dkgTypes.PartialSignature queuedPullBlocks chan coreCommon.Hashes queuedPullVotes chan coreTypes.Position + queuedPullRandomness chan coreCommon.Hashes term chan struct{} // Termination channel to stop the broadcaster } @@ -190,6 +192,7 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { queuedDKGPartialSignatures: make(chan *dkgTypes.PartialSignature, maxQueuedDKGParitialSignature), queuedPullBlocks: make(chan coreCommon.Hashes, maxQueuedPullBlocks), queuedPullVotes: make(chan coreTypes.Position, maxQueuedPullVotes), + queuedPullRandomness: make(chan coreCommon.Hashes, maxQueuedPullRandomness), term: make(chan struct{}), } } @@ -257,6 +260,11 @@ func (p *peer) broadcast() { return } p.Log().Trace("Pulling Votes", "position", pos) + case hashes := <-p.queuedPullRandomness: + if err := p.SendPullRandomness(hashes); err != nil { + return + } + p.Log().Trace("Pulling Randomness", "hashes", hashes) case <-p.term: return case <-time.After(100 * time.Millisecond): @@ -530,6 +538,18 @@ func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) { } } +func (p *peer) SendPullRandomness(hashes coreCommon.Hashes) error { + return p2p.Send(p.rw, PullRandomnessMsg, hashes) +} + +func (p *peer) AsyncSendPullRandomness(hashes coreCommon.Hashes) { + select { + case p.queuedPullRandomness <- hashes: + default: + p.Log().Debug("Dropping Pull Randomness") + } +} + // SendBlockHeaders sends a batch of block headers to the remote peer. func (p *peer) SendBlockHeaders(headers []*types.HeaderWithGovState) error { return p2p.Send(p.rw, BlockHeadersMsg, headers) diff --git a/dex/protocol.go b/dex/protocol.go index b6d672b7f..b417c91b6 100644 --- a/dex/protocol.go +++ b/dex/protocol.go @@ -59,7 +59,7 @@ var ProtocolName = "dex" var ProtocolVersions = []uint{dex64} // ProtocolLengths are the number of implemented message corresponding to different protocol versions. -var ProtocolLengths = []uint64{42} +var ProtocolLengths = []uint64{43} const ProtocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message @@ -92,9 +92,10 @@ const ( DKGPartialSignatureMsg = 0x25 PullBlocksMsg = 0x26 PullVotesMsg = 0x27 + PullRandomnessMsg = 0x28 - GetGovStateMsg = 0x28 - GovStateMsg = 0x29 + GetGovStateMsg = 0x29 + GovStateMsg = 0x2a ) type errCode int |