aboutsummaryrefslogtreecommitdiffstats
path: root/dex
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2019-03-27 20:47:32 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:59 +0800
commit0c63646ca8b06bb527737cd6e2a7fe58f169efff (patch)
treeb0666613c2a3cb84d53b60597bfef5ec45548c3a /dex
parent91981becf98b988470810aa1c26d86de2d294e29 (diff)
downloaddexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.tar
dexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.tar.gz
dexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.tar.bz2
dexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.tar.lz
dexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.tar.xz
dexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.tar.zst
dexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.zip
core: merge notarySet and DKGSet (#265)
* vendor: sync to latest core * core: merge notarySet and dkgSet * dex: optimize network traffic for finalized block
Diffstat (limited to 'dex')
-rw-r--r--dex/cache.go88
-rw-r--r--dex/cache_test.go165
-rw-r--r--dex/handler.go120
-rw-r--r--dex/network.go23
-rw-r--r--dex/peer.go101
-rw-r--r--dex/peer_test.go39
-rw-r--r--dex/protocol.go3
-rw-r--r--dex/protocol_test.go105
8 files changed, 248 insertions, 396 deletions
diff --git a/dex/cache.go b/dex/cache.go
index 5d4d20dd0..04030eaaf 100644
--- a/dex/cache.go
+++ b/dex/cache.go
@@ -44,23 +44,23 @@ func voteToKey(vote *coreTypes.Vote) voteKey {
}
type cache struct {
- lock sync.RWMutex
- blockCache map[coreCommon.Hash]*coreTypes.Block
- voteCache map[coreTypes.Position]map[voteKey]*coreTypes.Vote
- randomnessCache map[coreCommon.Hash]*coreTypes.BlockRandomnessResult
- votePosition []coreTypes.Position
- db coreDb.Database
- voteSize int
- size int
+ lock sync.RWMutex
+ blockCache map[coreCommon.Hash]*coreTypes.Block
+ finalizedBlockCache map[coreTypes.Position]*coreTypes.Block
+ voteCache map[coreTypes.Position]map[voteKey]*coreTypes.Vote
+ votePosition []coreTypes.Position
+ db coreDb.Database
+ voteSize int
+ size int
}
func newCache(size int, db coreDb.Database) *cache {
return &cache{
- blockCache: make(map[coreCommon.Hash]*coreTypes.Block),
- voteCache: make(map[coreTypes.Position]map[voteKey]*coreTypes.Vote),
- randomnessCache: make(map[coreCommon.Hash]*coreTypes.BlockRandomnessResult),
- db: db,
- size: size,
+ blockCache: make(map[coreCommon.Hash]*coreTypes.Block),
+ finalizedBlockCache: make(map[coreTypes.Position]*coreTypes.Block),
+ voteCache: make(map[coreTypes.Position]map[voteKey]*coreTypes.Vote),
+ db: db,
+ size: size,
}
}
@@ -110,6 +110,28 @@ func (c *cache) addBlock(block *coreTypes.Block) {
c.blockCache[block.Hash] = block
}
+func (c *cache) addFinalizedBlock(block *coreTypes.Block) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ block = block.Clone()
+ if len(c.blockCache) >= c.size {
+ // Randomly delete one entry.
+ for k := range c.blockCache {
+ delete(c.blockCache, k)
+ break
+ }
+ }
+ if len(c.finalizedBlockCache) >= c.size {
+ // Randomly delete one entry.
+ for k := range c.finalizedBlockCache {
+ delete(c.finalizedBlockCache, k)
+ break
+ }
+ }
+ c.blockCache[block.Hash] = block
+ c.finalizedBlockCache[block.Position] = block
+}
+
func (c *cache) blocks(hashes coreCommon.Hashes) []*coreTypes.Block {
c.lock.RLock()
defer c.lock.RUnlock()
@@ -122,48 +144,18 @@ func (c *cache) blocks(hashes coreCommon.Hashes) []*coreTypes.Block {
if err != nil {
continue
}
- // Blocks request from the cache do not need the finalization info.
- block.Finalization = coreTypes.FinalizationResult{}
cacheBlocks = append(cacheBlocks, &block)
}
}
return cacheBlocks
}
-func (c *cache) addRandomness(rand *coreTypes.BlockRandomnessResult) {
- c.lock.Lock()
- defer c.lock.Unlock()
- if len(c.randomnessCache) >= c.size {
- // Randomly delete one entry.
- for k := range c.randomnessCache {
- delete(c.randomnessCache, k)
- break
- }
- }
- c.randomnessCache[rand.BlockHash] = rand
-}
-
-func (c *cache) randomness(hashes coreCommon.Hashes) []*coreTypes.BlockRandomnessResult {
+func (c *cache) finalizedBlock(pos coreTypes.Position) *coreTypes.Block {
c.lock.RLock()
defer c.lock.RUnlock()
- cacheRandomnesss := make([]*coreTypes.BlockRandomnessResult, 0, len(hashes))
- for _, hash := range hashes {
- if block, exist := c.randomnessCache[hash]; exist {
- cacheRandomnesss = append(cacheRandomnesss, block)
- } else {
- block, err := c.db.GetBlock(hash)
- if err != nil {
- continue
- }
- if len(block.Finalization.Randomness) == 0 {
- continue
- }
- cacheRandomnesss = append(cacheRandomnesss, &coreTypes.BlockRandomnessResult{
- BlockHash: block.Hash,
- Position: block.Position,
- Randomness: block.Finalization.Randomness,
- })
- }
+ if block, exist := c.finalizedBlockCache[pos]; exist {
+ return block
}
- return cacheRandomnesss
+ // TODO(jimmy): get finalized block from db
+ return nil
}
diff --git a/dex/cache_test.go b/dex/cache_test.go
index 536e015f0..22b1b9b26 100644
--- a/dex/cache_test.go
+++ b/dex/cache_test.go
@@ -19,6 +19,7 @@ package dex
import (
"math/rand"
+ "reflect"
"sort"
"strings"
"testing"
@@ -205,91 +206,125 @@ func TestCacheBlock(t *testing.T) {
}
}
-func randomBytes() []byte {
- bytes := make([]byte, 32)
- for i := range bytes {
- bytes[i] = byte(rand.Int() % 256)
- }
- return bytes
-}
-
-func TestCacheRandomness(t *testing.T) {
+func TestCacheFinalizedBlock(t *testing.T) {
db, err := coreDb.NewMemBackedDB()
if err != nil {
panic(err)
}
cache := newCache(3, db)
- rand1 := &coreTypes.BlockRandomnessResult{
- BlockHash: coreCommon.NewRandomHash(),
- Randomness: randomBytes(),
+ block1 := &coreTypes.Block{
+ Position: coreTypes.Position{
+ Height: 1,
+ },
+ Hash: coreCommon.NewRandomHash(),
+ Finalization: coreTypes.FinalizationResult{
+ Randomness: randomBytes(),
+ },
}
- rand2 := &coreTypes.BlockRandomnessResult{
- BlockHash: coreCommon.NewRandomHash(),
- Randomness: randomBytes(),
+ block2 := &coreTypes.Block{
+ Position: coreTypes.Position{
+ Height: 2,
+ },
+ Hash: coreCommon.NewRandomHash(),
+ Finalization: coreTypes.FinalizationResult{
+ Randomness: randomBytes(),
+ },
}
- rand3 := &coreTypes.BlockRandomnessResult{
- BlockHash: coreCommon.NewRandomHash(),
- Randomness: randomBytes(),
+ block3 := &coreTypes.Block{
+ Position: coreTypes.Position{
+ Height: 3,
+ },
+ Hash: coreCommon.NewRandomHash(),
+ Finalization: coreTypes.FinalizationResult{
+ Randomness: randomBytes(),
+ },
}
- rand4 := &coreTypes.BlockRandomnessResult{
- BlockHash: coreCommon.NewRandomHash(),
- Randomness: randomBytes(),
+ block4 := &coreTypes.Block{
+ Position: coreTypes.Position{
+ Height: 4,
+ },
+ Hash: coreCommon.NewRandomHash(),
+ Finalization: coreTypes.FinalizationResult{
+ Randomness: randomBytes(),
+ },
}
- cache.addRandomness(rand1)
- cache.addRandomness(rand2)
- cache.addRandomness(rand3)
+ cache.addFinalizedBlock(block1)
+ cache.addFinalizedBlock(block2)
+ cache.addFinalizedBlock(block3)
- hashes := coreCommon.Hashes{rand1.BlockHash, rand2.BlockHash, rand3.BlockHash, rand4.BlockHash}
- hashMap := map[coreCommon.Hash]struct{}{
- rand1.BlockHash: {},
- rand2.BlockHash: {},
- rand3.BlockHash: {},
- }
- rands := cache.randomness(hashes)
- if len(rands) != 3 {
- t.Errorf("fail to get rands: have %d, want 3", len(rands))
- }
- for _, rand := range rands {
- if _, exist := hashMap[rand.BlockHash]; !exist {
- t.Errorf("get wrong rand: have %s, want %v", rand, hashMap)
+ hashes := coreCommon.Hashes{block1.Hash, block2.Hash, block3.Hash, block4.Hash}
+ for i := 0; i < 3; i++ {
+ pos := coreTypes.Position{
+ Height: uint64(i + 1),
+ }
+ block := cache.finalizedBlock(pos)
+ if block.Hash != hashes[i] {
+ t.Errorf("failed to get block: have %s, want %s", block, hashes[i])
}
}
- cache.addRandomness(rand4)
-
- rands = cache.randomness(hashes)
- hashMap[rand4.BlockHash] = struct{}{}
- if len(rands) != 3 {
- t.Errorf("fail to get rands: have %d, want 3", len(rands))
- }
- hasNewRandomness := false
- for _, rand := range rands {
- if _, exist := hashMap[rand.BlockHash]; !exist {
- t.Errorf("get wrong rand: have %s, want %v", rand, hashMap)
- }
- if rand.BlockHash.Equal(rand4.BlockHash) {
- hasNewRandomness = true
- }
+ cache.addFinalizedBlock(block4)
+ block := cache.finalizedBlock(block4.Position)
+ if block == nil {
+ t.Errorf("should have block %s in cache", block4)
}
- if !hasNewRandomness {
- t.Errorf("expect rand %s in cache, have %v", rand4, rands)
+ if block.Hash != block4.Hash {
+ t.Errorf("failed to get block: have %s, want %s", block, block4)
}
- block := &coreTypes.Block{
- Hash: coreCommon.NewRandomHash(),
- Finalization: coreTypes.FinalizationResult{
- Randomness: randomBytes(),
+ block5 := &coreTypes.Block{
+ Position: coreTypes.Position{
+ Height: 5,
},
+ Hash: coreCommon.NewRandomHash(),
}
- if err := db.PutBlock(*block); err != nil {
- panic(err)
+ cache.addBlock(block5)
+ if block := cache.finalizedBlock(block5.Position); block != nil {
+ t.Errorf("unexpected block %s in cache", block)
+ }
+ blocks := cache.blocks(coreCommon.Hashes{block5.Hash})
+ if len(blocks) != 1 {
+ t.Errorf("fail to get blocks: have %d, want 1", len(blocks))
+ } else {
+ if !blocks[0].Hash.Equal(block5.Hash) {
+ t.Errorf("get wrong block: have %s, want %s", blocks[0], block5)
+ }
+ }
+ finalizedBlock5 := block5.Clone()
+ finalizedBlock5.Finalization.Randomness = randomBytes()
+ cache.addFinalizedBlock(finalizedBlock5)
+ block = cache.finalizedBlock(block5.Position)
+ if block == nil {
+ t.Errorf("expecting block %s in cache", finalizedBlock5)
}
- rands = cache.randomness(coreCommon.Hashes{block.Hash})
- if len(rands) != 1 {
- t.Errorf("fail to get rands: have %d, want 1", len(rands))
+ if !reflect.DeepEqual(
+ block.Finalization.Randomness,
+ finalizedBlock5.Finalization.Randomness) {
+ t.Errorf("mismatch randomness, have %s, want %s",
+ block.Finalization.Randomness,
+ finalizedBlock5.Finalization.Randomness)
+ }
+ blocks = cache.blocks(coreCommon.Hashes{block5.Hash})
+ if len(blocks) != 1 {
+ t.Errorf("fail to get blocks: have %d, want 1", len(blocks))
} else {
- if !rands[0].BlockHash.Equal(block.Hash) {
- t.Errorf("get wrong rand: have %s, want %s", rands[0], block)
+ if !blocks[0].Hash.Equal(finalizedBlock5.Hash) {
+ t.Errorf("get wrong block: have %s, want %s", blocks[0], block5)
+ }
+ if !reflect.DeepEqual(
+ blocks[0].Finalization.Randomness,
+ finalizedBlock5.Finalization.Randomness) {
+ t.Errorf("mismatch randomness, have %s, want %s",
+ blocks[0].Finalization.Randomness,
+ finalizedBlock5.Finalization.Randomness)
}
}
}
+
+func randomBytes() []byte {
+ bytes := make([]byte, 32)
+ for i := range bytes {
+ bytes[i] = byte(rand.Int() % 256)
+ }
+ return bytes
+}
diff --git a/dex/handler.go b/dex/handler.go
index 20df41709..8971ad500 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -87,6 +87,9 @@ const (
maxPullVotePeers = 1
pullVoteRateLimit = 10 * time.Second
+
+ maxAgreementResultBroadcast = 3
+ maxFinalizedBlockBroadcast = 3
)
// errIncompatibleConfig is returned if the requested protocols and configs are
@@ -888,19 +891,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
p.MarkAgreement(rlpHash(agreement))
pm.receiveCh <- &agreement
- case msg.Code == RandomnessMsg:
- if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
- break
- }
- // Broadcast this to all peer
- var randomnesses []*coreTypes.BlockRandomnessResult
- if err := msg.Decode(&randomnesses); err != nil {
- return errResp(ErrDecode, "msg %v: %v", msg, err)
- }
- for _, randomness := range randomnesses {
- p.MarkRandomness(rlpHash(randomness))
- pm.receiveCh <- randomness
- }
case msg.Code == DKGPrivateShareMsg:
if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
@@ -949,20 +939,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&pos); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
+ if block := pm.cache.finalizedBlock(pos); block != nil {
+ log.Debug("Push finalized block as votes", "block", block)
+ return p.SendCoreBlocks([]*coreTypes.Block{block})
+ }
votes := pm.cache.votes(pos)
log.Debug("Push votes", "votes", votes)
return p.SendVotes(votes)
- case msg.Code == PullRandomnessMsg:
- if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
- break
- }
- var hashes coreCommon.Hashes
- if err := msg.Decode(&hashes); err != nil {
- return errResp(ErrDecode, "msg %v: %v", msg, err)
- }
- randomnesses := pm.cache.randomness(hashes)
- log.Debug("Push randomness", "randomness", randomnesses)
- return p.SendRandomnesses(randomnesses)
case msg.Code == GetGovStateMsg:
var hash common.Hash
if err := msg.Decode(&hash); err != nil {
@@ -1098,6 +1081,31 @@ func (pm *ProtocolManager) BroadcastRecords(records []*enr.Record) {
}
}
+// BroadcastFinalizedBlock broadcasts the finalized core block to some of its peers.
+func (pm *ProtocolManager) BroadcastFinalizedBlock(block *coreTypes.Block) {
+ if len(block.Finalization.Randomness) == 0 {
+ log.Warn("Ignore broadcast finalized block without randomness", "block", block)
+ return
+ }
+ pm.cache.addFinalizedBlock(block)
+
+ // send to notary nodes first (direct)
+ label := peerLabel{
+ set: notaryset,
+ round: block.Position.Round,
+ }
+ peers := pm.peers.PeersWithLabel(label)
+ count := maxFinalizedBlockBroadcast
+ for _, peer := range peers {
+ if count <= 0 {
+ break
+ } else {
+ count--
+ peer.AsyncSendCoreBlocks([]*coreTypes.Block{block})
+ }
+ }
+}
+
// BroadcastCoreBlock broadcasts the core block to all its peers.
func (pm *ProtocolManager) BroadcastCoreBlock(block *coreTypes.Block) {
pm.cache.addBlock(block)
@@ -1122,39 +1130,32 @@ func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) {
func (pm *ProtocolManager) BroadcastAgreementResult(
agreement *coreTypes.AgreementResult) {
- // send to dkg nodes first (direct)
- label := peerLabel{
- set: dkgset,
- round: agreement.Position.Round,
- }
- for _, peer := range pm.peers.PeersWithLabel(label) {
- if !peer.knownAgreements.Contains(rlpHash(agreement)) {
- peer.AsyncSendAgreement(agreement)
- }
+ block := pm.cache.blocks(coreCommon.Hashes{agreement.BlockHash})
+ if len(block) != 0 {
+ block[0].Finalization.Height = agreement.FinalizationHeight
+ block[0].Finalization.Randomness = agreement.Randomness
+ pm.cache.addFinalizedBlock(block[0])
}
- for _, peer := range pm.peers.PeersWithoutAgreement(rlpHash(agreement)) {
- peer.AsyncSendAgreement(agreement)
- }
-}
-
-func (pm *ProtocolManager) BroadcastRandomnessResult(
- randomness *coreTypes.BlockRandomnessResult) {
- pm.cache.addRandomness(randomness)
// send to notary nodes first (direct)
label := peerLabel{
set: notaryset,
- round: randomness.Position.Round,
+ round: agreement.Position.Round,
}
- randomnesses := []*coreTypes.BlockRandomnessResult{randomness}
- for _, peer := range pm.peers.PeersWithLabel(label) {
- if !peer.knownRandomnesses.Contains(rlpHash(randomness)) {
- peer.AsyncSendRandomnesses(randomnesses)
+ peers := pm.peers.PeersWithLabel(label)
+ count := maxAgreementResultBroadcast
+ agrHash := rlpHash(agreement)
+ for _, peer := range peers {
+ if count <= 0 {
+ peer.MarkAgreement(agrHash)
+ } else if !peer.knownAgreements.Contains(agrHash) {
+ count--
+ peer.AsyncSendAgreement(agreement)
}
}
- for _, peer := range pm.peers.PeersWithoutRandomness(rlpHash(randomness)) {
- peer.AsyncSendRandomnesses(randomnesses)
+ for _, peer := range pm.peers.PeersWithoutAgreement(rlpHash(agreement)) {
+ peer.AsyncSendAgreement(agreement)
}
}
@@ -1177,7 +1178,7 @@ func (pm *ProtocolManager) SendDKGPrivateShare(
func (pm *ProtocolManager) BroadcastDKGPrivateShare(
privateShare *dkgTypes.PrivateShare) {
- label := peerLabel{set: dkgset, round: privateShare.Round}
+ label := peerLabel{set: notaryset, round: privateShare.Round}
for _, peer := range pm.peers.PeersWithLabel(label) {
if !peer.knownDKGPrivateShares.Contains(rlpHash(privateShare)) {
peer.AsyncSendDKGPrivateShare(privateShare)
@@ -1187,7 +1188,7 @@ func (pm *ProtocolManager) BroadcastDKGPrivateShare(
func (pm *ProtocolManager) BroadcastDKGPartialSignature(
psig *dkgTypes.PartialSignature) {
- label := peerLabel{set: dkgset, round: psig.Round}
+ label := peerLabel{set: notaryset, round: psig.Round}
for _, peer := range pm.peers.PeersWithLabel(label) {
peer.AsyncSendDKGPartialSignature(psig)
}
@@ -1218,17 +1219,6 @@ func (pm *ProtocolManager) BroadcastPullVotes(
}
}
-func (pm *ProtocolManager) BroadcastPullRandomness(
- hashes coreCommon.Hashes) {
- // TODO(jimmy-dexon): pull from dkg set only.
- for idx, peer := range pm.peers.Peers() {
- if idx >= maxPullPeers {
- break
- }
- peer.AsyncSendPullRandomness(hashes)
- }
-}
-
func (pm *ProtocolManager) txBroadcastLoop() {
queueSizeMax := common.StorageSize(100 * 1024) // 100 KB
currentSize := common.StorageSize(0)
@@ -1321,9 +1311,15 @@ func (pm *ProtocolManager) peerSetLoop() {
for i := round; i <= dexCore.DKGDelayRound; i++ {
pm.peers.BuildConnection(i)
}
+ round = dexCore.DKGDelayRound
} else {
pm.peers.BuildConnection(round)
}
+ CRSRound := pm.gov.CRSRound()
+ if CRSRound > round {
+ pm.peers.BuildConnection(CRSRound)
+ round = CRSRound
+ }
for {
select {
@@ -1340,7 +1336,7 @@ func (pm *ProtocolManager) peerSetLoop() {
}
log.Debug("ProtocolManager: new round", "round", newRound)
- if newRound == round {
+ if newRound <= round {
break
}
diff --git a/dex/network.go b/dex/network.go
index f36850e59..0e2d338c1 100644
--- a/dex/network.go
+++ b/dex/network.go
@@ -45,14 +45,6 @@ func (n *DexconNetwork) PullVotes(pos types.Position) {
n.pm.BroadcastPullVotes(pos)
}
-// PullRandomness tries to pull randomness result from the DEXON network.
-func (n *DexconNetwork) PullRandomness(hashes coreCommon.Hashes) {
- if len(hashes) == 0 {
- return
- }
- n.pm.BroadcastPullRandomness(hashes)
-}
-
// BroadcastVote broadcasts vote to all nodes in DEXON network.
func (n *DexconNetwork) BroadcastVote(vote *types.Vote) {
n.pm.BroadcastVote(vote)
@@ -60,7 +52,11 @@ func (n *DexconNetwork) BroadcastVote(vote *types.Vote) {
// BroadcastBlock broadcasts block to all nodes in DEXON network.
func (n *DexconNetwork) BroadcastBlock(block *types.Block) {
- n.pm.BroadcastCoreBlock(block)
+ if block.IsFinalized() {
+ n.pm.BroadcastFinalizedBlock(block)
+ } else {
+ n.pm.BroadcastCoreBlock(block)
+ }
}
// SendDKGPrivateShare sends PrivateShare to a DKG participant.
@@ -83,13 +79,8 @@ func (n *DexconNetwork) BroadcastDKGPartialSignature(
}
// BroadcastAgreementResult broadcasts rand request to DKG set.
-func (n *DexconNetwork) BroadcastAgreementResult(randRequest *types.AgreementResult) {
- n.pm.BroadcastAgreementResult(randRequest)
-}
-
-// BroadcastRandomnessResult broadcasts rand request to Notary set.
-func (n *DexconNetwork) BroadcastRandomnessResult(randResult *types.BlockRandomnessResult) {
- n.pm.BroadcastRandomnessResult(randResult)
+func (n *DexconNetwork) BroadcastAgreementResult(result *types.AgreementResult) {
+ n.pm.BroadcastAgreementResult(result)
}
// ReceiveChan returns a channel to receive messages from DEXON network.
diff --git a/dex/peer.go b/dex/peer.go
index 0fa1ac61d..0d23e630f 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -67,7 +67,6 @@ const (
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
maxKnownAgreements = 10240
- maxKnownRandomnesses = 10240
maxKnownDKGPrivateShares = 1024 // this related to DKG Size
// maxQueuedTxs is the maximum number of transaction lists to queue up before
@@ -90,7 +89,6 @@ const (
maxQueuedCoreBlocks = 16
maxQueuedVotes = 128
maxQueuedAgreements = 16
- maxQueuedRandomnesses = 16
maxQueuedDKGPrivateShare = 16
maxQueuedDKGParitialSignature = 16
maxQueuedPullBlocks = 128
@@ -114,8 +112,7 @@ type PeerInfo struct {
type setType uint32
const (
- dkgset = iota
- notaryset
+ notaryset = iota
)
type peerLabel struct {
@@ -126,8 +123,6 @@ type peerLabel struct {
func (p peerLabel) String() string {
var t string
switch p.set {
- case dkgset:
- t = fmt.Sprintf("DKGSet round: %d", p.round)
case notaryset:
t = fmt.Sprintf("NotarySet round: %d", p.round)
}
@@ -150,7 +145,6 @@ type peer struct {
knownRecords mapset.Set // Set of node record known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
knownAgreements mapset.Set
- knownRandomnesses mapset.Set
knownDKGPrivateShares mapset.Set
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
queuedRecords chan []*enr.Record // Queue of node records to broadcast to the peer
@@ -159,7 +153,6 @@ type peer struct {
queuedCoreBlocks chan []*coreTypes.Block
queuedVotes chan []*coreTypes.Vote
queuedAgreements chan *coreTypes.AgreementResult
- queuedRandomnesses chan []*coreTypes.BlockRandomnessResult
queuedDKGPrivateShares chan *dkgTypes.PrivateShare
queuedDKGPartialSignatures chan *dkgTypes.PartialSignature
queuedPullBlocks chan coreCommon.Hashes
@@ -178,7 +171,6 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
knownRecords: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
knownAgreements: mapset.NewSet(),
- knownRandomnesses: mapset.NewSet(),
knownDKGPrivateShares: mapset.NewSet(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
queuedRecords: make(chan []*enr.Record, maxQueuedRecords),
@@ -187,7 +179,6 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
queuedCoreBlocks: make(chan []*coreTypes.Block, maxQueuedCoreBlocks),
queuedVotes: make(chan []*coreTypes.Vote, maxQueuedVotes),
queuedAgreements: make(chan *coreTypes.AgreementResult, maxQueuedAgreements),
- queuedRandomnesses: make(chan []*coreTypes.BlockRandomnessResult, maxQueuedRandomnesses),
queuedDKGPrivateShares: make(chan *dkgTypes.PrivateShare, maxQueuedDKGPrivateShare),
queuedDKGPartialSignatures: make(chan *dkgTypes.PartialSignature, maxQueuedDKGParitialSignature),
queuedPullBlocks: make(chan coreCommon.Hashes, maxQueuedPullBlocks),
@@ -252,11 +243,6 @@ func (p *peer) broadcast() {
return
}
p.Log().Trace("Broadcast agreement")
- case randomnesses := <-p.queuedRandomnesses:
- if err := p.SendRandomnesses(randomnesses); err != nil {
- return
- }
- p.Log().Trace("Broadcast randomnesses", "count", len(randomnesses))
case privateShare := <-p.queuedDKGPrivateShares:
if err := p.SendDKGPrivateShare(privateShare); err != nil {
return
@@ -277,11 +263,6 @@ func (p *peer) broadcast() {
return
}
p.Log().Trace("Pulling Votes", "position", pos)
- case hashes := <-p.queuedPullRandomness:
- if err := p.SendPullRandomness(hashes); err != nil {
- return
- }
- p.Log().Trace("Pulling Randomnesses", "hashes", hashes)
case <-p.term:
return
case <-time.After(100 * time.Millisecond):
@@ -366,13 +347,6 @@ func (p *peer) MarkAgreement(hash common.Hash) {
p.knownAgreements.Add(hash)
}
-func (p *peer) MarkRandomness(hash common.Hash) {
- for p.knownRandomnesses.Cardinality() >= maxKnownRandomnesses {
- p.knownRandomnesses.Pop()
- }
- p.knownRandomnesses.Add(hash)
-}
-
func (p *peer) MarkDKGPrivateShares(hash common.Hash) {
for p.knownDKGPrivateShares.Cardinality() >= maxKnownDKGPrivateShares {
p.knownDKGPrivateShares.Pop()
@@ -513,24 +487,6 @@ func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) {
}
}
-func (p *peer) SendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult) error {
- for _, randomness := range randomnesses {
- p.knownRandomnesses.Add(rlpHash(randomness))
- }
- return p.logSend(p2p.Send(p.rw, RandomnessMsg, randomnesses), RandomnessMsg)
-}
-
-func (p *peer) AsyncSendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult) {
- select {
- case p.queuedRandomnesses <- randomnesses:
- for _, randomness := range randomnesses {
- p.knownRandomnesses.Add(rlpHash(randomness))
- }
- default:
- p.Log().Debug("Dropping randomness result")
- }
-}
-
func (p *peer) SendDKGPrivateShare(privateShare *dkgTypes.PrivateShare) error {
p.knownDKGPrivateShares.Add(rlpHash(privateShare))
return p.logSend(p2p.Send(p.rw, DKGPrivateShareMsg, privateShare), DKGPrivateShareMsg)
@@ -581,18 +537,6 @@ func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) {
}
}
-func (p *peer) SendPullRandomness(hashes coreCommon.Hashes) error {
- return p.logSend(p2p.Send(p.rw, PullRandomnessMsg, hashes), PullRandomnessMsg)
-}
-
-func (p *peer) AsyncSendPullRandomness(hashes coreCommon.Hashes) {
- select {
- case p.queuedPullRandomness <- hashes:
- default:
- p.Log().Debug("Dropping Pull Randomness")
- }
-}
-
// SendBlockHeaders sends a batch of block headers to the remote peer.
func (p *peer) SendBlockHeaders(flag uint8, headers []*types.HeaderWithGovState) error {
return p.logSend(p2p.Send(p.rw, BlockHeadersMsg, headersData{Flag: flag, Headers: headers}), BlockHeadersMsg)
@@ -871,38 +815,43 @@ func (ps *peerSet) PeersWithLabel(label peerLabel) []*peer {
return list
}
-// PeersWithoutNodeRecord retrieves a list of peers that do not have a
-// given record in their set of known hashes.
-func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer {
+func (ps *peerSet) PeersWithoutLabel(label peerLabel) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
- list := make([]*peer, 0, len(ps.peers))
- for _, p := range ps.peers {
- if !p.knownRecords.Contains(hash) {
+ length := len(ps.peers) - len(ps.label2Nodes[label])
+ if length <= 0 {
+ return []*peer{}
+ }
+ list := make([]*peer, 0, len(ps.peers)-len(ps.label2Nodes[label]))
+ peersWithLabel := ps.label2Nodes[label]
+ for id, p := range ps.peers {
+ if _, exist := peersWithLabel[id]; !exist {
list = append(list, p)
}
}
return list
}
-func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer {
+// PeersWithoutNodeRecord retrieves a list of peers that do not have a
+// given record in their set of known hashes.
+func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
- if !p.knownAgreements.Contains(hash) {
+ if !p.knownRecords.Contains(hash) {
list = append(list, p)
}
}
return list
}
-func (ps *peerSet) PeersWithoutRandomness(hash common.Hash) []*peer {
+func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
- if !p.knownRandomnesses.Contains(hash) {
+ if !p.knownAgreements.Contains(hash) {
list = append(list, p)
}
}
@@ -956,23 +905,6 @@ func (ps *peerSet) BuildConnection(round uint64) {
log.Info("Build connection", "round", round)
- dkgLabel := peerLabel{set: dkgset, round: round}
- if _, ok := ps.label2Nodes[dkgLabel]; !ok {
- dkgPKs, err := ps.gov.DKGSet(round)
- if err != nil {
- log.Error("get DKG set fail", "round", round, "err", err)
- }
-
- nodes := ps.pksToNodes(dkgPKs)
- ps.label2Nodes[dkgLabel] = nodes
-
- if _, exists := nodes[ps.srvr.Self().ID().String()]; exists {
- ps.buildDirectConn(dkgLabel)
- } else {
- ps.buildGroupConn(dkgLabel)
- }
- }
-
notaryLabel := peerLabel{set: notaryset, round: round}
if _, ok := ps.label2Nodes[notaryLabel]; !ok {
notaryPKs, err := ps.gov.NotarySet(round)
@@ -990,6 +922,7 @@ func (ps *peerSet) BuildConnection(round uint64) {
ps.buildGroupConn(notaryLabel)
}
}
+
}
func (ps *peerSet) ForgetConnection(round uint64) {
diff --git a/dex/peer_test.go b/dex/peer_test.go
index 76a28b1ef..d6bc7e24c 100644
--- a/dex/peer_test.go
+++ b/dex/peer_test.go
@@ -62,31 +62,16 @@ func TestPeerSetBuildAndForgetConn(t *testing.T) {
nodes[1].ID().String(): nodes[1],
nodes[2].ID().String(): nodes[2],
},
- {set: dkgset, round: 10}: {
- self.ID().String(): self,
- nodes[1].ID().String(): nodes[1],
- nodes[3].ID().String(): nodes[3],
- },
{set: notaryset, round: 11}: {
self.ID().String(): self,
nodes[1].ID().String(): nodes[1],
nodes[5].ID().String(): nodes[5],
},
- {set: dkgset, round: 11}: {
- nodes[1].ID().String(): nodes[1],
- nodes[2].ID().String(): nodes[2],
- nodes[5].ID().String(): nodes[5],
- },
{set: notaryset, round: 12}: {
self.ID().String(): self,
nodes[3].ID().String(): nodes[3],
nodes[5].ID().String(): nodes[5],
},
- {set: dkgset, round: 12}: {
- self.ID().String(): self,
- nodes[3].ID().String(): nodes[3],
- nodes[5].ID().String(): nodes[5],
- },
}
if !reflect.DeepEqual(ps.label2Nodes, expectedlabel2Nodes) {
@@ -97,28 +82,12 @@ func TestPeerSetBuildAndForgetConn(t *testing.T) {
{set: notaryset, round: 10}: {},
{set: notaryset, round: 11}: {},
{set: notaryset, round: 12}: {},
- {set: dkgset, round: 10}: {},
- {set: dkgset, round: 12}: {},
}
if !reflect.DeepEqual(ps.directConn, expectedDirectConn) {
t.Errorf("direct conn not match")
}
- expectedGroupConn := []peerLabel{
- {set: dkgset, round: 11},
- }
-
- if len(ps.groupConnPeers) != len(expectedGroupConn) {
- t.Errorf("group conn peers not match")
- }
-
- for _, l := range expectedGroupConn {
- if len(ps.groupConnPeers[l]) == 0 {
- t.Errorf("group conn peers is 0")
- }
- }
-
expectedAllDirect := make(map[string]map[peerLabel]struct{})
for l := range ps.directConn {
@@ -152,11 +121,6 @@ func TestPeerSetBuildAndForgetConn(t *testing.T) {
nodes[3].ID().String(): nodes[3],
nodes[5].ID().String(): nodes[5],
},
- {set: dkgset, round: 12}: {
- self.ID().String(): self,
- nodes[3].ID().String(): nodes[3],
- nodes[5].ID().String(): nodes[5],
- },
}
if !reflect.DeepEqual(ps.label2Nodes, expectedlabel2Nodes) {
@@ -165,14 +129,13 @@ func TestPeerSetBuildAndForgetConn(t *testing.T) {
expectedDirectConn = map[peerLabel]struct{}{
{set: notaryset, round: 12}: {},
- {set: dkgset, round: 12}: {},
}
if !reflect.DeepEqual(ps.directConn, expectedDirectConn) {
t.Error("direct conn not match")
}
- expectedGroupConn = []peerLabel{}
+ expectedGroupConn := []peerLabel{}
if len(ps.groupConnPeers) != len(expectedGroupConn) {
t.Errorf("group conn peers not match")
diff --git a/dex/protocol.go b/dex/protocol.go
index 287bf0883..2bcb57506 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -92,7 +92,6 @@ const (
DKGPartialSignatureMsg = 0x25
PullBlocksMsg = 0x26
PullVotesMsg = 0x27
- PullRandomnessMsg = 0x28
GetGovStateMsg = 0x29
GovStateMsg = 0x2a
@@ -157,8 +156,6 @@ type governance interface {
CRSRound() uint64
NotarySet(uint64) (map[string]struct{}, error)
-
- DKGSet(uint64) (map[string]struct{}, error)
}
type dexconApp interface {
diff --git a/dex/protocol_test.go b/dex/protocol_test.go
index 517df97d9..7e0e1a9a4 100644
--- a/dex/protocol_test.go
+++ b/dex/protocol_test.go
@@ -438,6 +438,10 @@ func TestRecvVotes(t *testing.T) {
Height: 13,
},
},
+ PartialSignature: dkg.PartialSignature{
+ Type: "456",
+ Signature: []byte("psig"),
+ },
Signature: coreCrypto.Signature{
Type: "123",
Signature: []byte("sig"),
@@ -453,7 +457,7 @@ func TestRecvVotes(t *testing.T) {
select {
case msg := <-ch:
rvote := msg.(*coreTypes.Vote)
- if rlpHash(rvote) != rlpHash(vote) {
+ if !reflect.DeepEqual(rvote, &vote) {
t.Errorf("vote mismatch")
}
case <-time.After(1 * time.Second):
@@ -474,6 +478,10 @@ func TestSendVotes(t *testing.T) {
Height: 13,
},
},
+ PartialSignature: dkg.PartialSignature{
+ Type: "456",
+ Signature: []byte("psig"),
+ },
Signature: coreCrypto.Signature{
Type: "123",
Signature: []byte("sig"),
@@ -531,10 +539,6 @@ func TestSendVotes(t *testing.T) {
label: &peerLabel{set: notaryset, round: 11},
isReceiver: false,
},
- {
- label: &peerLabel{set: dkgset, round: 10},
- isReceiver: false,
- },
}
pm.peers.label2Nodes = make(map[peerLabel]map[string]*enode.Node)
@@ -669,6 +673,10 @@ func TestRecvAgreement(t *testing.T) {
Height: 13,
},
},
+ PartialSignature: dkg.PartialSignature{
+ Type: "456",
+ Signature: []byte("psig"),
+ },
Signature: coreCrypto.Signature{
Type: "123",
Signature: []byte("sig"),
@@ -676,9 +684,10 @@ func TestRecvAgreement(t *testing.T) {
}
agreement := coreTypes.AgreementResult{
- BlockHash: coreCommon.Hash{9, 9, 9},
- Position: vote.Position,
- Votes: []coreTypes.Vote{vote},
+ BlockHash: coreCommon.Hash{9, 9, 9},
+ Position: vote.Position,
+ Votes: []coreTypes.Vote{vote},
+ Randomness: []byte{9, 4, 8, 7},
}
if err := p2p.Send(p.app, AgreementMsg, &agreement); err != nil {
@@ -714,6 +723,10 @@ func TestSendAgreement(t *testing.T) {
Height: 13,
},
},
+ PartialSignature: dkg.PartialSignature{
+ Type: "456",
+ Signature: []byte("psig"),
+ },
Signature: coreCrypto.Signature{
Type: "123",
Signature: []byte("sig"),
@@ -721,9 +734,10 @@ func TestSendAgreement(t *testing.T) {
}
agreement := coreTypes.AgreementResult{
- BlockHash: coreCommon.Hash{9, 9, 9},
- Position: vote.Position,
- Votes: []coreTypes.Vote{vote},
+ BlockHash: coreCommon.Hash{9, 9, 9},
+ Position: vote.Position,
+ Votes: []coreTypes.Vote{vote},
+ Randomness: []byte{9, 4, 8, 7},
}
waitForRegister(pm, 1)
@@ -745,75 +759,6 @@ func TestSendAgreement(t *testing.T) {
}
}
-func TestRecvRandomnesses(t *testing.T) {
- pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
- pm.SetReceiveCoreMessage(true)
-
- p, _ := newTestPeer("peer", dex64, pm, true)
- defer pm.Stop()
- defer p.close()
-
- randomness := coreTypes.BlockRandomnessResult{
- BlockHash: coreCommon.Hash{8, 8, 8},
- Position: coreTypes.Position{
- Round: 10,
- Height: 13,
- },
- Randomness: []byte{7, 7, 7, 7},
- }
-
- if err := p2p.Send(p.app, RandomnessMsg, []*coreTypes.BlockRandomnessResult{&randomness}); err != nil {
- t.Fatalf("send error: %v", err)
- }
-
- ch := pm.ReceiveChan()
- select {
- case msg := <-ch:
- r := msg.(*coreTypes.BlockRandomnessResult)
- if !reflect.DeepEqual(r, &randomness) {
- t.Errorf("randomness mismatch")
- }
- case <-time.After(1 * time.Second):
- t.Errorf("no randomness received within 1 seconds")
- }
-}
-
-func TestSendRandomnesses(t *testing.T) {
- pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
- pm.SetReceiveCoreMessage(true)
-
- p, _ := newTestPeer("peer", dex64, pm, true)
- defer pm.Stop()
- defer p.close()
-
- randomness := coreTypes.BlockRandomnessResult{
- BlockHash: coreCommon.Hash{8, 8, 8},
- Position: coreTypes.Position{
- Round: 10,
- Height: 13,
- },
- Randomness: []byte{7, 7, 7, 7},
- }
-
- waitForRegister(pm, 1)
- pm.BroadcastRandomnessResult(&randomness)
- msg, err := p.app.ReadMsg()
- if err != nil {
- t.Errorf("%v: read error: %v", p.Peer, err)
- } else if msg.Code != RandomnessMsg {
- t.Errorf("%v: got code %d, want %d", p.Peer, msg.Code, RandomnessMsg)
- }
-
- var rs []*coreTypes.BlockRandomnessResult
- if err := msg.Decode(&rs); err != nil {
- t.Errorf("%v: %v", p.Peer, err)
- }
-
- if !reflect.DeepEqual(rs, []*coreTypes.BlockRandomnessResult{&randomness}) {
- t.Errorf("randomness mismatch")
- }
-}
-
func waitForRegister(pm *ProtocolManager, num int) {
for {
if pm.peers.Len() >= num {