diff options
Diffstat (limited to 'dex')
-rw-r--r-- | dex/cache.go | 29 | ||||
-rw-r--r-- | dex/cache_test.go | 14 | ||||
-rw-r--r-- | dex/handler.go | 31 | ||||
-rw-r--r-- | dex/peer.go | 60 | ||||
-rw-r--r-- | dex/protocol_test.go | 79 |
5 files changed, 156 insertions, 57 deletions
diff --git a/dex/cache.go b/dex/cache.go index 951657fae..dbd4b7b5d 100644 --- a/dex/cache.go +++ b/dex/cache.go @@ -95,9 +95,30 @@ func (c *cache) votes(pos coreTypes.Position) []*coreTypes.Vote { return votes } +func (c *cache) addBlocks(blocks []*coreTypes.Block) { + c.lock.Lock() + defer c.lock.Unlock() + for _, b := range blocks { + if b.IsFinalized() { + c.addFinalizedBlockNoLock(b) + } else { + c.addBlockNoLock(b) + } + } +} + func (c *cache) addBlock(block *coreTypes.Block) { c.lock.Lock() defer c.lock.Unlock() + c.addBlockNoLock(block) +} + +func (c *cache) addBlockNoLock(block *coreTypes.Block) { + // Avoid polluting cache by non-finalized blocks when we've received some + // finalized block from the same position. + if _, exist := c.finalizedBlockCache[block.Position]; exist { + return + } block = block.Clone() if len(c.blockCache) >= c.size { // Randomly delete one entry. @@ -112,6 +133,10 @@ func (c *cache) addBlock(block *coreTypes.Block) { func (c *cache) addFinalizedBlock(block *coreTypes.Block) { c.lock.Lock() defer c.lock.Unlock() + c.addFinalizedBlockNoLock(block) +} + +func (c *cache) addFinalizedBlockNoLock(block *coreTypes.Block) { block = block.Clone() if len(c.blockCache) >= c.size { // Randomly delete one entry. @@ -131,14 +156,14 @@ func (c *cache) addFinalizedBlock(block *coreTypes.Block) { c.finalizedBlockCache[block.Position] = block } -func (c *cache) blocks(hashes coreCommon.Hashes) []*coreTypes.Block { +func (c *cache) blocks(hashes coreCommon.Hashes, includeDB bool) []*coreTypes.Block { c.lock.RLock() defer c.lock.RUnlock() cacheBlocks := make([]*coreTypes.Block, 0, len(hashes)) for _, hash := range hashes { if block, exist := c.blockCache[hash]; exist { cacheBlocks = append(cacheBlocks, block) - } else { + } else if includeDB { block, err := c.db.GetBlock(hash) if err != nil { continue diff --git a/dex/cache_test.go b/dex/cache_test.go index b06effafb..04bca06ef 100644 --- a/dex/cache_test.go +++ b/dex/cache_test.go @@ -160,7 +160,7 @@ func TestCacheBlock(t *testing.T) { block2.Hash: {}, block3.Hash: {}, } - blocks := cache.blocks(hashes) + blocks := cache.blocks(hashes, true) if len(blocks) != 3 { t.Errorf("fail to get blocks: have %d, want 3", len(blocks)) } @@ -172,7 +172,7 @@ func TestCacheBlock(t *testing.T) { cache.addBlock(block4) - blocks = cache.blocks(hashes) + blocks = cache.blocks(hashes, true) hashMap[block4.Hash] = struct{}{} if len(blocks) != 3 { t.Errorf("fail to get blocks: have %d, want 3", len(blocks)) @@ -196,7 +196,7 @@ func TestCacheBlock(t *testing.T) { if err := db.PutBlock(*block5); err != nil { panic(err) } - blocks = cache.blocks(coreCommon.Hashes{block5.Hash}) + blocks = cache.blocks(coreCommon.Hashes{block5.Hash}, true) if len(blocks) != 1 { t.Errorf("fail to get blocks: have %d, want 1", len(blocks)) } else { @@ -204,6 +204,10 @@ func TestCacheBlock(t *testing.T) { t.Errorf("get wrong block: have %s, want %s", blocks[0], block5) } } + blocks = cache.blocks(coreCommon.Hashes{block5.Hash}, false) + if len(blocks) != 0 { + t.Errorf("unexpected length of blocks: have %d, want 0", len(blocks)) + } } func TestCacheFinalizedBlock(t *testing.T) { @@ -274,7 +278,7 @@ func TestCacheFinalizedBlock(t *testing.T) { if block := cache.finalizedBlock(block5.Position); block != nil { t.Errorf("unexpected block %s in cache", block) } - blocks := cache.blocks(coreCommon.Hashes{block5.Hash}) + blocks := cache.blocks(coreCommon.Hashes{block5.Hash}, true) if len(blocks) != 1 { t.Errorf("fail to get blocks: have %d, want 1", len(blocks)) } else { @@ -296,7 +300,7 @@ func TestCacheFinalizedBlock(t *testing.T) { block.Randomness, finalizedBlock5.Randomness) } - blocks = cache.blocks(coreCommon.Hashes{block5.Hash}) + blocks = cache.blocks(coreCommon.Hashes{block5.Hash}, true) if len(blocks) != 1 { t.Errorf("fail to get blocks: have %d, want 1", len(blocks)) } else { diff --git a/dex/handler.go b/dex/handler.go index 61e382610..45f58012c 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -862,8 +862,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&blocks); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } + pm.cache.addBlocks(blocks) for _, block := range blocks { - pm.cache.addBlock(block) pm.receiveCh <- block } case msg.Code == VoteMsg: @@ -889,7 +889,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&agreement); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - p.MarkAgreement(rlpHash(agreement)) + p.MarkAgreement(agreement.Position) + // Update randomness field for blocks in cache. + block := pm.cache.blocks(coreCommon.Hashes{agreement.BlockHash}, false) + if len(block) != 0 { + block[0].Randomness = agreement.Randomness + pm.cache.addFinalizedBlock(block[0]) + } pm.receiveCh <- &agreement case msg.Code == DKGPrivateShareMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { @@ -920,7 +926,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&hashes); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - blocks := pm.cache.blocks(hashes) + blocks := pm.cache.blocks(hashes, true) log.Debug("Push blocks", "blocks", blocks) return p.SendCoreBlocks(blocks) case msg.Code == PullVotesMsg: @@ -1109,7 +1115,12 @@ func (pm *ProtocolManager) BroadcastFinalizedBlock(block *coreTypes.Block) { // BroadcastCoreBlock broadcasts the core block to all its peers. func (pm *ProtocolManager) BroadcastCoreBlock(block *coreTypes.Block) { pm.cache.addBlock(block) - for _, peer := range pm.peers.Peers() { + // send to notary nodes only. + label := peerLabel{ + set: notaryset, + round: block.Position.Round, + } + for _, peer := range pm.peers.PeersWithLabel(label) { peer.AsyncSendCoreBlocks([]*coreTypes.Block{block}) } } @@ -1130,7 +1141,7 @@ func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) { func (pm *ProtocolManager) BroadcastAgreementResult( agreement *coreTypes.AgreementResult) { - block := pm.cache.blocks(coreCommon.Hashes{agreement.BlockHash}) + block := pm.cache.blocks(coreCommon.Hashes{agreement.BlockHash}, false) if len(block) != 0 { block[0].Randomness = agreement.Randomness pm.cache.addFinalizedBlock(block[0]) @@ -1143,17 +1154,17 @@ func (pm *ProtocolManager) BroadcastAgreementResult( } peers := pm.peers.PeersWithLabel(label) count := maxAgreementResultBroadcast - agrHash := rlpHash(agreement) for _, peer := range peers { - if count <= 0 { - peer.MarkAgreement(agrHash) - } else if !peer.knownAgreements.Contains(agrHash) { + if peer.MarkAgreement(agreement.Position) { + if count <= 0 { + continue + } count-- peer.AsyncSendAgreement(agreement) } } - for _, peer := range pm.peers.PeersWithoutAgreement(rlpHash(agreement)) { + for _, peer := range pm.peers.PeersWithoutAgreement(agreement.Position) { peer.AsyncSendAgreement(agreement) } } diff --git a/dex/peer.go b/dex/peer.go index d0e717233..1ade2820e 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -66,7 +66,6 @@ const ( maxKnownRecords = 32768 // Maximum records hashes to keep in the known list (prevent DOS) maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) - maxKnownAgreements = 10240 maxKnownDKGPrivateShares = 1024 // this related to DKG Size // maxQueuedTxs is the maximum number of transaction lists to queue up before @@ -141,24 +140,26 @@ type peer struct { number uint64 lock sync.RWMutex - knownTxs mapset.Set // Set of transaction hashes known to be known by this peer - knownRecords mapset.Set // Set of node record known to be known by this peer - knownBlocks mapset.Set // Set of block hashes known to be known by this peer - knownAgreements mapset.Set - knownDKGPrivateShares mapset.Set - queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer - queuedRecords chan []*enr.Record // Queue of node records to broadcast to the peer - queuedProps chan *types.Block // Queue of blocks to broadcast to the peer - queuedAnns chan *types.Block // Queue of blocks to announce to the peer - queuedCoreBlocks chan []*coreTypes.Block - queuedVotes chan []*coreTypes.Vote - queuedAgreements chan *coreTypes.AgreementResult - queuedDKGPrivateShares chan *dkgTypes.PrivateShare - queuedDKGPartialSignatures chan *dkgTypes.PartialSignature - queuedPullBlocks chan coreCommon.Hashes - queuedPullVotes chan coreTypes.Position - queuedPullRandomness chan coreCommon.Hashes - term chan struct{} // Termination channel to stop the broadcaster + lastKnownAgreementPositionLock sync.RWMutex + lastKnownAgreementPosition coreTypes.Position // The position of latest agreement to be known by this peer + knownTxs mapset.Set // Set of transaction hashes known to be known by this peer + knownRecords mapset.Set // Set of node record known to be known by this peer + knownBlocks mapset.Set // Set of block hashes known to be known by this peer + knownAgreements mapset.Set + knownDKGPrivateShares mapset.Set + queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer + queuedRecords chan []*enr.Record // Queue of node records to broadcast to the peer + queuedProps chan *types.Block // Queue of blocks to broadcast to the peer + queuedAnns chan *types.Block // Queue of blocks to announce to the peer + queuedCoreBlocks chan []*coreTypes.Block + queuedVotes chan []*coreTypes.Vote + queuedAgreements chan *coreTypes.AgreementResult + queuedDKGPrivateShares chan *dkgTypes.PrivateShare + queuedDKGPartialSignatures chan *dkgTypes.PartialSignature + queuedPullBlocks chan coreCommon.Hashes + queuedPullVotes chan coreTypes.Position + queuedPullRandomness chan coreCommon.Hashes + term chan struct{} // Termination channel to stop the broadcaster } func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { @@ -340,11 +341,14 @@ func (p *peer) MarkNodeRecord(hash common.Hash) { p.knownRecords.Add(hash) } -func (p *peer) MarkAgreement(hash common.Hash) { - for p.knownAgreements.Cardinality() >= maxKnownAgreements { - p.knownAgreements.Pop() +func (p *peer) MarkAgreement(position coreTypes.Position) bool { + p.lastKnownAgreementPositionLock.Lock() + defer p.lastKnownAgreementPositionLock.Unlock() + if position.Newer(p.lastKnownAgreementPosition) { + p.lastKnownAgreementPosition = position + return true } - p.knownAgreements.Add(hash) + return false } func (p *peer) MarkDKGPrivateShares(hash common.Hash) { @@ -354,6 +358,12 @@ func (p *peer) MarkDKGPrivateShares(hash common.Hash) { p.knownDKGPrivateShares.Add(hash) } +func (p *peer) isAgreementKnown(position coreTypes.Position) bool { + p.lastKnownAgreementPositionLock.RLock() + defer p.lastKnownAgreementPositionLock.RUnlock() + return !p.lastKnownAgreementPosition.Older(position) +} + func (p *peer) logSend(err error, code uint64) error { if err != nil { p.Log().Error("Failed to send peer message", "code", code, "err", err) @@ -846,12 +856,12 @@ func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer { return list } -func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer { +func (ps *peerSet) PeersWithoutAgreement(position coreTypes.Position) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if !p.knownAgreements.Contains(hash) { + if !p.isAgreementKnown(position) { list = append(list, p) } } diff --git a/dex/protocol_test.go b/dex/protocol_test.go index d6bebc18a..51bd32c72 100644 --- a/dex/protocol_test.go +++ b/dex/protocol_test.go @@ -364,10 +364,7 @@ func TestRecvCoreBlocks(t *testing.T) { func TestSendCoreBlocks(t *testing.T) { pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) pm.SetReceiveCoreMessage(true) - - p, _ := newTestPeer("peer", dex64, pm, true) defer pm.Stop() - defer p.close() block := coreTypes.Block{ ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}}, @@ -394,23 +391,75 @@ func TestSendCoreBlocks(t *testing.T) { }, } - waitForRegister(pm, 1) - pm.BroadcastCoreBlock(&block) - msg, err := p.app.ReadMsg() - if err != nil { - t.Errorf("%v: read error: %v", p.Peer, err) - } else if msg.Code != CoreBlockMsg { - t.Errorf("%v: got code %d, want %d", p.Peer, msg.Code, CoreBlockMsg) + var wg sync.WaitGroup + checkBlock := func(p *testPeer, isReceiver bool) { + defer wg.Done() + defer p.close() + if !isReceiver { + go func() { + time.Sleep(100 * time.Millisecond) + p.close() + }() + } + + msg, err := p.app.ReadMsg() + if !isReceiver { + if err != p2p.ErrPipeClosed { + t.Errorf("err mismatch: got %v, want %v (not receiver peer)", + err, p2p.ErrPipeClosed) + } + return + } + if err != nil { + t.Errorf("%v: read error: %v", p.Peer, err) + } else if msg.Code != CoreBlockMsg { + t.Errorf("%v: got code %d, want %d", p.Peer, msg.Code, CoreBlockMsg) + } + + var bs []*coreTypes.Block + if err := msg.Decode(&bs); err != nil { + t.Errorf("%v: %v", p.Peer, err) + } + + if !reflect.DeepEqual(bs, []*coreTypes.Block{&block}) { + t.Errorf("block mismatch") + } } - var bs []*coreTypes.Block - if err := msg.Decode(&bs); err != nil { - t.Errorf("%v: %v", p.Peer, err) + testPeers := []struct { + label *peerLabel + isReceiver bool + }{ + { + label: &peerLabel{set: notaryset, round: 12}, + isReceiver: true, + }, + { + label: nil, + isReceiver: false, + }, + { + label: &peerLabel{set: notaryset, round: 11}, + isReceiver: false, + }, } - if !reflect.DeepEqual(bs, []*coreTypes.Block{&block}) { - t.Errorf("block mismatch") + pm.peers.label2Nodes = make(map[peerLabel]map[string]*enode.Node) + for i, tt := range testPeers { + p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), dex64, pm, true) + if tt.label != nil { + if pm.peers.label2Nodes[*tt.label] == nil { + pm.peers.label2Nodes[*tt.label] = make(map[string]*enode.Node) + } + pm.peers.label2Nodes[*tt.label][p.ID().String()] = p.Node() + pm.peers.addDirectPeer(p.ID().String(), *tt.label) + } + wg.Add(1) + go checkBlock(p, tt.isReceiver) } + waitForRegister(pm, len(testPeers)) + pm.BroadcastCoreBlock(&block) + wg.Wait() } func TestRecvVotes(t *testing.T) { |