aboutsummaryrefslogtreecommitdiffstats
path: root/dex
diff options
context:
space:
mode:
Diffstat (limited to 'dex')
-rw-r--r--dex/cache.go29
-rw-r--r--dex/cache_test.go14
-rw-r--r--dex/handler.go31
-rw-r--r--dex/peer.go60
-rw-r--r--dex/protocol_test.go79
5 files changed, 156 insertions, 57 deletions
diff --git a/dex/cache.go b/dex/cache.go
index 951657fae..dbd4b7b5d 100644
--- a/dex/cache.go
+++ b/dex/cache.go
@@ -95,9 +95,30 @@ func (c *cache) votes(pos coreTypes.Position) []*coreTypes.Vote {
return votes
}
+func (c *cache) addBlocks(blocks []*coreTypes.Block) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ for _, b := range blocks {
+ if b.IsFinalized() {
+ c.addFinalizedBlockNoLock(b)
+ } else {
+ c.addBlockNoLock(b)
+ }
+ }
+}
+
func (c *cache) addBlock(block *coreTypes.Block) {
c.lock.Lock()
defer c.lock.Unlock()
+ c.addBlockNoLock(block)
+}
+
+func (c *cache) addBlockNoLock(block *coreTypes.Block) {
+ // Avoid polluting cache by non-finalized blocks when we've received some
+ // finalized block from the same position.
+ if _, exist := c.finalizedBlockCache[block.Position]; exist {
+ return
+ }
block = block.Clone()
if len(c.blockCache) >= c.size {
// Randomly delete one entry.
@@ -112,6 +133,10 @@ func (c *cache) addBlock(block *coreTypes.Block) {
func (c *cache) addFinalizedBlock(block *coreTypes.Block) {
c.lock.Lock()
defer c.lock.Unlock()
+ c.addFinalizedBlockNoLock(block)
+}
+
+func (c *cache) addFinalizedBlockNoLock(block *coreTypes.Block) {
block = block.Clone()
if len(c.blockCache) >= c.size {
// Randomly delete one entry.
@@ -131,14 +156,14 @@ func (c *cache) addFinalizedBlock(block *coreTypes.Block) {
c.finalizedBlockCache[block.Position] = block
}
-func (c *cache) blocks(hashes coreCommon.Hashes) []*coreTypes.Block {
+func (c *cache) blocks(hashes coreCommon.Hashes, includeDB bool) []*coreTypes.Block {
c.lock.RLock()
defer c.lock.RUnlock()
cacheBlocks := make([]*coreTypes.Block, 0, len(hashes))
for _, hash := range hashes {
if block, exist := c.blockCache[hash]; exist {
cacheBlocks = append(cacheBlocks, block)
- } else {
+ } else if includeDB {
block, err := c.db.GetBlock(hash)
if err != nil {
continue
diff --git a/dex/cache_test.go b/dex/cache_test.go
index b06effafb..04bca06ef 100644
--- a/dex/cache_test.go
+++ b/dex/cache_test.go
@@ -160,7 +160,7 @@ func TestCacheBlock(t *testing.T) {
block2.Hash: {},
block3.Hash: {},
}
- blocks := cache.blocks(hashes)
+ blocks := cache.blocks(hashes, true)
if len(blocks) != 3 {
t.Errorf("fail to get blocks: have %d, want 3", len(blocks))
}
@@ -172,7 +172,7 @@ func TestCacheBlock(t *testing.T) {
cache.addBlock(block4)
- blocks = cache.blocks(hashes)
+ blocks = cache.blocks(hashes, true)
hashMap[block4.Hash] = struct{}{}
if len(blocks) != 3 {
t.Errorf("fail to get blocks: have %d, want 3", len(blocks))
@@ -196,7 +196,7 @@ func TestCacheBlock(t *testing.T) {
if err := db.PutBlock(*block5); err != nil {
panic(err)
}
- blocks = cache.blocks(coreCommon.Hashes{block5.Hash})
+ blocks = cache.blocks(coreCommon.Hashes{block5.Hash}, true)
if len(blocks) != 1 {
t.Errorf("fail to get blocks: have %d, want 1", len(blocks))
} else {
@@ -204,6 +204,10 @@ func TestCacheBlock(t *testing.T) {
t.Errorf("get wrong block: have %s, want %s", blocks[0], block5)
}
}
+ blocks = cache.blocks(coreCommon.Hashes{block5.Hash}, false)
+ if len(blocks) != 0 {
+ t.Errorf("unexpected length of blocks: have %d, want 0", len(blocks))
+ }
}
func TestCacheFinalizedBlock(t *testing.T) {
@@ -274,7 +278,7 @@ func TestCacheFinalizedBlock(t *testing.T) {
if block := cache.finalizedBlock(block5.Position); block != nil {
t.Errorf("unexpected block %s in cache", block)
}
- blocks := cache.blocks(coreCommon.Hashes{block5.Hash})
+ blocks := cache.blocks(coreCommon.Hashes{block5.Hash}, true)
if len(blocks) != 1 {
t.Errorf("fail to get blocks: have %d, want 1", len(blocks))
} else {
@@ -296,7 +300,7 @@ func TestCacheFinalizedBlock(t *testing.T) {
block.Randomness,
finalizedBlock5.Randomness)
}
- blocks = cache.blocks(coreCommon.Hashes{block5.Hash})
+ blocks = cache.blocks(coreCommon.Hashes{block5.Hash}, true)
if len(blocks) != 1 {
t.Errorf("fail to get blocks: have %d, want 1", len(blocks))
} else {
diff --git a/dex/handler.go b/dex/handler.go
index 61e382610..45f58012c 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -862,8 +862,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&blocks); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
+ pm.cache.addBlocks(blocks)
for _, block := range blocks {
- pm.cache.addBlock(block)
pm.receiveCh <- block
}
case msg.Code == VoteMsg:
@@ -889,7 +889,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&agreement); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- p.MarkAgreement(rlpHash(agreement))
+ p.MarkAgreement(agreement.Position)
+ // Update randomness field for blocks in cache.
+ block := pm.cache.blocks(coreCommon.Hashes{agreement.BlockHash}, false)
+ if len(block) != 0 {
+ block[0].Randomness = agreement.Randomness
+ pm.cache.addFinalizedBlock(block[0])
+ }
pm.receiveCh <- &agreement
case msg.Code == DKGPrivateShareMsg:
if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
@@ -920,7 +926,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&hashes); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- blocks := pm.cache.blocks(hashes)
+ blocks := pm.cache.blocks(hashes, true)
log.Debug("Push blocks", "blocks", blocks)
return p.SendCoreBlocks(blocks)
case msg.Code == PullVotesMsg:
@@ -1109,7 +1115,12 @@ func (pm *ProtocolManager) BroadcastFinalizedBlock(block *coreTypes.Block) {
// BroadcastCoreBlock broadcasts the core block to all its peers.
func (pm *ProtocolManager) BroadcastCoreBlock(block *coreTypes.Block) {
pm.cache.addBlock(block)
- for _, peer := range pm.peers.Peers() {
+ // send to notary nodes only.
+ label := peerLabel{
+ set: notaryset,
+ round: block.Position.Round,
+ }
+ for _, peer := range pm.peers.PeersWithLabel(label) {
peer.AsyncSendCoreBlocks([]*coreTypes.Block{block})
}
}
@@ -1130,7 +1141,7 @@ func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) {
func (pm *ProtocolManager) BroadcastAgreementResult(
agreement *coreTypes.AgreementResult) {
- block := pm.cache.blocks(coreCommon.Hashes{agreement.BlockHash})
+ block := pm.cache.blocks(coreCommon.Hashes{agreement.BlockHash}, false)
if len(block) != 0 {
block[0].Randomness = agreement.Randomness
pm.cache.addFinalizedBlock(block[0])
@@ -1143,17 +1154,17 @@ func (pm *ProtocolManager) BroadcastAgreementResult(
}
peers := pm.peers.PeersWithLabel(label)
count := maxAgreementResultBroadcast
- agrHash := rlpHash(agreement)
for _, peer := range peers {
- if count <= 0 {
- peer.MarkAgreement(agrHash)
- } else if !peer.knownAgreements.Contains(agrHash) {
+ if peer.MarkAgreement(agreement.Position) {
+ if count <= 0 {
+ continue
+ }
count--
peer.AsyncSendAgreement(agreement)
}
}
- for _, peer := range pm.peers.PeersWithoutAgreement(rlpHash(agreement)) {
+ for _, peer := range pm.peers.PeersWithoutAgreement(agreement.Position) {
peer.AsyncSendAgreement(agreement)
}
}
diff --git a/dex/peer.go b/dex/peer.go
index d0e717233..1ade2820e 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -66,7 +66,6 @@ const (
maxKnownRecords = 32768 // Maximum records hashes to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
- maxKnownAgreements = 10240
maxKnownDKGPrivateShares = 1024 // this related to DKG Size
// maxQueuedTxs is the maximum number of transaction lists to queue up before
@@ -141,24 +140,26 @@ type peer struct {
number uint64
lock sync.RWMutex
- knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
- knownRecords mapset.Set // Set of node record known to be known by this peer
- knownBlocks mapset.Set // Set of block hashes known to be known by this peer
- knownAgreements mapset.Set
- knownDKGPrivateShares mapset.Set
- queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
- queuedRecords chan []*enr.Record // Queue of node records to broadcast to the peer
- queuedProps chan *types.Block // Queue of blocks to broadcast to the peer
- queuedAnns chan *types.Block // Queue of blocks to announce to the peer
- queuedCoreBlocks chan []*coreTypes.Block
- queuedVotes chan []*coreTypes.Vote
- queuedAgreements chan *coreTypes.AgreementResult
- queuedDKGPrivateShares chan *dkgTypes.PrivateShare
- queuedDKGPartialSignatures chan *dkgTypes.PartialSignature
- queuedPullBlocks chan coreCommon.Hashes
- queuedPullVotes chan coreTypes.Position
- queuedPullRandomness chan coreCommon.Hashes
- term chan struct{} // Termination channel to stop the broadcaster
+ lastKnownAgreementPositionLock sync.RWMutex
+ lastKnownAgreementPosition coreTypes.Position // The position of latest agreement to be known by this peer
+ knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
+ knownRecords mapset.Set // Set of node record known to be known by this peer
+ knownBlocks mapset.Set // Set of block hashes known to be known by this peer
+ knownAgreements mapset.Set
+ knownDKGPrivateShares mapset.Set
+ queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
+ queuedRecords chan []*enr.Record // Queue of node records to broadcast to the peer
+ queuedProps chan *types.Block // Queue of blocks to broadcast to the peer
+ queuedAnns chan *types.Block // Queue of blocks to announce to the peer
+ queuedCoreBlocks chan []*coreTypes.Block
+ queuedVotes chan []*coreTypes.Vote
+ queuedAgreements chan *coreTypes.AgreementResult
+ queuedDKGPrivateShares chan *dkgTypes.PrivateShare
+ queuedDKGPartialSignatures chan *dkgTypes.PartialSignature
+ queuedPullBlocks chan coreCommon.Hashes
+ queuedPullVotes chan coreTypes.Position
+ queuedPullRandomness chan coreCommon.Hashes
+ term chan struct{} // Termination channel to stop the broadcaster
}
func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
@@ -340,11 +341,14 @@ func (p *peer) MarkNodeRecord(hash common.Hash) {
p.knownRecords.Add(hash)
}
-func (p *peer) MarkAgreement(hash common.Hash) {
- for p.knownAgreements.Cardinality() >= maxKnownAgreements {
- p.knownAgreements.Pop()
+func (p *peer) MarkAgreement(position coreTypes.Position) bool {
+ p.lastKnownAgreementPositionLock.Lock()
+ defer p.lastKnownAgreementPositionLock.Unlock()
+ if position.Newer(p.lastKnownAgreementPosition) {
+ p.lastKnownAgreementPosition = position
+ return true
}
- p.knownAgreements.Add(hash)
+ return false
}
func (p *peer) MarkDKGPrivateShares(hash common.Hash) {
@@ -354,6 +358,12 @@ func (p *peer) MarkDKGPrivateShares(hash common.Hash) {
p.knownDKGPrivateShares.Add(hash)
}
+func (p *peer) isAgreementKnown(position coreTypes.Position) bool {
+ p.lastKnownAgreementPositionLock.RLock()
+ defer p.lastKnownAgreementPositionLock.RUnlock()
+ return !p.lastKnownAgreementPosition.Older(position)
+}
+
func (p *peer) logSend(err error, code uint64) error {
if err != nil {
p.Log().Error("Failed to send peer message", "code", code, "err", err)
@@ -846,12 +856,12 @@ func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer {
return list
}
-func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer {
+func (ps *peerSet) PeersWithoutAgreement(position coreTypes.Position) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
- if !p.knownAgreements.Contains(hash) {
+ if !p.isAgreementKnown(position) {
list = append(list, p)
}
}
diff --git a/dex/protocol_test.go b/dex/protocol_test.go
index d6bebc18a..51bd32c72 100644
--- a/dex/protocol_test.go
+++ b/dex/protocol_test.go
@@ -364,10 +364,7 @@ func TestRecvCoreBlocks(t *testing.T) {
func TestSendCoreBlocks(t *testing.T) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
pm.SetReceiveCoreMessage(true)
-
- p, _ := newTestPeer("peer", dex64, pm, true)
defer pm.Stop()
- defer p.close()
block := coreTypes.Block{
ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}},
@@ -394,23 +391,75 @@ func TestSendCoreBlocks(t *testing.T) {
},
}
- waitForRegister(pm, 1)
- pm.BroadcastCoreBlock(&block)
- msg, err := p.app.ReadMsg()
- if err != nil {
- t.Errorf("%v: read error: %v", p.Peer, err)
- } else if msg.Code != CoreBlockMsg {
- t.Errorf("%v: got code %d, want %d", p.Peer, msg.Code, CoreBlockMsg)
+ var wg sync.WaitGroup
+ checkBlock := func(p *testPeer, isReceiver bool) {
+ defer wg.Done()
+ defer p.close()
+ if !isReceiver {
+ go func() {
+ time.Sleep(100 * time.Millisecond)
+ p.close()
+ }()
+ }
+
+ msg, err := p.app.ReadMsg()
+ if !isReceiver {
+ if err != p2p.ErrPipeClosed {
+ t.Errorf("err mismatch: got %v, want %v (not receiver peer)",
+ err, p2p.ErrPipeClosed)
+ }
+ return
+ }
+ if err != nil {
+ t.Errorf("%v: read error: %v", p.Peer, err)
+ } else if msg.Code != CoreBlockMsg {
+ t.Errorf("%v: got code %d, want %d", p.Peer, msg.Code, CoreBlockMsg)
+ }
+
+ var bs []*coreTypes.Block
+ if err := msg.Decode(&bs); err != nil {
+ t.Errorf("%v: %v", p.Peer, err)
+ }
+
+ if !reflect.DeepEqual(bs, []*coreTypes.Block{&block}) {
+ t.Errorf("block mismatch")
+ }
}
- var bs []*coreTypes.Block
- if err := msg.Decode(&bs); err != nil {
- t.Errorf("%v: %v", p.Peer, err)
+ testPeers := []struct {
+ label *peerLabel
+ isReceiver bool
+ }{
+ {
+ label: &peerLabel{set: notaryset, round: 12},
+ isReceiver: true,
+ },
+ {
+ label: nil,
+ isReceiver: false,
+ },
+ {
+ label: &peerLabel{set: notaryset, round: 11},
+ isReceiver: false,
+ },
}
- if !reflect.DeepEqual(bs, []*coreTypes.Block{&block}) {
- t.Errorf("block mismatch")
+ pm.peers.label2Nodes = make(map[peerLabel]map[string]*enode.Node)
+ for i, tt := range testPeers {
+ p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), dex64, pm, true)
+ if tt.label != nil {
+ if pm.peers.label2Nodes[*tt.label] == nil {
+ pm.peers.label2Nodes[*tt.label] = make(map[string]*enode.Node)
+ }
+ pm.peers.label2Nodes[*tt.label][p.ID().String()] = p.Node()
+ pm.peers.addDirectPeer(p.ID().String(), *tt.label)
+ }
+ wg.Add(1)
+ go checkBlock(p, tt.isReceiver)
}
+ waitForRegister(pm, len(testPeers))
+ pm.BroadcastCoreBlock(&block)
+ wg.Wait()
}
func TestRecvVotes(t *testing.T) {