aboutsummaryrefslogtreecommitdiffstats
path: root/dex/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'dex/handler.go')
-rw-r--r--dex/handler.go37
1 files changed, 32 insertions, 5 deletions
diff --git a/dex/handler.go b/dex/handler.go
index 5753a7cd8..51167c9cb 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -72,6 +72,8 @@ const (
// The number is referenced from the size of tx pool.
txChanSize = 4096
+ finalizedBlockChanSize = 128
+
metaChanSize = 10240
maxPullPeers = 3
@@ -133,6 +135,10 @@ type ProtocolManager struct {
// Dexcon
isBlockProposer bool
+ app dexconApp
+
+ finalizedBlockCh chan core.NewFinalizedBlockEvent
+ finalizedBlockSub event.Subscription
}
// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -141,7 +147,7 @@ func NewProtocolManager(
config *params.ChainConfig, mode downloader.SyncMode, networkID uint64,
mux *event.TypeMux, txpool txPool, engine consensus.Engine,
blockchain *core.BlockChain, chaindb ethdb.Database,
- isBlockProposer bool, gov governance) (*ProtocolManager, error) {
+ isBlockProposer bool, gov governance, app dexconApp) (*ProtocolManager, error) {
tab := newNodeTable()
// Create the protocol manager with the base fields
manager := &ProtocolManager{
@@ -160,6 +166,7 @@ func NewProtocolManager(
quitSync: make(chan struct{}),
receiveCh: make(chan interface{}, 1024),
isBlockProposer: isBlockProposer,
+ app: app,
}
// Figure out whether to allow fast sync or not
@@ -255,6 +262,15 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
+ if pm.isBlockProposer {
+ // broadcast finalized blocks
+ pm.finalizedBlockCh = make(chan core.NewFinalizedBlockEvent,
+ finalizedBlockChanSize)
+ pm.finalizedBlockSub = pm.app.SubscribeNewFinalizedBlockEvent(
+ pm.finalizedBlockCh)
+ go pm.finalizedBlockBroadcastLoop()
+ }
+
// broadcast node metas
pm.metasCh = make(chan newMetasEvent, metaChanSize)
pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh)
@@ -1128,6 +1144,20 @@ func (pm *ProtocolManager) txBroadcastLoop() {
}
}
+func (pm *ProtocolManager) finalizedBlockBroadcastLoop() {
+ for {
+ select {
+ case event := <-pm.finalizedBlockCh:
+ pm.BroadcastBlock(event.Block, true)
+ pm.BroadcastBlock(event.Block, false)
+
+ // Err() channel will be closed when unsubscribing.
+ case <-pm.finalizedBlockSub.Err():
+ return
+ }
+ }
+}
+
func (pm *ProtocolManager) metaBroadcastLoop() {
for {
select {
@@ -1154,10 +1184,7 @@ func (pm *ProtocolManager) peerSetLoop() {
for {
select {
- case event := <-pm.chainHeadCh:
- pm.BroadcastBlock(event.Block, true) // First propagate block to peers
- pm.BroadcastBlock(event.Block, false) // Only then announce to the rest
-
+ case <-pm.chainHeadCh:
if !pm.isBlockProposer {
break
}