From af8a742d00f9d47b832f6f2d50a8e1c89bbf8441 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Wed, 30 Nov 2016 06:02:08 +0100 Subject: les: improved header fetcher and server statistics --- les/handler.go | 53 ++++++++++++++++++++++------------------------------- 1 file changed, 22 insertions(+), 31 deletions(-) (limited to 'les/handler.go') diff --git a/les/handler.go b/les/handler.go index f1a8bc62c..4535aaeb9 100644 --- a/les/handler.go +++ b/les/handler.go @@ -24,10 +24,8 @@ import ( "math/big" "net" "sync" - "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" @@ -60,7 +58,7 @@ const ( MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request MaxTxSend = 64 // Amount of transactions to be send per request - disableClientRemovePeer = true + disableClientRemovePeer = false ) // errIncompatibleConfig is returned if the requested protocols and configs are @@ -157,44 +155,27 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { var entry *poolEntry + peer := manager.newPeer(int(version), networkId, p, rw) if manager.serverPool != nil { addr := p.RemoteAddr().(*net.TCPAddr) - entry = manager.serverPool.connect(p.ID(), addr.IP, uint16(addr.Port)) + entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port)) if entry == nil { return fmt.Errorf("unwanted connection") } } - peer := manager.newPeer(int(version), networkId, p, rw) peer.poolEntry = entry select { case manager.newPeerCh <- peer: manager.wg.Add(1) defer manager.wg.Done() - start := mclock.Now() err := manager.handle(peer) if entry != nil { - connTime := time.Duration(mclock.Now() - start) - stopped := false - select { - case <-manager.quitSync: - stopped = true - default: - } - //fmt.Println("connTime", peer.id, connTime, stopped, err) - quality := float64(1) - setQuality := true - if connTime < time.Minute*10 { - quality = 0 - if stopped { - setQuality = false - } - } - manager.serverPool.disconnect(entry, quality, setQuality) + manager.serverPool.disconnect(entry) } return err case <-manager.quitSync: if entry != nil { - manager.serverPool.disconnect(entry, 0, false) + manager.serverPool.disconnect(entry) } return p2p.DiscQuitting } @@ -224,7 +205,6 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash, nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash, blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer) - manager.fetcher = newLightFetcher(manager) } if odr != nil { @@ -254,10 +234,12 @@ func (pm *ProtocolManager) removePeer(id string) { glog.V(logger.Debug).Infof("LES: unregister peer %v", id) if pm.lightSync { pm.downloader.UnregisterPeer(id) - pm.odr.UnregisterPeer(peer) if pm.txrelay != nil { pm.txrelay.removePeer(id) } + if pm.fetcher != nil { + pm.fetcher.removePeer(peer) + } } if err := pm.peers.Unregister(id); err != nil { glog.V(logger.Error).Infoln("Removal failed:", err) @@ -276,8 +258,10 @@ func (pm *ProtocolManager) Start(srvr *p2p.Server) { lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) if pm.lightSync { // start sync handler - if srvr != nil { + if srvr != nil { // srvr is nil during testing pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg) + pm.odr.serverPool = pm.serverPool + pm.fetcher = newLightFetcher(pm) } go pm.syncer() } else { @@ -369,12 +353,17 @@ func (pm *ProtocolManager) handle(p *peer) error { requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil { return err } - pm.odr.RegisterPeer(p) if pm.txrelay != nil { pm.txrelay.addPeer(p) } - pm.fetcher.notify(p, nil) + p.lock.Lock() + head := p.headInfo + p.lock.Unlock() + if pm.fetcher != nil { + pm.fetcher.addPeer(p) + pm.fetcher.announce(p, head) + } if p.poolEntry != nil { pm.serverPool.registered(p.poolEntry) @@ -460,7 +449,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "%v: %v", msg, err) } glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth) - pm.fetcher.notify(p, &req) + if pm.fetcher != nil { + go pm.fetcher.announce(p, &req) + } case GetBlockHeadersMsg: glog.V(logger.Debug).Infof("<=== GetBlockHeadersMsg from peer %v", p.id) @@ -558,7 +549,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } p.fcServer.GotReply(resp.ReqID, resp.BV) - if pm.fetcher.requestedID(resp.ReqID) { + if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) { pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers) } else { err := pm.downloader.DeliverHeaders(p.id, resp.Headers) -- cgit v1.2.3