diff options
author | Sonic <sonic@dexon.org> | 2019-04-03 16:28:29 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@byzantine-lab.io> | 2019-06-15 22:09:55 +0800 |
commit | c597b2ff15aefcc73d55a0a3f8c8e0f6e18f083c (patch) | |
tree | def84e1c48925e637ff47bb9c8ee382666e4f752 /dex/handler.go | |
parent | aff2c3533badc7415c223580c591a3274330185c (diff) | |
download | go-tangerine-c597b2ff15aefcc73d55a0a3f8c8e0f6e18f083c.tar go-tangerine-c597b2ff15aefcc73d55a0a3f8c8e0f6e18f083c.tar.gz go-tangerine-c597b2ff15aefcc73d55a0a3f8c8e0f6e18f083c.tar.bz2 go-tangerine-c597b2ff15aefcc73d55a0a3f8c8e0f6e18f083c.tar.lz go-tangerine-c597b2ff15aefcc73d55a0a3f8c8e0f6e18f083c.tar.xz go-tangerine-c597b2ff15aefcc73d55a0a3f8c8e0f6e18f083c.tar.zst go-tangerine-c597b2ff15aefcc73d55a0a3f8c8e0f6e18f083c.zip |
dex: remove node table (#330)
* dex: remove node table
Node table is not so useful, go back to rely on kademlia
* p2p: fix direct dial still have resolve delay
Diffstat (limited to 'dex/handler.go')
-rw-r--r-- | dex/handler.go | 110 |
1 files changed, 22 insertions, 88 deletions
diff --git a/dex/handler.go b/dex/handler.go index 45f58012c..deb959c45 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -35,11 +35,11 @@ package dex import ( "bytes" + "context" "encoding/json" "errors" "fmt" "math" - "math/rand" "sync" "sync/atomic" "time" @@ -64,7 +64,6 @@ import ( "github.com/dexon-foundation/dexon/metrics" "github.com/dexon-foundation/dexon/p2p" "github.com/dexon-foundation/dexon/p2p/enode" - "github.com/dexon-foundation/dexon/p2p/enr" "github.com/dexon-foundation/dexon/params" "github.com/dexon-foundation/dexon/rlp" ) @@ -81,8 +80,6 @@ const ( finalizedBlockChanSize = 128 - recordChanSize = 10240 - maxPullPeers = 3 maxPullVotePeers = 1 @@ -107,7 +104,6 @@ type ProtocolManager struct { acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing) txpool txPool - nodeTable *nodeTable gov governance blockchain *core.BlockChain chainconfig *params.ChainConfig @@ -121,20 +117,17 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol - eventMux *event.TypeMux - txsCh chan core.NewTxsEvent - txsSub event.Subscription - recordsCh chan newRecordsEvent - recordsSub event.Subscription + eventMux *event.TypeMux + txsCh chan core.NewTxsEvent + txsSub event.Subscription whitelist map[uint64]common.Hash // channels for fetcher, syncer, txsyncLoop - newPeerCh chan *peer - txsyncCh chan *txsync - recordsyncCh chan *recordsync - quitSync chan struct{} - noMorePeers chan struct{} + newPeerCh chan *peer + txsyncCh chan *txsync + quitSync chan struct{} + noMorePeers chan struct{} // channels for peerSetLoop chainHeadCh chan core.ChainHeadEvent @@ -168,13 +161,11 @@ func NewProtocolManager( mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, whitelist map[uint64]common.Hash, isBlockProposer bool, gov governance, app dexconApp) (*ProtocolManager, error) { - tab := newNodeTable() // Create the protocol manager with the base fields manager := &ProtocolManager{ networkID: networkID, eventMux: mux, txpool: txpool, - nodeTable: tab, gov: gov, blockchain: blockchain, cache: newCache(5120, dexDB.NewDatabase(chaindb)), @@ -184,7 +175,6 @@ func NewProtocolManager( newPeerCh: make(chan *peer), noMorePeers: make(chan struct{}), txsyncCh: make(chan *txsync), - recordsyncCh: make(chan *recordsync), quitSync: make(chan struct{}), receiveCh: make(chan interface{}, 1024), receiveCoreMessage: 0, @@ -285,7 +275,7 @@ func (pm *ProtocolManager) removePeer(id string) { func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { pm.maxPeers = maxPeers pm.srvr = srvr - pm.peers = newPeerSet(pm.gov, pm.srvr, pm.nodeTable) + pm.peers = newPeerSet(pm.gov, pm.srvr) // broadcast transactions pm.txsCh = make(chan core.NewTxsEvent, txChanSize) @@ -301,11 +291,6 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { go pm.finalizedBlockBroadcastLoop() } - // broadcast node records - pm.recordsCh = make(chan newRecordsEvent, recordChanSize) - pm.recordsSub = pm.nodeTable.SubscribeNewRecordsEvent(pm.recordsCh) - go pm.recordBroadcastLoop() - // run the peer set loop pm.chainHeadCh = make(chan core.ChainHeadEvent) pm.chainHeadSub = pm.blockchain.SubscribeChainHeadEvent(pm.chainHeadCh) @@ -314,8 +299,6 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { // start sync handlers go pm.syncer() go pm.txsyncLoop() - go pm.recordsyncLoop() - } func (pm *ProtocolManager) Stop() { @@ -392,7 +375,6 @@ func (pm *ProtocolManager) handle(p *peer) error { // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. pm.syncTransactions(p) - pm.syncNodeRecords(p) // If we have any explicit whitelist block hashes, request them for number := range pm.whitelist { @@ -839,21 +821,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { types.GlobalSigCache.Add(types.NewEIP155Signer(pm.blockchain.Config().ChainID), txs) pm.txpool.AddRemotes(txs) - case msg.Code == RecordMsg: - var records []*enr.Record - if err := msg.Decode(&records); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - for i, record := range records { - if record == nil { - return errResp(ErrDecode, "node record %d is nil", i) - } - p.MarkNodeRecord(rlpHash(record)) - } - pm.nodeTable.AddRecords(records) - // Block proposer-only messages. - case msg.Code == CoreBlockMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break @@ -1070,23 +1038,6 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { } } -// BroadcastRecords will propagate node records to its peers. -func (pm *ProtocolManager) BroadcastRecords(records []*enr.Record) { - var recordset = make(map[*peer][]*enr.Record) - - for _, record := range records { - peers := pm.peers.PeersWithoutNodeRecord(rlpHash(record)) - for _, peer := range peers { - recordset[peer] = append(recordset[peer], record) - } - log.Trace("Broadcast record", "recipients", len(peers)) - } - - for peer, records := range recordset { - peer.AsyncSendNodeRecords(records) - } -} - // BroadcastFinalizedBlock broadcasts the finalized core block to some of its peers. func (pm *ProtocolManager) BroadcastFinalizedBlock(block *coreTypes.Block) { if len(block.Randomness) == 0 { @@ -1271,36 +1222,6 @@ func (pm *ProtocolManager) finalizedBlockBroadcastLoop() { } } -func (pm *ProtocolManager) recordBroadcastLoop() { - r := rand.New(rand.NewSource(time.Now().Unix())) - t := time.NewTimer(0) - defer t.Stop() - - for { - select { - case event := <-pm.recordsCh: - pm.BroadcastRecords(event.Records) - pm.peers.Refresh() - - case <-t.C: - record := pm.srvr.Self().Record() - log.Debug("refresh our node record", "seq", record.Seq()) - pm.nodeTable.AddRecords([]*enr.Record{record}) - - // Log current peers connection status. - pm.peers.Status() - - // Reset timer. - d := 1*time.Minute + time.Duration(r.Int63n(60))*time.Second - t.Reset(d) - - // Err() channel will be closed when unsubscribing. - case <-pm.recordsSub.Err(): - return - } - } -} - func (pm *ProtocolManager) SetReceiveCoreMessage(enabled bool) { if enabled { atomic.StoreInt32(&pm.receiveCoreMessage, 1) @@ -1333,6 +1254,19 @@ func (pm *ProtocolManager) peerSetLoop() { resetCount = pm.gov.DKGResetCount(round) } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + for ctx.Err() == nil { + select { + case <-time.After(time.Minute): + pm.peers.Status() + case <-ctx.Done(): + return + } + } + }() + for { select { case event := <-pm.chainHeadCh: |