aboutsummaryrefslogtreecommitdiffstats
path: root/les/handler.go
diff options
context:
space:
mode:
authorZsolt Felfoldi <zsfelfoldi@gmail.com>2016-11-30 13:02:08 +0800
committerZsolt Felfoldi <zsfelfoldi@gmail.com>2016-12-10 16:53:08 +0800
commitaf8a742d00f9d47b832f6f2d50a8e1c89bbf8441 (patch)
tree92fca6d9750797a9c5f1fc712e2de1181be60390 /les/handler.go
parente67500aa15a2f51a96f0ae91ab3af898b81d82f2 (diff)
downloadgo-tangerine-af8a742d00f9d47b832f6f2d50a8e1c89bbf8441.tar
go-tangerine-af8a742d00f9d47b832f6f2d50a8e1c89bbf8441.tar.gz
go-tangerine-af8a742d00f9d47b832f6f2d50a8e1c89bbf8441.tar.bz2
go-tangerine-af8a742d00f9d47b832f6f2d50a8e1c89bbf8441.tar.lz
go-tangerine-af8a742d00f9d47b832f6f2d50a8e1c89bbf8441.tar.xz
go-tangerine-af8a742d00f9d47b832f6f2d50a8e1c89bbf8441.tar.zst
go-tangerine-af8a742d00f9d47b832f6f2d50a8e1c89bbf8441.zip
les: improved header fetcher and server statistics
Diffstat (limited to 'les/handler.go')
-rw-r--r--les/handler.go53
1 files changed, 22 insertions, 31 deletions
diff --git a/les/handler.go b/les/handler.go
index f1a8bc62c..4535aaeb9 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -24,10 +24,8 @@ import (
"math/big"
"net"
"sync"
- "time"
"github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
@@ -60,7 +58,7 @@ const (
MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
MaxTxSend = 64 // Amount of transactions to be send per request
- disableClientRemovePeer = true
+ disableClientRemovePeer = false
)
// errIncompatibleConfig is returned if the requested protocols and configs are
@@ -157,44 +155,27 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
var entry *poolEntry
+ peer := manager.newPeer(int(version), networkId, p, rw)
if manager.serverPool != nil {
addr := p.RemoteAddr().(*net.TCPAddr)
- entry = manager.serverPool.connect(p.ID(), addr.IP, uint16(addr.Port))
+ entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
if entry == nil {
return fmt.Errorf("unwanted connection")
}
}
- peer := manager.newPeer(int(version), networkId, p, rw)
peer.poolEntry = entry
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
- start := mclock.Now()
err := manager.handle(peer)
if entry != nil {
- connTime := time.Duration(mclock.Now() - start)
- stopped := false
- select {
- case <-manager.quitSync:
- stopped = true
- default:
- }
- //fmt.Println("connTime", peer.id, connTime, stopped, err)
- quality := float64(1)
- setQuality := true
- if connTime < time.Minute*10 {
- quality = 0
- if stopped {
- setQuality = false
- }
- }
- manager.serverPool.disconnect(entry, quality, setQuality)
+ manager.serverPool.disconnect(entry)
}
return err
case <-manager.quitSync:
if entry != nil {
- manager.serverPool.disconnect(entry, 0, false)
+ manager.serverPool.disconnect(entry)
}
return p2p.DiscQuitting
}
@@ -224,7 +205,6 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash,
nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
- manager.fetcher = newLightFetcher(manager)
}
if odr != nil {
@@ -254,10 +234,12 @@ func (pm *ProtocolManager) removePeer(id string) {
glog.V(logger.Debug).Infof("LES: unregister peer %v", id)
if pm.lightSync {
pm.downloader.UnregisterPeer(id)
- pm.odr.UnregisterPeer(peer)
if pm.txrelay != nil {
pm.txrelay.removePeer(id)
}
+ if pm.fetcher != nil {
+ pm.fetcher.removePeer(peer)
+ }
}
if err := pm.peers.Unregister(id); err != nil {
glog.V(logger.Error).Infoln("Removal failed:", err)
@@ -276,8 +258,10 @@ func (pm *ProtocolManager) Start(srvr *p2p.Server) {
lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
if pm.lightSync {
// start sync handler
- if srvr != nil {
+ if srvr != nil { // srvr is nil during testing
pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg)
+ pm.odr.serverPool = pm.serverPool
+ pm.fetcher = newLightFetcher(pm)
}
go pm.syncer()
} else {
@@ -369,12 +353,17 @@ func (pm *ProtocolManager) handle(p *peer) error {
requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
return err
}
- pm.odr.RegisterPeer(p)
if pm.txrelay != nil {
pm.txrelay.addPeer(p)
}
- pm.fetcher.notify(p, nil)
+ p.lock.Lock()
+ head := p.headInfo
+ p.lock.Unlock()
+ if pm.fetcher != nil {
+ pm.fetcher.addPeer(p)
+ pm.fetcher.announce(p, head)
+ }
if p.poolEntry != nil {
pm.serverPool.registered(p.poolEntry)
@@ -460,7 +449,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "%v: %v", msg, err)
}
glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth)
- pm.fetcher.notify(p, &req)
+ if pm.fetcher != nil {
+ go pm.fetcher.announce(p, &req)
+ }
case GetBlockHeadersMsg:
glog.V(logger.Debug).Infof("<=== GetBlockHeadersMsg from peer %v", p.id)
@@ -558,7 +549,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.GotReply(resp.ReqID, resp.BV)
- if pm.fetcher.requestedID(resp.ReqID) {
+ if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
} else {
err := pm.downloader.DeliverHeaders(p.id, resp.Headers)