diff options
Diffstat (limited to 'dex/handler.go')
-rw-r--r-- | dex/handler.go | 108 |
1 files changed, 42 insertions, 66 deletions
diff --git a/dex/handler.go b/dex/handler.go index 8a60c2a56..66a0fd7fd 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -61,6 +61,7 @@ import ( "github.com/dexon-foundation/dexon/log" "github.com/dexon-foundation/dexon/p2p" "github.com/dexon-foundation/dexon/p2p/enode" + "github.com/dexon-foundation/dexon/p2p/enr" "github.com/dexon-foundation/dexon/params" "github.com/dexon-foundation/dexon/rlp" ) @@ -75,7 +76,7 @@ const ( finalizedBlockChanSize = 128 - metaChanSize = 10240 + recordChanSize = 10240 maxPullPeers = 3 ) @@ -108,18 +109,18 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol - eventMux *event.TypeMux - txsCh chan core.NewTxsEvent - txsSub event.Subscription - metasCh chan newMetasEvent - metasSub event.Subscription + eventMux *event.TypeMux + txsCh chan core.NewTxsEvent + txsSub event.Subscription + recordsCh chan newRecordsEvent + recordsSub event.Subscription // channels for fetcher, syncer, txsyncLoop - newPeerCh chan *peer - txsyncCh chan *txsync - metasyncCh chan *metasync - quitSync chan struct{} - noMorePeers chan struct{} + newPeerCh chan *peer + txsyncCh chan *txsync + recordsyncCh chan *recordsync + quitSync chan struct{} + noMorePeers chan struct{} // channels for peerSetLoop chainHeadCh chan core.ChainHeadEvent @@ -163,7 +164,7 @@ func NewProtocolManager( newPeerCh: make(chan *peer), noMorePeers: make(chan struct{}), txsyncCh: make(chan *txsync), - metasyncCh: make(chan *metasync), + recordsyncCh: make(chan *recordsync), quitSync: make(chan struct{}), receiveCh: make(chan interface{}, 1024), isBlockProposer: isBlockProposer, @@ -272,10 +273,10 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { go pm.finalizedBlockBroadcastLoop() } - // broadcast node metas - pm.metasCh = make(chan newMetasEvent, metaChanSize) - pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh) - go pm.metaBroadcastLoop() + // broadcast node records + pm.recordsCh = make(chan newRecordsEvent, recordChanSize) + pm.recordsSub = pm.nodeTable.SubscribeNewRecordsEvent(pm.recordsCh) + go pm.recordBroadcastLoop() // run the peer set loop pm.chainHeadCh = make(chan core.ChainHeadEvent) @@ -285,37 +286,12 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { // start sync handlers go pm.syncer() go pm.txsyncLoop() - go pm.metasyncLoop() + go pm.recordsyncLoop() } -func (pm *ProtocolManager) addSelfMeta() { - pm.nodeTable.Add([]*NodeMeta{pm.makeSelfNodeMeta()}) -} - -func (pm *ProtocolManager) makeSelfNodeMeta() *NodeMeta { - self := pm.srvr.Self() - meta := &NodeMeta{ - ID: self.ID(), - IP: self.IP(), - UDP: uint(self.UDP()), - TCP: uint(self.TCP()), - Timestamp: uint64(time.Now().Unix()), - } - - h := rlpHash([]interface{}{ - meta.ID, - meta.IP, - meta.UDP, - meta.TCP, - meta.Timestamp, - }) - sig, err := crypto.Sign(h[:], pm.srvr.GetPrivateKey()) - if err != nil { - panic(err) - } - meta.Sig = sig - return meta +func (pm *ProtocolManager) addSelfRecord() { + pm.nodeTable.AddRecords([]*enr.Record{pm.srvr.Self().Record()}) } func (pm *ProtocolManager) Stop() { @@ -388,7 +364,7 @@ func (pm *ProtocolManager) handle(p *peer) error { // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. pm.syncTransactions(p) - pm.syncNodeMetas(p) + pm.syncNodeRecords(p) // main loop. handle incoming messages. for { @@ -784,18 +760,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { types.GlobalSigCache.Add(types.NewEIP155Signer(pm.blockchain.Config().ChainID), txs) pm.txpool.AddRemotes(txs) - case msg.Code == MetaMsg: - var metas []*NodeMeta - if err := msg.Decode(&metas); err != nil { + case msg.Code == RecordMsg: + var records []*enr.Record + if err := msg.Decode(&records); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - for i, meta := range metas { - if meta == nil { - return errResp(ErrDecode, "node meta %d is nil", i) + for i, record := range records { + if record == nil { + return errResp(ErrDecode, "node record %d is nil", i) } - p.MarkNodeMeta(meta.Hash()) + p.MarkNodeRecord(rlpHash(record)) } - pm.nodeTable.Add(metas) + pm.nodeTable.AddRecords(records) // Block proposer-only messages. @@ -979,20 +955,20 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { } } -// BroadcastMetas will propagate node metas to its peers. -func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) { - var metaset = make(map[*peer][]*NodeMeta) +// BroadcastRecords will propagate node records to its peers. +func (pm *ProtocolManager) BroadcastRecords(records []*enr.Record) { + var recordset = make(map[*peer][]*enr.Record) - for _, meta := range metas { - peers := pm.peers.PeersWithoutNodeMeta(meta.Hash()) + for _, record := range records { + peers := pm.peers.PeersWithoutNodeRecord(rlpHash(record)) for _, peer := range peers { - metaset[peer] = append(metaset[peer], meta) + recordset[peer] = append(recordset[peer], record) } - log.Trace("Broadcast meta", "ID", meta.ID, "recipients", len(peers)) + log.Trace("Broadcast record", "recipients", len(peers)) } - for peer, metas := range metaset { - peer.AsyncSendNodeMetas(metas) + for peer, records := range recordset { + peer.AsyncSendNodeRecords(records) } } @@ -1178,14 +1154,14 @@ func (pm *ProtocolManager) finalizedBlockBroadcastLoop() { } } -func (pm *ProtocolManager) metaBroadcastLoop() { +func (pm *ProtocolManager) recordBroadcastLoop() { for { select { - case event := <-pm.metasCh: - pm.BroadcastMetas(event.Metas) + case event := <-pm.recordsCh: + pm.BroadcastRecords(event.Records) // Err() channel will be closed when unsubscribing. - case <-pm.metasSub.Err(): + case <-pm.recordsSub.Err(): return } } |