aboutsummaryrefslogtreecommitdiffstats
path: root/eth/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/handler.go')
-rw-r--r--eth/handler.go36
1 files changed, 25 insertions, 11 deletions
diff --git a/eth/handler.go b/eth/handler.go
index 6c6449340..cee719ddb 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -45,6 +45,10 @@ import (
const (
softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
+
+ // txChanSize is the size of channel listening to TxPreEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
)
var (
@@ -78,7 +82,8 @@ type ProtocolManager struct {
SubProtocols []p2p.Protocol
eventMux *event.TypeMux
- txSub *event.TypeMuxSubscription
+ txCh chan core.TxPreEvent
+ txSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
// channels for fetcher, syncer, txsyncLoop
@@ -94,7 +99,7 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, maxPeers int, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
+func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkId: networkId,
@@ -103,7 +108,6 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
blockchain: blockchain,
chaindb: chaindb,
chainconfig: config,
- maxPeers: maxPeers,
peers: newPeerSet(),
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
@@ -198,10 +202,14 @@ func (pm *ProtocolManager) removePeer(id string) {
}
}
-func (pm *ProtocolManager) Start() {
+func (pm *ProtocolManager) Start(maxPeers int) {
+ pm.maxPeers = maxPeers
+
// broadcast transactions
- pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
+ pm.txCh = make(chan core.TxPreEvent, txChanSize)
+ pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
go pm.txBroadcastLoop()
+
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
@@ -601,7 +609,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Schedule all the unknown hashes for retrieval
unknown := make(newBlockHashesData, 0, len(announces))
for _, block := range announces {
- if !pm.blockchain.HasBlock(block.Hash) {
+ if !pm.blockchain.HasBlock(block.Hash, block.Number) {
unknown = append(unknown, block)
}
}
@@ -688,9 +696,10 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
peer.SendNewBlock(block, td)
}
log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
+ return
}
// Otherwise if the block is indeed in out own chain, announce it
- if pm.blockchain.HasBlock(hash) {
+ if pm.blockchain.HasBlock(hash, block.NumberU64()) {
for _, peer := range peers {
peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
}
@@ -723,10 +732,15 @@ func (self *ProtocolManager) minedBroadcastLoop() {
}
func (self *ProtocolManager) txBroadcastLoop() {
- // automatically stops if unsubscribe
- for obj := range self.txSub.Chan() {
- event := obj.Data.(core.TxPreEvent)
- self.BroadcastTx(event.Tx.Hash(), event.Tx)
+ for {
+ select {
+ case event := <-self.txCh:
+ self.BroadcastTx(event.Tx.Hash(), event.Tx)
+
+ // Err() channel will be closed when unsubscribing.
+ case <-self.txSub.Err():
+ return
+ }
}
}