From 3e617f3cd6c303652147f45ec9c85a5bb7769348 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Thu, 17 Nov 2016 15:54:24 +0100 Subject: les: implement light server pool --- les/handler.go | 96 +++++++++++++++++++++++++++++++--------------------------- 1 file changed, 51 insertions(+), 45 deletions(-) (limited to 'les/handler.go') diff --git a/les/handler.go b/les/handler.go index 83d73666f..2e6952d2f 100644 --- a/les/handler.go +++ b/les/handler.go @@ -22,10 +22,12 @@ import ( "errors" "fmt" "math/big" + "net" "sync" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" @@ -101,10 +103,7 @@ type ProtocolManager struct { chainDb ethdb.Database odr *LesOdr server *LesServer - - topicDisc *discv5.Network - lesTopic discv5.Topic - p2pServer *p2p.Server + serverPool *serverPool downloader *downloader.Downloader fetcher *lightFetcher @@ -157,13 +156,46 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network Version: version, Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + var entry *poolEntry + if manager.serverPool != nil { + addr := p.RemoteAddr().(*net.TCPAddr) + entry = manager.serverPool.connect(p.ID(), addr.IP, uint16(addr.Port)) + if entry == nil { + return fmt.Errorf("unwanted connection") + } + } peer := manager.newPeer(int(version), networkId, p, rw) + peer.poolEntry = entry select { case manager.newPeerCh <- peer: manager.wg.Add(1) defer manager.wg.Done() - return manager.handle(peer) + start := mclock.Now() + err := manager.handle(peer) + if entry != nil { + connTime := time.Duration(mclock.Now() - start) + stopped := false + select { + case <-manager.quitSync: + stopped = true + default: + } + //fmt.Println("connTime", peer.id, connTime, stopped, err) + quality := float64(1) + setQuality := true + if connTime < time.Minute*10 { + quality = 0 + if stopped { + setQuality = false + } + } + manager.serverPool.disconnect(entry, quality, setQuality) + } + return err case <-manager.quitSync: + if entry != nil { + manager.serverPool.disconnect(entry, 0, false) + } return p2p.DiscQuitting } }, @@ -236,54 +268,24 @@ func (pm *ProtocolManager) removePeer(id string) { } } -func (pm *ProtocolManager) findServers() { - if pm.p2pServer == nil || pm.topicDisc == nil { - return - } - glog.V(logger.Debug).Infoln("Looking for topic", string(pm.lesTopic)) - enodes := make(chan string, 100) - stop := make(chan struct{}) - go pm.topicDisc.SearchTopic(pm.lesTopic, stop, enodes) - go func() { - added := make(map[string]bool) - for { - select { - case enode := <-enodes: - if !added[enode] { - glog.V(logger.Info).Infoln("Found LES server:", enode) - added[enode] = true - if node, err := discover.ParseNode(enode); err == nil { - pm.p2pServer.AddPeer(node) - } - } - case <-stop: - return - } - } - }() - select { - case <-time.After(time.Second * 20): - case <-pm.quitSync: - } - close(stop) -} - func (pm *ProtocolManager) Start(srvr *p2p.Server) { - pm.p2pServer = srvr + var topicDisc *discv5.Network if srvr != nil { - pm.topicDisc = srvr.DiscV5 + topicDisc = srvr.DiscV5 } - pm.lesTopic = discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) + lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) if pm.lightSync { // start sync handler - go pm.findServers() + if srvr != nil { + pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg) + } go pm.syncer() } else { - if pm.topicDisc != nil { + if topicDisc != nil { go func() { - glog.V(logger.Debug).Infoln("Starting registering topic", string(pm.lesTopic)) - pm.topicDisc.RegisterTopic(pm.lesTopic, pm.quitSync) - glog.V(logger.Debug).Infoln("Stopped registering topic", string(pm.lesTopic)) + glog.V(logger.Debug).Infoln("Starting registering topic", string(lesTopic)) + topicDisc.RegisterTopic(lesTopic, pm.quitSync) + glog.V(logger.Debug).Infoln("Stopped registering topic", string(lesTopic)) }() } go func() { @@ -373,6 +375,10 @@ func (pm *ProtocolManager) handle(p *peer) error { } pm.fetcher.notify(p, nil) + + if p.poolEntry != nil { + pm.serverPool.registered(p.poolEntry) + } } stop := make(chan struct{}) -- cgit v1.2.3 From e67500aa15a2f51a96f0ae91ab3af898b81d82f2 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Tue, 29 Nov 2016 23:55:35 +0100 Subject: les: fixed light fetcher request ID matching --- les/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'les/handler.go') diff --git a/les/handler.go b/les/handler.go index 2e6952d2f..f1a8bc62c 100644 --- a/les/handler.go +++ b/les/handler.go @@ -559,7 +559,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } p.fcServer.GotReply(resp.ReqID, resp.BV) if pm.fetcher.requestedID(resp.ReqID) { - pm.fetcher.deliverHeaders(resp.ReqID, resp.Headers) + pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers) } else { err := pm.downloader.DeliverHeaders(p.id, resp.Headers) if err != nil { -- cgit v1.2.3 From af8a742d00f9d47b832f6f2d50a8e1c89bbf8441 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Wed, 30 Nov 2016 06:02:08 +0100 Subject: les: improved header fetcher and server statistics --- les/handler.go | 53 ++++++++++++++++++++++------------------------------- 1 file changed, 22 insertions(+), 31 deletions(-) (limited to 'les/handler.go') diff --git a/les/handler.go b/les/handler.go index f1a8bc62c..4535aaeb9 100644 --- a/les/handler.go +++ b/les/handler.go @@ -24,10 +24,8 @@ import ( "math/big" "net" "sync" - "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" @@ -60,7 +58,7 @@ const ( MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request MaxTxSend = 64 // Amount of transactions to be send per request - disableClientRemovePeer = true + disableClientRemovePeer = false ) // errIncompatibleConfig is returned if the requested protocols and configs are @@ -157,44 +155,27 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { var entry *poolEntry + peer := manager.newPeer(int(version), networkId, p, rw) if manager.serverPool != nil { addr := p.RemoteAddr().(*net.TCPAddr) - entry = manager.serverPool.connect(p.ID(), addr.IP, uint16(addr.Port)) + entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port)) if entry == nil { return fmt.Errorf("unwanted connection") } } - peer := manager.newPeer(int(version), networkId, p, rw) peer.poolEntry = entry select { case manager.newPeerCh <- peer: manager.wg.Add(1) defer manager.wg.Done() - start := mclock.Now() err := manager.handle(peer) if entry != nil { - connTime := time.Duration(mclock.Now() - start) - stopped := false - select { - case <-manager.quitSync: - stopped = true - default: - } - //fmt.Println("connTime", peer.id, connTime, stopped, err) - quality := float64(1) - setQuality := true - if connTime < time.Minute*10 { - quality = 0 - if stopped { - setQuality = false - } - } - manager.serverPool.disconnect(entry, quality, setQuality) + manager.serverPool.disconnect(entry) } return err case <-manager.quitSync: if entry != nil { - manager.serverPool.disconnect(entry, 0, false) + manager.serverPool.disconnect(entry) } return p2p.DiscQuitting } @@ -224,7 +205,6 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash, nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash, blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer) - manager.fetcher = newLightFetcher(manager) } if odr != nil { @@ -254,10 +234,12 @@ func (pm *ProtocolManager) removePeer(id string) { glog.V(logger.Debug).Infof("LES: unregister peer %v", id) if pm.lightSync { pm.downloader.UnregisterPeer(id) - pm.odr.UnregisterPeer(peer) if pm.txrelay != nil { pm.txrelay.removePeer(id) } + if pm.fetcher != nil { + pm.fetcher.removePeer(peer) + } } if err := pm.peers.Unregister(id); err != nil { glog.V(logger.Error).Infoln("Removal failed:", err) @@ -276,8 +258,10 @@ func (pm *ProtocolManager) Start(srvr *p2p.Server) { lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) if pm.lightSync { // start sync handler - if srvr != nil { + if srvr != nil { // srvr is nil during testing pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg) + pm.odr.serverPool = pm.serverPool + pm.fetcher = newLightFetcher(pm) } go pm.syncer() } else { @@ -369,12 +353,17 @@ func (pm *ProtocolManager) handle(p *peer) error { requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil { return err } - pm.odr.RegisterPeer(p) if pm.txrelay != nil { pm.txrelay.addPeer(p) } - pm.fetcher.notify(p, nil) + p.lock.Lock() + head := p.headInfo + p.lock.Unlock() + if pm.fetcher != nil { + pm.fetcher.addPeer(p) + pm.fetcher.announce(p, head) + } if p.poolEntry != nil { pm.serverPool.registered(p.poolEntry) @@ -460,7 +449,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "%v: %v", msg, err) } glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth) - pm.fetcher.notify(p, &req) + if pm.fetcher != nil { + go pm.fetcher.announce(p, &req) + } case GetBlockHeadersMsg: glog.V(logger.Debug).Infof("<=== GetBlockHeadersMsg from peer %v", p.id) @@ -558,7 +549,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } p.fcServer.GotReply(resp.ReqID, resp.BV) - if pm.fetcher.requestedID(resp.ReqID) { + if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) { pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers) } else { err := pm.downloader.DeliverHeaders(p.id, resp.Headers) -- cgit v1.2.3 From c8130df1d9dcc504244a49cbb12aa4c2848e5de2 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Tue, 6 Dec 2016 06:16:53 +0100 Subject: les: using random request IDs --- les/handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'les/handler.go') diff --git a/les/handler.go b/les/handler.go index 4535aaeb9..3fc80e19a 100644 --- a/les/handler.go +++ b/les/handler.go @@ -338,13 +338,13 @@ func (pm *ProtocolManager) handle(p *peer) error { glog.V(logger.Debug).Infof("LES: register peer %v", p.id) if pm.lightSync { requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error { - reqID := pm.odr.getNextReqID() + reqID := getNextReqID() cost := p.GetRequestCost(GetBlockHeadersMsg, amount) p.fcServer.SendRequest(reqID, cost) return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error { - reqID := pm.odr.getNextReqID() + reqID := getNextReqID() cost := p.GetRequestCost(GetBlockHeadersMsg, amount) p.fcServer.SendRequest(reqID, cost) return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) -- cgit v1.2.3 From c57c54ce96628aeb6345776310123a80593f0143 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Fri, 9 Dec 2016 09:03:22 +0100 Subject: eth, les: defer starting LES service until ETH initial sync is finished --- les/handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'les/handler.go') diff --git a/les/handler.go b/les/handler.go index 3fc80e19a..048f30217 100644 --- a/les/handler.go +++ b/les/handler.go @@ -267,9 +267,9 @@ func (pm *ProtocolManager) Start(srvr *p2p.Server) { } else { if topicDisc != nil { go func() { - glog.V(logger.Debug).Infoln("Starting registering topic", string(lesTopic)) + glog.V(logger.Info).Infoln("Starting registering topic", string(lesTopic)) topicDisc.RegisterTopic(lesTopic, pm.quitSync) - glog.V(logger.Debug).Infoln("Stopped registering topic", string(lesTopic)) + glog.V(logger.Info).Infoln("Stopped registering topic", string(lesTopic)) }() } go func() { -- cgit v1.2.3