aboutsummaryrefslogtreecommitdiffstats
path: root/dex/handler.go
diff options
context:
space:
mode:
authorSonic <sonic@dexon.org>2019-04-03 16:28:29 +0800
committerWei-Ning Huang <w@byzantine-lab.io>2019-06-15 22:09:55 +0800
commitc597b2ff15aefcc73d55a0a3f8c8e0f6e18f083c (patch)
treedef84e1c48925e637ff47bb9c8ee382666e4f752 /dex/handler.go
parentaff2c3533badc7415c223580c591a3274330185c (diff)
downloadgo-tangerine-c597b2ff15aefcc73d55a0a3f8c8e0f6e18f083c.tar
go-tangerine-c597b2ff15aefcc73d55a0a3f8c8e0f6e18f083c.tar.gz
go-tangerine-c597b2ff15aefcc73d55a0a3f8c8e0f6e18f083c.tar.bz2
go-tangerine-c597b2ff15aefcc73d55a0a3f8c8e0f6e18f083c.tar.lz
go-tangerine-c597b2ff15aefcc73d55a0a3f8c8e0f6e18f083c.tar.xz
go-tangerine-c597b2ff15aefcc73d55a0a3f8c8e0f6e18f083c.tar.zst
go-tangerine-c597b2ff15aefcc73d55a0a3f8c8e0f6e18f083c.zip
dex: remove node table (#330)
* dex: remove node table Node table is not so useful, go back to rely on kademlia * p2p: fix direct dial still have resolve delay
Diffstat (limited to 'dex/handler.go')
-rw-r--r--dex/handler.go110
1 files changed, 22 insertions, 88 deletions
diff --git a/dex/handler.go b/dex/handler.go
index 45f58012c..deb959c45 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -35,11 +35,11 @@ package dex
import (
"bytes"
+ "context"
"encoding/json"
"errors"
"fmt"
"math"
- "math/rand"
"sync"
"sync/atomic"
"time"
@@ -64,7 +64,6 @@ import (
"github.com/dexon-foundation/dexon/metrics"
"github.com/dexon-foundation/dexon/p2p"
"github.com/dexon-foundation/dexon/p2p/enode"
- "github.com/dexon-foundation/dexon/p2p/enr"
"github.com/dexon-foundation/dexon/params"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -81,8 +80,6 @@ const (
finalizedBlockChanSize = 128
- recordChanSize = 10240
-
maxPullPeers = 3
maxPullVotePeers = 1
@@ -107,7 +104,6 @@ type ProtocolManager struct {
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
txpool txPool
- nodeTable *nodeTable
gov governance
blockchain *core.BlockChain
chainconfig *params.ChainConfig
@@ -121,20 +117,17 @@ type ProtocolManager struct {
SubProtocols []p2p.Protocol
- eventMux *event.TypeMux
- txsCh chan core.NewTxsEvent
- txsSub event.Subscription
- recordsCh chan newRecordsEvent
- recordsSub event.Subscription
+ eventMux *event.TypeMux
+ txsCh chan core.NewTxsEvent
+ txsSub event.Subscription
whitelist map[uint64]common.Hash
// channels for fetcher, syncer, txsyncLoop
- newPeerCh chan *peer
- txsyncCh chan *txsync
- recordsyncCh chan *recordsync
- quitSync chan struct{}
- noMorePeers chan struct{}
+ newPeerCh chan *peer
+ txsyncCh chan *txsync
+ quitSync chan struct{}
+ noMorePeers chan struct{}
// channels for peerSetLoop
chainHeadCh chan core.ChainHeadEvent
@@ -168,13 +161,11 @@ func NewProtocolManager(
mux *event.TypeMux, txpool txPool, engine consensus.Engine,
blockchain *core.BlockChain, chaindb ethdb.Database, whitelist map[uint64]common.Hash,
isBlockProposer bool, gov governance, app dexconApp) (*ProtocolManager, error) {
- tab := newNodeTable()
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkID: networkID,
eventMux: mux,
txpool: txpool,
- nodeTable: tab,
gov: gov,
blockchain: blockchain,
cache: newCache(5120, dexDB.NewDatabase(chaindb)),
@@ -184,7 +175,6 @@ func NewProtocolManager(
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
- recordsyncCh: make(chan *recordsync),
quitSync: make(chan struct{}),
receiveCh: make(chan interface{}, 1024),
receiveCoreMessage: 0,
@@ -285,7 +275,7 @@ func (pm *ProtocolManager) removePeer(id string) {
func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
pm.maxPeers = maxPeers
pm.srvr = srvr
- pm.peers = newPeerSet(pm.gov, pm.srvr, pm.nodeTable)
+ pm.peers = newPeerSet(pm.gov, pm.srvr)
// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
@@ -301,11 +291,6 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
go pm.finalizedBlockBroadcastLoop()
}
- // broadcast node records
- pm.recordsCh = make(chan newRecordsEvent, recordChanSize)
- pm.recordsSub = pm.nodeTable.SubscribeNewRecordsEvent(pm.recordsCh)
- go pm.recordBroadcastLoop()
-
// run the peer set loop
pm.chainHeadCh = make(chan core.ChainHeadEvent)
pm.chainHeadSub = pm.blockchain.SubscribeChainHeadEvent(pm.chainHeadCh)
@@ -314,8 +299,6 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
- go pm.recordsyncLoop()
-
}
func (pm *ProtocolManager) Stop() {
@@ -392,7 +375,6 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
- pm.syncNodeRecords(p)
// If we have any explicit whitelist block hashes, request them
for number := range pm.whitelist {
@@ -839,21 +821,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
types.GlobalSigCache.Add(types.NewEIP155Signer(pm.blockchain.Config().ChainID), txs)
pm.txpool.AddRemotes(txs)
- case msg.Code == RecordMsg:
- var records []*enr.Record
- if err := msg.Decode(&records); err != nil {
- return errResp(ErrDecode, "msg %v: %v", msg, err)
- }
- for i, record := range records {
- if record == nil {
- return errResp(ErrDecode, "node record %d is nil", i)
- }
- p.MarkNodeRecord(rlpHash(record))
- }
- pm.nodeTable.AddRecords(records)
-
// Block proposer-only messages.
-
case msg.Code == CoreBlockMsg:
if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
@@ -1070,23 +1038,6 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
}
}
-// BroadcastRecords will propagate node records to its peers.
-func (pm *ProtocolManager) BroadcastRecords(records []*enr.Record) {
- var recordset = make(map[*peer][]*enr.Record)
-
- for _, record := range records {
- peers := pm.peers.PeersWithoutNodeRecord(rlpHash(record))
- for _, peer := range peers {
- recordset[peer] = append(recordset[peer], record)
- }
- log.Trace("Broadcast record", "recipients", len(peers))
- }
-
- for peer, records := range recordset {
- peer.AsyncSendNodeRecords(records)
- }
-}
-
// BroadcastFinalizedBlock broadcasts the finalized core block to some of its peers.
func (pm *ProtocolManager) BroadcastFinalizedBlock(block *coreTypes.Block) {
if len(block.Randomness) == 0 {
@@ -1271,36 +1222,6 @@ func (pm *ProtocolManager) finalizedBlockBroadcastLoop() {
}
}
-func (pm *ProtocolManager) recordBroadcastLoop() {
- r := rand.New(rand.NewSource(time.Now().Unix()))
- t := time.NewTimer(0)
- defer t.Stop()
-
- for {
- select {
- case event := <-pm.recordsCh:
- pm.BroadcastRecords(event.Records)
- pm.peers.Refresh()
-
- case <-t.C:
- record := pm.srvr.Self().Record()
- log.Debug("refresh our node record", "seq", record.Seq())
- pm.nodeTable.AddRecords([]*enr.Record{record})
-
- // Log current peers connection status.
- pm.peers.Status()
-
- // Reset timer.
- d := 1*time.Minute + time.Duration(r.Int63n(60))*time.Second
- t.Reset(d)
-
- // Err() channel will be closed when unsubscribing.
- case <-pm.recordsSub.Err():
- return
- }
- }
-}
-
func (pm *ProtocolManager) SetReceiveCoreMessage(enabled bool) {
if enabled {
atomic.StoreInt32(&pm.receiveCoreMessage, 1)
@@ -1333,6 +1254,19 @@ func (pm *ProtocolManager) peerSetLoop() {
resetCount = pm.gov.DKGResetCount(round)
}
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ go func() {
+ for ctx.Err() == nil {
+ select {
+ case <-time.After(time.Minute):
+ pm.peers.Status()
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
for {
select {
case event := <-pm.chainHeadCh: