aboutsummaryrefslogtreecommitdiffstats
path: root/les/handler.go
diff options
context:
space:
mode:
authorZsolt Felfoldi <zsfelfoldi@gmail.com>2016-11-17 22:54:24 +0800
committerZsolt Felfoldi <zsfelfoldi@gmail.com>2016-12-08 20:38:15 +0800
commit3e617f3cd6c303652147f45ec9c85a5bb7769348 (patch)
tree3280f0ecd95f2d9f109537b96e7ea82f43deafcd /les/handler.go
parent0fe35b907addf1c066cb4d7c717bb23f9f2e7be4 (diff)
downloadgo-tangerine-3e617f3cd6c303652147f45ec9c85a5bb7769348.tar
go-tangerine-3e617f3cd6c303652147f45ec9c85a5bb7769348.tar.gz
go-tangerine-3e617f3cd6c303652147f45ec9c85a5bb7769348.tar.bz2
go-tangerine-3e617f3cd6c303652147f45ec9c85a5bb7769348.tar.lz
go-tangerine-3e617f3cd6c303652147f45ec9c85a5bb7769348.tar.xz
go-tangerine-3e617f3cd6c303652147f45ec9c85a5bb7769348.tar.zst
go-tangerine-3e617f3cd6c303652147f45ec9c85a5bb7769348.zip
les: implement light server pool
Diffstat (limited to 'les/handler.go')
-rw-r--r--les/handler.go96
1 files changed, 51 insertions, 45 deletions
diff --git a/les/handler.go b/les/handler.go
index 83d73666f..2e6952d2f 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -22,10 +22,12 @@ import (
"errors"
"fmt"
"math/big"
+ "net"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
@@ -101,10 +103,7 @@ type ProtocolManager struct {
chainDb ethdb.Database
odr *LesOdr
server *LesServer
-
- topicDisc *discv5.Network
- lesTopic discv5.Topic
- p2pServer *p2p.Server
+ serverPool *serverPool
downloader *downloader.Downloader
fetcher *lightFetcher
@@ -157,13 +156,46 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
Version: version,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ var entry *poolEntry
+ if manager.serverPool != nil {
+ addr := p.RemoteAddr().(*net.TCPAddr)
+ entry = manager.serverPool.connect(p.ID(), addr.IP, uint16(addr.Port))
+ if entry == nil {
+ return fmt.Errorf("unwanted connection")
+ }
+ }
peer := manager.newPeer(int(version), networkId, p, rw)
+ peer.poolEntry = entry
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
- return manager.handle(peer)
+ start := mclock.Now()
+ err := manager.handle(peer)
+ if entry != nil {
+ connTime := time.Duration(mclock.Now() - start)
+ stopped := false
+ select {
+ case <-manager.quitSync:
+ stopped = true
+ default:
+ }
+ //fmt.Println("connTime", peer.id, connTime, stopped, err)
+ quality := float64(1)
+ setQuality := true
+ if connTime < time.Minute*10 {
+ quality = 0
+ if stopped {
+ setQuality = false
+ }
+ }
+ manager.serverPool.disconnect(entry, quality, setQuality)
+ }
+ return err
case <-manager.quitSync:
+ if entry != nil {
+ manager.serverPool.disconnect(entry, 0, false)
+ }
return p2p.DiscQuitting
}
},
@@ -236,54 +268,24 @@ func (pm *ProtocolManager) removePeer(id string) {
}
}
-func (pm *ProtocolManager) findServers() {
- if pm.p2pServer == nil || pm.topicDisc == nil {
- return
- }
- glog.V(logger.Debug).Infoln("Looking for topic", string(pm.lesTopic))
- enodes := make(chan string, 100)
- stop := make(chan struct{})
- go pm.topicDisc.SearchTopic(pm.lesTopic, stop, enodes)
- go func() {
- added := make(map[string]bool)
- for {
- select {
- case enode := <-enodes:
- if !added[enode] {
- glog.V(logger.Info).Infoln("Found LES server:", enode)
- added[enode] = true
- if node, err := discover.ParseNode(enode); err == nil {
- pm.p2pServer.AddPeer(node)
- }
- }
- case <-stop:
- return
- }
- }
- }()
- select {
- case <-time.After(time.Second * 20):
- case <-pm.quitSync:
- }
- close(stop)
-}
-
func (pm *ProtocolManager) Start(srvr *p2p.Server) {
- pm.p2pServer = srvr
+ var topicDisc *discv5.Network
if srvr != nil {
- pm.topicDisc = srvr.DiscV5
+ topicDisc = srvr.DiscV5
}
- pm.lesTopic = discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
+ lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
if pm.lightSync {
// start sync handler
- go pm.findServers()
+ if srvr != nil {
+ pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg)
+ }
go pm.syncer()
} else {
- if pm.topicDisc != nil {
+ if topicDisc != nil {
go func() {
- glog.V(logger.Debug).Infoln("Starting registering topic", string(pm.lesTopic))
- pm.topicDisc.RegisterTopic(pm.lesTopic, pm.quitSync)
- glog.V(logger.Debug).Infoln("Stopped registering topic", string(pm.lesTopic))
+ glog.V(logger.Debug).Infoln("Starting registering topic", string(lesTopic))
+ topicDisc.RegisterTopic(lesTopic, pm.quitSync)
+ glog.V(logger.Debug).Infoln("Stopped registering topic", string(lesTopic))
}()
}
go func() {
@@ -373,6 +375,10 @@ func (pm *ProtocolManager) handle(p *peer) error {
}
pm.fetcher.notify(p, nil)
+
+ if p.poolEntry != nil {
+ pm.serverPool.registered(p.poolEntry)
+ }
}
stop := make(chan struct{})