diff options
author | Zsolt Felfoldi <zsfelfoldi@gmail.com> | 2016-11-17 22:54:24 +0800 |
---|---|---|
committer | Zsolt Felfoldi <zsfelfoldi@gmail.com> | 2016-12-08 20:38:15 +0800 |
commit | 3e617f3cd6c303652147f45ec9c85a5bb7769348 (patch) | |
tree | 3280f0ecd95f2d9f109537b96e7ea82f43deafcd /les/handler.go | |
parent | 0fe35b907addf1c066cb4d7c717bb23f9f2e7be4 (diff) | |
download | go-tangerine-3e617f3cd6c303652147f45ec9c85a5bb7769348.tar go-tangerine-3e617f3cd6c303652147f45ec9c85a5bb7769348.tar.gz go-tangerine-3e617f3cd6c303652147f45ec9c85a5bb7769348.tar.bz2 go-tangerine-3e617f3cd6c303652147f45ec9c85a5bb7769348.tar.lz go-tangerine-3e617f3cd6c303652147f45ec9c85a5bb7769348.tar.xz go-tangerine-3e617f3cd6c303652147f45ec9c85a5bb7769348.tar.zst go-tangerine-3e617f3cd6c303652147f45ec9c85a5bb7769348.zip |
les: implement light server pool
Diffstat (limited to 'les/handler.go')
-rw-r--r-- | les/handler.go | 96 |
1 files changed, 51 insertions, 45 deletions
diff --git a/les/handler.go b/les/handler.go index 83d73666f..2e6952d2f 100644 --- a/les/handler.go +++ b/les/handler.go @@ -22,10 +22,12 @@ import ( "errors" "fmt" "math/big" + "net" "sync" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" @@ -101,10 +103,7 @@ type ProtocolManager struct { chainDb ethdb.Database odr *LesOdr server *LesServer - - topicDisc *discv5.Network - lesTopic discv5.Topic - p2pServer *p2p.Server + serverPool *serverPool downloader *downloader.Downloader fetcher *lightFetcher @@ -157,13 +156,46 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network Version: version, Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + var entry *poolEntry + if manager.serverPool != nil { + addr := p.RemoteAddr().(*net.TCPAddr) + entry = manager.serverPool.connect(p.ID(), addr.IP, uint16(addr.Port)) + if entry == nil { + return fmt.Errorf("unwanted connection") + } + } peer := manager.newPeer(int(version), networkId, p, rw) + peer.poolEntry = entry select { case manager.newPeerCh <- peer: manager.wg.Add(1) defer manager.wg.Done() - return manager.handle(peer) + start := mclock.Now() + err := manager.handle(peer) + if entry != nil { + connTime := time.Duration(mclock.Now() - start) + stopped := false + select { + case <-manager.quitSync: + stopped = true + default: + } + //fmt.Println("connTime", peer.id, connTime, stopped, err) + quality := float64(1) + setQuality := true + if connTime < time.Minute*10 { + quality = 0 + if stopped { + setQuality = false + } + } + manager.serverPool.disconnect(entry, quality, setQuality) + } + return err case <-manager.quitSync: + if entry != nil { + manager.serverPool.disconnect(entry, 0, false) + } return p2p.DiscQuitting } }, @@ -236,54 +268,24 @@ func (pm *ProtocolManager) removePeer(id string) { } } -func (pm *ProtocolManager) findServers() { - if pm.p2pServer == nil || pm.topicDisc == nil { - return - } - glog.V(logger.Debug).Infoln("Looking for topic", string(pm.lesTopic)) - enodes := make(chan string, 100) - stop := make(chan struct{}) - go pm.topicDisc.SearchTopic(pm.lesTopic, stop, enodes) - go func() { - added := make(map[string]bool) - for { - select { - case enode := <-enodes: - if !added[enode] { - glog.V(logger.Info).Infoln("Found LES server:", enode) - added[enode] = true - if node, err := discover.ParseNode(enode); err == nil { - pm.p2pServer.AddPeer(node) - } - } - case <-stop: - return - } - } - }() - select { - case <-time.After(time.Second * 20): - case <-pm.quitSync: - } - close(stop) -} - func (pm *ProtocolManager) Start(srvr *p2p.Server) { - pm.p2pServer = srvr + var topicDisc *discv5.Network if srvr != nil { - pm.topicDisc = srvr.DiscV5 + topicDisc = srvr.DiscV5 } - pm.lesTopic = discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) + lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) if pm.lightSync { // start sync handler - go pm.findServers() + if srvr != nil { + pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg) + } go pm.syncer() } else { - if pm.topicDisc != nil { + if topicDisc != nil { go func() { - glog.V(logger.Debug).Infoln("Starting registering topic", string(pm.lesTopic)) - pm.topicDisc.RegisterTopic(pm.lesTopic, pm.quitSync) - glog.V(logger.Debug).Infoln("Stopped registering topic", string(pm.lesTopic)) + glog.V(logger.Debug).Infoln("Starting registering topic", string(lesTopic)) + topicDisc.RegisterTopic(lesTopic, pm.quitSync) + glog.V(logger.Debug).Infoln("Stopped registering topic", string(lesTopic)) }() } go func() { @@ -373,6 +375,10 @@ func (pm *ProtocolManager) handle(p *peer) error { } pm.fetcher.notify(p, nil) + + if p.poolEntry != nil { + pm.serverPool.registered(p.poolEntry) + } } stop := make(chan struct{}) |