diff options
-rw-r--r-- | dex/handler.go | 110 | ||||
-rw-r--r-- | dex/nodetable.go | 73 | ||||
-rw-r--r-- | dex/nodetable_test.go | 121 | ||||
-rw-r--r-- | dex/peer.go | 86 | ||||
-rw-r--r-- | dex/peer_test.go | 3 | ||||
-rw-r--r-- | dex/protocol.go | 2 | ||||
-rw-r--r-- | dex/protocol_test.go | 83 | ||||
-rw-r--r-- | dex/sync.go | 93 | ||||
-rw-r--r-- | p2p/dial.go | 13 |
9 files changed, 35 insertions, 549 deletions
diff --git a/dex/handler.go b/dex/handler.go index 45f58012c..deb959c45 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -35,11 +35,11 @@ package dex import ( "bytes" + "context" "encoding/json" "errors" "fmt" "math" - "math/rand" "sync" "sync/atomic" "time" @@ -64,7 +64,6 @@ import ( "github.com/dexon-foundation/dexon/metrics" "github.com/dexon-foundation/dexon/p2p" "github.com/dexon-foundation/dexon/p2p/enode" - "github.com/dexon-foundation/dexon/p2p/enr" "github.com/dexon-foundation/dexon/params" "github.com/dexon-foundation/dexon/rlp" ) @@ -81,8 +80,6 @@ const ( finalizedBlockChanSize = 128 - recordChanSize = 10240 - maxPullPeers = 3 maxPullVotePeers = 1 @@ -107,7 +104,6 @@ type ProtocolManager struct { acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing) txpool txPool - nodeTable *nodeTable gov governance blockchain *core.BlockChain chainconfig *params.ChainConfig @@ -121,20 +117,17 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol - eventMux *event.TypeMux - txsCh chan core.NewTxsEvent - txsSub event.Subscription - recordsCh chan newRecordsEvent - recordsSub event.Subscription + eventMux *event.TypeMux + txsCh chan core.NewTxsEvent + txsSub event.Subscription whitelist map[uint64]common.Hash // channels for fetcher, syncer, txsyncLoop - newPeerCh chan *peer - txsyncCh chan *txsync - recordsyncCh chan *recordsync - quitSync chan struct{} - noMorePeers chan struct{} + newPeerCh chan *peer + txsyncCh chan *txsync + quitSync chan struct{} + noMorePeers chan struct{} // channels for peerSetLoop chainHeadCh chan core.ChainHeadEvent @@ -168,13 +161,11 @@ func NewProtocolManager( mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, whitelist map[uint64]common.Hash, isBlockProposer bool, gov governance, app dexconApp) (*ProtocolManager, error) { - tab := newNodeTable() // Create the protocol manager with the base fields manager := &ProtocolManager{ networkID: networkID, eventMux: mux, txpool: txpool, - nodeTable: tab, gov: gov, blockchain: blockchain, cache: newCache(5120, dexDB.NewDatabase(chaindb)), @@ -184,7 +175,6 @@ func NewProtocolManager( newPeerCh: make(chan *peer), noMorePeers: make(chan struct{}), txsyncCh: make(chan *txsync), - recordsyncCh: make(chan *recordsync), quitSync: make(chan struct{}), receiveCh: make(chan interface{}, 1024), receiveCoreMessage: 0, @@ -285,7 +275,7 @@ func (pm *ProtocolManager) removePeer(id string) { func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { pm.maxPeers = maxPeers pm.srvr = srvr - pm.peers = newPeerSet(pm.gov, pm.srvr, pm.nodeTable) + pm.peers = newPeerSet(pm.gov, pm.srvr) // broadcast transactions pm.txsCh = make(chan core.NewTxsEvent, txChanSize) @@ -301,11 +291,6 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { go pm.finalizedBlockBroadcastLoop() } - // broadcast node records - pm.recordsCh = make(chan newRecordsEvent, recordChanSize) - pm.recordsSub = pm.nodeTable.SubscribeNewRecordsEvent(pm.recordsCh) - go pm.recordBroadcastLoop() - // run the peer set loop pm.chainHeadCh = make(chan core.ChainHeadEvent) pm.chainHeadSub = pm.blockchain.SubscribeChainHeadEvent(pm.chainHeadCh) @@ -314,8 +299,6 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { // start sync handlers go pm.syncer() go pm.txsyncLoop() - go pm.recordsyncLoop() - } func (pm *ProtocolManager) Stop() { @@ -392,7 +375,6 @@ func (pm *ProtocolManager) handle(p *peer) error { // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. pm.syncTransactions(p) - pm.syncNodeRecords(p) // If we have any explicit whitelist block hashes, request them for number := range pm.whitelist { @@ -839,21 +821,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { types.GlobalSigCache.Add(types.NewEIP155Signer(pm.blockchain.Config().ChainID), txs) pm.txpool.AddRemotes(txs) - case msg.Code == RecordMsg: - var records []*enr.Record - if err := msg.Decode(&records); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - for i, record := range records { - if record == nil { - return errResp(ErrDecode, "node record %d is nil", i) - } - p.MarkNodeRecord(rlpHash(record)) - } - pm.nodeTable.AddRecords(records) - // Block proposer-only messages. - case msg.Code == CoreBlockMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break @@ -1070,23 +1038,6 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { } } -// BroadcastRecords will propagate node records to its peers. -func (pm *ProtocolManager) BroadcastRecords(records []*enr.Record) { - var recordset = make(map[*peer][]*enr.Record) - - for _, record := range records { - peers := pm.peers.PeersWithoutNodeRecord(rlpHash(record)) - for _, peer := range peers { - recordset[peer] = append(recordset[peer], record) - } - log.Trace("Broadcast record", "recipients", len(peers)) - } - - for peer, records := range recordset { - peer.AsyncSendNodeRecords(records) - } -} - // BroadcastFinalizedBlock broadcasts the finalized core block to some of its peers. func (pm *ProtocolManager) BroadcastFinalizedBlock(block *coreTypes.Block) { if len(block.Randomness) == 0 { @@ -1271,36 +1222,6 @@ func (pm *ProtocolManager) finalizedBlockBroadcastLoop() { } } -func (pm *ProtocolManager) recordBroadcastLoop() { - r := rand.New(rand.NewSource(time.Now().Unix())) - t := time.NewTimer(0) - defer t.Stop() - - for { - select { - case event := <-pm.recordsCh: - pm.BroadcastRecords(event.Records) - pm.peers.Refresh() - - case <-t.C: - record := pm.srvr.Self().Record() - log.Debug("refresh our node record", "seq", record.Seq()) - pm.nodeTable.AddRecords([]*enr.Record{record}) - - // Log current peers connection status. - pm.peers.Status() - - // Reset timer. - d := 1*time.Minute + time.Duration(r.Int63n(60))*time.Second - t.Reset(d) - - // Err() channel will be closed when unsubscribing. - case <-pm.recordsSub.Err(): - return - } - } -} - func (pm *ProtocolManager) SetReceiveCoreMessage(enabled bool) { if enabled { atomic.StoreInt32(&pm.receiveCoreMessage, 1) @@ -1333,6 +1254,19 @@ func (pm *ProtocolManager) peerSetLoop() { resetCount = pm.gov.DKGResetCount(round) } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + for ctx.Err() == nil { + select { + case <-time.After(time.Minute): + pm.peers.Status() + case <-ctx.Done(): + return + } + } + }() + for { select { case event := <-pm.chainHeadCh: diff --git a/dex/nodetable.go b/dex/nodetable.go deleted file mode 100644 index cc1de160f..000000000 --- a/dex/nodetable.go +++ /dev/null @@ -1,73 +0,0 @@ -package dex - -import ( - "sync" - - "github.com/dexon-foundation/dexon/event" - "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/p2p/enode" - "github.com/dexon-foundation/dexon/p2p/enr" -) - -type newRecordsEvent struct{ Records []*enr.Record } - -type nodeTable struct { - mu sync.RWMutex - entry map[enode.ID]*enode.Node - feed event.Feed -} - -func newNodeTable() *nodeTable { - return &nodeTable{ - entry: make(map[enode.ID]*enode.Node), - } -} - -func (t *nodeTable) GetNode(id enode.ID) *enode.Node { - t.mu.RLock() - defer t.mu.RUnlock() - return t.entry[id] -} - -func (t *nodeTable) AddRecords(records []*enr.Record) { - t.mu.Lock() - defer t.mu.Unlock() - - var newRecords []*enr.Record - for _, record := range records { - node, err := enode.New(enode.ValidSchemes, record) - if err != nil { - log.Error("invalid node record", "err", err) - return - } - - if n, ok := t.entry[node.ID()]; ok && n.Seq() >= node.Seq() { - log.Trace("Ignore new record, already exists", "id", node.ID().String(), - "ip", node.IP().String(), "udp", node.UDP(), "tcp", node.TCP()) - continue - } - - t.entry[node.ID()] = node - newRecords = append(newRecords, record) - log.Debug("Add new record to node table", "id", node.ID().String(), - "ip", node.IP().String(), "udp", node.UDP(), "tcp", node.TCP()) - } - if len(newRecords) > 0 { - go t.feed.Send(newRecordsEvent{newRecords}) - } -} - -func (t *nodeTable) Records() []*enr.Record { - t.mu.RLock() - defer t.mu.RUnlock() - records := make([]*enr.Record, 0, len(t.entry)) - for _, node := range t.entry { - records = append(records, node.Record()) - } - return records -} - -func (t *nodeTable) SubscribeNewRecordsEvent( - ch chan<- newRecordsEvent) event.Subscription { - return t.feed.Subscribe(ch) -} diff --git a/dex/nodetable_test.go b/dex/nodetable_test.go deleted file mode 100644 index 06078a0d8..000000000 --- a/dex/nodetable_test.go +++ /dev/null @@ -1,121 +0,0 @@ -package dex - -import ( - "crypto/ecdsa" - "net" - "testing" - "time" - - "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/crypto" - "github.com/dexon-foundation/dexon/p2p/enode" - "github.com/dexon-foundation/dexon/p2p/enr" -) - -func TestNodeTable(t *testing.T) { - table := newNodeTable() - ch := make(chan newRecordsEvent) - table.SubscribeNewRecordsEvent(ch) - - records1 := []*enr.Record{ - randomNode().Record(), - randomNode().Record(), - } - - records2 := []*enr.Record{ - randomNode().Record(), - randomNode().Record(), - } - - go table.AddRecords(records1) - - select { - case newRecords := <-ch: - m := map[common.Hash]struct{}{} - for _, record := range newRecords.Records { - m[rlpHash(record)] = struct{}{} - } - - if len(m) != len(records1) { - t.Errorf("len mismatch: got %d, want: %d", - len(m), len(records1)) - } - - for _, record := range records1 { - if _, ok := m[rlpHash(record)]; !ok { - t.Errorf("expected record (%s) not exists", rlpHash(record)) - } - } - case <-time.After(1 * time.Second): - t.Error("did not receive new records event within one second") - } - - go table.AddRecords(records2) - select { - case newRecords := <-ch: - m := map[common.Hash]struct{}{} - for _, record := range newRecords.Records { - m[rlpHash(record)] = struct{}{} - } - - if len(m) != len(records2) { - t.Errorf("len mismatch: got %d, want: %d", - len(m), len(records2)) - } - - for _, record := range records2 { - if _, ok := m[rlpHash(record)]; !ok { - t.Errorf("expected record (%s) not exists", rlpHash(record)) - } - } - case <-time.After(1 * time.Second): - t.Error("did not receive new records event within one second") - } - - var records []*enr.Record - records = append(records, records1...) - records = append(records, records2...) - allRecords := table.Records() - if len(allRecords) != len(records) { - t.Errorf("all metas num mismatch: got %d, want %d", - len(records), len(allRecords)) - } - - for _, r := range records { - n, err := enode.New(enode.V4ID{}, r) - if err != nil { - t.Errorf(err.Error()) - } - if rlpHash(r) != rlpHash(table.GetNode(n.ID()).Record()) { - t.Errorf("record (%s) mismatch", n.ID().String()) - } - } -} - -func randomNode() *enode.Node { - var err error - var privkey *ecdsa.PrivateKey - for { - privkey, err = crypto.GenerateKey() - if err == nil { - break - } - } - var r enr.Record - r.Set(enr.IP(net.IP{})) - r.Set(enr.UDP(0)) - r.Set(enr.TCP(0)) - if err := enode.SignV4(&r, privkey); err != nil { - panic(err) - } - node, err := enode.New(enode.V4ID{}, &r) - if err != nil { - panic(err) - - } - return node -} - -func randomID() enode.ID { - return randomNode().ID() -} diff --git a/dex/peer.go b/dex/peer.go index 1ade2820e..68576564f 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -51,7 +51,6 @@ import ( "github.com/dexon-foundation/dexon/log" "github.com/dexon-foundation/dexon/p2p" "github.com/dexon-foundation/dexon/p2p/enode" - "github.com/dexon-foundation/dexon/p2p/enr" "github.com/dexon-foundation/dexon/rlp" ) @@ -62,9 +61,8 @@ var ( ) const ( - maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) - maxKnownRecords = 32768 // Maximum records hashes to keep in the known list (prevent DOS) - maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) + maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) + maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) maxKnownDKGPrivateShares = 1024 // this related to DKG Size @@ -73,8 +71,6 @@ const ( // contain a single transaction, or thousands. maxQueuedTxs = 1024 - maxQueuedRecords = 512 - // maxQueuedProps is the maximum number of block propagations to queue up before // dropping broadcasts. There's not much point in queueing stale blocks, so a few // that might cover uncles should be enough. @@ -143,12 +139,10 @@ type peer struct { lastKnownAgreementPositionLock sync.RWMutex lastKnownAgreementPosition coreTypes.Position // The position of latest agreement to be known by this peer knownTxs mapset.Set // Set of transaction hashes known to be known by this peer - knownRecords mapset.Set // Set of node record known to be known by this peer knownBlocks mapset.Set // Set of block hashes known to be known by this peer knownAgreements mapset.Set knownDKGPrivateShares mapset.Set queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer - queuedRecords chan []*enr.Record // Queue of node records to broadcast to the peer queuedProps chan *types.Block // Queue of blocks to broadcast to the peer queuedAnns chan *types.Block // Queue of blocks to announce to the peer queuedCoreBlocks chan []*coreTypes.Block @@ -169,12 +163,10 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { version: version, id: p.ID().String(), knownTxs: mapset.NewSet(), - knownRecords: mapset.NewSet(), knownBlocks: mapset.NewSet(), knownAgreements: mapset.NewSet(), knownDKGPrivateShares: mapset.NewSet(), queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), - queuedRecords: make(chan []*enr.Record, maxQueuedRecords), queuedProps: make(chan *types.Block, maxQueuedProps), queuedAnns: make(chan *types.Block, maxQueuedAnns), queuedCoreBlocks: make(chan []*coreTypes.Block, maxQueuedCoreBlocks), @@ -190,7 +182,7 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { } // broadcast is a write loop that multiplexes block propagations, announcements, -// transaction and notary node records broadcasts into the remote peer. +// transaction broadcasts into the remote peer. // The goal is to have an async writer that does not lock up node internals. func (p *peer) broadcast() { queuedVotes := make([]*coreTypes.Vote, 0, maxQueuedVotes) @@ -212,12 +204,6 @@ func (p *peer) broadcast() { queuedVotes = queuedVotes[:0] } select { - case records := <-p.queuedRecords: - if err := p.SendNodeRecords(records); err != nil { - return - } - p.Log().Trace("Broadcast node records", "count", len(records)) - case block := <-p.queuedProps: if err := p.SendNewBlock(block); err != nil { return @@ -334,13 +320,6 @@ func (p *peer) MarkTransaction(hash common.Hash) { p.knownTxs.Add(hash) } -func (p *peer) MarkNodeRecord(hash common.Hash) { - for p.knownRecords.Cardinality() >= maxKnownRecords { - p.knownRecords.Pop() - } - p.knownRecords.Add(hash) -} - func (p *peer) MarkAgreement(position coreTypes.Position) bool { p.lastKnownAgreementPositionLock.Lock() defer p.lastKnownAgreementPositionLock.Unlock() @@ -393,29 +372,6 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) { } } -// SendNodeRecords sends the records to the peer and includes the hashes -// in its records hash set for future reference. -func (p *peer) SendNodeRecords(records []*enr.Record) error { - for _, record := range records { - p.knownRecords.Add(rlpHash(record)) - } - return p.logSend(p2p.Send(p.rw, RecordMsg, records), RecordMsg) -} - -// AsyncSendNodeRecord queues list of notary node records propagation to a -// remote peer. If the peer's broadcast queue is full, the event is silently -// dropped. -func (p *peer) AsyncSendNodeRecords(records []*enr.Record) { - select { - case p.queuedRecords <- records: - for _, record := range records { - p.knownRecords.Add(rlpHash(record)) - } - default: - p.Log().Debug("Dropping node record propagation", "count", len(records)) - } -} - // SendNewBlockHashes announces the availability of a number of blocks through // a hash notification. func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { @@ -708,7 +664,6 @@ type peerSet struct { peers map[string]*peer lock sync.RWMutex closed bool - tab *nodeTable selfPK string srvr p2pServer @@ -721,12 +676,11 @@ type peerSet struct { } // newPeerSet creates a new peer set to track the active participants. -func newPeerSet(gov governance, srvr p2pServer, tab *nodeTable) *peerSet { +func newPeerSet(gov governance, srvr p2pServer) *peerSet { return &peerSet{ peers: make(map[string]*peer), gov: gov, srvr: srvr, - tab: tab, selfPK: hex.EncodeToString(crypto.FromECDSAPub(&srvr.GetPrivateKey().PublicKey)), label2Nodes: make(map[peerLabel]map[string]*enode.Node), directConn: make(map[peerLabel]struct{}), @@ -842,20 +796,6 @@ func (ps *peerSet) PeersWithoutLabel(label peerLabel) []*peer { return list } -// PeersWithoutNodeRecord retrieves a list of peers that do not have a -// given record in their set of known hashes. -func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer { - ps.lock.RLock() - defer ps.lock.RUnlock() - list := make([]*peer, 0, len(ps.peers)) - for _, p := range ps.peers { - if !p.knownRecords.Contains(hash) { - list = append(list, p) - } - } - return list -} - func (ps *peerSet) PeersWithoutAgreement(position coreTypes.Position) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() @@ -996,18 +936,6 @@ func (ps *peerSet) EnsureGroupConn() { } } -func (ps *peerSet) Refresh() { - ps.lock.Lock() - defer ps.lock.Unlock() - for id := range ps.allDirectPeers { - if ps.peers[id] == nil { - if node := ps.tab.GetNode(enode.HexID(id)); node != nil { - ps.srvr.AddDirectPeer(node) - } - } - } -} - func (ps *peerSet) buildDirectConn(label peerLabel) { ps.directConn[label] = struct{}{} for id := range ps.label2Nodes[label] { @@ -1048,11 +976,7 @@ func (ps *peerSet) addDirectPeer(id string, label peerLabel) { return } ps.allDirectPeers[id] = map[peerLabel]struct{}{label: {}} - - node := ps.tab.GetNode(enode.HexID(id)) - if node == nil { - node = ps.label2Nodes[label][id] - } + node := ps.label2Nodes[label][id] ps.srvr.AddDirectPeer(node) } diff --git a/dex/peer_test.go b/dex/peer_test.go index d6bc7e24c..18f6617d3 100644 --- a/dex/peer_test.go +++ b/dex/peer_test.go @@ -17,7 +17,6 @@ func TestPeerSetBuildAndForgetConn(t *testing.T) { } server := newTestP2PServer(key) self := server.Self() - table := newNodeTable() gov := &testGovernance{} @@ -49,7 +48,7 @@ func TestPeerSetBuildAndForgetConn(t *testing.T) { return newTestNodeSet(m[round]), nil } - ps := newPeerSet(gov, server, table) + ps := newPeerSet(gov, server) // build round 10 ps.BuildConnection(10) diff --git a/dex/protocol.go b/dex/protocol.go index 4da64b604..adfda3c6f 100644 --- a/dex/protocol.go +++ b/dex/protocol.go @@ -82,8 +82,6 @@ const ( ReceiptsMsg = 0x10 // Protocol messages belonging to dex/64 - RecordMsg = 0x11 - CoreBlockMsg = 0x20 VoteMsg = 0x21 AgreementMsg = 0x22 diff --git a/dex/protocol_test.go b/dex/protocol_test.go index 51bd32c72..3ed93c061 100644 --- a/dex/protocol_test.go +++ b/dex/protocol_test.go @@ -36,7 +36,6 @@ import ( "github.com/dexon-foundation/dexon/dex/downloader" "github.com/dexon-foundation/dexon/p2p" "github.com/dexon-foundation/dexon/p2p/enode" - "github.com/dexon-foundation/dexon/p2p/enr" "github.com/dexon-foundation/dexon/rlp" ) @@ -232,86 +231,6 @@ func TestGetBlockHeadersDataEncodeDecode(t *testing.T) { } } -func TestRecvNodeRecords(t *testing.T) { - pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) - p, _ := newTestPeer("peer", dex64, pm, true) - defer pm.Stop() - defer p.close() - - record := randomNode().Record() - - ch := make(chan newRecordsEvent) - pm.nodeTable.SubscribeNewRecordsEvent(ch) - - if err := p2p.Send(p.app, RecordMsg, []interface{}{record}); err != nil { - t.Fatalf("send error: %v", err) - } - - select { - case event := <-ch: - records := event.Records - if len(records) != 1 { - t.Errorf("wrong number of new records: got %d, want 1", len(records)) - } else if rlpHash(records[0]) != rlpHash(record) { - t.Errorf("added wrong records hash: got %v, want %v", rlpHash(records[0]), rlpHash(record)) - } - case <-time.After(3 * time.Second): - t.Errorf("no newRecordsEvent received within 3 seconds") - } -} - -func TestSendNodeRecords(t *testing.T) { - pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) - defer pm.Stop() - - allrecords := make([]*enr.Record, 100) - for i := 0; i < len(allrecords); i++ { - allrecords[i] = randomNode().Record() - } - - // Connect several peers. They should all receive the pending transactions. - var wg sync.WaitGroup - checkrecords := func(p *testPeer) { - defer wg.Done() - defer p.close() - seen := make(map[common.Hash]bool) - for _, record := range allrecords { - seen[rlpHash(record)] = false - } - for n := 0; n < len(allrecords) && !t.Failed(); { - var records []*enr.Record - msg, err := p.app.ReadMsg() - if err != nil { - t.Errorf("%v: read error: %v", p.Peer, err) - } else if msg.Code != RecordMsg { - t.Errorf("%v: got code %d, want RecordMsg", p.Peer, msg.Code) - } - if err := msg.Decode(&records); err != nil { - t.Errorf("%v: %v", p.Peer, err) - } - for _, record := range records { - hash := rlpHash(record) - seenrecord, want := seen[hash] - if seenrecord { - t.Errorf("%v: got record more than once: %x", p.Peer, hash) - } - if !want { - t.Errorf("%v: got unexpected record: %x", p.Peer, hash) - } - seen[hash] = true - n++ - } - } - } - for i := 0; i < 3; i++ { - p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), dex64, pm, true) - wg.Add(1) - go checkrecords(p) - } - pm.nodeTable.AddRecords(allrecords) - wg.Wait() -} - func TestRecvCoreBlocks(t *testing.T) { pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) pm.SetReceiveCoreMessage(true) @@ -357,7 +276,7 @@ func TestRecvCoreBlocks(t *testing.T) { t.Errorf("block mismatch") } case <-time.After(3 * time.Second): - t.Errorf("no newRecordsEvent received within 3 seconds") + t.Errorf("no core block received within 3 seconds") } } diff --git a/dex/sync.go b/dex/sync.go index 93bed87c4..84c161845 100644 --- a/dex/sync.go +++ b/dex/sync.go @@ -26,7 +26,6 @@ import ( "github.com/dexon-foundation/dexon/dex/downloader" "github.com/dexon-foundation/dexon/log" "github.com/dexon-foundation/dexon/p2p/enode" - "github.com/dexon-foundation/dexon/p2p/enr" ) const ( @@ -40,9 +39,6 @@ const ( // This is the target size for the packs of transactions sent by txsyncLoop. // A pack can get larger than this if a single transactions exceeds this size. txsyncPackSize = 100 * 1024 - - // This is the target number for the packs of records sent by recordsyncLoop. - recordsyncPackNum = 1024 ) type txsync struct { @@ -137,95 +133,6 @@ func (pm *ProtocolManager) txsyncLoop() { } } -type recordsync struct { - p *peer - records []*enr.Record -} - -// syncNodeRecords starts sending all node records to the given peer. -func (pm *ProtocolManager) syncNodeRecords(p *peer) { - records := pm.nodeTable.Records() - p.Log().Debug("Sync node records", "num", len(records)) - if len(records) == 0 { - return - } - select { - case pm.recordsyncCh <- &recordsync{p, records}: - case <-pm.quitSync: - } -} - -// recordsyncLoop takes care of the initial node record sync for each new -// connection. When a new peer appears, we relay all currently node records. -// In order to minimise egress bandwidth usage, we send -// the records in small packs to one peer at a time. -func (pm *ProtocolManager) recordsyncLoop() { - var ( - pending = make(map[enode.ID]*recordsync) - sending = false // whether a send is active - pack = new(recordsync) // the pack that is being sent - done = make(chan error, 1) // result of the send - ) - - // send starts a sending a pack of transactions from the sync. - send := func(s *recordsync) { - // Fill pack with node records up to the target num. - var num int - pack.p = s.p - pack.records = pack.records[:0] - for i := 0; i < len(s.records) && num < recordsyncPackNum; i++ { - pack.records = append(pack.records, s.records[i]) - num += 1 - } - // Remove the records that will be sent. - s.records = s.records[:copy(s.records, s.records[len(pack.records):])] - if len(s.records) == 0 { - delete(pending, s.p.ID()) - } - // Send the pack in the background. - s.p.Log().Trace("Sending batch of records", "count", len(pack.records), "bytes", num) - sending = true - go func() { done <- pack.p.SendNodeRecords(pack.records) }() - } - - // pick chooses the next pending sync. - pick := func() *recordsync { - if len(pending) == 0 { - return nil - } - n := rand.Intn(len(pending)) + 1 - for _, s := range pending { - if n--; n == 0 { - return s - } - } - return nil - } - - for { - select { - case s := <-pm.recordsyncCh: - pending[s.p.ID()] = s - if !sending { - send(s) - } - case err := <-done: - sending = false - // Stop tracking peers that cause send failures. - if err != nil { - pack.p.Log().Debug("Record send failed", "err", err) - delete(pending, pack.p.ID()) - } - // Schedule the next send. - if s := pick(); s != nil { - send(s) - } - case <-pm.quitSync: - return - } - } -} - // syncer is responsible for periodically synchronising with the network, both // downloading hashes and blocks as well as handling the announcement handler. func (pm *ProtocolManager) syncer() { diff --git a/p2p/dial.go b/p2p/dial.go index 583f02f6b..8bb39f9e8 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -226,7 +226,9 @@ func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Ti delete(s.direct, t.dest.ID()) case nil: s.dialing[id] = t.flags - newtasks = append(newtasks, t) + // New a task instance with no lastResolved, resolveDelay here, + // so that we can pass the resolve delay check. + newtasks = append(newtasks, &dialTask{flags: t.flags, dest: t.dest}) } } @@ -363,12 +365,9 @@ func (t *dialTask) resolve(srv *Server) bool { resolved := srv.ntab.Resolve(t.dest) t.lastResolved = time.Now() if resolved == nil { - // Only backoff delay if this is not direct connection. - if t.flags&directDialedConn == 0 { - t.resolveDelay *= 2 - if t.resolveDelay > maxResolveDelay { - t.resolveDelay = maxResolveDelay - } + t.resolveDelay *= 2 + if t.resolveDelay > maxResolveDelay { + t.resolveDelay = maxResolveDelay } log.Debug("Resolving node failed", "id", t.dest.ID(), "newdelay", t.resolveDelay) return false |