From 8732c918f7b5d3dd9ac45b6e00995f5d74bc8a47 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Wed, 27 Mar 2019 20:47:32 +0800 Subject: core: merge notarySet and DKGSet (#265) * vendor: sync to latest core * core: merge notarySet and dkgSet * dex: optimize network traffic for finalized block --- dex/cache.go | 88 +++++++++++++-------------- dex/cache_test.go | 165 +++++++++++++++++++++++++++++++-------------------- dex/handler.go | 120 ++++++++++++++++++------------------- dex/network.go | 23 +++---- dex/peer.go | 101 ++++++------------------------- dex/peer_test.go | 39 +----------- dex/protocol.go | 3 - dex/protocol_test.go | 105 ++++++++------------------------ 8 files changed, 248 insertions(+), 396 deletions(-) (limited to 'dex') diff --git a/dex/cache.go b/dex/cache.go index 5d4d20dd0..04030eaaf 100644 --- a/dex/cache.go +++ b/dex/cache.go @@ -44,23 +44,23 @@ func voteToKey(vote *coreTypes.Vote) voteKey { } type cache struct { - lock sync.RWMutex - blockCache map[coreCommon.Hash]*coreTypes.Block - voteCache map[coreTypes.Position]map[voteKey]*coreTypes.Vote - randomnessCache map[coreCommon.Hash]*coreTypes.BlockRandomnessResult - votePosition []coreTypes.Position - db coreDb.Database - voteSize int - size int + lock sync.RWMutex + blockCache map[coreCommon.Hash]*coreTypes.Block + finalizedBlockCache map[coreTypes.Position]*coreTypes.Block + voteCache map[coreTypes.Position]map[voteKey]*coreTypes.Vote + votePosition []coreTypes.Position + db coreDb.Database + voteSize int + size int } func newCache(size int, db coreDb.Database) *cache { return &cache{ - blockCache: make(map[coreCommon.Hash]*coreTypes.Block), - voteCache: make(map[coreTypes.Position]map[voteKey]*coreTypes.Vote), - randomnessCache: make(map[coreCommon.Hash]*coreTypes.BlockRandomnessResult), - db: db, - size: size, + blockCache: make(map[coreCommon.Hash]*coreTypes.Block), + finalizedBlockCache: make(map[coreTypes.Position]*coreTypes.Block), + voteCache: make(map[coreTypes.Position]map[voteKey]*coreTypes.Vote), + db: db, + size: size, } } @@ -110,6 +110,28 @@ func (c *cache) addBlock(block *coreTypes.Block) { c.blockCache[block.Hash] = block } +func (c *cache) addFinalizedBlock(block *coreTypes.Block) { + c.lock.Lock() + defer c.lock.Unlock() + block = block.Clone() + if len(c.blockCache) >= c.size { + // Randomly delete one entry. + for k := range c.blockCache { + delete(c.blockCache, k) + break + } + } + if len(c.finalizedBlockCache) >= c.size { + // Randomly delete one entry. + for k := range c.finalizedBlockCache { + delete(c.finalizedBlockCache, k) + break + } + } + c.blockCache[block.Hash] = block + c.finalizedBlockCache[block.Position] = block +} + func (c *cache) blocks(hashes coreCommon.Hashes) []*coreTypes.Block { c.lock.RLock() defer c.lock.RUnlock() @@ -122,48 +144,18 @@ func (c *cache) blocks(hashes coreCommon.Hashes) []*coreTypes.Block { if err != nil { continue } - // Blocks request from the cache do not need the finalization info. - block.Finalization = coreTypes.FinalizationResult{} cacheBlocks = append(cacheBlocks, &block) } } return cacheBlocks } -func (c *cache) addRandomness(rand *coreTypes.BlockRandomnessResult) { - c.lock.Lock() - defer c.lock.Unlock() - if len(c.randomnessCache) >= c.size { - // Randomly delete one entry. - for k := range c.randomnessCache { - delete(c.randomnessCache, k) - break - } - } - c.randomnessCache[rand.BlockHash] = rand -} - -func (c *cache) randomness(hashes coreCommon.Hashes) []*coreTypes.BlockRandomnessResult { +func (c *cache) finalizedBlock(pos coreTypes.Position) *coreTypes.Block { c.lock.RLock() defer c.lock.RUnlock() - cacheRandomnesss := make([]*coreTypes.BlockRandomnessResult, 0, len(hashes)) - for _, hash := range hashes { - if block, exist := c.randomnessCache[hash]; exist { - cacheRandomnesss = append(cacheRandomnesss, block) - } else { - block, err := c.db.GetBlock(hash) - if err != nil { - continue - } - if len(block.Finalization.Randomness) == 0 { - continue - } - cacheRandomnesss = append(cacheRandomnesss, &coreTypes.BlockRandomnessResult{ - BlockHash: block.Hash, - Position: block.Position, - Randomness: block.Finalization.Randomness, - }) - } + if block, exist := c.finalizedBlockCache[pos]; exist { + return block } - return cacheRandomnesss + // TODO(jimmy): get finalized block from db + return nil } diff --git a/dex/cache_test.go b/dex/cache_test.go index 536e015f0..22b1b9b26 100644 --- a/dex/cache_test.go +++ b/dex/cache_test.go @@ -19,6 +19,7 @@ package dex import ( "math/rand" + "reflect" "sort" "strings" "testing" @@ -205,91 +206,125 @@ func TestCacheBlock(t *testing.T) { } } -func randomBytes() []byte { - bytes := make([]byte, 32) - for i := range bytes { - bytes[i] = byte(rand.Int() % 256) - } - return bytes -} - -func TestCacheRandomness(t *testing.T) { +func TestCacheFinalizedBlock(t *testing.T) { db, err := coreDb.NewMemBackedDB() if err != nil { panic(err) } cache := newCache(3, db) - rand1 := &coreTypes.BlockRandomnessResult{ - BlockHash: coreCommon.NewRandomHash(), - Randomness: randomBytes(), + block1 := &coreTypes.Block{ + Position: coreTypes.Position{ + Height: 1, + }, + Hash: coreCommon.NewRandomHash(), + Finalization: coreTypes.FinalizationResult{ + Randomness: randomBytes(), + }, } - rand2 := &coreTypes.BlockRandomnessResult{ - BlockHash: coreCommon.NewRandomHash(), - Randomness: randomBytes(), + block2 := &coreTypes.Block{ + Position: coreTypes.Position{ + Height: 2, + }, + Hash: coreCommon.NewRandomHash(), + Finalization: coreTypes.FinalizationResult{ + Randomness: randomBytes(), + }, } - rand3 := &coreTypes.BlockRandomnessResult{ - BlockHash: coreCommon.NewRandomHash(), - Randomness: randomBytes(), + block3 := &coreTypes.Block{ + Position: coreTypes.Position{ + Height: 3, + }, + Hash: coreCommon.NewRandomHash(), + Finalization: coreTypes.FinalizationResult{ + Randomness: randomBytes(), + }, } - rand4 := &coreTypes.BlockRandomnessResult{ - BlockHash: coreCommon.NewRandomHash(), - Randomness: randomBytes(), + block4 := &coreTypes.Block{ + Position: coreTypes.Position{ + Height: 4, + }, + Hash: coreCommon.NewRandomHash(), + Finalization: coreTypes.FinalizationResult{ + Randomness: randomBytes(), + }, } - cache.addRandomness(rand1) - cache.addRandomness(rand2) - cache.addRandomness(rand3) + cache.addFinalizedBlock(block1) + cache.addFinalizedBlock(block2) + cache.addFinalizedBlock(block3) - hashes := coreCommon.Hashes{rand1.BlockHash, rand2.BlockHash, rand3.BlockHash, rand4.BlockHash} - hashMap := map[coreCommon.Hash]struct{}{ - rand1.BlockHash: {}, - rand2.BlockHash: {}, - rand3.BlockHash: {}, - } - rands := cache.randomness(hashes) - if len(rands) != 3 { - t.Errorf("fail to get rands: have %d, want 3", len(rands)) - } - for _, rand := range rands { - if _, exist := hashMap[rand.BlockHash]; !exist { - t.Errorf("get wrong rand: have %s, want %v", rand, hashMap) + hashes := coreCommon.Hashes{block1.Hash, block2.Hash, block3.Hash, block4.Hash} + for i := 0; i < 3; i++ { + pos := coreTypes.Position{ + Height: uint64(i + 1), + } + block := cache.finalizedBlock(pos) + if block.Hash != hashes[i] { + t.Errorf("failed to get block: have %s, want %s", block, hashes[i]) } } - cache.addRandomness(rand4) - - rands = cache.randomness(hashes) - hashMap[rand4.BlockHash] = struct{}{} - if len(rands) != 3 { - t.Errorf("fail to get rands: have %d, want 3", len(rands)) - } - hasNewRandomness := false - for _, rand := range rands { - if _, exist := hashMap[rand.BlockHash]; !exist { - t.Errorf("get wrong rand: have %s, want %v", rand, hashMap) - } - if rand.BlockHash.Equal(rand4.BlockHash) { - hasNewRandomness = true - } + cache.addFinalizedBlock(block4) + block := cache.finalizedBlock(block4.Position) + if block == nil { + t.Errorf("should have block %s in cache", block4) } - if !hasNewRandomness { - t.Errorf("expect rand %s in cache, have %v", rand4, rands) + if block.Hash != block4.Hash { + t.Errorf("failed to get block: have %s, want %s", block, block4) } - block := &coreTypes.Block{ - Hash: coreCommon.NewRandomHash(), - Finalization: coreTypes.FinalizationResult{ - Randomness: randomBytes(), + block5 := &coreTypes.Block{ + Position: coreTypes.Position{ + Height: 5, }, + Hash: coreCommon.NewRandomHash(), } - if err := db.PutBlock(*block); err != nil { - panic(err) + cache.addBlock(block5) + if block := cache.finalizedBlock(block5.Position); block != nil { + t.Errorf("unexpected block %s in cache", block) + } + blocks := cache.blocks(coreCommon.Hashes{block5.Hash}) + if len(blocks) != 1 { + t.Errorf("fail to get blocks: have %d, want 1", len(blocks)) + } else { + if !blocks[0].Hash.Equal(block5.Hash) { + t.Errorf("get wrong block: have %s, want %s", blocks[0], block5) + } + } + finalizedBlock5 := block5.Clone() + finalizedBlock5.Finalization.Randomness = randomBytes() + cache.addFinalizedBlock(finalizedBlock5) + block = cache.finalizedBlock(block5.Position) + if block == nil { + t.Errorf("expecting block %s in cache", finalizedBlock5) } - rands = cache.randomness(coreCommon.Hashes{block.Hash}) - if len(rands) != 1 { - t.Errorf("fail to get rands: have %d, want 1", len(rands)) + if !reflect.DeepEqual( + block.Finalization.Randomness, + finalizedBlock5.Finalization.Randomness) { + t.Errorf("mismatch randomness, have %s, want %s", + block.Finalization.Randomness, + finalizedBlock5.Finalization.Randomness) + } + blocks = cache.blocks(coreCommon.Hashes{block5.Hash}) + if len(blocks) != 1 { + t.Errorf("fail to get blocks: have %d, want 1", len(blocks)) } else { - if !rands[0].BlockHash.Equal(block.Hash) { - t.Errorf("get wrong rand: have %s, want %s", rands[0], block) + if !blocks[0].Hash.Equal(finalizedBlock5.Hash) { + t.Errorf("get wrong block: have %s, want %s", blocks[0], block5) + } + if !reflect.DeepEqual( + blocks[0].Finalization.Randomness, + finalizedBlock5.Finalization.Randomness) { + t.Errorf("mismatch randomness, have %s, want %s", + blocks[0].Finalization.Randomness, + finalizedBlock5.Finalization.Randomness) } } } + +func randomBytes() []byte { + bytes := make([]byte, 32) + for i := range bytes { + bytes[i] = byte(rand.Int() % 256) + } + return bytes +} diff --git a/dex/handler.go b/dex/handler.go index 20df41709..8971ad500 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -87,6 +87,9 @@ const ( maxPullVotePeers = 1 pullVoteRateLimit = 10 * time.Second + + maxAgreementResultBroadcast = 3 + maxFinalizedBlockBroadcast = 3 ) // errIncompatibleConfig is returned if the requested protocols and configs are @@ -888,19 +891,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } p.MarkAgreement(rlpHash(agreement)) pm.receiveCh <- &agreement - case msg.Code == RandomnessMsg: - if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { - break - } - // Broadcast this to all peer - var randomnesses []*coreTypes.BlockRandomnessResult - if err := msg.Decode(&randomnesses); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - for _, randomness := range randomnesses { - p.MarkRandomness(rlpHash(randomness)) - pm.receiveCh <- randomness - } case msg.Code == DKGPrivateShareMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break @@ -949,20 +939,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&pos); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } + if block := pm.cache.finalizedBlock(pos); block != nil { + log.Debug("Push finalized block as votes", "block", block) + return p.SendCoreBlocks([]*coreTypes.Block{block}) + } votes := pm.cache.votes(pos) log.Debug("Push votes", "votes", votes) return p.SendVotes(votes) - case msg.Code == PullRandomnessMsg: - if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { - break - } - var hashes coreCommon.Hashes - if err := msg.Decode(&hashes); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - randomnesses := pm.cache.randomness(hashes) - log.Debug("Push randomness", "randomness", randomnesses) - return p.SendRandomnesses(randomnesses) case msg.Code == GetGovStateMsg: var hash common.Hash if err := msg.Decode(&hash); err != nil { @@ -1098,6 +1081,31 @@ func (pm *ProtocolManager) BroadcastRecords(records []*enr.Record) { } } +// BroadcastFinalizedBlock broadcasts the finalized core block to some of its peers. +func (pm *ProtocolManager) BroadcastFinalizedBlock(block *coreTypes.Block) { + if len(block.Finalization.Randomness) == 0 { + log.Warn("Ignore broadcast finalized block without randomness", "block", block) + return + } + pm.cache.addFinalizedBlock(block) + + // send to notary nodes first (direct) + label := peerLabel{ + set: notaryset, + round: block.Position.Round, + } + peers := pm.peers.PeersWithLabel(label) + count := maxFinalizedBlockBroadcast + for _, peer := range peers { + if count <= 0 { + break + } else { + count-- + peer.AsyncSendCoreBlocks([]*coreTypes.Block{block}) + } + } +} + // BroadcastCoreBlock broadcasts the core block to all its peers. func (pm *ProtocolManager) BroadcastCoreBlock(block *coreTypes.Block) { pm.cache.addBlock(block) @@ -1122,39 +1130,32 @@ func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) { func (pm *ProtocolManager) BroadcastAgreementResult( agreement *coreTypes.AgreementResult) { - // send to dkg nodes first (direct) - label := peerLabel{ - set: dkgset, - round: agreement.Position.Round, - } - for _, peer := range pm.peers.PeersWithLabel(label) { - if !peer.knownAgreements.Contains(rlpHash(agreement)) { - peer.AsyncSendAgreement(agreement) - } + block := pm.cache.blocks(coreCommon.Hashes{agreement.BlockHash}) + if len(block) != 0 { + block[0].Finalization.Height = agreement.FinalizationHeight + block[0].Finalization.Randomness = agreement.Randomness + pm.cache.addFinalizedBlock(block[0]) } - for _, peer := range pm.peers.PeersWithoutAgreement(rlpHash(agreement)) { - peer.AsyncSendAgreement(agreement) - } -} - -func (pm *ProtocolManager) BroadcastRandomnessResult( - randomness *coreTypes.BlockRandomnessResult) { - pm.cache.addRandomness(randomness) // send to notary nodes first (direct) label := peerLabel{ set: notaryset, - round: randomness.Position.Round, + round: agreement.Position.Round, } - randomnesses := []*coreTypes.BlockRandomnessResult{randomness} - for _, peer := range pm.peers.PeersWithLabel(label) { - if !peer.knownRandomnesses.Contains(rlpHash(randomness)) { - peer.AsyncSendRandomnesses(randomnesses) + peers := pm.peers.PeersWithLabel(label) + count := maxAgreementResultBroadcast + agrHash := rlpHash(agreement) + for _, peer := range peers { + if count <= 0 { + peer.MarkAgreement(agrHash) + } else if !peer.knownAgreements.Contains(agrHash) { + count-- + peer.AsyncSendAgreement(agreement) } } - for _, peer := range pm.peers.PeersWithoutRandomness(rlpHash(randomness)) { - peer.AsyncSendRandomnesses(randomnesses) + for _, peer := range pm.peers.PeersWithoutAgreement(rlpHash(agreement)) { + peer.AsyncSendAgreement(agreement) } } @@ -1177,7 +1178,7 @@ func (pm *ProtocolManager) SendDKGPrivateShare( func (pm *ProtocolManager) BroadcastDKGPrivateShare( privateShare *dkgTypes.PrivateShare) { - label := peerLabel{set: dkgset, round: privateShare.Round} + label := peerLabel{set: notaryset, round: privateShare.Round} for _, peer := range pm.peers.PeersWithLabel(label) { if !peer.knownDKGPrivateShares.Contains(rlpHash(privateShare)) { peer.AsyncSendDKGPrivateShare(privateShare) @@ -1187,7 +1188,7 @@ func (pm *ProtocolManager) BroadcastDKGPrivateShare( func (pm *ProtocolManager) BroadcastDKGPartialSignature( psig *dkgTypes.PartialSignature) { - label := peerLabel{set: dkgset, round: psig.Round} + label := peerLabel{set: notaryset, round: psig.Round} for _, peer := range pm.peers.PeersWithLabel(label) { peer.AsyncSendDKGPartialSignature(psig) } @@ -1218,17 +1219,6 @@ func (pm *ProtocolManager) BroadcastPullVotes( } } -func (pm *ProtocolManager) BroadcastPullRandomness( - hashes coreCommon.Hashes) { - // TODO(jimmy-dexon): pull from dkg set only. - for idx, peer := range pm.peers.Peers() { - if idx >= maxPullPeers { - break - } - peer.AsyncSendPullRandomness(hashes) - } -} - func (pm *ProtocolManager) txBroadcastLoop() { queueSizeMax := common.StorageSize(100 * 1024) // 100 KB currentSize := common.StorageSize(0) @@ -1321,9 +1311,15 @@ func (pm *ProtocolManager) peerSetLoop() { for i := round; i <= dexCore.DKGDelayRound; i++ { pm.peers.BuildConnection(i) } + round = dexCore.DKGDelayRound } else { pm.peers.BuildConnection(round) } + CRSRound := pm.gov.CRSRound() + if CRSRound > round { + pm.peers.BuildConnection(CRSRound) + round = CRSRound + } for { select { @@ -1340,7 +1336,7 @@ func (pm *ProtocolManager) peerSetLoop() { } log.Debug("ProtocolManager: new round", "round", newRound) - if newRound == round { + if newRound <= round { break } diff --git a/dex/network.go b/dex/network.go index f36850e59..0e2d338c1 100644 --- a/dex/network.go +++ b/dex/network.go @@ -45,14 +45,6 @@ func (n *DexconNetwork) PullVotes(pos types.Position) { n.pm.BroadcastPullVotes(pos) } -// PullRandomness tries to pull randomness result from the DEXON network. -func (n *DexconNetwork) PullRandomness(hashes coreCommon.Hashes) { - if len(hashes) == 0 { - return - } - n.pm.BroadcastPullRandomness(hashes) -} - // BroadcastVote broadcasts vote to all nodes in DEXON network. func (n *DexconNetwork) BroadcastVote(vote *types.Vote) { n.pm.BroadcastVote(vote) @@ -60,7 +52,11 @@ func (n *DexconNetwork) BroadcastVote(vote *types.Vote) { // BroadcastBlock broadcasts block to all nodes in DEXON network. func (n *DexconNetwork) BroadcastBlock(block *types.Block) { - n.pm.BroadcastCoreBlock(block) + if block.IsFinalized() { + n.pm.BroadcastFinalizedBlock(block) + } else { + n.pm.BroadcastCoreBlock(block) + } } // SendDKGPrivateShare sends PrivateShare to a DKG participant. @@ -83,13 +79,8 @@ func (n *DexconNetwork) BroadcastDKGPartialSignature( } // BroadcastAgreementResult broadcasts rand request to DKG set. -func (n *DexconNetwork) BroadcastAgreementResult(randRequest *types.AgreementResult) { - n.pm.BroadcastAgreementResult(randRequest) -} - -// BroadcastRandomnessResult broadcasts rand request to Notary set. -func (n *DexconNetwork) BroadcastRandomnessResult(randResult *types.BlockRandomnessResult) { - n.pm.BroadcastRandomnessResult(randResult) +func (n *DexconNetwork) BroadcastAgreementResult(result *types.AgreementResult) { + n.pm.BroadcastAgreementResult(result) } // ReceiveChan returns a channel to receive messages from DEXON network. diff --git a/dex/peer.go b/dex/peer.go index 0fa1ac61d..0d23e630f 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -67,7 +67,6 @@ const ( maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) maxKnownAgreements = 10240 - maxKnownRandomnesses = 10240 maxKnownDKGPrivateShares = 1024 // this related to DKG Size // maxQueuedTxs is the maximum number of transaction lists to queue up before @@ -90,7 +89,6 @@ const ( maxQueuedCoreBlocks = 16 maxQueuedVotes = 128 maxQueuedAgreements = 16 - maxQueuedRandomnesses = 16 maxQueuedDKGPrivateShare = 16 maxQueuedDKGParitialSignature = 16 maxQueuedPullBlocks = 128 @@ -114,8 +112,7 @@ type PeerInfo struct { type setType uint32 const ( - dkgset = iota - notaryset + notaryset = iota ) type peerLabel struct { @@ -126,8 +123,6 @@ type peerLabel struct { func (p peerLabel) String() string { var t string switch p.set { - case dkgset: - t = fmt.Sprintf("DKGSet round: %d", p.round) case notaryset: t = fmt.Sprintf("NotarySet round: %d", p.round) } @@ -150,7 +145,6 @@ type peer struct { knownRecords mapset.Set // Set of node record known to be known by this peer knownBlocks mapset.Set // Set of block hashes known to be known by this peer knownAgreements mapset.Set - knownRandomnesses mapset.Set knownDKGPrivateShares mapset.Set queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer queuedRecords chan []*enr.Record // Queue of node records to broadcast to the peer @@ -159,7 +153,6 @@ type peer struct { queuedCoreBlocks chan []*coreTypes.Block queuedVotes chan []*coreTypes.Vote queuedAgreements chan *coreTypes.AgreementResult - queuedRandomnesses chan []*coreTypes.BlockRandomnessResult queuedDKGPrivateShares chan *dkgTypes.PrivateShare queuedDKGPartialSignatures chan *dkgTypes.PartialSignature queuedPullBlocks chan coreCommon.Hashes @@ -178,7 +171,6 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { knownRecords: mapset.NewSet(), knownBlocks: mapset.NewSet(), knownAgreements: mapset.NewSet(), - knownRandomnesses: mapset.NewSet(), knownDKGPrivateShares: mapset.NewSet(), queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), queuedRecords: make(chan []*enr.Record, maxQueuedRecords), @@ -187,7 +179,6 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { queuedCoreBlocks: make(chan []*coreTypes.Block, maxQueuedCoreBlocks), queuedVotes: make(chan []*coreTypes.Vote, maxQueuedVotes), queuedAgreements: make(chan *coreTypes.AgreementResult, maxQueuedAgreements), - queuedRandomnesses: make(chan []*coreTypes.BlockRandomnessResult, maxQueuedRandomnesses), queuedDKGPrivateShares: make(chan *dkgTypes.PrivateShare, maxQueuedDKGPrivateShare), queuedDKGPartialSignatures: make(chan *dkgTypes.PartialSignature, maxQueuedDKGParitialSignature), queuedPullBlocks: make(chan coreCommon.Hashes, maxQueuedPullBlocks), @@ -252,11 +243,6 @@ func (p *peer) broadcast() { return } p.Log().Trace("Broadcast agreement") - case randomnesses := <-p.queuedRandomnesses: - if err := p.SendRandomnesses(randomnesses); err != nil { - return - } - p.Log().Trace("Broadcast randomnesses", "count", len(randomnesses)) case privateShare := <-p.queuedDKGPrivateShares: if err := p.SendDKGPrivateShare(privateShare); err != nil { return @@ -277,11 +263,6 @@ func (p *peer) broadcast() { return } p.Log().Trace("Pulling Votes", "position", pos) - case hashes := <-p.queuedPullRandomness: - if err := p.SendPullRandomness(hashes); err != nil { - return - } - p.Log().Trace("Pulling Randomnesses", "hashes", hashes) case <-p.term: return case <-time.After(100 * time.Millisecond): @@ -366,13 +347,6 @@ func (p *peer) MarkAgreement(hash common.Hash) { p.knownAgreements.Add(hash) } -func (p *peer) MarkRandomness(hash common.Hash) { - for p.knownRandomnesses.Cardinality() >= maxKnownRandomnesses { - p.knownRandomnesses.Pop() - } - p.knownRandomnesses.Add(hash) -} - func (p *peer) MarkDKGPrivateShares(hash common.Hash) { for p.knownDKGPrivateShares.Cardinality() >= maxKnownDKGPrivateShares { p.knownDKGPrivateShares.Pop() @@ -513,24 +487,6 @@ func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) { } } -func (p *peer) SendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult) error { - for _, randomness := range randomnesses { - p.knownRandomnesses.Add(rlpHash(randomness)) - } - return p.logSend(p2p.Send(p.rw, RandomnessMsg, randomnesses), RandomnessMsg) -} - -func (p *peer) AsyncSendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult) { - select { - case p.queuedRandomnesses <- randomnesses: - for _, randomness := range randomnesses { - p.knownRandomnesses.Add(rlpHash(randomness)) - } - default: - p.Log().Debug("Dropping randomness result") - } -} - func (p *peer) SendDKGPrivateShare(privateShare *dkgTypes.PrivateShare) error { p.knownDKGPrivateShares.Add(rlpHash(privateShare)) return p.logSend(p2p.Send(p.rw, DKGPrivateShareMsg, privateShare), DKGPrivateShareMsg) @@ -581,18 +537,6 @@ func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) { } } -func (p *peer) SendPullRandomness(hashes coreCommon.Hashes) error { - return p.logSend(p2p.Send(p.rw, PullRandomnessMsg, hashes), PullRandomnessMsg) -} - -func (p *peer) AsyncSendPullRandomness(hashes coreCommon.Hashes) { - select { - case p.queuedPullRandomness <- hashes: - default: - p.Log().Debug("Dropping Pull Randomness") - } -} - // SendBlockHeaders sends a batch of block headers to the remote peer. func (p *peer) SendBlockHeaders(flag uint8, headers []*types.HeaderWithGovState) error { return p.logSend(p2p.Send(p.rw, BlockHeadersMsg, headersData{Flag: flag, Headers: headers}), BlockHeadersMsg) @@ -871,38 +815,43 @@ func (ps *peerSet) PeersWithLabel(label peerLabel) []*peer { return list } -// PeersWithoutNodeRecord retrieves a list of peers that do not have a -// given record in their set of known hashes. -func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer { +func (ps *peerSet) PeersWithoutLabel(label peerLabel) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() - list := make([]*peer, 0, len(ps.peers)) - for _, p := range ps.peers { - if !p.knownRecords.Contains(hash) { + length := len(ps.peers) - len(ps.label2Nodes[label]) + if length <= 0 { + return []*peer{} + } + list := make([]*peer, 0, len(ps.peers)-len(ps.label2Nodes[label])) + peersWithLabel := ps.label2Nodes[label] + for id, p := range ps.peers { + if _, exist := peersWithLabel[id]; !exist { list = append(list, p) } } return list } -func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer { +// PeersWithoutNodeRecord retrieves a list of peers that do not have a +// given record in their set of known hashes. +func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if !p.knownAgreements.Contains(hash) { + if !p.knownRecords.Contains(hash) { list = append(list, p) } } return list } -func (ps *peerSet) PeersWithoutRandomness(hash common.Hash) []*peer { +func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if !p.knownRandomnesses.Contains(hash) { + if !p.knownAgreements.Contains(hash) { list = append(list, p) } } @@ -956,23 +905,6 @@ func (ps *peerSet) BuildConnection(round uint64) { log.Info("Build connection", "round", round) - dkgLabel := peerLabel{set: dkgset, round: round} - if _, ok := ps.label2Nodes[dkgLabel]; !ok { - dkgPKs, err := ps.gov.DKGSet(round) - if err != nil { - log.Error("get DKG set fail", "round", round, "err", err) - } - - nodes := ps.pksToNodes(dkgPKs) - ps.label2Nodes[dkgLabel] = nodes - - if _, exists := nodes[ps.srvr.Self().ID().String()]; exists { - ps.buildDirectConn(dkgLabel) - } else { - ps.buildGroupConn(dkgLabel) - } - } - notaryLabel := peerLabel{set: notaryset, round: round} if _, ok := ps.label2Nodes[notaryLabel]; !ok { notaryPKs, err := ps.gov.NotarySet(round) @@ -990,6 +922,7 @@ func (ps *peerSet) BuildConnection(round uint64) { ps.buildGroupConn(notaryLabel) } } + } func (ps *peerSet) ForgetConnection(round uint64) { diff --git a/dex/peer_test.go b/dex/peer_test.go index 76a28b1ef..d6bc7e24c 100644 --- a/dex/peer_test.go +++ b/dex/peer_test.go @@ -62,31 +62,16 @@ func TestPeerSetBuildAndForgetConn(t *testing.T) { nodes[1].ID().String(): nodes[1], nodes[2].ID().String(): nodes[2], }, - {set: dkgset, round: 10}: { - self.ID().String(): self, - nodes[1].ID().String(): nodes[1], - nodes[3].ID().String(): nodes[3], - }, {set: notaryset, round: 11}: { self.ID().String(): self, nodes[1].ID().String(): nodes[1], nodes[5].ID().String(): nodes[5], }, - {set: dkgset, round: 11}: { - nodes[1].ID().String(): nodes[1], - nodes[2].ID().String(): nodes[2], - nodes[5].ID().String(): nodes[5], - }, {set: notaryset, round: 12}: { self.ID().String(): self, nodes[3].ID().String(): nodes[3], nodes[5].ID().String(): nodes[5], }, - {set: dkgset, round: 12}: { - self.ID().String(): self, - nodes[3].ID().String(): nodes[3], - nodes[5].ID().String(): nodes[5], - }, } if !reflect.DeepEqual(ps.label2Nodes, expectedlabel2Nodes) { @@ -97,28 +82,12 @@ func TestPeerSetBuildAndForgetConn(t *testing.T) { {set: notaryset, round: 10}: {}, {set: notaryset, round: 11}: {}, {set: notaryset, round: 12}: {}, - {set: dkgset, round: 10}: {}, - {set: dkgset, round: 12}: {}, } if !reflect.DeepEqual(ps.directConn, expectedDirectConn) { t.Errorf("direct conn not match") } - expectedGroupConn := []peerLabel{ - {set: dkgset, round: 11}, - } - - if len(ps.groupConnPeers) != len(expectedGroupConn) { - t.Errorf("group conn peers not match") - } - - for _, l := range expectedGroupConn { - if len(ps.groupConnPeers[l]) == 0 { - t.Errorf("group conn peers is 0") - } - } - expectedAllDirect := make(map[string]map[peerLabel]struct{}) for l := range ps.directConn { @@ -152,11 +121,6 @@ func TestPeerSetBuildAndForgetConn(t *testing.T) { nodes[3].ID().String(): nodes[3], nodes[5].ID().String(): nodes[5], }, - {set: dkgset, round: 12}: { - self.ID().String(): self, - nodes[3].ID().String(): nodes[3], - nodes[5].ID().String(): nodes[5], - }, } if !reflect.DeepEqual(ps.label2Nodes, expectedlabel2Nodes) { @@ -165,14 +129,13 @@ func TestPeerSetBuildAndForgetConn(t *testing.T) { expectedDirectConn = map[peerLabel]struct{}{ {set: notaryset, round: 12}: {}, - {set: dkgset, round: 12}: {}, } if !reflect.DeepEqual(ps.directConn, expectedDirectConn) { t.Error("direct conn not match") } - expectedGroupConn = []peerLabel{} + expectedGroupConn := []peerLabel{} if len(ps.groupConnPeers) != len(expectedGroupConn) { t.Errorf("group conn peers not match") diff --git a/dex/protocol.go b/dex/protocol.go index 287bf0883..2bcb57506 100644 --- a/dex/protocol.go +++ b/dex/protocol.go @@ -92,7 +92,6 @@ const ( DKGPartialSignatureMsg = 0x25 PullBlocksMsg = 0x26 PullVotesMsg = 0x27 - PullRandomnessMsg = 0x28 GetGovStateMsg = 0x29 GovStateMsg = 0x2a @@ -157,8 +156,6 @@ type governance interface { CRSRound() uint64 NotarySet(uint64) (map[string]struct{}, error) - - DKGSet(uint64) (map[string]struct{}, error) } type dexconApp interface { diff --git a/dex/protocol_test.go b/dex/protocol_test.go index 517df97d9..7e0e1a9a4 100644 --- a/dex/protocol_test.go +++ b/dex/protocol_test.go @@ -438,6 +438,10 @@ func TestRecvVotes(t *testing.T) { Height: 13, }, }, + PartialSignature: dkg.PartialSignature{ + Type: "456", + Signature: []byte("psig"), + }, Signature: coreCrypto.Signature{ Type: "123", Signature: []byte("sig"), @@ -453,7 +457,7 @@ func TestRecvVotes(t *testing.T) { select { case msg := <-ch: rvote := msg.(*coreTypes.Vote) - if rlpHash(rvote) != rlpHash(vote) { + if !reflect.DeepEqual(rvote, &vote) { t.Errorf("vote mismatch") } case <-time.After(1 * time.Second): @@ -474,6 +478,10 @@ func TestSendVotes(t *testing.T) { Height: 13, }, }, + PartialSignature: dkg.PartialSignature{ + Type: "456", + Signature: []byte("psig"), + }, Signature: coreCrypto.Signature{ Type: "123", Signature: []byte("sig"), @@ -531,10 +539,6 @@ func TestSendVotes(t *testing.T) { label: &peerLabel{set: notaryset, round: 11}, isReceiver: false, }, - { - label: &peerLabel{set: dkgset, round: 10}, - isReceiver: false, - }, } pm.peers.label2Nodes = make(map[peerLabel]map[string]*enode.Node) @@ -669,6 +673,10 @@ func TestRecvAgreement(t *testing.T) { Height: 13, }, }, + PartialSignature: dkg.PartialSignature{ + Type: "456", + Signature: []byte("psig"), + }, Signature: coreCrypto.Signature{ Type: "123", Signature: []byte("sig"), @@ -676,9 +684,10 @@ func TestRecvAgreement(t *testing.T) { } agreement := coreTypes.AgreementResult{ - BlockHash: coreCommon.Hash{9, 9, 9}, - Position: vote.Position, - Votes: []coreTypes.Vote{vote}, + BlockHash: coreCommon.Hash{9, 9, 9}, + Position: vote.Position, + Votes: []coreTypes.Vote{vote}, + Randomness: []byte{9, 4, 8, 7}, } if err := p2p.Send(p.app, AgreementMsg, &agreement); err != nil { @@ -714,6 +723,10 @@ func TestSendAgreement(t *testing.T) { Height: 13, }, }, + PartialSignature: dkg.PartialSignature{ + Type: "456", + Signature: []byte("psig"), + }, Signature: coreCrypto.Signature{ Type: "123", Signature: []byte("sig"), @@ -721,9 +734,10 @@ func TestSendAgreement(t *testing.T) { } agreement := coreTypes.AgreementResult{ - BlockHash: coreCommon.Hash{9, 9, 9}, - Position: vote.Position, - Votes: []coreTypes.Vote{vote}, + BlockHash: coreCommon.Hash{9, 9, 9}, + Position: vote.Position, + Votes: []coreTypes.Vote{vote}, + Randomness: []byte{9, 4, 8, 7}, } waitForRegister(pm, 1) @@ -745,75 +759,6 @@ func TestSendAgreement(t *testing.T) { } } -func TestRecvRandomnesses(t *testing.T) { - pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) - pm.SetReceiveCoreMessage(true) - - p, _ := newTestPeer("peer", dex64, pm, true) - defer pm.Stop() - defer p.close() - - randomness := coreTypes.BlockRandomnessResult{ - BlockHash: coreCommon.Hash{8, 8, 8}, - Position: coreTypes.Position{ - Round: 10, - Height: 13, - }, - Randomness: []byte{7, 7, 7, 7}, - } - - if err := p2p.Send(p.app, RandomnessMsg, []*coreTypes.BlockRandomnessResult{&randomness}); err != nil { - t.Fatalf("send error: %v", err) - } - - ch := pm.ReceiveChan() - select { - case msg := <-ch: - r := msg.(*coreTypes.BlockRandomnessResult) - if !reflect.DeepEqual(r, &randomness) { - t.Errorf("randomness mismatch") - } - case <-time.After(1 * time.Second): - t.Errorf("no randomness received within 1 seconds") - } -} - -func TestSendRandomnesses(t *testing.T) { - pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) - pm.SetReceiveCoreMessage(true) - - p, _ := newTestPeer("peer", dex64, pm, true) - defer pm.Stop() - defer p.close() - - randomness := coreTypes.BlockRandomnessResult{ - BlockHash: coreCommon.Hash{8, 8, 8}, - Position: coreTypes.Position{ - Round: 10, - Height: 13, - }, - Randomness: []byte{7, 7, 7, 7}, - } - - waitForRegister(pm, 1) - pm.BroadcastRandomnessResult(&randomness) - msg, err := p.app.ReadMsg() - if err != nil { - t.Errorf("%v: read error: %v", p.Peer, err) - } else if msg.Code != RandomnessMsg { - t.Errorf("%v: got code %d, want %d", p.Peer, msg.Code, RandomnessMsg) - } - - var rs []*coreTypes.BlockRandomnessResult - if err := msg.Decode(&rs); err != nil { - t.Errorf("%v: %v", p.Peer, err) - } - - if !reflect.DeepEqual(rs, []*coreTypes.BlockRandomnessResult{&randomness}) { - t.Errorf("randomness mismatch") - } -} - func waitForRegister(pm *ProtocolManager, num int) { for { if pm.peers.Len() >= num { -- cgit v1.2.3