aboutsummaryrefslogtreecommitdiffstats
path: root/dex
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2018-12-27 09:17:28 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:55 +0800
commitf79d09a12c8de2e1572292ef6bbd82352526930d (patch)
tree0ec9ce7fba237187b6d5b88b9401ad36798f7fe1 /dex
parent509c6899caad7a66f7e64a1ef9718daa9018f7f1 (diff)
downloaddexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar
dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar.gz
dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar.bz2
dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar.lz
dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar.xz
dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.tar.zst
dexon-f79d09a12c8de2e1572292ef6bbd82352526930d.zip
dex: add pull randomness (#105)
* vendor: sync to latest core * dex: Add PullRandomness
Diffstat (limited to 'dex')
-rw-r--r--dex/cache.go62
-rw-r--r--dex/cache_test.go90
-rw-r--r--dex/handler.go27
-rw-r--r--dex/network.go11
-rw-r--r--dex/peer.go20
-rw-r--r--dex/protocol.go7
6 files changed, 203 insertions, 14 deletions
diff --git a/dex/cache.go b/dex/cache.go
index 89bbbe3be..bdc22e114 100644
--- a/dex/cache.go
+++ b/dex/cache.go
@@ -44,21 +44,23 @@ func voteToKey(vote *coreTypes.Vote) voteKey {
}
type cache struct {
- lock sync.RWMutex
- blockCache map[coreCommon.Hash]*coreTypes.Block
- voteCache map[coreTypes.Position]map[voteKey]*coreTypes.Vote
- votePosition []coreTypes.Position
- db coreDb.Database
- voteSize int
- size int
+ lock sync.RWMutex
+ blockCache map[coreCommon.Hash]*coreTypes.Block
+ voteCache map[coreTypes.Position]map[voteKey]*coreTypes.Vote
+ randomnessCache map[coreCommon.Hash]*coreTypes.BlockRandomnessResult
+ votePosition []coreTypes.Position
+ db coreDb.Database
+ voteSize int
+ size int
}
func newCache(size int, db coreDb.Database) *cache {
return &cache{
- blockCache: make(map[coreCommon.Hash]*coreTypes.Block),
- voteCache: make(map[coreTypes.Position]map[voteKey]*coreTypes.Vote),
- db: db,
- size: size,
+ blockCache: make(map[coreCommon.Hash]*coreTypes.Block),
+ voteCache: make(map[coreTypes.Position]map[voteKey]*coreTypes.Vote),
+ randomnessCache: make(map[coreCommon.Hash]*coreTypes.BlockRandomnessResult),
+ db: db,
+ size: size,
}
}
@@ -126,3 +128,41 @@ func (c *cache) blocks(hashes coreCommon.Hashes) []*coreTypes.Block {
}
return cacheBlocks
}
+
+func (c *cache) addRandomness(rand *coreTypes.BlockRandomnessResult) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ if len(c.randomnessCache) >= c.size {
+ // Randomly delete one entry.
+ for k := range c.randomnessCache {
+ delete(c.randomnessCache, k)
+ break
+ }
+ }
+ c.randomnessCache[rand.BlockHash] = rand
+}
+
+func (c *cache) randomness(hashes coreCommon.Hashes) []*coreTypes.BlockRandomnessResult {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+ cacheRandomnesss := make([]*coreTypes.BlockRandomnessResult, 0, len(hashes))
+ for _, hash := range hashes {
+ if block, exist := c.randomnessCache[hash]; exist {
+ cacheRandomnesss = append(cacheRandomnesss, block)
+ } else {
+ block, err := c.db.GetBlock(hash)
+ if err != nil {
+ continue
+ }
+ if len(block.Finalization.Randomness) == 0 {
+ continue
+ }
+ cacheRandomnesss = append(cacheRandomnesss, &coreTypes.BlockRandomnessResult{
+ BlockHash: block.Hash,
+ Position: block.Position,
+ Randomness: block.Finalization.Randomness,
+ })
+ }
+ }
+ return cacheRandomnesss
+}
diff --git a/dex/cache_test.go b/dex/cache_test.go
index 3b43e77aa..536e015f0 100644
--- a/dex/cache_test.go
+++ b/dex/cache_test.go
@@ -18,6 +18,7 @@
package dex
import (
+ "math/rand"
"sort"
"strings"
"testing"
@@ -203,3 +204,92 @@ func TestCacheBlock(t *testing.T) {
}
}
}
+
+func randomBytes() []byte {
+ bytes := make([]byte, 32)
+ for i := range bytes {
+ bytes[i] = byte(rand.Int() % 256)
+ }
+ return bytes
+}
+
+func TestCacheRandomness(t *testing.T) {
+ db, err := coreDb.NewMemBackedDB()
+ if err != nil {
+ panic(err)
+ }
+ cache := newCache(3, db)
+ rand1 := &coreTypes.BlockRandomnessResult{
+ BlockHash: coreCommon.NewRandomHash(),
+ Randomness: randomBytes(),
+ }
+ rand2 := &coreTypes.BlockRandomnessResult{
+ BlockHash: coreCommon.NewRandomHash(),
+ Randomness: randomBytes(),
+ }
+ rand3 := &coreTypes.BlockRandomnessResult{
+ BlockHash: coreCommon.NewRandomHash(),
+ Randomness: randomBytes(),
+ }
+ rand4 := &coreTypes.BlockRandomnessResult{
+ BlockHash: coreCommon.NewRandomHash(),
+ Randomness: randomBytes(),
+ }
+ cache.addRandomness(rand1)
+ cache.addRandomness(rand2)
+ cache.addRandomness(rand3)
+
+ hashes := coreCommon.Hashes{rand1.BlockHash, rand2.BlockHash, rand3.BlockHash, rand4.BlockHash}
+ hashMap := map[coreCommon.Hash]struct{}{
+ rand1.BlockHash: {},
+ rand2.BlockHash: {},
+ rand3.BlockHash: {},
+ }
+ rands := cache.randomness(hashes)
+ if len(rands) != 3 {
+ t.Errorf("fail to get rands: have %d, want 3", len(rands))
+ }
+ for _, rand := range rands {
+ if _, exist := hashMap[rand.BlockHash]; !exist {
+ t.Errorf("get wrong rand: have %s, want %v", rand, hashMap)
+ }
+ }
+
+ cache.addRandomness(rand4)
+
+ rands = cache.randomness(hashes)
+ hashMap[rand4.BlockHash] = struct{}{}
+ if len(rands) != 3 {
+ t.Errorf("fail to get rands: have %d, want 3", len(rands))
+ }
+ hasNewRandomness := false
+ for _, rand := range rands {
+ if _, exist := hashMap[rand.BlockHash]; !exist {
+ t.Errorf("get wrong rand: have %s, want %v", rand, hashMap)
+ }
+ if rand.BlockHash.Equal(rand4.BlockHash) {
+ hasNewRandomness = true
+ }
+ }
+ if !hasNewRandomness {
+ t.Errorf("expect rand %s in cache, have %v", rand4, rands)
+ }
+
+ block := &coreTypes.Block{
+ Hash: coreCommon.NewRandomHash(),
+ Finalization: coreTypes.FinalizationResult{
+ Randomness: randomBytes(),
+ },
+ }
+ if err := db.PutBlock(*block); err != nil {
+ panic(err)
+ }
+ rands = cache.randomness(coreCommon.Hashes{block.Hash})
+ if len(rands) != 1 {
+ t.Errorf("fail to get rands: have %d, want 1", len(rands))
+ } else {
+ if !rands[0].BlockHash.Equal(block.Hash) {
+ t.Errorf("get wrong rand: have %s, want %s", rands[0], block)
+ }
+ }
+}
diff --git a/dex/handler.go b/dex/handler.go
index ff87884d2..8a60c2a56 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -891,6 +891,21 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return err
}
}
+ case msg.Code == PullRandomnessMsg:
+ if !pm.isBlockProposer {
+ break
+ }
+ var hashes coreCommon.Hashes
+ if err := msg.Decode(&hashes); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ randomness := pm.cache.randomness(hashes)
+ log.Debug("Push randomness", "randomness", randomness)
+ for _, randomness := range randomness {
+ if err := p.SendRandomness(randomness); err != nil {
+ return err
+ }
+ }
case msg.Code == GetGovStateMsg:
var hash common.Hash
if err := msg.Decode(&hash); err != nil {
@@ -1029,6 +1044,7 @@ func (pm *ProtocolManager) BroadcastAgreementResult(
func (pm *ProtocolManager) BroadcastRandomnessResult(
randomness *coreTypes.BlockRandomnessResult) {
+ pm.cache.addRandomness(randomness)
// send to notary nodes first (direct)
label := peerLabel{
set: notaryset,
@@ -1109,6 +1125,17 @@ func (pm *ProtocolManager) BroadcastPullVotes(
}
}
+func (pm *ProtocolManager) BroadcastPullRandomness(
+ hashes coreCommon.Hashes) {
+ // TODO(jimmy-dexon): pull from dkg set only.
+ for idx, peer := range pm.peers.Peers() {
+ if idx >= maxPullPeers {
+ break
+ }
+ peer.AsyncSendPullRandomness(hashes)
+ }
+}
+
func (pm *ProtocolManager) txBroadcastLoop() {
queueSizeMax := common.StorageSize(100 * 1024) // 100 KB
currentSize := common.StorageSize(0)
diff --git a/dex/network.go b/dex/network.go
index 38ee614ad..c5f81782d 100644
--- a/dex/network.go
+++ b/dex/network.go
@@ -34,6 +34,9 @@ func NewDexconNetwork(pm *ProtocolManager) *DexconNetwork {
// PullBlocks tries to pull blocks from the DEXON network.
func (n *DexconNetwork) PullBlocks(hashes coreCommon.Hashes) {
+ if len(hashes) == 0 {
+ return
+ }
n.pm.BroadcastPullBlocks(hashes)
}
@@ -42,6 +45,14 @@ func (n *DexconNetwork) PullVotes(pos types.Position) {
n.pm.BroadcastPullVotes(pos)
}
+// PullRandomness tries to pull randomness result from the DEXON network.
+func (n *DexconNetwork) PullRandomness(hashes coreCommon.Hashes) {
+ if len(hashes) == 0 {
+ return
+ }
+ n.pm.BroadcastPullRandomness(hashes)
+}
+
// BroadcastVote broadcasts vote to all nodes in DEXON network.
func (n *DexconNetwork) BroadcastVote(vote *types.Vote) {
n.pm.BroadcastVote(vote)
diff --git a/dex/peer.go b/dex/peer.go
index 49a9b64f8..aecf9dc7c 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -100,6 +100,7 @@ const (
maxQueuedDKGParitialSignature = 16
maxQueuedPullBlocks = 128
maxQueuedPullVotes = 128
+ maxQueuedPullRandomness = 128
handshakeTimeout = 5 * time.Second
@@ -160,6 +161,7 @@ type peer struct {
queuedDKGPartialSignatures chan *dkgTypes.PartialSignature
queuedPullBlocks chan coreCommon.Hashes
queuedPullVotes chan coreTypes.Position
+ queuedPullRandomness chan coreCommon.Hashes
term chan struct{} // Termination channel to stop the broadcaster
}
@@ -190,6 +192,7 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
queuedDKGPartialSignatures: make(chan *dkgTypes.PartialSignature, maxQueuedDKGParitialSignature),
queuedPullBlocks: make(chan coreCommon.Hashes, maxQueuedPullBlocks),
queuedPullVotes: make(chan coreTypes.Position, maxQueuedPullVotes),
+ queuedPullRandomness: make(chan coreCommon.Hashes, maxQueuedPullRandomness),
term: make(chan struct{}),
}
}
@@ -257,6 +260,11 @@ func (p *peer) broadcast() {
return
}
p.Log().Trace("Pulling Votes", "position", pos)
+ case hashes := <-p.queuedPullRandomness:
+ if err := p.SendPullRandomness(hashes); err != nil {
+ return
+ }
+ p.Log().Trace("Pulling Randomness", "hashes", hashes)
case <-p.term:
return
case <-time.After(100 * time.Millisecond):
@@ -530,6 +538,18 @@ func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) {
}
}
+func (p *peer) SendPullRandomness(hashes coreCommon.Hashes) error {
+ return p2p.Send(p.rw, PullRandomnessMsg, hashes)
+}
+
+func (p *peer) AsyncSendPullRandomness(hashes coreCommon.Hashes) {
+ select {
+ case p.queuedPullRandomness <- hashes:
+ default:
+ p.Log().Debug("Dropping Pull Randomness")
+ }
+}
+
// SendBlockHeaders sends a batch of block headers to the remote peer.
func (p *peer) SendBlockHeaders(headers []*types.HeaderWithGovState) error {
return p2p.Send(p.rw, BlockHeadersMsg, headers)
diff --git a/dex/protocol.go b/dex/protocol.go
index b6d672b7f..b417c91b6 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -59,7 +59,7 @@ var ProtocolName = "dex"
var ProtocolVersions = []uint{dex64}
// ProtocolLengths are the number of implemented message corresponding to different protocol versions.
-var ProtocolLengths = []uint64{42}
+var ProtocolLengths = []uint64{43}
const ProtocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message
@@ -92,9 +92,10 @@ const (
DKGPartialSignatureMsg = 0x25
PullBlocksMsg = 0x26
PullVotesMsg = 0x27
+ PullRandomnessMsg = 0x28
- GetGovStateMsg = 0x28
- GovStateMsg = 0x29
+ GetGovStateMsg = 0x29
+ GovStateMsg = 0x2a
)
type errCode int