aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--dex/handler.go110
-rw-r--r--dex/nodetable.go73
-rw-r--r--dex/nodetable_test.go121
-rw-r--r--dex/peer.go86
-rw-r--r--dex/peer_test.go3
-rw-r--r--dex/protocol.go2
-rw-r--r--dex/protocol_test.go83
-rw-r--r--dex/sync.go93
-rw-r--r--p2p/dial.go13
9 files changed, 35 insertions, 549 deletions
diff --git a/dex/handler.go b/dex/handler.go
index 45f58012c..deb959c45 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -35,11 +35,11 @@ package dex
import (
"bytes"
+ "context"
"encoding/json"
"errors"
"fmt"
"math"
- "math/rand"
"sync"
"sync/atomic"
"time"
@@ -64,7 +64,6 @@ import (
"github.com/dexon-foundation/dexon/metrics"
"github.com/dexon-foundation/dexon/p2p"
"github.com/dexon-foundation/dexon/p2p/enode"
- "github.com/dexon-foundation/dexon/p2p/enr"
"github.com/dexon-foundation/dexon/params"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -81,8 +80,6 @@ const (
finalizedBlockChanSize = 128
- recordChanSize = 10240
-
maxPullPeers = 3
maxPullVotePeers = 1
@@ -107,7 +104,6 @@ type ProtocolManager struct {
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
txpool txPool
- nodeTable *nodeTable
gov governance
blockchain *core.BlockChain
chainconfig *params.ChainConfig
@@ -121,20 +117,17 @@ type ProtocolManager struct {
SubProtocols []p2p.Protocol
- eventMux *event.TypeMux
- txsCh chan core.NewTxsEvent
- txsSub event.Subscription
- recordsCh chan newRecordsEvent
- recordsSub event.Subscription
+ eventMux *event.TypeMux
+ txsCh chan core.NewTxsEvent
+ txsSub event.Subscription
whitelist map[uint64]common.Hash
// channels for fetcher, syncer, txsyncLoop
- newPeerCh chan *peer
- txsyncCh chan *txsync
- recordsyncCh chan *recordsync
- quitSync chan struct{}
- noMorePeers chan struct{}
+ newPeerCh chan *peer
+ txsyncCh chan *txsync
+ quitSync chan struct{}
+ noMorePeers chan struct{}
// channels for peerSetLoop
chainHeadCh chan core.ChainHeadEvent
@@ -168,13 +161,11 @@ func NewProtocolManager(
mux *event.TypeMux, txpool txPool, engine consensus.Engine,
blockchain *core.BlockChain, chaindb ethdb.Database, whitelist map[uint64]common.Hash,
isBlockProposer bool, gov governance, app dexconApp) (*ProtocolManager, error) {
- tab := newNodeTable()
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkID: networkID,
eventMux: mux,
txpool: txpool,
- nodeTable: tab,
gov: gov,
blockchain: blockchain,
cache: newCache(5120, dexDB.NewDatabase(chaindb)),
@@ -184,7 +175,6 @@ func NewProtocolManager(
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
- recordsyncCh: make(chan *recordsync),
quitSync: make(chan struct{}),
receiveCh: make(chan interface{}, 1024),
receiveCoreMessage: 0,
@@ -285,7 +275,7 @@ func (pm *ProtocolManager) removePeer(id string) {
func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
pm.maxPeers = maxPeers
pm.srvr = srvr
- pm.peers = newPeerSet(pm.gov, pm.srvr, pm.nodeTable)
+ pm.peers = newPeerSet(pm.gov, pm.srvr)
// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
@@ -301,11 +291,6 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
go pm.finalizedBlockBroadcastLoop()
}
- // broadcast node records
- pm.recordsCh = make(chan newRecordsEvent, recordChanSize)
- pm.recordsSub = pm.nodeTable.SubscribeNewRecordsEvent(pm.recordsCh)
- go pm.recordBroadcastLoop()
-
// run the peer set loop
pm.chainHeadCh = make(chan core.ChainHeadEvent)
pm.chainHeadSub = pm.blockchain.SubscribeChainHeadEvent(pm.chainHeadCh)
@@ -314,8 +299,6 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
- go pm.recordsyncLoop()
-
}
func (pm *ProtocolManager) Stop() {
@@ -392,7 +375,6 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
- pm.syncNodeRecords(p)
// If we have any explicit whitelist block hashes, request them
for number := range pm.whitelist {
@@ -839,21 +821,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
types.GlobalSigCache.Add(types.NewEIP155Signer(pm.blockchain.Config().ChainID), txs)
pm.txpool.AddRemotes(txs)
- case msg.Code == RecordMsg:
- var records []*enr.Record
- if err := msg.Decode(&records); err != nil {
- return errResp(ErrDecode, "msg %v: %v", msg, err)
- }
- for i, record := range records {
- if record == nil {
- return errResp(ErrDecode, "node record %d is nil", i)
- }
- p.MarkNodeRecord(rlpHash(record))
- }
- pm.nodeTable.AddRecords(records)
-
// Block proposer-only messages.
-
case msg.Code == CoreBlockMsg:
if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
@@ -1070,23 +1038,6 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
}
}
-// BroadcastRecords will propagate node records to its peers.
-func (pm *ProtocolManager) BroadcastRecords(records []*enr.Record) {
- var recordset = make(map[*peer][]*enr.Record)
-
- for _, record := range records {
- peers := pm.peers.PeersWithoutNodeRecord(rlpHash(record))
- for _, peer := range peers {
- recordset[peer] = append(recordset[peer], record)
- }
- log.Trace("Broadcast record", "recipients", len(peers))
- }
-
- for peer, records := range recordset {
- peer.AsyncSendNodeRecords(records)
- }
-}
-
// BroadcastFinalizedBlock broadcasts the finalized core block to some of its peers.
func (pm *ProtocolManager) BroadcastFinalizedBlock(block *coreTypes.Block) {
if len(block.Randomness) == 0 {
@@ -1271,36 +1222,6 @@ func (pm *ProtocolManager) finalizedBlockBroadcastLoop() {
}
}
-func (pm *ProtocolManager) recordBroadcastLoop() {
- r := rand.New(rand.NewSource(time.Now().Unix()))
- t := time.NewTimer(0)
- defer t.Stop()
-
- for {
- select {
- case event := <-pm.recordsCh:
- pm.BroadcastRecords(event.Records)
- pm.peers.Refresh()
-
- case <-t.C:
- record := pm.srvr.Self().Record()
- log.Debug("refresh our node record", "seq", record.Seq())
- pm.nodeTable.AddRecords([]*enr.Record{record})
-
- // Log current peers connection status.
- pm.peers.Status()
-
- // Reset timer.
- d := 1*time.Minute + time.Duration(r.Int63n(60))*time.Second
- t.Reset(d)
-
- // Err() channel will be closed when unsubscribing.
- case <-pm.recordsSub.Err():
- return
- }
- }
-}
-
func (pm *ProtocolManager) SetReceiveCoreMessage(enabled bool) {
if enabled {
atomic.StoreInt32(&pm.receiveCoreMessage, 1)
@@ -1333,6 +1254,19 @@ func (pm *ProtocolManager) peerSetLoop() {
resetCount = pm.gov.DKGResetCount(round)
}
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ go func() {
+ for ctx.Err() == nil {
+ select {
+ case <-time.After(time.Minute):
+ pm.peers.Status()
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
for {
select {
case event := <-pm.chainHeadCh:
diff --git a/dex/nodetable.go b/dex/nodetable.go
deleted file mode 100644
index cc1de160f..000000000
--- a/dex/nodetable.go
+++ /dev/null
@@ -1,73 +0,0 @@
-package dex
-
-import (
- "sync"
-
- "github.com/dexon-foundation/dexon/event"
- "github.com/dexon-foundation/dexon/log"
- "github.com/dexon-foundation/dexon/p2p/enode"
- "github.com/dexon-foundation/dexon/p2p/enr"
-)
-
-type newRecordsEvent struct{ Records []*enr.Record }
-
-type nodeTable struct {
- mu sync.RWMutex
- entry map[enode.ID]*enode.Node
- feed event.Feed
-}
-
-func newNodeTable() *nodeTable {
- return &nodeTable{
- entry: make(map[enode.ID]*enode.Node),
- }
-}
-
-func (t *nodeTable) GetNode(id enode.ID) *enode.Node {
- t.mu.RLock()
- defer t.mu.RUnlock()
- return t.entry[id]
-}
-
-func (t *nodeTable) AddRecords(records []*enr.Record) {
- t.mu.Lock()
- defer t.mu.Unlock()
-
- var newRecords []*enr.Record
- for _, record := range records {
- node, err := enode.New(enode.ValidSchemes, record)
- if err != nil {
- log.Error("invalid node record", "err", err)
- return
- }
-
- if n, ok := t.entry[node.ID()]; ok && n.Seq() >= node.Seq() {
- log.Trace("Ignore new record, already exists", "id", node.ID().String(),
- "ip", node.IP().String(), "udp", node.UDP(), "tcp", node.TCP())
- continue
- }
-
- t.entry[node.ID()] = node
- newRecords = append(newRecords, record)
- log.Debug("Add new record to node table", "id", node.ID().String(),
- "ip", node.IP().String(), "udp", node.UDP(), "tcp", node.TCP())
- }
- if len(newRecords) > 0 {
- go t.feed.Send(newRecordsEvent{newRecords})
- }
-}
-
-func (t *nodeTable) Records() []*enr.Record {
- t.mu.RLock()
- defer t.mu.RUnlock()
- records := make([]*enr.Record, 0, len(t.entry))
- for _, node := range t.entry {
- records = append(records, node.Record())
- }
- return records
-}
-
-func (t *nodeTable) SubscribeNewRecordsEvent(
- ch chan<- newRecordsEvent) event.Subscription {
- return t.feed.Subscribe(ch)
-}
diff --git a/dex/nodetable_test.go b/dex/nodetable_test.go
deleted file mode 100644
index 06078a0d8..000000000
--- a/dex/nodetable_test.go
+++ /dev/null
@@ -1,121 +0,0 @@
-package dex
-
-import (
- "crypto/ecdsa"
- "net"
- "testing"
- "time"
-
- "github.com/dexon-foundation/dexon/common"
- "github.com/dexon-foundation/dexon/crypto"
- "github.com/dexon-foundation/dexon/p2p/enode"
- "github.com/dexon-foundation/dexon/p2p/enr"
-)
-
-func TestNodeTable(t *testing.T) {
- table := newNodeTable()
- ch := make(chan newRecordsEvent)
- table.SubscribeNewRecordsEvent(ch)
-
- records1 := []*enr.Record{
- randomNode().Record(),
- randomNode().Record(),
- }
-
- records2 := []*enr.Record{
- randomNode().Record(),
- randomNode().Record(),
- }
-
- go table.AddRecords(records1)
-
- select {
- case newRecords := <-ch:
- m := map[common.Hash]struct{}{}
- for _, record := range newRecords.Records {
- m[rlpHash(record)] = struct{}{}
- }
-
- if len(m) != len(records1) {
- t.Errorf("len mismatch: got %d, want: %d",
- len(m), len(records1))
- }
-
- for _, record := range records1 {
- if _, ok := m[rlpHash(record)]; !ok {
- t.Errorf("expected record (%s) not exists", rlpHash(record))
- }
- }
- case <-time.After(1 * time.Second):
- t.Error("did not receive new records event within one second")
- }
-
- go table.AddRecords(records2)
- select {
- case newRecords := <-ch:
- m := map[common.Hash]struct{}{}
- for _, record := range newRecords.Records {
- m[rlpHash(record)] = struct{}{}
- }
-
- if len(m) != len(records2) {
- t.Errorf("len mismatch: got %d, want: %d",
- len(m), len(records2))
- }
-
- for _, record := range records2 {
- if _, ok := m[rlpHash(record)]; !ok {
- t.Errorf("expected record (%s) not exists", rlpHash(record))
- }
- }
- case <-time.After(1 * time.Second):
- t.Error("did not receive new records event within one second")
- }
-
- var records []*enr.Record
- records = append(records, records1...)
- records = append(records, records2...)
- allRecords := table.Records()
- if len(allRecords) != len(records) {
- t.Errorf("all metas num mismatch: got %d, want %d",
- len(records), len(allRecords))
- }
-
- for _, r := range records {
- n, err := enode.New(enode.V4ID{}, r)
- if err != nil {
- t.Errorf(err.Error())
- }
- if rlpHash(r) != rlpHash(table.GetNode(n.ID()).Record()) {
- t.Errorf("record (%s) mismatch", n.ID().String())
- }
- }
-}
-
-func randomNode() *enode.Node {
- var err error
- var privkey *ecdsa.PrivateKey
- for {
- privkey, err = crypto.GenerateKey()
- if err == nil {
- break
- }
- }
- var r enr.Record
- r.Set(enr.IP(net.IP{}))
- r.Set(enr.UDP(0))
- r.Set(enr.TCP(0))
- if err := enode.SignV4(&r, privkey); err != nil {
- panic(err)
- }
- node, err := enode.New(enode.V4ID{}, &r)
- if err != nil {
- panic(err)
-
- }
- return node
-}
-
-func randomID() enode.ID {
- return randomNode().ID()
-}
diff --git a/dex/peer.go b/dex/peer.go
index 1ade2820e..68576564f 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -51,7 +51,6 @@ import (
"github.com/dexon-foundation/dexon/log"
"github.com/dexon-foundation/dexon/p2p"
"github.com/dexon-foundation/dexon/p2p/enode"
- "github.com/dexon-foundation/dexon/p2p/enr"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -62,9 +61,8 @@ var (
)
const (
- maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
- maxKnownRecords = 32768 // Maximum records hashes to keep in the known list (prevent DOS)
- maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
+ maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+ maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
maxKnownDKGPrivateShares = 1024 // this related to DKG Size
@@ -73,8 +71,6 @@ const (
// contain a single transaction, or thousands.
maxQueuedTxs = 1024
- maxQueuedRecords = 512
-
// maxQueuedProps is the maximum number of block propagations to queue up before
// dropping broadcasts. There's not much point in queueing stale blocks, so a few
// that might cover uncles should be enough.
@@ -143,12 +139,10 @@ type peer struct {
lastKnownAgreementPositionLock sync.RWMutex
lastKnownAgreementPosition coreTypes.Position // The position of latest agreement to be known by this peer
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
- knownRecords mapset.Set // Set of node record known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
knownAgreements mapset.Set
knownDKGPrivateShares mapset.Set
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
- queuedRecords chan []*enr.Record // Queue of node records to broadcast to the peer
queuedProps chan *types.Block // Queue of blocks to broadcast to the peer
queuedAnns chan *types.Block // Queue of blocks to announce to the peer
queuedCoreBlocks chan []*coreTypes.Block
@@ -169,12 +163,10 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
version: version,
id: p.ID().String(),
knownTxs: mapset.NewSet(),
- knownRecords: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
knownAgreements: mapset.NewSet(),
knownDKGPrivateShares: mapset.NewSet(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
- queuedRecords: make(chan []*enr.Record, maxQueuedRecords),
queuedProps: make(chan *types.Block, maxQueuedProps),
queuedAnns: make(chan *types.Block, maxQueuedAnns),
queuedCoreBlocks: make(chan []*coreTypes.Block, maxQueuedCoreBlocks),
@@ -190,7 +182,7 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
}
// broadcast is a write loop that multiplexes block propagations, announcements,
-// transaction and notary node records broadcasts into the remote peer.
+// transaction broadcasts into the remote peer.
// The goal is to have an async writer that does not lock up node internals.
func (p *peer) broadcast() {
queuedVotes := make([]*coreTypes.Vote, 0, maxQueuedVotes)
@@ -212,12 +204,6 @@ func (p *peer) broadcast() {
queuedVotes = queuedVotes[:0]
}
select {
- case records := <-p.queuedRecords:
- if err := p.SendNodeRecords(records); err != nil {
- return
- }
- p.Log().Trace("Broadcast node records", "count", len(records))
-
case block := <-p.queuedProps:
if err := p.SendNewBlock(block); err != nil {
return
@@ -334,13 +320,6 @@ func (p *peer) MarkTransaction(hash common.Hash) {
p.knownTxs.Add(hash)
}
-func (p *peer) MarkNodeRecord(hash common.Hash) {
- for p.knownRecords.Cardinality() >= maxKnownRecords {
- p.knownRecords.Pop()
- }
- p.knownRecords.Add(hash)
-}
-
func (p *peer) MarkAgreement(position coreTypes.Position) bool {
p.lastKnownAgreementPositionLock.Lock()
defer p.lastKnownAgreementPositionLock.Unlock()
@@ -393,29 +372,6 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
}
}
-// SendNodeRecords sends the records to the peer and includes the hashes
-// in its records hash set for future reference.
-func (p *peer) SendNodeRecords(records []*enr.Record) error {
- for _, record := range records {
- p.knownRecords.Add(rlpHash(record))
- }
- return p.logSend(p2p.Send(p.rw, RecordMsg, records), RecordMsg)
-}
-
-// AsyncSendNodeRecord queues list of notary node records propagation to a
-// remote peer. If the peer's broadcast queue is full, the event is silently
-// dropped.
-func (p *peer) AsyncSendNodeRecords(records []*enr.Record) {
- select {
- case p.queuedRecords <- records:
- for _, record := range records {
- p.knownRecords.Add(rlpHash(record))
- }
- default:
- p.Log().Debug("Dropping node record propagation", "count", len(records))
- }
-}
-
// SendNewBlockHashes announces the availability of a number of blocks through
// a hash notification.
func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
@@ -708,7 +664,6 @@ type peerSet struct {
peers map[string]*peer
lock sync.RWMutex
closed bool
- tab *nodeTable
selfPK string
srvr p2pServer
@@ -721,12 +676,11 @@ type peerSet struct {
}
// newPeerSet creates a new peer set to track the active participants.
-func newPeerSet(gov governance, srvr p2pServer, tab *nodeTable) *peerSet {
+func newPeerSet(gov governance, srvr p2pServer) *peerSet {
return &peerSet{
peers: make(map[string]*peer),
gov: gov,
srvr: srvr,
- tab: tab,
selfPK: hex.EncodeToString(crypto.FromECDSAPub(&srvr.GetPrivateKey().PublicKey)),
label2Nodes: make(map[peerLabel]map[string]*enode.Node),
directConn: make(map[peerLabel]struct{}),
@@ -842,20 +796,6 @@ func (ps *peerSet) PeersWithoutLabel(label peerLabel) []*peer {
return list
}
-// PeersWithoutNodeRecord retrieves a list of peers that do not have a
-// given record in their set of known hashes.
-func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
- list := make([]*peer, 0, len(ps.peers))
- for _, p := range ps.peers {
- if !p.knownRecords.Contains(hash) {
- list = append(list, p)
- }
- }
- return list
-}
-
func (ps *peerSet) PeersWithoutAgreement(position coreTypes.Position) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
@@ -996,18 +936,6 @@ func (ps *peerSet) EnsureGroupConn() {
}
}
-func (ps *peerSet) Refresh() {
- ps.lock.Lock()
- defer ps.lock.Unlock()
- for id := range ps.allDirectPeers {
- if ps.peers[id] == nil {
- if node := ps.tab.GetNode(enode.HexID(id)); node != nil {
- ps.srvr.AddDirectPeer(node)
- }
- }
- }
-}
-
func (ps *peerSet) buildDirectConn(label peerLabel) {
ps.directConn[label] = struct{}{}
for id := range ps.label2Nodes[label] {
@@ -1048,11 +976,7 @@ func (ps *peerSet) addDirectPeer(id string, label peerLabel) {
return
}
ps.allDirectPeers[id] = map[peerLabel]struct{}{label: {}}
-
- node := ps.tab.GetNode(enode.HexID(id))
- if node == nil {
- node = ps.label2Nodes[label][id]
- }
+ node := ps.label2Nodes[label][id]
ps.srvr.AddDirectPeer(node)
}
diff --git a/dex/peer_test.go b/dex/peer_test.go
index d6bc7e24c..18f6617d3 100644
--- a/dex/peer_test.go
+++ b/dex/peer_test.go
@@ -17,7 +17,6 @@ func TestPeerSetBuildAndForgetConn(t *testing.T) {
}
server := newTestP2PServer(key)
self := server.Self()
- table := newNodeTable()
gov := &testGovernance{}
@@ -49,7 +48,7 @@ func TestPeerSetBuildAndForgetConn(t *testing.T) {
return newTestNodeSet(m[round]), nil
}
- ps := newPeerSet(gov, server, table)
+ ps := newPeerSet(gov, server)
// build round 10
ps.BuildConnection(10)
diff --git a/dex/protocol.go b/dex/protocol.go
index 4da64b604..adfda3c6f 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -82,8 +82,6 @@ const (
ReceiptsMsg = 0x10
// Protocol messages belonging to dex/64
- RecordMsg = 0x11
-
CoreBlockMsg = 0x20
VoteMsg = 0x21
AgreementMsg = 0x22
diff --git a/dex/protocol_test.go b/dex/protocol_test.go
index 51bd32c72..3ed93c061 100644
--- a/dex/protocol_test.go
+++ b/dex/protocol_test.go
@@ -36,7 +36,6 @@ import (
"github.com/dexon-foundation/dexon/dex/downloader"
"github.com/dexon-foundation/dexon/p2p"
"github.com/dexon-foundation/dexon/p2p/enode"
- "github.com/dexon-foundation/dexon/p2p/enr"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -232,86 +231,6 @@ func TestGetBlockHeadersDataEncodeDecode(t *testing.T) {
}
}
-func TestRecvNodeRecords(t *testing.T) {
- pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
- p, _ := newTestPeer("peer", dex64, pm, true)
- defer pm.Stop()
- defer p.close()
-
- record := randomNode().Record()
-
- ch := make(chan newRecordsEvent)
- pm.nodeTable.SubscribeNewRecordsEvent(ch)
-
- if err := p2p.Send(p.app, RecordMsg, []interface{}{record}); err != nil {
- t.Fatalf("send error: %v", err)
- }
-
- select {
- case event := <-ch:
- records := event.Records
- if len(records) != 1 {
- t.Errorf("wrong number of new records: got %d, want 1", len(records))
- } else if rlpHash(records[0]) != rlpHash(record) {
- t.Errorf("added wrong records hash: got %v, want %v", rlpHash(records[0]), rlpHash(record))
- }
- case <-time.After(3 * time.Second):
- t.Errorf("no newRecordsEvent received within 3 seconds")
- }
-}
-
-func TestSendNodeRecords(t *testing.T) {
- pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
- defer pm.Stop()
-
- allrecords := make([]*enr.Record, 100)
- for i := 0; i < len(allrecords); i++ {
- allrecords[i] = randomNode().Record()
- }
-
- // Connect several peers. They should all receive the pending transactions.
- var wg sync.WaitGroup
- checkrecords := func(p *testPeer) {
- defer wg.Done()
- defer p.close()
- seen := make(map[common.Hash]bool)
- for _, record := range allrecords {
- seen[rlpHash(record)] = false
- }
- for n := 0; n < len(allrecords) && !t.Failed(); {
- var records []*enr.Record
- msg, err := p.app.ReadMsg()
- if err != nil {
- t.Errorf("%v: read error: %v", p.Peer, err)
- } else if msg.Code != RecordMsg {
- t.Errorf("%v: got code %d, want RecordMsg", p.Peer, msg.Code)
- }
- if err := msg.Decode(&records); err != nil {
- t.Errorf("%v: %v", p.Peer, err)
- }
- for _, record := range records {
- hash := rlpHash(record)
- seenrecord, want := seen[hash]
- if seenrecord {
- t.Errorf("%v: got record more than once: %x", p.Peer, hash)
- }
- if !want {
- t.Errorf("%v: got unexpected record: %x", p.Peer, hash)
- }
- seen[hash] = true
- n++
- }
- }
- }
- for i := 0; i < 3; i++ {
- p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), dex64, pm, true)
- wg.Add(1)
- go checkrecords(p)
- }
- pm.nodeTable.AddRecords(allrecords)
- wg.Wait()
-}
-
func TestRecvCoreBlocks(t *testing.T) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
pm.SetReceiveCoreMessage(true)
@@ -357,7 +276,7 @@ func TestRecvCoreBlocks(t *testing.T) {
t.Errorf("block mismatch")
}
case <-time.After(3 * time.Second):
- t.Errorf("no newRecordsEvent received within 3 seconds")
+ t.Errorf("no core block received within 3 seconds")
}
}
diff --git a/dex/sync.go b/dex/sync.go
index 93bed87c4..84c161845 100644
--- a/dex/sync.go
+++ b/dex/sync.go
@@ -26,7 +26,6 @@ import (
"github.com/dexon-foundation/dexon/dex/downloader"
"github.com/dexon-foundation/dexon/log"
"github.com/dexon-foundation/dexon/p2p/enode"
- "github.com/dexon-foundation/dexon/p2p/enr"
)
const (
@@ -40,9 +39,6 @@ const (
// This is the target size for the packs of transactions sent by txsyncLoop.
// A pack can get larger than this if a single transactions exceeds this size.
txsyncPackSize = 100 * 1024
-
- // This is the target number for the packs of records sent by recordsyncLoop.
- recordsyncPackNum = 1024
)
type txsync struct {
@@ -137,95 +133,6 @@ func (pm *ProtocolManager) txsyncLoop() {
}
}
-type recordsync struct {
- p *peer
- records []*enr.Record
-}
-
-// syncNodeRecords starts sending all node records to the given peer.
-func (pm *ProtocolManager) syncNodeRecords(p *peer) {
- records := pm.nodeTable.Records()
- p.Log().Debug("Sync node records", "num", len(records))
- if len(records) == 0 {
- return
- }
- select {
- case pm.recordsyncCh <- &recordsync{p, records}:
- case <-pm.quitSync:
- }
-}
-
-// recordsyncLoop takes care of the initial node record sync for each new
-// connection. When a new peer appears, we relay all currently node records.
-// In order to minimise egress bandwidth usage, we send
-// the records in small packs to one peer at a time.
-func (pm *ProtocolManager) recordsyncLoop() {
- var (
- pending = make(map[enode.ID]*recordsync)
- sending = false // whether a send is active
- pack = new(recordsync) // the pack that is being sent
- done = make(chan error, 1) // result of the send
- )
-
- // send starts a sending a pack of transactions from the sync.
- send := func(s *recordsync) {
- // Fill pack with node records up to the target num.
- var num int
- pack.p = s.p
- pack.records = pack.records[:0]
- for i := 0; i < len(s.records) && num < recordsyncPackNum; i++ {
- pack.records = append(pack.records, s.records[i])
- num += 1
- }
- // Remove the records that will be sent.
- s.records = s.records[:copy(s.records, s.records[len(pack.records):])]
- if len(s.records) == 0 {
- delete(pending, s.p.ID())
- }
- // Send the pack in the background.
- s.p.Log().Trace("Sending batch of records", "count", len(pack.records), "bytes", num)
- sending = true
- go func() { done <- pack.p.SendNodeRecords(pack.records) }()
- }
-
- // pick chooses the next pending sync.
- pick := func() *recordsync {
- if len(pending) == 0 {
- return nil
- }
- n := rand.Intn(len(pending)) + 1
- for _, s := range pending {
- if n--; n == 0 {
- return s
- }
- }
- return nil
- }
-
- for {
- select {
- case s := <-pm.recordsyncCh:
- pending[s.p.ID()] = s
- if !sending {
- send(s)
- }
- case err := <-done:
- sending = false
- // Stop tracking peers that cause send failures.
- if err != nil {
- pack.p.Log().Debug("Record send failed", "err", err)
- delete(pending, pack.p.ID())
- }
- // Schedule the next send.
- if s := pick(); s != nil {
- send(s)
- }
- case <-pm.quitSync:
- return
- }
- }
-}
-
// syncer is responsible for periodically synchronising with the network, both
// downloading hashes and blocks as well as handling the announcement handler.
func (pm *ProtocolManager) syncer() {
diff --git a/p2p/dial.go b/p2p/dial.go
index 583f02f6b..8bb39f9e8 100644
--- a/p2p/dial.go
+++ b/p2p/dial.go
@@ -226,7 +226,9 @@ func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Ti
delete(s.direct, t.dest.ID())
case nil:
s.dialing[id] = t.flags
- newtasks = append(newtasks, t)
+ // New a task instance with no lastResolved, resolveDelay here,
+ // so that we can pass the resolve delay check.
+ newtasks = append(newtasks, &dialTask{flags: t.flags, dest: t.dest})
}
}
@@ -363,12 +365,9 @@ func (t *dialTask) resolve(srv *Server) bool {
resolved := srv.ntab.Resolve(t.dest)
t.lastResolved = time.Now()
if resolved == nil {
- // Only backoff delay if this is not direct connection.
- if t.flags&directDialedConn == 0 {
- t.resolveDelay *= 2
- if t.resolveDelay > maxResolveDelay {
- t.resolveDelay = maxResolveDelay
- }
+ t.resolveDelay *= 2
+ if t.resolveDelay > maxResolveDelay {
+ t.resolveDelay = maxResolveDelay
}
log.Debug("Resolving node failed", "id", t.dest.ID(), "newdelay", t.resolveDelay)
return false