aboutsummaryrefslogtreecommitdiffstats
path: root/dex/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'dex/handler.go')
-rw-r--r--dex/handler.go108
1 files changed, 42 insertions, 66 deletions
diff --git a/dex/handler.go b/dex/handler.go
index 8a60c2a56..66a0fd7fd 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -61,6 +61,7 @@ import (
"github.com/dexon-foundation/dexon/log"
"github.com/dexon-foundation/dexon/p2p"
"github.com/dexon-foundation/dexon/p2p/enode"
+ "github.com/dexon-foundation/dexon/p2p/enr"
"github.com/dexon-foundation/dexon/params"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -75,7 +76,7 @@ const (
finalizedBlockChanSize = 128
- metaChanSize = 10240
+ recordChanSize = 10240
maxPullPeers = 3
)
@@ -108,18 +109,18 @@ type ProtocolManager struct {
SubProtocols []p2p.Protocol
- eventMux *event.TypeMux
- txsCh chan core.NewTxsEvent
- txsSub event.Subscription
- metasCh chan newMetasEvent
- metasSub event.Subscription
+ eventMux *event.TypeMux
+ txsCh chan core.NewTxsEvent
+ txsSub event.Subscription
+ recordsCh chan newRecordsEvent
+ recordsSub event.Subscription
// channels for fetcher, syncer, txsyncLoop
- newPeerCh chan *peer
- txsyncCh chan *txsync
- metasyncCh chan *metasync
- quitSync chan struct{}
- noMorePeers chan struct{}
+ newPeerCh chan *peer
+ txsyncCh chan *txsync
+ recordsyncCh chan *recordsync
+ quitSync chan struct{}
+ noMorePeers chan struct{}
// channels for peerSetLoop
chainHeadCh chan core.ChainHeadEvent
@@ -163,7 +164,7 @@ func NewProtocolManager(
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
- metasyncCh: make(chan *metasync),
+ recordsyncCh: make(chan *recordsync),
quitSync: make(chan struct{}),
receiveCh: make(chan interface{}, 1024),
isBlockProposer: isBlockProposer,
@@ -272,10 +273,10 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
go pm.finalizedBlockBroadcastLoop()
}
- // broadcast node metas
- pm.metasCh = make(chan newMetasEvent, metaChanSize)
- pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh)
- go pm.metaBroadcastLoop()
+ // broadcast node records
+ pm.recordsCh = make(chan newRecordsEvent, recordChanSize)
+ pm.recordsSub = pm.nodeTable.SubscribeNewRecordsEvent(pm.recordsCh)
+ go pm.recordBroadcastLoop()
// run the peer set loop
pm.chainHeadCh = make(chan core.ChainHeadEvent)
@@ -285,37 +286,12 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
- go pm.metasyncLoop()
+ go pm.recordsyncLoop()
}
-func (pm *ProtocolManager) addSelfMeta() {
- pm.nodeTable.Add([]*NodeMeta{pm.makeSelfNodeMeta()})
-}
-
-func (pm *ProtocolManager) makeSelfNodeMeta() *NodeMeta {
- self := pm.srvr.Self()
- meta := &NodeMeta{
- ID: self.ID(),
- IP: self.IP(),
- UDP: uint(self.UDP()),
- TCP: uint(self.TCP()),
- Timestamp: uint64(time.Now().Unix()),
- }
-
- h := rlpHash([]interface{}{
- meta.ID,
- meta.IP,
- meta.UDP,
- meta.TCP,
- meta.Timestamp,
- })
- sig, err := crypto.Sign(h[:], pm.srvr.GetPrivateKey())
- if err != nil {
- panic(err)
- }
- meta.Sig = sig
- return meta
+func (pm *ProtocolManager) addSelfRecord() {
+ pm.nodeTable.AddRecords([]*enr.Record{pm.srvr.Self().Record()})
}
func (pm *ProtocolManager) Stop() {
@@ -388,7 +364,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
- pm.syncNodeMetas(p)
+ pm.syncNodeRecords(p)
// main loop. handle incoming messages.
for {
@@ -784,18 +760,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
types.GlobalSigCache.Add(types.NewEIP155Signer(pm.blockchain.Config().ChainID), txs)
pm.txpool.AddRemotes(txs)
- case msg.Code == MetaMsg:
- var metas []*NodeMeta
- if err := msg.Decode(&metas); err != nil {
+ case msg.Code == RecordMsg:
+ var records []*enr.Record
+ if err := msg.Decode(&records); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- for i, meta := range metas {
- if meta == nil {
- return errResp(ErrDecode, "node meta %d is nil", i)
+ for i, record := range records {
+ if record == nil {
+ return errResp(ErrDecode, "node record %d is nil", i)
}
- p.MarkNodeMeta(meta.Hash())
+ p.MarkNodeRecord(rlpHash(record))
}
- pm.nodeTable.Add(metas)
+ pm.nodeTable.AddRecords(records)
// Block proposer-only messages.
@@ -979,20 +955,20 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
}
}
-// BroadcastMetas will propagate node metas to its peers.
-func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) {
- var metaset = make(map[*peer][]*NodeMeta)
+// BroadcastRecords will propagate node records to its peers.
+func (pm *ProtocolManager) BroadcastRecords(records []*enr.Record) {
+ var recordset = make(map[*peer][]*enr.Record)
- for _, meta := range metas {
- peers := pm.peers.PeersWithoutNodeMeta(meta.Hash())
+ for _, record := range records {
+ peers := pm.peers.PeersWithoutNodeRecord(rlpHash(record))
for _, peer := range peers {
- metaset[peer] = append(metaset[peer], meta)
+ recordset[peer] = append(recordset[peer], record)
}
- log.Trace("Broadcast meta", "ID", meta.ID, "recipients", len(peers))
+ log.Trace("Broadcast record", "recipients", len(peers))
}
- for peer, metas := range metaset {
- peer.AsyncSendNodeMetas(metas)
+ for peer, records := range recordset {
+ peer.AsyncSendNodeRecords(records)
}
}
@@ -1178,14 +1154,14 @@ func (pm *ProtocolManager) finalizedBlockBroadcastLoop() {
}
}
-func (pm *ProtocolManager) metaBroadcastLoop() {
+func (pm *ProtocolManager) recordBroadcastLoop() {
for {
select {
- case event := <-pm.metasCh:
- pm.BroadcastMetas(event.Metas)
+ case event := <-pm.recordsCh:
+ pm.BroadcastRecords(event.Records)
// Err() channel will be closed when unsubscribing.
- case <-pm.metasSub.Err():
+ case <-pm.recordsSub.Err():
return
}
}