aboutsummaryrefslogtreecommitdiffstats
path: root/les/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'les/handler.go')
-rw-r--r--les/handler.go70
1 files changed, 59 insertions, 11 deletions
diff --git a/les/handler.go b/les/handler.go
index 4271da8b8..fbb9e9906 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -27,6 +27,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
@@ -39,7 +40,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/params"
- "github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
@@ -102,6 +102,7 @@ type ProtocolManager struct {
odr *LesOdr
server *LesServer
serverPool *serverPool
+ reqDist *requestDistributor
downloader *downloader.Downloader
fetcher *lightFetcher
@@ -127,7 +128,7 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId int, mux *event.TypeMux, pow pow.PoW, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay) (*ProtocolManager, error) {
+func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId int, mux *event.TypeMux, engine consensus.Engine, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
lightSync: lightSync,
@@ -203,8 +204,17 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
}
+ manager.reqDist = newRequestDistributor(func() map[distPeer]struct{} {
+ m := make(map[distPeer]struct{})
+ peers := manager.peers.AllPeers()
+ for _, peer := range peers {
+ m[peer] = struct{}{}
+ }
+ return m
+ }, manager.quitSync)
if odr != nil {
odr.removePeer = removePeer
+ odr.reqDist = manager.reqDist
}
/*validator := func(block *types.Block, parent *types.Block) error {
@@ -334,17 +344,49 @@ func (pm *ProtocolManager) handle(p *peer) error {
if pm.lightSync {
requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
reqID := getNextReqID()
- cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
- p.fcServer.MustAssignRequest(reqID)
- p.fcServer.SendRequest(reqID, cost)
- return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse)
+ rq := &distReq{
+ getCost: func(dp distPeer) uint64 {
+ peer := dp.(*peer)
+ return peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ },
+ canSend: func(dp distPeer) bool {
+ return dp.(*peer) == p
+ },
+ request: func(dp distPeer) func() {
+ peer := dp.(*peer)
+ cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ peer.fcServer.QueueRequest(reqID, cost)
+ return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
+ },
+ }
+ _, ok := <-pm.reqDist.queue(rq)
+ if !ok {
+ return ErrNoPeers
+ }
+ return nil
}
requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
reqID := getNextReqID()
- cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
- p.fcServer.MustAssignRequest(reqID)
- p.fcServer.SendRequest(reqID, cost)
- return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse)
+ rq := &distReq{
+ getCost: func(dp distPeer) uint64 {
+ peer := dp.(*peer)
+ return peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ },
+ canSend: func(dp distPeer) bool {
+ return dp.(*peer) == p
+ },
+ request: func(dp distPeer) func() {
+ peer := dp.(*peer)
+ cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ peer.fcServer.QueueRequest(reqID, cost)
+ return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
+ },
+ }
+ _, ok := <-pm.reqDist.queue(rq)
+ if !ok {
+ return ErrNoPeers
+ }
+ return nil
}
if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd,
requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
@@ -884,7 +926,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
if deliverMsg != nil {
- return pm.odr.Deliver(p, deliverMsg)
+ err := pm.odr.Deliver(p, deliverMsg)
+ if err != nil {
+ p.responseErrors++
+ if p.responseErrors > maxResponseErrors {
+ return err
+ }
+ }
}
return nil
}